Skip to content

Commit 045033c

Browse files
author
fabien.menager
committed
Add sync mode (Fixes #4)
1 parent f00e9eb commit 045033c

9 files changed

Lines changed: 345 additions & 76 deletions

File tree

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string
2828

2929
/// <inheritdoc />
3030
protected override async Task BulkInsert<T>(
31+
bool sync,
3132
DbContext context,
3233
IEnumerable<T> entities,
3334
string tableName,
@@ -39,20 +40,51 @@ protected override async Task BulkInsert<T>(
3940

4041
var importCommand = GetBinaryImportCommand(context, typeof(T), tableName);
4142

42-
await using var writer = await connection.BeginBinaryImportAsync(importCommand, ctk);
43+
var writer = sync
44+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
45+
? connection.BeginBinaryImport(importCommand)
46+
: await connection.BeginBinaryImportAsync(importCommand, ctk);
4347

4448
foreach (var entity in entities)
4549
{
46-
await writer.StartRowAsync(ctk);
50+
if (sync)
51+
{
52+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
53+
writer.StartRow();
54+
}
55+
else
56+
{
57+
await writer.StartRowAsync(ctk);
58+
}
4759

4860
foreach (var property in properties)
4961
{
5062
var value = property.GetValue(entity);
5163

52-
await writer.WriteAsync(value, ctk);
64+
if (sync)
65+
{
66+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
67+
writer.Write(value);
68+
}
69+
else
70+
{
71+
await writer.WriteAsync(value, ctk);
72+
}
5373
}
5474
}
5575

56-
await writer.CompleteAsync(ctk);
76+
if (sync)
77+
{
78+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
79+
writer.Complete();
80+
// ReSharper disable once MethodHasAsyncOverload
81+
writer.Dispose();
82+
}
83+
else
84+
{
85+
await writer.CompleteAsync(ctk);
86+
await writer.DisposeAsync();
87+
}
88+
5789
}
5890
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDia
2323
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2424

2525
/// <inheritdoc />
26-
protected override async Task BulkInsert<T>(DbContext context, IEnumerable<T> entities,
26+
protected override async Task BulkInsert<T>(
27+
bool sync,
28+
DbContext context,
29+
IEnumerable<T> entities,
2730
string tableName,
28-
PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk)
31+
PropertyAccessor[] properties,
32+
BulkInsertOptions options,
33+
CancellationToken ctk
34+
)
2935
{
3036
var connection = context.Database.GetDbConnection();
3137
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;
@@ -40,6 +46,14 @@ protected override async Task BulkInsert<T>(DbContext context, IEnumerable<T> en
4046
bulkCopy.ColumnMappings.Add(prop.Name, SqlDialect.Quote(prop.ColumnName));
4147
}
4248

43-
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
49+
if (sync)
50+
{
51+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
52+
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, properties));
53+
}
54+
else
55+
{
56+
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
57+
}
4458
}
4559
}

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBu
2525
protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite
2626

2727
/// <inheritdoc />
28-
protected override Task AddBulkInsertIdColumn<T>(DbContext context, CancellationToken cancellationToken,
29-
string tempTableName) where T : class
30-
{
31-
return Task.CompletedTask;
32-
}
28+
protected override Task AddBulkInsertIdColumn<T>(
29+
bool sync,
30+
DbContext context,
31+
string tempTableName,
32+
CancellationToken cancellationToken
33+
) where T : class => Task.CompletedTask;
3334

3435
/// <summary>
3536
/// Taken from https://github.com/dotnet/efcore/blob/667c569c49a1ab7e142621395d3f14f2af0508b4/src/Microsoft.Data.Sqlite.Core/SqliteValueBinder.cs#L231
@@ -116,8 +117,15 @@ private DbCommand GetInsertCommand(DbContext context, Type entityType, string ta
116117
}
117118

118119
/// <inheritdoc />
119-
protected override async Task BulkInsert<T>(DbContext context, IEnumerable<T> entities,
120-
string tableName, PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk) where T : class
120+
protected override async Task BulkInsert<T>(
121+
bool sync,
122+
DbContext context,
123+
IEnumerable<T> entities,
124+
string tableName,
125+
PropertyAccessor[] properties,
126+
BulkInsertOptions options,
127+
CancellationToken ctk
128+
) where T : class
121129
{
122130
const int maxParams = 1000;
123131
var batchSize = options.BatchSize ?? 5;
@@ -131,20 +139,32 @@ protected override async Task BulkInsert<T>(DbContext context, IEnumerable<T> en
131139
if (chunk.Length == batchSize)
132140
{
133141
FillValues(chunk, insertCommand.Parameters, properties);
134-
135-
await insertCommand.ExecuteNonQueryAsync(ctk);
142+
await ExecuteCommand(sync, insertCommand, ctk);
136143
}
137144
// Last chunk
138145
else
139146
{
140147
var partialInsertCommand = GetInsertCommand(context, typeof(T), tableName, chunk.Length);
141-
FillValues(chunk, partialInsertCommand.Parameters, properties);
142148

143-
await partialInsertCommand.ExecuteNonQueryAsync(ctk);
149+
FillValues(chunk, partialInsertCommand.Parameters, properties);
150+
await ExecuteCommand(sync, partialInsertCommand, ctk);
144151
}
145152
}
146153
}
147154

155+
private static async Task ExecuteCommand(bool sync, DbCommand insertCommand, CancellationToken ctk)
156+
{
157+
if (sync)
158+
{
159+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
160+
insertCommand.ExecuteNonQuery();
161+
}
162+
else
163+
{
164+
await insertCommand.ExecuteNonQueryAsync(ctk);
165+
}
166+
}
167+
148168
private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, PropertyAccessor[] properties) where T : class
149169
{
150170
var index = 0;

src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ internal interface IBulkInsertProvider
1212
/// <summary>
1313
/// Calls the provider to perform a bulk insert operation.
1414
/// </summary>
15-
internal Task<List<T>> BulkInsertWithIdentityAsync<T>(
15+
internal Task<List<T>> BulkInsertReturnEntities<T>(
16+
bool sync,
1617
DbContext context,
1718
IEnumerable<T> entities,
1819
BulkInsertOptions options,
@@ -23,7 +24,8 @@ internal Task<List<T>> BulkInsertWithIdentityAsync<T>(
2324
/// <summary>
2425
/// Calls the provider to perform a bulk insert operation without returning the inserted entities.
2526
/// </summary>
26-
internal Task BulkInsertWithoutReturnAsync<T>(
27+
internal Task BulkInsert<T>(
28+
bool sync,
2729
DbContext context,
2830
IEnumerable<T> entities,
2931
BulkInsertOptions options,

0 commit comments

Comments
 (0)