Skip to content

Commit 90b3a2b

Browse files
author
fabien.menager
committed
Add BatchSize option to BulkInsertOptions and update bulk import methods for SQL Server and SQLite
1 parent 46247b4 commit 90b3a2b

5 files changed

Lines changed: 132 additions & 41 deletions

File tree

src/EntityFrameworkCore.ExecuteInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ DELETE FROM {tableName}
5858
INSERT INTO {targetTableName} ({insertedColumnList})
5959
SELECT {insertedColumnList}
6060
FROM {tableName}
61+
WHERE TRUE
6162
""");
6263

6364
if (onConflict is OnConflictOptions<T> onConflictTyped)
@@ -98,7 +99,7 @@ DELETE FROM {tableName}
9899
}
99100

100101
protected override async Task BulkImport<T>(DbContext context, DbConnection connection, IEnumerable<T> entities,
101-
string tableName, PropertyAccessor[] properties, CancellationToken ctk) where T : class
102+
string tableName, PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk) where T : class
102103
{
103104
var importCommand = GetBinaryImportCommand(context, typeof(T), tableName);
104105

src/EntityFrameworkCore.ExecuteInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ public class SqlServerBulkInsertProvider : BulkInsertProviderBase
2424

2525
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2626

27-
protected override async Task BulkImport<T>(DbContext context, DbConnection connection, IEnumerable<T> entities, string tableName,
28-
PropertyAccessor[] properties, CancellationToken ctk)
27+
protected override async Task BulkImport<T>(DbContext context, DbConnection connection, IEnumerable<T> entities,
28+
string tableName,
29+
PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk)
2930
{
3031
await using var t = (SqlTransaction) await connection.BeginTransactionAsync(ctk); // TODO option
3132

3233
using var bulkCopy = new SqlBulkCopy(connection as SqlConnection, SqlBulkCopyOptions.TableLock, t);
3334
bulkCopy.DestinationTableName = tableName;
34-
bulkCopy.BatchSize = 50_000; // TODO option
35+
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
3536
bulkCopy.BulkCopyTimeout = 60;
3637

3738
foreach (var prop in properties)
Lines changed: 122 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Data.Common;
2-
using System.Linq.Expressions;
32
using System.Text;
3+
4+
using EntityFrameworkCore.ExecuteInsert.Helpers;
45
using EntityFrameworkCore.ExecuteInsert.OnConflict;
56
using Microsoft.EntityFrameworkCore;
67
using Microsoft.EntityFrameworkCore.Metadata;
@@ -24,12 +25,88 @@ protected override Task AddBulkInsertIdColumn<T>(DbConnection connection, Cancel
2425
return Task.CompletedTask;
2526
}
2627

27-
private string GetInsertCommand(DbContext context, Type entityType, string tableName)
28+
/// <summary>
29+
/// Taken from https://github.com/dotnet/efcore/blob/667c569c49a1ab7e142621395d3f14f2af0508b4/src/Microsoft.Data.Sqlite.Core/SqliteValueBinder.cs#L231
30+
/// As the method is not exposed in the public API, we need to copy it here.
31+
/// </summary>
32+
private static readonly Dictionary<Type, SqliteType> SqliteTypeMapping =
33+
new()
34+
{
35+
{ typeof(bool), SqliteType.Integer },
36+
{ typeof(byte), SqliteType.Integer },
37+
{ typeof(byte[]), SqliteType.Blob },
38+
{ typeof(char), SqliteType.Text },
39+
{ typeof(DateTime), SqliteType.Text },
40+
{ typeof(DateTimeOffset), SqliteType.Text },
41+
{ typeof(DateOnly), SqliteType.Text },
42+
{ typeof(TimeOnly), SqliteType.Text },
43+
{ typeof(DBNull), SqliteType.Text },
44+
{ typeof(decimal), SqliteType.Text },
45+
{ typeof(double), SqliteType.Real },
46+
{ typeof(float), SqliteType.Real },
47+
{ typeof(Guid), SqliteType.Text },
48+
{ typeof(int), SqliteType.Integer },
49+
{ typeof(long), SqliteType.Integer },
50+
{ typeof(sbyte), SqliteType.Integer },
51+
{ typeof(short), SqliteType.Integer },
52+
{ typeof(string), SqliteType.Text },
53+
{ typeof(TimeSpan), SqliteType.Text },
54+
{ typeof(uint), SqliteType.Integer },
55+
{ typeof(ulong), SqliteType.Integer },
56+
{ typeof(ushort), SqliteType.Integer }
57+
};
58+
59+
private static SqliteType GetSqliteType(Type clrType)
2860
{
29-
var columns = GetEscapedColumns(context, entityType, false);
30-
var placeholders = string.Join(", ", columns.Select((_, i) => $"@p{i}"));
61+
var type = Nullable.GetUnderlyingType(clrType) ?? clrType;
62+
type = type.IsEnum ? Enum.GetUnderlyingType(type) : type;
63+
64+
if (SqliteTypeMapping.TryGetValue(type, out var sqliteType))
65+
{
66+
return sqliteType;
67+
}
3168

32-
return $"INSERT INTO {tableName} ({string.Join(", ", columns)}) VALUES ({placeholders})";
69+
throw new InvalidOperationException("Unknown Sqlite type for " + clrType);
70+
}
71+
72+
private DbCommand GetInsertCommand(DbContext context, DbConnection connection, Type entityType, string tableName,
73+
int batchSize)
74+
{
75+
var columns = DatabaseHelper.GetProperties(context, entityType, false);
76+
var cmd = connection.CreateCommand();
77+
78+
var sqliteColumns = columns
79+
.Select(c => new
80+
{
81+
Name = c.GetColumnName(),
82+
Type = GetSqliteType(c.GetProviderClrType() ?? c.ClrType)
83+
})
84+
.ToArray();
85+
86+
var i = 0;
87+
var batches = Enumerable
88+
.Repeat(0, batchSize)
89+
.Select(_ =>
90+
{
91+
var cols = sqliteColumns.Select(column =>
92+
{
93+
var paramName = $"@p{i++}";
94+
95+
cmd.Parameters.Add(new SqliteParameter(paramName, column.Type));
96+
97+
return paramName;
98+
});
99+
100+
return $"({string.Join(",", cols)})";
101+
});
102+
103+
var sql = $"INSERT INTO {tableName} ({string.Join(",", sqliteColumns.Select(c => Escape(c.Name)))}) VALUES {string.Join(",", batches)}";
104+
105+
cmd.CommandText = sql;
106+
107+
cmd.Prepare();
108+
109+
return cmd;
33110
}
34111

35112
protected override string BuildInsertSelectQuery<T>(string tableName,
@@ -46,47 +123,39 @@ protected override string BuildInsertSelectQuery<T>(string tableName,
46123

47124
var q = new StringBuilder();
48125

49-
if (options.MoveRows)
50-
{
51-
q.AppendLine($"""
52-
WITH moved_rows AS (
53-
DELETE FROM {tableName}
54-
RETURNING {insertedColumnList}
55-
)
56-
""");
57-
tableName = "moved_rows";
58-
}
126+
// No support for moveRows in SQLite
59127

60128
q.AppendLine($"""
61129
INSERT INTO {targetTableName} ({insertedColumnList})
62130
SELECT {insertedColumnList}
63131
FROM {tableName}
132+
WHERE TRUE
64133
""");
65134

66135
if (onConflict is OnConflictOptions<T> onConflictTyped)
67136
{
68-
q.AppendLine("ON CONFLICT");
137+
q.Append("ON CONFLICT");
69138

70139
if (onConflictTyped.Update != null)
71140
{
72141
if (onConflictTyped.Match != null)
73142
{
74-
q.AppendLine($"({string.Join(", ", GetColumns(onConflictTyped.Match).Select(Escape))})");
143+
q.AppendLine($" ({string.Join(", ", GetColumns(onConflictTyped.Match).Select(Escape))})");
75144
}
76145

77146
if (onConflictTyped.Update != null)
78147
{
79-
q.AppendLine($"DO UPDATE SET {string.Join(", ", GetUpdates(onConflictTyped.Update))}");
148+
q.AppendLine($" DO UPDATE SET {string.Join(", ", GetUpdates(onConflictTyped.Update))}");
80149
}
81150

82151
if (onConflictTyped.Condition != null)
83152
{
84-
q.AppendLine($"WHERE {onConflictTyped.Condition}");
153+
q.AppendLine($" WHERE {onConflictTyped.Condition}");
85154
}
86155
}
87156
else
88157
{
89-
q.AppendLine("DO NOTHING");
158+
q.AppendLine(" DO NOTHING");
90159
}
91160
}
92161

@@ -101,33 +170,51 @@ DELETE FROM {tableName}
101170
}
102171

103172
protected override async Task BulkImport<T>(DbContext context, DbConnection connection, IEnumerable<T> entities,
104-
string tableName, PropertyAccessor[] properties, CancellationToken ctk) where T : class
173+
string tableName, PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk) where T : class
105174
{
106175
await using var transaction = await connection.BeginTransactionAsync(ctk);
107176

108-
var insertCommand = GetInsertCommand(context, typeof(T), tableName);
109-
await using var cmd = connection.CreateCommand();
110-
cmd.CommandText = insertCommand;
177+
const int maxParams = 1000;
178+
var batchSize = options.BatchSize ?? 5;
179+
batchSize = Math.Min(batchSize, maxParams / properties.Length);
111180

112-
for (var index = 0; index < properties.Length; index++)
181+
await using var insertCommand = GetInsertCommand(context, connection, typeof(T), tableName, batchSize);
182+
183+
foreach (var chunk in entities.Chunk(batchSize))
113184
{
114-
var param = cmd.CreateParameter();
115-
param.ParameterName = $"@p{index}";
185+
// Full chunks
186+
if (chunk.Length == batchSize)
187+
{
188+
FillValues(chunk, insertCommand.Parameters, properties);
189+
190+
await insertCommand.ExecuteNonQueryAsync(ctk);
191+
}
192+
// Last chunk
193+
else
194+
{
195+
var partialInsertCommand = GetInsertCommand(context, connection, typeof(T), tableName, chunk.Length);
196+
FillValues(chunk, partialInsertCommand.Parameters, properties);
116197

117-
cmd.Parameters.Add(param);
198+
await partialInsertCommand.ExecuteNonQueryAsync(ctk);
199+
}
118200
}
119201

120-
foreach (var entity in entities)
202+
await transaction.CommitAsync(ctk);
203+
}
204+
205+
private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, PropertyAccessor[] properties) where T : class
206+
{
207+
var index = 0;
208+
foreach (var entity in chunk)
121209
{
122-
for (var index = 0; index < properties.Length; index++)
210+
foreach (var property in properties)
123211
{
124-
var value = properties[index].GetValue(entity);
125-
cmd.Parameters[index].Value = value;
126-
}
212+
var value = property.GetValue(entity);
213+
parameters[index].Value = value;
127214

128-
await cmd.ExecuteNonQueryAsync(ctk);
215+
index++;
216+
}
129217
}
130-
await transaction.CommitAsync(ctk);
131218
}
132219
}
133220

src/EntityFrameworkCore.ExecuteInsert/BulkInsertOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ public class BulkInsertOptions
55
public bool Recursive { get; set; }
66

77
public bool MoveRows { get; set; }
8+
9+
public int? BatchSize { get; set; }
810
}

src/EntityFrameworkCore.ExecuteInsert/BulkInsertProviderBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public async Task BulkInsertWithoutReturnAsync<T>(
306306
.Select(p => new PropertyAccessor(p))
307307
.ToArray();
308308

309-
await BulkImport(context, connection, entities, tableName, properties, ctk);
309+
await BulkImport(context, connection, entities, tableName, properties, options, ctk);
310310

311311
if (wasClosed)
312312
{
@@ -317,7 +317,7 @@ public async Task BulkInsertWithoutReturnAsync<T>(
317317
}
318318

319319
protected abstract Task BulkImport<T>(DbContext context, DbConnection connection, IEnumerable<T> entities,
320-
string tableName, PropertyAccessor[] properties, CancellationToken ctk) where T : class;
320+
string tableName, PropertyAccessor[] properties, BulkInsertOptions options, CancellationToken ctk) where T : class;
321321

322322
private static async Task<(DbConnection connection, bool wasClosed)> GetConnection(DbContext context, CancellationToken ctk)
323323
{

0 commit comments

Comments
 (0)