1- using System . Data . Common ;
21using System . Runtime . CompilerServices ;
32
43using Microsoft . EntityFrameworkCore ;
1312
1413namespace PhenX . EntityFrameworkCore . BulkInsert ;
1514
16- #pragma warning disable CS9113 // Parameter is unread.
17- internal abstract class BulkInsertProviderBase < TDialect > ( ILogger < BulkInsertProviderBase < TDialect > > ? logger = null ) : IBulkInsertProvider
18- #pragma warning restore CS9113 // Parameter is unread.
19- where TDialect : SqlDialectBuilder , new ( )
15+ internal abstract class BulkInsertProviderBase < TDialect > ( ILogger < BulkInsertProviderBase < TDialect > > ? logger = null ) : IBulkInsertProvider where TDialect : SqlDialectBuilder , new ( )
2016{
2117 protected readonly TDialect SqlDialect = new ( ) ;
2218
@@ -28,116 +24,6 @@ internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProvi
2824
2925 SqlDialectBuilder IBulkInsertProvider . SqlDialect => SqlDialect ;
3026
31- protected async Task < string > CreateTableCopyAsync < T > (
32- bool sync ,
33- DbContext context ,
34- BulkInsertOptions options ,
35- TableMetadata tableInfo ,
36- CancellationToken ctk ) where T : class
37- {
38- var tempTableName = SqlDialect . QuoteTableName ( null , GetTempTableName ( tableInfo . TableName ) ) ;
39- var tempColumns = tableInfo . GetProperties ( options . CopyGeneratedColumns ) ;
40-
41- var query = SqlDialect . CreateTableCopySql ( tempTableName , tableInfo , tempColumns ) ;
42-
43- await ExecuteAsync ( sync , context , query , ctk ) ;
44- await AddBulkInsertIdColumn < T > ( sync , context , tempTableName , ctk ) ;
45-
46- return tempTableName ;
47- }
48-
49- protected virtual async Task AddBulkInsertIdColumn < T > (
50- bool sync ,
51- DbContext context ,
52- string tempTableName ,
53- CancellationToken ctk ) where T : class
54- {
55- var alterQuery = string . Format ( AddTableCopyBulkInsertId , tempTableName ) ;
56-
57- await ExecuteAsync ( sync , context , alterQuery , ctk ) ;
58- }
59-
60- protected static async Task ExecuteAsync (
61- bool sync ,
62- DbContext context ,
63- string query ,
64- CancellationToken ctk )
65- {
66- var command = context . Database . GetDbConnection ( ) . CreateCommand ( ) ;
67- command . Transaction = context . Database . CurrentTransaction ! . GetDbTransaction ( ) ;
68- command . CommandText = query ;
69-
70- if ( sync )
71- {
72- // ReSharper disable once MethodHasAsyncOverloadWithCancellation
73- command . ExecuteNonQuery ( ) ;
74- }
75- else
76- {
77- await command . ExecuteNonQueryAsync ( ctk ) ;
78- }
79- }
80-
81- public async Task < IAsyncEnumerable < T > ? > CopyFromTempTableAsync < T > (
82- bool sync ,
83- DbContext context ,
84- TableMetadata tableInfo ,
85- string tempTableName ,
86- bool returnData ,
87- BulkInsertOptions options ,
88- OnConflictOptions ? onConflict ,
89- CancellationToken ctk ) where T : class
90- {
91- return await CopyFromTempTableWithoutKeysAsync < T , T > (
92- sync ,
93- context ,
94- tableInfo ,
95- tempTableName ,
96- returnData ,
97- options ,
98- onConflict ,
99- ctk ) ;
100- }
101-
102- private async Task < IAsyncEnumerable < TResult > ? > CopyFromTempTableWithoutKeysAsync < T , TResult > (
103- bool sync ,
104- DbContext context ,
105- TableMetadata tableInfo ,
106- string tempTableName ,
107- bool returnData ,
108- BulkInsertOptions options ,
109- OnConflictOptions ? onConflict ,
110- CancellationToken ctk ) where T : class where TResult : class
111- {
112- var query =
113- SqlDialect . BuildMoveDataSql < T > (
114- tableInfo ,
115- tempTableName ,
116- tableInfo . GetProperties ( options . CopyGeneratedColumns ) ,
117- returnData ? tableInfo . GetProperties ( ) : [ ] ,
118- options ,
119- onConflict ) ;
120-
121- if ( returnData )
122- {
123- return Query ( context , query ) ;
124- }
125-
126- // If not returning data, just execute the command
127- await ExecuteAsync ( sync , context , query , ctk ) ;
128- return null ;
129-
130- static IAsyncEnumerable < TResult > Query ( DbContext context , string query )
131- {
132- // Use EF to execute the query and return the results
133- IQueryable < TResult > queryable = context
134- . Set < TResult > ( )
135- . FromSqlRaw ( query ) ;
136-
137- return queryable . AsAsyncEnumerable ( ) ;
138- }
139- }
140-
14127 public virtual async IAsyncEnumerable < T > BulkInsertReturnEntities < T > (
14228 bool sync ,
14329 DbContext context ,
@@ -147,15 +33,25 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
14733 OnConflictOptions ? onConflict ,
14834 [ EnumeratorCancellation ] CancellationToken ctk ) where T : class
14935 {
36+ using var activity = Telemetry . ActivitySource . StartActivity ( "BulkInsertReturnEntities" ) ;
37+ activity ? . AddTag ( "tableName" , tableInfo . TableName ) ;
38+ activity ? . AddTag ( "synchronous" , sync ) ;
39+
15040 var connection = await context . GetConnection ( sync , ctk ) ;
15141 try
15242 {
153- var ( tableName , _) = await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : true , ctk : ctk ) ;
43+ if ( logger != null )
44+ {
45+ Log . UsingTempTablToReturnData ( logger ) ;
46+ }
15447
155- var result = await CopyFromTempTableAsync < T > ( sync , context , tableInfo , tableName , true , options , onConflict , ctk )
156- ?? throw new InvalidOperationException ( "Failed to get async enumerable." ) ;
48+ var tableName = await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : true , ctk : ctk ) ;
15749
158- await foreach ( var item in result )
50+ var result =
51+ await CopyFromTempTableAsync < T , T > ( sync , context , tableInfo , tableName , true , options , onConflict , ctk : ctk )
52+ ?? throw new InvalidOperationException ( "Copy returns null enumerable." ) ;
53+
54+ await foreach ( var item in result . WithCancellation ( ctk ) )
15955 {
16056 yield return item ;
16157 }
@@ -178,30 +74,44 @@ public virtual async Task BulkInsert<T>(
17874 OnConflictOptions ? onConflict ,
17975 CancellationToken ctk ) where T : class
18076 {
181- if ( onConflict != null )
77+ using var activity = Telemetry . ActivitySource . StartActivity ( "BulkInsert" ) ;
78+ activity ? . AddTag ( "tableName" , tableInfo . TableName ) ;
79+ activity ? . AddTag ( "synchronous" , sync ) ;
80+
81+ var connection = await context . GetConnection ( sync , ctk ) ;
82+ try
18283 {
183- var connection = await context . GetConnection ( sync , ctk ) ;
184- try
84+ if ( onConflict != null )
18585 {
186- var ( tableName , _) = await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : true , ctk : ctk ) ;
86+ if ( logger != null )
87+ {
88+ Log . UsingTempTableToResolveConflicts ( logger ) ;
89+ }
18790
188- await CopyFromTempTableAsync < T > ( sync , context , tableInfo , tableName , false , options , onConflict , ctk ) ;
91+ var tableName = await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : true , ctk : ctk ) ;
18992
190- // Commit the transaction if we own them.
191- await connection . Commit ( sync , ctk ) ;
93+ await CopyFromTempTableAsync < T , T > ( sync , context , tableInfo , tableName , false , options , onConflict , ctk ) ;
19294 }
193- finally
95+ else
19496 {
195- await connection . Close ( sync , ctk ) ;
97+ if ( logger != null )
98+ {
99+ Log . UsingDirectInsert ( logger ) ;
100+ }
101+
102+ await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : false , ctk : ctk ) ;
196103 }
104+
105+ // Commit the transaction if we own them.
106+ await connection . Commit ( sync , ctk ) ;
197107 }
198- else
108+ finally
199109 {
200- await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : false , ctk : ctk ) ;
110+ await connection . Close ( sync , ctk ) ;
201111 }
202112 }
203113
204- private async Task < ( string TableName , DbConnection Connection ) > PerformBulkInsertAsync < T > (
114+ private async Task < string > PerformBulkInsertAsync < T > (
205115 bool sync ,
206116 DbContext context ,
207117 TableMetadata tableInfo ,
@@ -215,27 +125,18 @@ public virtual async Task BulkInsert<T>(
215125 throw new InvalidOperationException ( "No entities to insert." ) ;
216126 }
217127
218- var connection = await context . GetConnection ( sync , ctk ) ;
219-
220128 var tableName = tempTableRequired
221129 ? await CreateTableCopyAsync < T > ( sync , context , options , tableInfo , ctk )
222130 : tableInfo . QuotedTableName ;
223131
224132 var properties = tableInfo . GetProperties ( options . CopyGeneratedColumns ) ;
225133
226- try
227- {
228- await BulkInsert ( false , context , tableInfo , entities , tableName , properties , options , ctk ) ;
134+ using var activity = Telemetry . ActivitySource . StartActivity ( "Insert" ) ;
135+ activity ? . AddTag ( "tempTable" , tempTableRequired ) ;
136+ activity ? . AddTag ( "synchronous" , sync ) ;
229137
230- // Commit the transaction if we own them.
231- await connection . Commit ( sync , ctk ) ;
232- }
233- finally
234- {
235- await connection . Close ( sync , ctk ) ;
236- }
237-
238- return ( tableName , connection . Connection ) ;
138+ await BulkInsert ( false , context , tableInfo , entities , tableName , properties , options , ctk ) ;
139+ return tableName ;
239140 }
240141
241142 /// <summary>
@@ -250,4 +151,84 @@ protected abstract Task BulkInsert<T>(
250151 IReadOnlyList < PropertyMetadata > properties ,
251152 BulkInsertOptions options ,
252153 CancellationToken ctk ) where T : class ;
154+
155+ protected async Task < string > CreateTableCopyAsync < T > (
156+ bool sync ,
157+ DbContext context ,
158+ BulkInsertOptions options ,
159+ TableMetadata tableInfo ,
160+ CancellationToken ctk ) where T : class
161+ {
162+ var tempTableName = SqlDialect . QuoteTableName ( null , GetTempTableName ( tableInfo . TableName ) ) ;
163+ var tempColumns = tableInfo . GetProperties ( options . CopyGeneratedColumns ) ;
164+
165+ var query = SqlDialect . CreateTableCopySql ( tempTableName , tableInfo , tempColumns ) ;
166+
167+ await ExecuteAsync ( sync , context , query , ctk ) ;
168+ await AddBulkInsertIdColumn < T > ( sync , context , tempTableName , ctk ) ;
169+
170+ return tempTableName ;
171+ }
172+
173+ protected virtual async Task AddBulkInsertIdColumn < T > (
174+ bool sync ,
175+ DbContext context ,
176+ string tempTableName ,
177+ CancellationToken ctk ) where T : class
178+ {
179+ var alterQuery = string . Format ( AddTableCopyBulkInsertId , tempTableName ) ;
180+
181+ await ExecuteAsync ( sync , context , alterQuery , ctk ) ;
182+ }
183+
184+ private async Task < IAsyncEnumerable < TResult > ? > CopyFromTempTableAsync < T , TResult > (
185+ bool sync ,
186+ DbContext context ,
187+ TableMetadata tableInfo ,
188+ string tempTableName ,
189+ bool returnData ,
190+ BulkInsertOptions options ,
191+ OnConflictOptions ? onConflict ,
192+ CancellationToken ctk ) where T : class where TResult : class
193+ {
194+ var query =
195+ SqlDialect . BuildMoveDataSql < T > (
196+ tableInfo ,
197+ tempTableName ,
198+ tableInfo . GetProperties ( options . CopyGeneratedColumns ) ,
199+ returnData ? tableInfo . GetProperties ( ) : [ ] ,
200+ options ,
201+ onConflict ) ;
202+
203+ if ( returnData )
204+ {
205+ // Use EF to execute the query and return the results
206+ return context . Set < TResult > ( ) . FromSqlRaw ( query ) . AsAsyncEnumerable ( ) ;
207+ }
208+
209+ // If not returning data, just execute the command
210+ await ExecuteAsync ( sync , context , query , ctk ) ;
211+ return null ;
212+ }
213+
214+ protected static async Task ExecuteAsync (
215+ bool sync ,
216+ DbContext context ,
217+ string query ,
218+ CancellationToken ctk )
219+ {
220+ var command = context . Database . GetDbConnection ( ) . CreateCommand ( ) ;
221+ command . Transaction = context . Database . CurrentTransaction ! . GetDbTransaction ( ) ;
222+ command . CommandText = query ;
223+
224+ if ( sync )
225+ {
226+ // ReSharper disable once MethodHasAsyncOverloadWithCancellation
227+ command . ExecuteNonQuery ( ) ;
228+ }
229+ else
230+ {
231+ await command . ExecuteNonQueryAsync ( ctk ) ;
232+ }
233+ }
253234}
0 commit comments