Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,34 @@ CancellationToken ctk
bulkCopy.DestinationTableName = tableName;
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();

// Handle progress notifications
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
{
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;

bulkCopy.MySqlRowsCopied += (sender, e) =>
{
options.OnProgress(e.RowsCopied);

if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

// If no progress notification is set, we still need to handle cancellation.
else
{
bulkCopy.MySqlRowsCopied += (sender, e) =>
{
if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

var sourceOrdinal = 0;
foreach (var prop in properties)
{
Expand All @@ -72,7 +100,7 @@ CancellationToken ctk
}

var dataReader = new EnumerableDataReader<T>(entities, properties, options);

if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@ protected override Task BulkInsert<T>(
bulkCopy.BatchSize = options.BatchSize;
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();

// Handle progress notifications
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
{
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;

bulkCopy.OracleRowsCopied += (sender, e) =>
{
options.OnProgress(e.RowsCopied);

if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

// If no progress notification is set, we still need to handle cancellation.
else
{
bulkCopy.OracleRowsCopied += (sender, e) =>
{
if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

foreach (var column in columns)
{
bulkCopy.ColumnMappings.Add(column.PropertyName, column.QuotedColumName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> prope
/// <inheritdoc />
protected override PostgreSqlBulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 50_000,
Converters = [PostgreSqlGeometryConverter.Instance],
};

Expand All @@ -59,6 +58,7 @@ protected override async Task BulkInsert<T>(
// The type mapping can be null for obvious types like string.
var columnTypes = columns.Select(c => GetPostgreSqlType(c, options)).ToArray();

long rowsCopied = 0;
foreach (var entity in entities)
{
if (sync)
Expand Down Expand Up @@ -103,6 +103,8 @@ protected override async Task BulkInsert<T>(
}
}
}

options.HandleOnProgress(ref rowsCopied);
}

if (sync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,34 @@ protected override async Task BulkInsert<T>(
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
bulkCopy.EnableStreaming = options.EnableStreaming;

// Handle progress notifications
if (options is { NotifyProgressAfter: not null, OnProgress: not null })
{
bulkCopy.NotifyAfter = options.NotifyProgressAfter.Value;

bulkCopy.SqlRowsCopied += (sender, e) =>
{
options.OnProgress(e.RowsCopied);

if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

// If no progress notification is set, we still need to handle cancellation.
else
{
bulkCopy.SqlRowsCopied += (sender, e) =>
{
if (ctk.IsCancellationRequested)
{
e.Abort = true;
}
};
}

foreach (var column in columns)
{
bulkCopy.ColumnMappings.Add(column.PropertyName, column.ColumnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ CancellationToken ctk
{
var batchSize = Math.Min(options.BatchSize, MaxParams / columns.Count);

long rowsCopied = 0;

// The StringBuilder can be reused between the batches.
var sb = new StringBuilder();

Expand Down Expand Up @@ -179,6 +181,12 @@ CancellationToken ctk
FillValues(chunk, partialInsertCommand.Parameters, columns, options);
await ExecuteCommand(sync, partialInsertCommand, ctk);
}

// Notify progress after each chunk
for (var i = 0; i < chunk.Length; i++)
{
options.HandleOnProgress(ref rowsCopied);
}
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Options;
/// </summary>
public class BulkInsertOptions
{
/// <summary>
/// Progress callback delegate to notify about the number of rows copied.
/// </summary>
public delegate void ProgressCallback(long rowsCopied);

/// <summary>
/// Move rows between tables instead of inserting them.
/// Only supported for PostgreSQL.
Expand All @@ -31,6 +36,10 @@ public class BulkInsertOptions
/// <term>SQLite</term>
/// <description>5</description>
/// </item>
/// <item>
/// <term>Oracle</term>
/// <description>50 000</description>
/// </item>
/// </list>
/// </summary>
public int BatchSize { get; set; }
Expand All @@ -55,8 +64,30 @@ public class BulkInsertOptions
/// </summary>
public int SRID { get; set; } = 4326;

/// <summary>
/// Number of rows after which the progress callback is invoked.
/// </summary>
public int? NotifyProgressAfter { get; set; }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is advantage of a nullable here?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real advantages, I just could not decide which value was better


/// <summary>
/// Callback to notify about the progress of the bulk insert operation.
/// </summary>
public ProgressCallback? OnProgress { get; set; }

internal int GetCopyTimeoutInSeconds()
{
return Math.Max(0, (int)CopyTimeout.TotalSeconds);
}

internal void HandleOnProgress(ref long rowsCopied)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have really missed this PR. I think the ref is super weird here. Just to save a increment call on the provider level...

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just a design choice on internal code, I find not so hard to understand and skips a few lines of code

{
rowsCopied++;

if (OnProgress == null || NotifyProgressAfter == null || NotifyProgressAfter <= 0 || rowsCopied % NotifyProgressAfter != 0)
{
return;
}

OnProgress(rowsCopied);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Tests.DbContext;
public static class Extensions
{
public static PropertyBuilder<T> AsJsonString<T>(this PropertyBuilder<T> propertyBuilder, string? columnType)
where T : class
where T : class?
{
var converter = new ValueConverter<T, string>(
v => JsonSerializer.Serialize(v, (JsonSerializerOptions?)null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,68 @@ public async Task InsertEntities_WithGeneratedGuidId(InsertStrategy strategy)
.Excluding(e => e.Id)
);
}

[SkippableTheory]
[CombinatorialData]
public async Task HandleProgress(InsertStrategy strategy)
{
// Arrange
var entities = new List<TestEntity>
{
new TestEntity { Name = $"{_run}_Entity1" },
new TestEntity { Name = $"{_run}_Entity2" },
new TestEntity { Name = $"{_run}_Entity3" },
new TestEntity { Name = $"{_run}_Entity4" },
new TestEntity { Name = $"{_run}_Entity5" },
new TestEntity { Name = $"{_run}_Entity6" },
new TestEntity { Name = $"{_run}_Entity7" },
new TestEntity { Name = $"{_run}_Entity8" },
new TestEntity { Name = $"{_run}_Entity9" },
new TestEntity { Name = $"{_run}_Entity10" },
};

long progressCount = 0;
var callCount = 0;

// Act
await _context.InsertWithStrategyAsync(strategy, entities, o =>
{
o.NotifyProgressAfter = 2;
o.OnProgress = count =>
{
progressCount = count;
callCount++;
};
});

// Assert
Assert.Equal(10, progressCount);
Assert.Equal(5, callCount);
}

[SkippableTheory]
[CombinatorialData]
public async Task HandleNoProgress(InsertStrategy strategy)
{
// Arrange
var entities = new List<TestEntity>
{
new TestEntity { Name = $"{_run}_Entity1" },
new TestEntity { Name = $"{_run}_Entity2" },
new TestEntity { Name = $"{_run}_Entity3" },
new TestEntity { Name = $"{_run}_Entity4" },
};

var callCount = 0;

// Act
await _context.InsertWithStrategyAsync(strategy, entities, o =>
{
// NotifyProgressAfter not set, so no progress callback should be invoked
o.OnProgress = _ => callCount++;
});

// Assert
Assert.Equal(0, callCount);
}
}