Skip to content

Commit 1429edc

Browse files
authored
Add progress callback (#58)
1 parent 112dc6e commit 1429edc

8 files changed

Lines changed: 192 additions & 3 deletions

File tree

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,34 @@ CancellationToken ctk
6464
bulkCopy.DestinationTableName = tableName;
6565
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
6666

67+
// Handle progress notifications
68+
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
69+
{
70+
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;
71+
72+
bulkCopy.MySqlRowsCopied += (sender, e) =>
73+
{
74+
options.OnProgress(e.RowsCopied);
75+
76+
if (ctk.IsCancellationRequested)
77+
{
78+
e.Abort = true;
79+
}
80+
};
81+
}
82+
83+
// If no progress notification is set, we still need to handle cancellation.
84+
else
85+
{
86+
bulkCopy.MySqlRowsCopied += (sender, e) =>
87+
{
88+
if (ctk.IsCancellationRequested)
89+
{
90+
e.Abort = true;
91+
}
92+
};
93+
}
94+
6795
var sourceOrdinal = 0;
6896
foreach (var prop in properties)
6997
{
@@ -72,7 +100,7 @@ CancellationToken ctk
72100
}
73101

74102
var dataReader = new EnumerableDataReader<T>(entities, properties, options);
75-
103+
76104
if (sync)
77105
{
78106
// ReSharper disable once MethodHasAsyncOverloadWithCancellation

src/PhenX.EntityFrameworkCore.BulkInsert.Oracle/OracleBulkInsertProvider.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,34 @@ protected override Task BulkInsert<T>(
6363
bulkCopy.BatchSize = options.BatchSize;
6464
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
6565

66+
// Handle progress notifications
67+
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
68+
{
69+
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;
70+
71+
bulkCopy.OracleRowsCopied += (sender, e) =>
72+
{
73+
options.OnProgress(e.RowsCopied);
74+
75+
if (ctk.IsCancellationRequested)
76+
{
77+
e.Abort = true;
78+
}
79+
};
80+
}
81+
82+
// If no progress notification is set, we still need to handle cancellation.
83+
else
84+
{
85+
bulkCopy.OracleRowsCopied += (sender, e) =>
86+
{
87+
if (ctk.IsCancellationRequested)
88+
{
89+
e.Abort = true;
90+
}
91+
};
92+
}
93+
6694
foreach (var column in columns)
6795
{
6896
bulkCopy.ColumnMappings.Add(column.PropertyName, column.QuotedColumName);

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> prope
3333
/// <inheritdoc />
3434
protected override PostgreSqlBulkInsertOptions CreateDefaultOptions() => new()
3535
{
36-
BatchSize = 50_000,
3736
Converters = [PostgreSqlGeometryConverter.Instance],
3837
};
3938

@@ -59,6 +58,7 @@ protected override async Task BulkInsert<T>(
5958
// The type mapping can be null for obvious types like string.
6059
var columnTypes = columns.Select(c => GetPostgreSqlType(c, options)).ToArray();
6160

61+
long rowsCopied = 0;
6262
foreach (var entity in entities)
6363
{
6464
if (sync)
@@ -103,6 +103,8 @@ protected override async Task BulkInsert<T>(
103103
}
104104
}
105105
}
106+
107+
options.HandleOnProgress(ref rowsCopied);
106108
}
107109

108110
if (sync)

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,34 @@ protected override async Task BulkInsert<T>(
4646
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
4747
bulkCopy.EnableStreaming = options.EnableStreaming;
4848

49+
// Handle progress notifications
50+
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
51+
{
52+
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;
53+
54+
bulkCopy.SqlRowsCopied += (sender, e) =>
55+
{
56+
options.OnProgress(e.RowsCopied);
57+
58+
if (ctk.IsCancellationRequested)
59+
{
60+
e.Abort = true;
61+
}
62+
};
63+
}
64+
65+
// If no progress notification is set, we still need to handle cancellation.
66+
else
67+
{
68+
bulkCopy.SqlRowsCopied += (sender, e) =>
69+
{
70+
if (ctk.IsCancellationRequested)
71+
{
72+
e.Abort = true;
73+
}
74+
};
75+
}
76+
4977
foreach (var column in columns)
5078
{
5179
bulkCopy.ColumnMappings.Add(column.PropertyName, column.ColumnName);

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ CancellationToken ctk
138138
{
139139
var batchSize = Math.Min(options.BatchSize, MaxParams / columns.Count);
140140

141+
long rowsCopied = 0;
142+
141143
// The StringBuilder can be reused between the batches.
142144
var sb = new StringBuilder();
143145

@@ -179,6 +181,12 @@ CancellationToken ctk
179181
FillValues(chunk, partialInsertCommand.Parameters, columns, options);
180182
await ExecuteCommand(sync, partialInsertCommand, ctk);
181183
}
184+
185+
// Notify progress after each chunk
186+
for (var i = 0; i < chunk.Length; i++)
187+
{
188+
options.HandleOnProgress(ref rowsCopied);
189+
}
182190
}
183191
}
184192
finally

src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Options;
77
/// </summary>
88
public class BulkInsertOptions
99
{
10+
/// <summary>
11+
/// Progress callback delegate to notify about the number of rows copied.
12+
/// </summary>
13+
public delegate void ProgressCallback(long rowsCopied);
14+
1015
/// <summary>
1116
/// Move rows between tables instead of inserting them.
1217
/// Only supported for PostgreSQL.
@@ -31,6 +36,10 @@ public class BulkInsertOptions
3136
/// <term>SQLite</term>
3237
/// <description>5</description>
3338
/// </item>
39+
/// <item>
40+
/// <term>Oracle</term>
41+
/// <description>50 000</description>
42+
/// </item>
3443
/// </list>
3544
/// </summary>
3645
public int BatchSize { get; set; }
@@ -55,8 +64,30 @@ public class BulkInsertOptions
5564
/// </summary>
5665
public int SRID { get; set; } = 4326;
5766

67+
/// <summary>
68+
/// Number of rows after which the progress callback is invoked.
69+
/// </summary>
70+
public int? NotifyProgressAfter { get; set; }
71+
72+
/// <summary>
73+
/// Callback to notify about the progress of the bulk insert operation.
74+
/// </summary>
75+
public ProgressCallback? OnProgress { get; set; }
76+
5877
internal int GetCopyTimeoutInSeconds()
5978
{
6079
return Math.Max(0, (int)CopyTimeout.TotalSeconds);
6180
}
81+
82+
internal void HandleOnProgress(ref long rowsCopied)
83+
{
84+
rowsCopied++;
85+
86+
if (OnProgress == null || NotifyProgressAfter == null || NotifyProgressAfter <= 0 || rowsCopied % NotifyProgressAfter != 0)
87+
{
88+
return;
89+
}
90+
91+
OnProgress(rowsCopied);
92+
}
6293
}

tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/DbContext/Extensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Tests.DbContext;
99
public static class Extensions
1010
{
1111
public static PropertyBuilder<T> AsJsonString<T>(this PropertyBuilder<T> propertyBuilder, string? columnType)
12-
where T : class
12+
where T : class?
1313
{
1414
var converter = new ValueConverter<T, string>(
1515
v => JsonSerializer.Serialize(v, (JsonSerializerOptions?)null),

tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,68 @@ public async Task InsertEntities_WithGeneratedGuidId(InsertStrategy strategy)
291291
.Excluding(e => e.Id)
292292
);
293293
}
294+
295+
[SkippableTheory]
296+
[CombinatorialData]
297+
public async Task HandleProgress(InsertStrategy strategy)
298+
{
299+
// Arrange
300+
var entities = new List<TestEntity>
301+
{
302+
new TestEntity { Name = $"{_run}_Entity1" },
303+
new TestEntity { Name = $"{_run}_Entity2" },
304+
new TestEntity { Name = $"{_run}_Entity3" },
305+
new TestEntity { Name = $"{_run}_Entity4" },
306+
new TestEntity { Name = $"{_run}_Entity5" },
307+
new TestEntity { Name = $"{_run}_Entity6" },
308+
new TestEntity { Name = $"{_run}_Entity7" },
309+
new TestEntity { Name = $"{_run}_Entity8" },
310+
new TestEntity { Name = $"{_run}_Entity9" },
311+
new TestEntity { Name = $"{_run}_Entity10" },
312+
};
313+
314+
long progressCount = 0;
315+
var callCount = 0;
316+
317+
// Act
318+
await _context.InsertWithStrategyAsync(strategy, entities, o =>
319+
{
320+
o.NotifyProgressAfter = 2;
321+
o.OnProgress = count =>
322+
{
323+
progressCount = count;
324+
callCount++;
325+
};
326+
});
327+
328+
// Assert
329+
Assert.Equal(10, progressCount);
330+
Assert.Equal(5, callCount);
331+
}
332+
333+
[SkippableTheory]
334+
[CombinatorialData]
335+
public async Task HandleNoProgress(InsertStrategy strategy)
336+
{
337+
// Arrange
338+
var entities = new List<TestEntity>
339+
{
340+
new TestEntity { Name = $"{_run}_Entity1" },
341+
new TestEntity { Name = $"{_run}_Entity2" },
342+
new TestEntity { Name = $"{_run}_Entity3" },
343+
new TestEntity { Name = $"{_run}_Entity4" },
344+
};
345+
346+
var callCount = 0;
347+
348+
// Act
349+
await _context.InsertWithStrategyAsync(strategy, entities, o =>
350+
{
351+
// NotifyProgressAfter not set, so no progress callback should be invoked
352+
o.OnProgress = _ => callCount++;
353+
});
354+
355+
// Assert
356+
Assert.Equal(0, callCount);
357+
}
294358
}

0 commit comments

Comments
 (0)