Skip to content

Commit 542eece

Browse files
Logging and telemetry.
1 parent aa6fa0a commit 542eece

4 files changed

Lines changed: 185 additions & 176 deletions

File tree

Lines changed: 130 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
using System.Data.Common;
2-
31
using Microsoft.EntityFrameworkCore;
42
using Microsoft.EntityFrameworkCore.Storage;
53
using Microsoft.Extensions.Logging;
@@ -12,10 +10,7 @@
1210

1311
namespace PhenX.EntityFrameworkCore.BulkInsert;
1412

15-
#pragma warning disable CS9113 // Parameter is unread.
16-
internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProviderBase<TDialect>>? logger = null) : IBulkInsertProvider
17-
#pragma warning restore CS9113 // Parameter is unread.
18-
where TDialect : SqlDialectBuilder, new()
13+
internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProviderBase<TDialect>>? logger = null) : IBulkInsertProvider where TDialect : SqlDialectBuilder, new()
1914
{
2015
protected readonly TDialect SqlDialect = new();
2116

@@ -27,121 +22,6 @@ internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProvi
2722

2823
SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;
2924

30-
protected async Task<string> CreateTableCopyAsync<T>(
31-
bool sync,
32-
DbContext context,
33-
BulkInsertOptions options,
34-
TableMetadata tableInfo,
35-
CancellationToken ctk) where T : class
36-
{
37-
var tempTableName = SqlDialect.QuoteTableName(null, GetTempTableName(tableInfo.TableName));
38-
var tempColumns = tableInfo.GetProperties(options.CopyGeneratedColumns);
39-
40-
var query = SqlDialect.CreateTableCopySql(tempTableName, tableInfo, tempColumns);
41-
42-
await ExecuteAsync(sync, context, query, ctk);
43-
await AddBulkInsertIdColumn<T>(sync, context, tempTableName, ctk);
44-
45-
return tempTableName;
46-
}
47-
48-
protected virtual async Task AddBulkInsertIdColumn<T>(
49-
bool sync,
50-
DbContext context,
51-
string tempTableName,
52-
CancellationToken ctk) where T : class
53-
{
54-
var alterQuery = string.Format(AddTableCopyBulkInsertId, tempTableName);
55-
56-
await ExecuteAsync(sync, context, alterQuery, ctk);
57-
}
58-
59-
protected static async Task ExecuteAsync(
60-
bool sync,
61-
DbContext context,
62-
string query,
63-
CancellationToken ctk)
64-
{
65-
var command = context.Database.GetDbConnection().CreateCommand();
66-
command.Transaction = context.Database.CurrentTransaction!.GetDbTransaction();
67-
command.CommandText = query;
68-
69-
if (sync)
70-
{
71-
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
72-
command.ExecuteNonQuery();
73-
}
74-
else
75-
{
76-
await command.ExecuteNonQueryAsync(ctk);
77-
}
78-
}
79-
80-
public async Task<List<T>> CopyFromTempTableAsync<T>(
81-
bool sync,
82-
DbContext context,
83-
TableMetadata tableInfo,
84-
string tempTableName,
85-
bool returnData,
86-
BulkInsertOptions options,
87-
OnConflictOptions? onConflict,
88-
CancellationToken ctk) where T : class
89-
{
90-
return await CopyFromTempTableWithoutKeysAsync<T, T>(
91-
sync,
92-
context,
93-
tableInfo,
94-
tempTableName,
95-
returnData,
96-
options,
97-
onConflict,
98-
ctk);
99-
}
100-
101-
private async Task<List<TResult>> CopyFromTempTableWithoutKeysAsync<T, TResult>(
102-
bool sync,
103-
DbContext context,
104-
TableMetadata tableInfo,
105-
string tempTableName,
106-
bool returnData,
107-
BulkInsertOptions options,
108-
OnConflictOptions? onConflict,
109-
CancellationToken ctk) where T : class where TResult : class
110-
{
111-
var query =
112-
SqlDialect.BuildMoveDataSql<T>(
113-
tableInfo,
114-
tempTableName,
115-
tableInfo.GetProperties(options.CopyGeneratedColumns),
116-
returnData ? tableInfo.GetProperties() : [],
117-
options,
118-
onConflict);
119-
120-
if (returnData)
121-
{
122-
return await QueryAsync(sync, context, query, ctk);
123-
}
124-
125-
// If not returning data, just execute the command
126-
await ExecuteAsync(sync, context, query, ctk);
127-
return [];
128-
129-
static async Task<List<TResult>> QueryAsync(bool sync, DbContext context, string query, CancellationToken cancellationToken)
130-
{
131-
// Use EF to execute the query and return the results
132-
IQueryable<TResult> queryable = context
133-
.Set<TResult>()
134-
.FromSqlRaw(query);
135-
136-
if (sync)
137-
{
138-
return queryable.ToList();
139-
}
140-
141-
return await queryable.ToListAsync(cancellationToken: cancellationToken);
142-
}
143-
}
144-
14525
public virtual async Task<List<T>> BulkInsertReturnEntities<T>(
14626
bool sync,
14727
DbContext context,
@@ -151,24 +31,30 @@ public virtual async Task<List<T>> BulkInsertReturnEntities<T>(
15131
OnConflictOptions? onConflict,
15232
CancellationToken ctk) where T : class
15333
{
154-
List<T> result;
34+
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities");
35+
activity?.AddTag("tableName", tableInfo.TableName);
36+
activity?.AddTag("synchronous", true);
15537

15638
var connection = await context.GetConnection(sync, ctk);
15739
try
15840
{
159-
var (tableName, _) = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
41+
if (logger != null)
42+
{
43+
Log.UsingTempTablToReturnData(logger);
44+
}
45+
46+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
16047

161-
result = await CopyFromTempTableAsync<T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk);
48+
var result = await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk);
16249

16350
// Commit the transaction if we own them.
16451
await connection.Commit(sync, ctk);
52+
return result;
16553
}
16654
finally
16755
{
16856
await connection.Close(sync, ctk);
16957
}
170-
171-
return result;
17258
}
17359

