@@ -24,39 +24,44 @@ internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProvi
2424
2525 protected abstract string AddTableCopyBulkInsertId { get ; }
2626
27+ protected virtual string GetTempTableName ( string tableName ) => $ "_temp_bulk_insert_{ tableName } ";
28+
2729 SqlDialectBuilder IBulkInsertProvider . SqlDialect => SqlDialect ;
2830
2931 protected async Task < string > CreateTableCopyAsync < T > (
3032 bool sync ,
3133 DbContext context ,
3234 BulkInsertOptions options ,
3335 TableMetadata tableInfo ,
34- CancellationToken cancellationToken = default ) where T : class
36+ CancellationToken ctk ) where T : class
3537 {
3638 var tempTableName = SqlDialect . QuoteTableName ( null , GetTempTableName ( tableInfo . TableName ) ) ;
3739 var tempColumns = tableInfo . GetProperties ( options . CopyGeneratedColumns ) ;
3840
3941 var query = SqlDialect . CreateTableCopySql ( tempTableName , tableInfo , tempColumns ) ;
4042
41- await ExecuteAsync ( sync , context , query , cancellationToken ) ;
42- await AddBulkInsertIdColumn < T > ( sync , context , tempTableName , cancellationToken ) ;
43+ await ExecuteAsync ( sync , context , query , ctk ) ;
44+ await AddBulkInsertIdColumn < T > ( sync , context , tempTableName , ctk ) ;
4345
4446 return tempTableName ;
4547 }
4648
47- protected virtual async Task AddBulkInsertIdColumn < T > ( bool sync , DbContext context ,
48- string tempTableName , CancellationToken cancellationToken ) where T : class
49+ protected virtual async Task AddBulkInsertIdColumn < T > (
50+ bool sync ,
51+ DbContext context ,
52+ string tempTableName ,
53+ CancellationToken ctk ) where T : class
4954 {
5055 var alterQuery = string . Format ( AddTableCopyBulkInsertId , tempTableName ) ;
5156
52- await ExecuteAsync ( sync , context , alterQuery , cancellationToken ) ;
57+ await ExecuteAsync ( sync , context , alterQuery , ctk ) ;
5358 }
5459
55- protected virtual string GetTempTableName ( string tableName ) => $ "_temp_bulk_insert_ { tableName } " ;
56-
57- protected string Quote ( string name ) => SqlDialect . Quote ( name ) ;
58-
59- protected static async Task ExecuteAsync ( bool sync , DbContext context , string query , CancellationToken cancellationToken = default )
60+ protected static async Task ExecuteAsync (
61+ bool sync ,
62+ DbContext context ,
63+ string query ,
64+ CancellationToken ctk )
6065 {
6166 var command = context . Database . GetDbConnection ( ) . CreateCommand ( ) ;
6267 command . Transaction = context . Database . CurrentTransaction ! . GetDbTransaction ( ) ;
@@ -69,7 +74,7 @@ protected static async Task ExecuteAsync(bool sync, DbContext context, string qu
6974 }
7075 else
7176 {
72- await command . ExecuteNonQueryAsync ( cancellationToken ) ;
77+ await command . ExecuteNonQueryAsync ( ctk ) ;
7378 }
7479 }
7580
@@ -80,8 +85,8 @@ public async Task<List<T>> CopyFromTempTableAsync<T>(
8085 string tempTableName ,
8186 bool returnData ,
8287 BulkInsertOptions options ,
83- OnConflictOptions ? onConflict = null ,
84- CancellationToken cancellationToken = default ) where T : class
88+ OnConflictOptions ? onConflict ,
89+ CancellationToken ctk ) where T : class
8590 {
8691 return await CopyFromTempTableWithoutKeysAsync < T , T > (
8792 sync ,
@@ -91,7 +96,7 @@ public async Task<List<T>> CopyFromTempTableAsync<T>(
9196 returnData ,
9297 options ,
9398 onConflict ,
94- cancellationToken : cancellationToken ) ;
99+ ctk ) ;
95100 }
96101
97102 private async Task < List < TResult > > CopyFromTempTableWithoutKeysAsync < T , TResult > (
@@ -101,10 +106,8 @@ private async Task<List<TResult>> CopyFromTempTableWithoutKeysAsync<T, TResult>(
101106 string tempTableName ,
102107 bool returnData ,
103108 BulkInsertOptions options ,
104- OnConflictOptions ? onConflict = null ,
105- CancellationToken cancellationToken = default )
106- where T : class
107- where TResult : class
109+ OnConflictOptions ? onConflict ,
110+ CancellationToken ctk ) where T : class where TResult : class
108111 {
109112 var query =
110113 SqlDialect . BuildMoveDataSql < T > (
@@ -117,11 +120,11 @@ private async Task<List<TResult>> CopyFromTempTableWithoutKeysAsync<T, TResult>(
117120
118121 if ( returnData )
119122 {
120- return await QueryAsync ( sync , context , query , cancellationToken ) ;
123+ return await QueryAsync ( sync , context , query , ctk ) ;
121124 }
122125
123126 // If not returning data, just execute the command
124- await ExecuteAsync ( sync , context , query , cancellationToken ) ;
127+ await ExecuteAsync ( sync , context , query , ctk ) ;
125128 return [ ] ;
126129
127130 static async Task < List < TResult > > QueryAsync ( bool sync , DbContext context , string query , CancellationToken cancellationToken )
@@ -146,9 +149,8 @@ public virtual async Task<List<T>> BulkInsertReturnEntities<T>(
146149 TableMetadata tableInfo ,
147150 IEnumerable < T > entities ,
148151 BulkInsertOptions options ,
149- OnConflictOptions ? onConflict = null ,
150- CancellationToken ctk = default
151- ) where T : class
152+ OnConflictOptions ? onConflict ,
153+ CancellationToken ctk ) where T : class
152154 {
153155 List < T > result ;
154156
@@ -157,77 +159,27 @@ public virtual async Task<List<T>> BulkInsertReturnEntities<T>(
157159 {
158160 var ( tableName , _) = await PerformBulkInsertAsync ( sync , context , tableInfo , entities , options , tempTableRequired : true , ctk : ctk ) ;
159161
160- result = await CopyFromTempTableAsync < T > ( sync , context , tableInfo , tableName , true , options , onConflict , cancellationToken : ctk ) ;
162+ result = await CopyFromTempTableAsync < T > ( sync , context , tableInfo , tableName , true , options , onConflict , ctk : ctk ) ;
161163
162164 // Commit the transaction if we own them.
163- await Commit ( sync , connectionInfo , ctk ) ;
165+ await connectionInfo . Commit ( sync , ctk ) ;
164166 }
165167 finally
166168 {
167- await Finish ( sync , connectionInfo , ctk ) ;
169+ await connectionInfo . Close ( sync , ctk ) ;
168170 }
169171
170172 return result ;
171173 }
172174
173- private static async Task Commit ( bool sync , ConnectionInfo connectionInfo , CancellationToken ctk )
174- {
175- var ( _, _, transaction , wasBegan ) = connectionInfo ;
176-
177- if ( ! wasBegan )
178- {
179- if ( sync )
180- {
181- // ReSharper disable once MethodHasAsyncOverloadWithCancellation
182- transaction . Commit ( ) ;
183- }
184- else
185- {
186- await transaction . CommitAsync ( ctk ) ;
187- }
188- }
189- }
190-
191- private static async Task Finish ( bool sync , ConnectionInfo connectionInfo , CancellationToken ctk )
192- {
193- var ( connection , wasClosed , transaction , wasBegan ) = connectionInfo ;
194-
195- if ( ! wasBegan )
196- {
197- if ( sync )
198- {
199- // ReSharper disable once MethodHasAsyncOverloadWithCancellation
200- transaction . Dispose ( ) ;
201- }
202- else
203- {
204- await transaction . DisposeAsync ( ) ;
205- }
206- }
207-
208- if ( wasClosed )
209- {
210- if ( sync )
211- {
212- // ReSharper disable once MethodHasAsyncOverload
213- connection . Close ( ) ;
214- }
215- else
216- {
217- await connection . CloseAsync ( ) ;
218- }
219- }
220- }
221-
222175 public virtual async Task BulkInsert < T > (
223176 bool sync ,
224177 DbContext context ,
225178 TableMetadata tableInfo ,
226179 IEnumerable < T > entities ,
227180 BulkInsertOptions options ,
228- OnConflictOptions ? onConflict = null ,
229- CancellationToken ctk = default
230- ) where T : class
181+ OnConflictOptions ? onConflict ,
182+ CancellationToken ctk ) where T : class
231183 {
232184 if ( onConflict != null )
233185 {
@@ -239,11 +191,11 @@ public virtual async Task BulkInsert<T>(
239191 await CopyFromTempTableAsync < T > ( sync , context , tableInfo , tableName , false , options , onConflict , ctk ) ;
240192
241193 // Commit the transaction if we own them.
242- await Commit ( sync , connectionInfo , ctk ) ;
194+ await connectionInfo . Commit ( sync , ctk ) ;
243195 }
244196 finally
245197 {
246- await Finish ( sync , connectionInfo , ctk ) ;
198+ await connectionInfo . Close ( sync , ctk ) ;
247199 }
248200 }
249201 else
@@ -259,7 +211,7 @@ public virtual async Task BulkInsert<T>(
259211 IEnumerable < T > entities ,
260212 BulkInsertOptions options ,
261213 bool tempTableRequired ,
262- CancellationToken ctk = default ) where T : class
214+ CancellationToken ctk ) where T : class
263215 {
264216 if ( entities . TryGetNonEnumeratedCount ( out var count ) && count == 0 )
265217 {
@@ -279,11 +231,11 @@ public virtual async Task BulkInsert<T>(
279231 await BulkInsert ( false , context , tableInfo , entities , tableName , properties , options , ctk ) ;
280232
281233 // Commit the transaction if we own them.
282- await Commit ( sync , connectionInfo , ctk ) ;
234+ await connectionInfo . Commit ( sync , ctk ) ;
283235 }
284236 finally
285237 {
286- await Finish ( sync , connectionInfo , ctk ) ;
238+ await connectionInfo . Close ( sync , ctk ) ;
287239 }
288240
289241 return ( tableName , connectionInfo . Connection ) ;
@@ -300,6 +252,5 @@ protected abstract Task BulkInsert<T>(
300252 string tableName ,
301253 IReadOnlyList < PropertyMetadata > properties ,
302254 BulkInsertOptions options ,
303- CancellationToken ctk
304- ) where T : class ;
255+ CancellationToken ctk ) where T : class ;
305256}
0 commit comments