diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs index 5d9a2d3..42be248 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs @@ -64,6 +64,34 @@ CancellationToken ctk bulkCopy.DestinationTableName = tableName; bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); + // Handle progress notifications + if (options is { NotifyProgressAfter: not null, OnProgress: not null }) + { + bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value; + + bulkCopy.MySqlRowsCopied += (sender, e) => + { + options.OnProgress(e.RowsCopied); + + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + + // If no progress notification is set, we still need to handle cancellation. + else + { + bulkCopy.MySqlRowsCopied += (sender, e) => + { + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + var sourceOrdinal = 0; foreach (var prop in properties) { @@ -72,7 +100,7 @@ CancellationToken ctk } var dataReader = new EnumerableDataReader(entities, properties, options); - + if (sync) { // ReSharper disable once MethodHasAsyncOverloadWithCancellation diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.Oracle/OracleBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.Oracle/OracleBulkInsertProvider.cs index 35ee9b2..6f035ef 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.Oracle/OracleBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.Oracle/OracleBulkInsertProvider.cs @@ -63,6 +63,34 @@ protected override Task BulkInsert( bulkCopy.BatchSize = options.BatchSize; bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); + // Handle progress notifications + if (options is { NotifyProgressAfter: not null, OnProgress: not null }) + { + bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value; + + bulkCopy.OracleRowsCopied += (sender, e) => + { + options.OnProgress(e.RowsCopied); + + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + + // If no progress notification is set, we still need to handle cancellation. + else + { + bulkCopy.OracleRowsCopied += (sender, e) => + { + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + foreach (var column in columns) { bulkCopy.ColumnMappings.Add(column.PropertyName, column.QuotedColumName); diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs index d7a6e35..9f8ee0b 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs @@ -33,7 +33,6 @@ private static string GetBinaryImportCommand(IReadOnlyList prope /// protected override PostgreSqlBulkInsertOptions CreateDefaultOptions() => new() { - BatchSize = 50_000, Converters = [PostgreSqlGeometryConverter.Instance], }; @@ -59,6 +58,7 @@ protected override async Task BulkInsert( // The type mapping can be null for obvious types like string. var columnTypes = columns.Select(c => GetPostgreSqlType(c, options)).ToArray(); + long rowsCopied = 0; foreach (var entity in entities) { if (sync) @@ -103,6 +103,8 @@ protected override async Task BulkInsert( } } } + + options.HandleOnProgress(ref rowsCopied); } if (sync) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs index 496217b..1465530 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs @@ -46,6 +46,34 @@ protected override async Task BulkInsert( bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); bulkCopy.EnableStreaming = options.EnableStreaming; + // Handle progress notifications + if (options is { NotifyProgressAfter: not null, OnProgress: not null }) + { + bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value; + + bulkCopy.SqlRowsCopied += (sender, e) => + { + options.OnProgress(e.RowsCopied); + + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + + // If no progress notification is set, we still need to handle cancellation. + else + { + bulkCopy.SqlRowsCopied += (sender, e) => + { + if (ctk.IsCancellationRequested) + { + e.Abort = true; + } + }; + } + foreach (var column in columns) { bulkCopy.ColumnMappings.Add(column.PropertyName, column.ColumnName); diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs index cb670e2..d432ad5 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs @@ -138,6 +138,8 @@ CancellationToken ctk { var batchSize = Math.Min(options.BatchSize, MaxParams / columns.Count); + long rowsCopied = 0; + // The StringBuilder can be reused between the batches. var sb = new StringBuilder(); @@ -179,6 +181,12 @@ CancellationToken ctk FillValues(chunk, partialInsertCommand.Parameters, columns, options); await ExecuteCommand(sync, partialInsertCommand, ctk); } + + // Notify progress after each chunk + for (var i = 0; i < chunk.Length; i++) + { + options.HandleOnProgress(ref rowsCopied); + } } } finally diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs index ffa775f..13535b3 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs @@ -7,6 +7,11 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Options; /// public class BulkInsertOptions { + /// + /// Progress callback delegate to notify about the number of rows copied. + /// + public delegate void ProgressCallback(long rowsCopied); + /// /// Move rows between tables instead of inserting them. /// Only supported for PostgreSQL. @@ -31,6 +36,10 @@ public class BulkInsertOptions /// SQLite /// 5 /// + /// + /// Oracle + /// 50 000 + /// /// /// public int BatchSize { get; set; } @@ -55,8 +64,30 @@ public class BulkInsertOptions /// public int SRID { get; set; } = 4326; + /// + /// Number of rows after which the progress callback is invoked. + /// + public int? NotifyProgressAfter { get; set; } + + /// + /// Callback to notify about the progress of the bulk insert operation. + /// + public ProgressCallback? OnProgress { get; set; } + internal int GetCopyTimeoutInSeconds() { return Math.Max(0, (int)CopyTimeout.TotalSeconds); } + + internal void HandleOnProgress(ref long rowsCopied) + { + rowsCopied++; + + if (OnProgress == null || NotifyProgressAfter == null || NotifyProgressAfter <= 0 || rowsCopied % NotifyProgressAfter != 0) + { + return; + } + + OnProgress(rowsCopied); + } } diff --git a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/DbContext/Extensions.cs b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/DbContext/Extensions.cs index 5ec25a0..85659b2 100644 --- a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/DbContext/Extensions.cs +++ b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/DbContext/Extensions.cs @@ -9,7 +9,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Tests.DbContext; public static class Extensions { public static PropertyBuilder AsJsonString(this PropertyBuilder propertyBuilder, string? columnType) - where T : class + where T : class? { var converter = new ValueConverter( v => JsonSerializer.Serialize(v, (JsonSerializerOptions?)null), diff --git a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs index 2e2cb41..cfe2c27 100644 --- a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs +++ b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs @@ -291,4 +291,68 @@ public async Task InsertEntities_WithGeneratedGuidId(InsertStrategy strategy) .Excluding(e => e.Id) ); } + + [SkippableTheory] + [CombinatorialData] + public async Task HandleProgress(InsertStrategy strategy) + { + // Arrange + var entities = new List + { + new TestEntity { Name = $"{_run}_Entity1" }, + new TestEntity { Name = $"{_run}_Entity2" }, + new TestEntity { Name = $"{_run}_Entity3" }, + new TestEntity { Name = $"{_run}_Entity4" }, + new TestEntity { Name = $"{_run}_Entity5" }, + new TestEntity { Name = $"{_run}_Entity6" }, + new TestEntity { Name = $"{_run}_Entity7" }, + new TestEntity { Name = $"{_run}_Entity8" }, + new TestEntity { Name = $"{_run}_Entity9" }, + new TestEntity { Name = $"{_run}_Entity10" }, + }; + + long progressCount = 0; + var callCount = 0; + + // Act + await _context.InsertWithStrategyAsync(strategy, entities, o => + { + o.NotifyProgressAfter = 2; + o.OnProgress = count => + { + progressCount = count; + callCount++; + }; + }); + + // Assert + Assert.Equal(10, progressCount); + Assert.Equal(5, callCount); + } + + [SkippableTheory] + [CombinatorialData] + public async Task HandleNoProgress(InsertStrategy strategy) + { + // Arrange + var entities = new List + { + new TestEntity { Name = $"{_run}_Entity1" }, + new TestEntity { Name = $"{_run}_Entity2" }, + new TestEntity { Name = $"{_run}_Entity3" }, + new TestEntity { Name = $"{_run}_Entity4" }, + }; + + var callCount = 0; + + // Act + await _context.InsertWithStrategyAsync(strategy, entities, o => + { + // NotifyProgressAfter not set, so no progress callback should be invoked + o.OnProgress = _ => callCount++; + }); + + // Assert + Assert.Equal(0, callCount); + } }