17460
public virtual async Task BulkInsert<T>(
@@ -180,30 +66,44 @@ public virtual async Task BulkInsert<T>(
18066
OnConflictOptions? onConflict,
18167
CancellationToken ctk) where T : class
18268
{
183-
if (onConflict != null)
69+
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert");
70+
activity?.AddTag("tableName", tableInfo.TableName);
71+
activity?.AddTag("synchronous", true);
72+
73+
var connection = await context.GetConnection(sync, ctk);
74+
try
18475
{
185-
var connection = await context.GetConnection(sync, ctk);
186-
try
76+
if (onConflict != null)
18777
{
188-
var (tableName, _) = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
78+
if (logger != null)
79+
{
80+
Log.UsingTempTableToResolveConflicts(logger);
81+
}
18982

190-
await CopyFromTempTableAsync<T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
83+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
19184

192-
// Commit the transaction if we own them.
193-
await connection.Commit(sync, ctk);
85+
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
19486
}
195-
finally
87+
else
19688
{
197-
await connection.Close(sync, ctk);
89+
if (logger != null)
90+
{
91+
Log.UsingDirectInsert(logger);
92+
}
93+
94+
await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
19895
}
96+
97+
// Commit the transaction if we own them.
98+
await connection.Commit(sync, ctk);
19999
}
200-
else
100+
finally
201101
{
202-
await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
102+
await connection.Close(sync, ctk);
203103
}
204104
}
205105

206-
private async Task<(string TableName, DbConnection Connection)> PerformBulkInsertAsync<T>(
106+
private async Task<string> PerformBulkInsertAsync<T>(
207107
bool sync,
208108
DbContext context,
209109
TableMetadata tableInfo,
@@ -217,27 +117,18 @@ public virtual async Task BulkInsert<T>(
217117
throw new InvalidOperationException("No entities to insert.");
218118
}
219119

220-
var connection = await context.GetConnection(sync, ctk);
221-
222120
var tableName = tempTableRequired
223121
? await CreateTableCopyAsync<T>(sync, context, options, tableInfo, ctk)
224122
: tableInfo.QuotedTableName;
225123

226124
var properties = tableInfo.GetProperties(options.CopyGeneratedColumns);
227125

228-
try
229-
{
230-
await BulkInsert(false, context, tableInfo, entities, tableName, properties, options, ctk);
231-
232-
// Commit the transaction if we own them.
233-
await connection.Commit(sync, ctk);
234-
}
235-
finally
236-
{
237-
await connection.Close(sync, ctk);
238-
}
126+
using var activity = Telemetry.ActivitySource.StartActivity("Insert");
127+
activity?.AddTag("tempTable", tempTableRequired);
128+
activity?.AddTag("synchronous", true);
239129

240-
return (tableName, connection.Connection);
130+
await BulkInsert(false, context, tableInfo, entities, tableName, properties, options, ctk);
131+
return tableName;
241132
}
242133

243134
/// <summary>
@@ -252,4 +143,91 @@ protected abstract Task BulkInsert<T>(
252143
IReadOnlyList<PropertyMetadata> properties,
253144
BulkInsertOptions options,
254145
CancellationToken ctk) where T : class;
146+
147+
protected async Task<string> CreateTableCopyAsync<T>(
148+
bool sync,
149+
DbContext context,
150+
BulkInsertOptions options,
151+
TableMetadata tableInfo,
152+
CancellationToken ctk) where T : class
153+
{
154+
var tempTableName = SqlDialect.QuoteTableName(null, GetTempTableName(tableInfo.TableName));
155+
var tempColumns = tableInfo.GetProperties(options.CopyGeneratedColumns);
156+
157+
var query = SqlDialect.CreateTableCopySql(tempTableName, tableInfo, tempColumns);
158+
159+
await ExecuteAsync(sync, context, query, ctk);
160+
await AddBulkInsertIdColumn<T>(sync, context, tempTableName, ctk);
161+
162+
return tempTableName;
163+
}
164+
165+
protected virtual async Task AddBulkInsertIdColumn<T>(
166+
bool sync,
167+
DbContext context,
168+
string tempTableName,
169+
CancellationToken ctk) where T : class
170+
{
171+
var alterQuery = string.Format(AddTableCopyBulkInsertId, tempTableName);
172+
173+
await ExecuteAsync(sync, context, alterQuery, ctk);
174+
}
175+
176+
private async Task<List<TResult>> CopyFromTempTableAsync<T, TResult>(
177+
bool sync,
178+
DbContext context,
179+
TableMetadata tableInfo,
180+
string tempTableName,
181+
bool returnData,
182+
BulkInsertOptions options,
183+
OnConflictOptions? onConflict,
184+
CancellationToken ctk) where T : class where TResult : class
185+
{
186+
var query =
187+
SqlDialect.BuildMoveDataSql<T>(
188+
tableInfo,
189+
tempTableName,
190+
tableInfo.GetProperties(options.CopyGeneratedColumns),
191+
returnData ? tableInfo.GetProperties() : [],
192+
options,
193+
onConflict);
194+
195+
if (returnData)
196+
{
197+
// Use EF to execute the query and return the results
198+
var queryable = context.Set<TResult>().FromSqlRaw(query);
199+
200+
if (sync)
201+
{
202+
return [.. queryable];
203+
}
204+
205+
return await queryable.ToListAsync(ctk);
206+
}
207+
208+
// If not returning data, just execute the command
209+
await ExecuteAsync(sync, context, query, ctk);
210+
return [];
211+
}
212+
213+
protected static async Task ExecuteAsync(
214+
bool sync,
215+
DbContext context,
216+
string query,
217+
CancellationToken ctk)
218+
{
219+
var command = context.Database.GetDbConnection().CreateCommand();
220+
command.Transaction = context.Database.CurrentTransaction!.GetDbTransaction();
221+
command.CommandText = query;
222+
223+
if (sync)
224+
{
225+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
226+
command.ExecuteNonQuery();
227+
}
228+
else
229+
{
230+
await command.ExecuteNonQueryAsync(ctk);
231+
}
232+
}
255233
}

0 commit comments

Comments
 (0)