From b72d18978e77551ccc495cea0ad1543d4021cc0a Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Thu, 22 May 2025 07:51:53 +0200 Subject: [PATCH 1/6] Add support for provider specific options --- .../MySqlBulkInsertProvider.cs | 3 + .../PostgreSqlBulkInsertProvider.cs | 6 + .../SqlServerBulkInsertOptions.cs | 15 +++ .../SqlServerBulkInsertProvider.cs | 7 +- .../SqliteBulkInsertProvider.cs | 8 +- .../Abstractions/IBulkInsertProvider.cs | 5 + .../BulkInsertProviderBase.cs | 2 + .../Extensions/DbSetExtensions.cs | 118 ++++++++++++++++-- .../Options/BulkInsertOptions.cs | 6 +- 9 files changed, 153 insertions(+), 17 deletions(-) create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs index d6eae56..c8b6e11 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs @@ -25,6 +25,9 @@ public MySqlBulkInsertProvider(ILogger? logger = null) /// protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; + /// + public override BulkInsertOptions GetDefaultOptions() => new(); + /// public override Task> BulkInsertReturnEntities( bool sync, diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs index d0d4c24..e8ac63a 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs @@ -31,6 +31,12 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)"; } + /// + public override BulkInsertOptions GetDefaultOptions() => new() + { + BatchSize = 50_000, + }; + /// protected override async Task BulkInsert( bool sync, diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs new file mode 100644 index 0000000..aad1a32 --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs @@ -0,0 +1,15 @@ +using Microsoft.Data.SqlClient; + +using PhenX.EntityFrameworkCore.BulkInsert.Options; + +namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer; + +/// +/// Options specific to SQL Server bulk insert. +/// +public class SqlServerBulkInsertOptions : BulkInsertOptions +{ + /// + public SqlBulkCopyOptions CopyOptions { get; init; } = SqlBulkCopyOptions.Default; + +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs index 51425ff..e9c3ff4 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs @@ -27,6 +27,11 @@ public SqlServerBulkInsertProvider(ILogger? logger /// protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; + public override BulkInsertOptions GetDefaultOptions() => new SqlServerBulkInsertOptions + { + BatchSize = 50_000, + }; + /// protected override async Task BulkInsert( bool sync, @@ -43,7 +48,7 @@ CancellationToken ctk using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction); bulkCopy.DestinationTableName = tableName; - bulkCopy.BatchSize = options.BatchSize ?? 50_000; + bulkCopy.BatchSize = options.BatchSize; bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); foreach (var prop in properties) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs index 0eea33f..8bdd99e 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs @@ -29,6 +29,12 @@ public SqliteBulkInsertProvider(ILogger? logger = null /// protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite + /// + public override BulkInsertOptions GetDefaultOptions() => new() + { + BatchSize = 5, + }; + /// protected override Task AddBulkInsertIdColumn( bool sync, @@ -134,7 +140,7 @@ CancellationToken ctk ) where T : class { const int maxParams = 1000; - var batchSize = options.BatchSize ?? 5; + var batchSize = options.BatchSize; batchSize = Math.Min(batchSize, maxParams / properties.Length); await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, options, batchSize); diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs index 344cebd..c870460 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs @@ -32,4 +32,9 @@ internal Task BulkInsert( OnConflictOptions? onConflict = null, CancellationToken ctk = default ) where T : class; + + /// + /// Make the default options for the provider, can be a subclass of . + /// + internal BulkInsertOptions GetDefaultOptions(); } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs index 119e004..077bc91 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs @@ -216,6 +216,8 @@ public virtual async Task BulkInsert( } } + public abstract BulkInsertOptions GetDefaultOptions(); + private async Task<(string TableName, DbConnection Connection)> PerformBulkInsertAsync( bool sync, DbContext context, diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs index 4d76cee..1d771f4 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs @@ -12,25 +12,60 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; public static class DbSetExtensions { /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet. + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with options which + /// can be a subclass of . /// - public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( this DbSet dbSet, IEnumerable entities, - Action? configure = null, + Action configure, OnConflictOptions? onConflict = null, CancellationToken ctk = default - ) where T : class + ) + where T : class + where TOptions : BulkInsertOptions { var provider = InitProvider(dbSet, configure, out var context, out var options); return await provider.BulkInsertReturnEntities(false, context, entities, options, onConflict, ctk); } + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet without options. + /// + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) where T : class + => await ExecuteBulkInsertReturnEntitiesAsync(dbSet, entities, configure, onConflict, ctk); + + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet without options. + /// + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) where T : class + => await ExecuteBulkInsertReturnEntitiesAsync(dbSet, entities, _ => { }, onConflict, ctk); + /// /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext. /// - public static async Task> ExecuteBulkInsertReturnEntitiesAsync(this DbContext dbContext, IEnumerable entities, Action? configure = null, OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default) where T : class + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) + where T : class + where TOptions : BulkInsertOptions { var dbSet = dbContext.Set(); if (dbSet == null) @@ -41,16 +76,41 @@ public static async Task> ExecuteBulkInsertReturnEntitiesAsync(this D return await dbSet.ExecuteBulkInsertReturnEntitiesAsync(entities, configure, onConflict, cancellationToken); } + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with generic options. + /// + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) where T : class + => await ExecuteBulkInsertReturnEntitiesAsync(dbContext, entities, configure, onConflict, cancellationToken); + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with generic options. + /// + public static async Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) where T : class => + await dbContext.ExecuteBulkInsertReturnEntitiesAsync(entities, _ => { }, onConflict, cancellationToken); + /// /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet. /// - public static async Task ExecuteBulkInsertAsync( + public static async Task ExecuteBulkInsertAsync( this DbSet dbSet, IEnumerable entities, - Action? configure = null, + Action configure, OnConflictOptions? onConflict = null, CancellationToken ctk = default - ) where T : class + ) + where T : class + where TOptions : BulkInsertOptions { var provider = InitProvider(dbSet, configure, out var context, out var options); @@ -60,7 +120,14 @@ public static async Task ExecuteBulkInsertAsync( /// /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext. /// - public static async Task ExecuteBulkInsertAsync(this DbContext dbContext, IEnumerable entities, Action? configure = null, OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default) where T : class + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) + where T : class { var dbSet = dbContext.Set(); if (dbSet == null) @@ -71,6 +138,25 @@ public static async Task ExecuteBulkInsertAsync(this DbContext dbContext, IEn await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, cancellationToken); } + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext. + /// + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) where T : class + { + var dbSet = dbContext.Set(); + if (dbSet == null) + { + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + } + + await dbSet.ExecuteBulkInsertAsync(entities, _ => { }, onConflict, cancellationToken); + } + /// /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant). /// @@ -145,14 +231,22 @@ private static DbContext GetDbContext(this DbSet dbSet) where T : class return (infrastructure.Instance.GetService(typeof(ICurrentDbContext)) as ICurrentDbContext)!.Context; } - private static IBulkInsertProvider InitProvider(DbSet dbSet, Action? configure, out DbContext context, - out BulkInsertOptions options) where T : class + private static IBulkInsertProvider InitProvider(DbSet dbSet, Action? configure, out DbContext context, + out TOptions options) where T : class where TOptions : BulkInsertOptions { context = dbSet.GetDbContext(); var provider = context.GetService(); - options = new BulkInsertOptions(); + var defaultOptions = provider.GetDefaultOptions(); + + if (defaultOptions is not TOptions castedOptions) + { + throw new InvalidOperationException($"Options type mismatch. Expected {defaultOptions.GetType().Name}, but got {typeof(TOptions).Name}."); + } + + options = castedOptions; configure?.Invoke(options); + return provider; } } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs index c72f919..19f4835 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs @@ -31,17 +31,17 @@ public class BulkInsertOptions /// /// /// - public int? BatchSize { get; set; } + public int BatchSize { get; set; } /// /// Indicates if also generated columns should be copied. This is useful for upsert operations. /// public bool CopyGeneratedColumns { get; set; } - + /// /// The timeout to copy records. /// - public TimeSpan CopyTimeout = TimeSpan.FromMinutes(10); + public TimeSpan CopyTimeout { get; set; } = TimeSpan.FromMinutes(10); internal int GetCopyTimeoutInSeconds() { From f037a4ce86b07a7e9cb1e1dc4f29a0a05d0ae3a5 Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Thu, 22 May 2025 16:37:42 +0200 Subject: [PATCH 2/6] Add support for provider-specifig options --- .../MySqlBulkInsertOptions.cs | 11 ++++ .../MySqlBulkInsertProvider.cs | 18 +++--- .../PostgreSqlBulkInsertProvider.cs | 4 +- .../SqlServerBulkInsertOptions.cs | 5 +- .../SqlServerBulkInsertProvider.cs | 10 ++-- .../SqliteBulkInsertProvider.cs | 4 +- .../Abstractions/IBulkInsertProvider.cs | 8 ++- .../BulkInsertProviderBase.cs | 35 ++++++++---- .../Enums/ProviderType.cs | 27 +++++++++ .../Extensions/DbContextExtensions.cs | 15 +++++ .../Extensions/DbSetExtensions.cs | 31 ++++++----- ...henX.EntityFrameworkCore.BulkInsert.csproj | 1 + .../Tests/Basic/BasicTestsBase.cs | 55 +++++++++++++++---- 13 files changed, 171 insertions(+), 53 deletions(-) create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertOptions.cs create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert/Enums/ProviderType.cs diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertOptions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertOptions.cs new file mode 100644 index 0000000..d1e8d60 --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertOptions.cs @@ -0,0 +1,11 @@ +using PhenX.EntityFrameworkCore.BulkInsert.Options; + +namespace PhenX.EntityFrameworkCore.BulkInsert.MySql; + +/// +/// Options specific to MySQL bulk insert. +/// +public class MySqlBulkInsertOptions : BulkInsertOptions +{ + +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs index c8b6e11..cc69c24 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs @@ -8,7 +8,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.MySql; -internal class MySqlBulkInsertProvider : BulkInsertProviderBase +internal class MySqlBulkInsertProvider : BulkInsertProviderBase { public MySqlBulkInsertProvider(ILogger? logger = null) : base(logger) { @@ -26,7 +26,7 @@ public MySqlBulkInsertProvider(ILogger? logger = null) protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; /// - public override BulkInsertOptions GetDefaultOptions() => new(); + protected override MySqlBulkInsertOptions GetDefaultOptions() => new(); /// public override Task> BulkInsertReturnEntities( @@ -47,21 +47,25 @@ protected override async Task BulkInsert( IEnumerable entities, string tableName, PropertyAccessor[] properties, - BulkInsertOptions options, + MySqlBulkInsertOptions options, CancellationToken ctk ) { var connection = (MySqlConnection)context.Database.GetDbConnection(); - var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() + + var sqlTransaction = context.Database.CurrentTransaction?.GetDbTransaction() ?? throw new InvalidOperationException("No open transaction found."); + if (sqlTransaction is not MySqlTransaction mySqlTransaction) { throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}."); } - var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction); - bulkCopy.DestinationTableName = tableName; - bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); + var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction) + { + DestinationTableName = tableName, + BulkCopyTimeout = options.GetCopyTimeoutInSeconds(), + }; var sourceOrdinal = 0; foreach (var prop in properties) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs index e8ac63a..e57e600 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs @@ -10,7 +10,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql; [UsedImplicitly] -internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase +internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase { public PostgreSqlBulkInsertProvider(ILogger? logger = null) : base(logger) { @@ -32,7 +32,7 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string } /// - public override BulkInsertOptions GetDefaultOptions() => new() + protected override BulkInsertOptions GetDefaultOptions() => new() { BatchSize = 50_000, }; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs index aad1a32..b31b6cc 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertOptions.cs @@ -10,6 +10,9 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer; public class SqlServerBulkInsertOptions : BulkInsertOptions { /// - public SqlBulkCopyOptions CopyOptions { get; init; } = SqlBulkCopyOptions.Default; + public SqlBulkCopyOptions CopyOptions { get; set; } = SqlBulkCopyOptions.Default; + + /// + public bool EnableStreaming { get; set; } = false; } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs index e9c3ff4..ff6fb48 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs @@ -10,7 +10,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer; [UsedImplicitly] -internal class SqlServerBulkInsertProvider : BulkInsertProviderBase +internal class SqlServerBulkInsertProvider : BulkInsertProviderBase { public SqlServerBulkInsertProvider(ILogger? logger = null) : base(logger) { @@ -27,7 +27,7 @@ public SqlServerBulkInsertProvider(ILogger? logger /// protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; - public override BulkInsertOptions GetDefaultOptions() => new SqlServerBulkInsertOptions + protected override SqlServerBulkInsertOptions GetDefaultOptions() => new() { BatchSize = 50_000, }; @@ -39,17 +39,19 @@ protected override async Task BulkInsert( IEnumerable entities, string tableName, PropertyAccessor[] properties, - BulkInsertOptions options, + SqlServerBulkInsertOptions options, CancellationToken ctk ) { var connection = (SqlConnection) context.Database.GetDbConnection(); var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction; - using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction); + using var bulkCopy = new SqlBulkCopy(connection, options.CopyOptions, sqlTransaction); + bulkCopy.DestinationTableName = tableName; bulkCopy.BatchSize = options.BatchSize; bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds(); + bulkCopy.EnableStreaming = options.EnableStreaming; foreach (var prop in properties) { diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs index 8bdd99e..40d72bf 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs @@ -12,7 +12,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite; [UsedImplicitly] -internal class SqliteBulkInsertProvider : BulkInsertProviderBase +internal class SqliteBulkInsertProvider : BulkInsertProviderBase { public SqliteBulkInsertProvider(ILogger? logger = null) : base(logger) { @@ -30,7 +30,7 @@ public SqliteBulkInsertProvider(ILogger? logger = null protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite /// - public override BulkInsertOptions GetDefaultOptions() => new() + protected override BulkInsertOptions GetDefaultOptions() => new() { BatchSize = 5, }; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs index c870460..2728ad4 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs @@ -19,7 +19,8 @@ internal Task> BulkInsertReturnEntities( BulkInsertOptions options, OnConflictOptions? onConflict = null, CancellationToken ctk = default - ) where T : class; + ) + where T : class; /// /// Calls the provider to perform a bulk insert operation without returning the inserted entities. @@ -31,10 +32,11 @@ internal Task BulkInsert( BulkInsertOptions options, OnConflictOptions? onConflict = null, CancellationToken ctk = default - ) where T : class; + ) + where T : class; /// /// Make the default options for the provider, can be a subclass of . /// - internal BulkInsertOptions GetDefaultOptions(); + internal BulkInsertOptions InternalGetDefaultOptions(); } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs index 077bc91..39e0968 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs @@ -12,18 +12,19 @@ namespace PhenX.EntityFrameworkCore.BulkInsert; -internal abstract class BulkInsertProviderBase : IBulkInsertProvider +internal abstract class BulkInsertProviderBase : IBulkInsertProvider where TDialect : SqlDialectBuilder, new() + where TOptions : BulkInsertOptions, new() { protected readonly TDialect SqlDialect = new(); - private readonly ILogger>? Logger; + private readonly ILogger>? Logger; protected virtual string BulkInsertId => "_bulk_insert_id"; protected abstract string CreateTableCopySql { get; } protected abstract string AddTableCopyBulkInsertId { get; } - protected BulkInsertProviderBase(ILogger>? logger = null) + protected BulkInsertProviderBase(ILogger>? logger = null) { Logger = logger; } @@ -31,7 +32,7 @@ protected BulkInsertProviderBase(ILogger>? logg protected async Task CreateTableCopyAsync( bool sync, DbContext context, - BulkInsertOptions options, + TOptions options, CancellationToken cancellationToken = default) where T : class { var tableInfo = GetTableInfo(context, typeof(T)); @@ -148,11 +149,16 @@ public virtual async Task> BulkInsertReturnEntities( CancellationToken ctk = default ) where T : class { + if (options is not TOptions providerOptions) + { + throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}"); + } + var (connection, wasClosed, transaction, wasBegan) = await context.GetConnection(sync, ctk); - var (tableName, _) = await PerformBulkInsertAsync(sync, context, entities, options, tempTableRequired: true, ctk: ctk); + var (tableName, _) = await PerformBulkInsertAsync(sync, context, entities, providerOptions, tempTableRequired: true, ctk: ctk); - var result = await CopyFromTempTableAsync(sync, context, tableName, true, options, onConflict, cancellationToken: ctk); + var result = await CopyFromTempTableAsync(sync, context, tableName, true, providerOptions, onConflict, cancellationToken: ctk); await Finish(sync, connection, wasClosed, transaction, wasBegan, ctk); @@ -200,11 +206,16 @@ public virtual async Task BulkInsert( CancellationToken ctk = default ) where T : class { + if (options is not TOptions providerOptions) + { + throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}"); + } + if (onConflict != null) { var (connection, wasClosed, transaction, wasBegan) = await context.GetConnection(sync, ctk); - var (tableName, _) = await PerformBulkInsertAsync(sync, context, entities, options, tempTableRequired: true, ctk: ctk); + var (tableName, _) = await PerformBulkInsertAsync(sync, context, entities, providerOptions, tempTableRequired: true, ctk: ctk); await CopyFromTempTableAsync(sync, context, tableName, false, options, onConflict, ctk); @@ -212,17 +223,19 @@ public virtual async Task BulkInsert( } else { - await PerformBulkInsertAsync(sync, context, entities, options, tempTableRequired: false, ctk: ctk); + await PerformBulkInsertAsync(sync, context, entities, providerOptions, tempTableRequired: false, ctk: ctk); } } - public abstract BulkInsertOptions GetDefaultOptions(); + public BulkInsertOptions InternalGetDefaultOptions() => GetDefaultOptions(); + + protected abstract TOptions GetDefaultOptions(); private async Task<(string TableName, DbConnection Connection)> PerformBulkInsertAsync( bool sync, DbContext context, IEnumerable entities, - BulkInsertOptions options, + TOptions options, bool tempTableRequired, CancellationToken ctk = default) where T : class { @@ -258,7 +271,7 @@ protected abstract Task BulkInsert( IEnumerable entities, string tableName, PropertyAccessor[] properties, - BulkInsertOptions options, + TOptions options, CancellationToken ctk ) where T : class; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Enums/ProviderType.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Enums/ProviderType.cs new file mode 100644 index 0000000..1010c28 --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Enums/ProviderType.cs @@ -0,0 +1,27 @@ +namespace PhenX.EntityFrameworkCore.BulkInsert.Enums; + +/// +/// Enumeration of supported database providers. +/// +public enum ProviderType +{ + /// + /// SQL Server provider. + /// + SqlServer, + + /// + /// PostgreSQL provider. + /// + PostgreSql, + + /// + /// SQLite provider. + /// + Sqlite, + + /// + /// MySQL provider. + /// + MySql, +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs index 666b120..af377fa 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs @@ -5,6 +5,8 @@ using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.EntityFrameworkCore.Storage; +using PhenX.EntityFrameworkCore.BulkInsert.Enums; + namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; internal static class DbContextExtensions @@ -61,4 +63,17 @@ internal static IProperty[] GetProperties(this DbContext context, Type entityTyp return (connection, wasClosed, transaction, wasBegan); } + + /// + /// Tells if the current provider is the specified provider type. + /// + internal static bool IsProvider(this DbContext context, ProviderType providerType) + { + if (context.Database.ProviderName == null) + { + throw new InvalidOperationException("Database provider name is null."); + } + + return context.Database.ProviderName.Contains(providerType.ToString(), StringComparison.OrdinalIgnoreCase); + } } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs index 1d771f4..4443d05 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs @@ -126,17 +126,8 @@ public static async Task ExecuteBulkInsertAsync( Action configure, OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default - ) - where T : class - { - var dbSet = dbContext.Set(); - if (dbSet == null) - { - throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - } - - await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, cancellationToken); - } + ) where T : class + => await ExecuteBulkInsertAsync(dbContext, entities, configure, onConflict, cancellationToken); /// /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext. @@ -147,6 +138,20 @@ public static async Task ExecuteBulkInsertAsync( OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default ) where T : class + => await ExecuteBulkInsertAsync(dbContext, entities, _ => { }, onConflict, cancellationToken); + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext. + /// + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken cancellationToken = default + ) + where T : class + where TOptions : BulkInsertOptions { var dbSet = dbContext.Set(); if (dbSet == null) @@ -154,7 +159,7 @@ public static async Task ExecuteBulkInsertAsync( throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); } - await dbSet.ExecuteBulkInsertAsync(entities, _ => { }, onConflict, cancellationToken); + await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, cancellationToken); } /// @@ -237,7 +242,7 @@ private static IBulkInsertProvider InitProvider(DbSet dbSet, Act context = dbSet.GetDbContext(); var provider = context.GetService(); - var defaultOptions = provider.GetDefaultOptions(); + var defaultOptions = provider.InternalGetDefaultOptions(); if (defaultOptions is not TOptions castedOptions) { diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/PhenX.EntityFrameworkCore.BulkInsert.csproj b/src/PhenX.EntityFrameworkCore.BulkInsert/PhenX.EntityFrameworkCore.BulkInsert.csproj index b59ec6a..67330af 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/PhenX.EntityFrameworkCore.BulkInsert.csproj +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/PhenX.EntityFrameworkCore.BulkInsert.csproj @@ -6,6 +6,7 @@ + diff --git a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs index 62f4bc2..92fcac2 100644 --- a/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs +++ b/tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs @@ -1,5 +1,8 @@ +using PhenX.EntityFrameworkCore.BulkInsert.Enums; using PhenX.EntityFrameworkCore.BulkInsert.Extensions; +using PhenX.EntityFrameworkCore.BulkInsert.MySql; using PhenX.EntityFrameworkCore.BulkInsert.Options; +using PhenX.EntityFrameworkCore.BulkInsert.SqlServer; using PhenX.EntityFrameworkCore.BulkInsert.Tests.DbContainer; using PhenX.EntityFrameworkCore.BulkInsert.Tests.DbContext; @@ -74,7 +77,7 @@ public void InsertsEntitiesSuccessfully_Sync() [SkippableFact] public async Task InsertsEntitiesAndReturn() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); // Arrange var entities = new List @@ -95,7 +98,7 @@ public async Task InsertsEntitiesAndReturn() [SkippableFact] public void InsertsEntitiesAndReturn_Sync() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); // Arrange var entities = new List @@ -116,8 +119,8 @@ public void InsertsEntitiesAndReturn_Sync() [SkippableFact] public async Task InsertsEntities_MultipleTimes() { - Skip.If(_context.Database.ProviderName!.Contains("Postgres", StringComparison.InvariantCultureIgnoreCase)); - Skip.If(_context.Database.ProviderName!.Contains("SqlServer", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.PostgreSql)); + Skip.If(_context.IsProvider(ProviderType.SqlServer)); // Arrange var entities = new List @@ -150,8 +153,8 @@ await _context.ExecuteBulkInsertAsync(entities, [SkippableFact] public async Task InsertsEntities_MultipleTimes_With_Conflict_On_Id() { - Skip.If(_context.Database.ProviderName!.Contains("Postgres", StringComparison.InvariantCultureIgnoreCase)); - Skip.If(_context.Database.ProviderName!.Contains("SqlServer", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.PostgreSql)); + Skip.If(_context.IsProvider(ProviderType.SqlServer)); // Arrange var entities = new List @@ -209,7 +212,7 @@ await _context.ExecuteBulkInsertAsync(entities, o => [SkippableFact] public async Task InsertsEntitiesWithConflict_SingleColumn() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); _context.TestEntities.Add(new TestEntity { TestRun = _run, Name = $"{_run}_Entity1" }); await _context.SaveChangesAsync(); @@ -248,7 +251,7 @@ await _context.ExecuteBulkInsertAsync(entities, o => [SkippableFact] public async Task InsertsEntitiesWithConflict_DoNothing() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); _context.TestEntities.Add(new TestEntity { TestRun = _run, Name = $"{_run}_Entity1" }); await _context.SaveChangesAsync(); @@ -278,7 +281,7 @@ await _context.ExecuteBulkInsertAsync(entities, o => [SkippableFact] public async Task InsertsEntitiesWithConflict_Condition() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); _context.TestEntities.Add(new TestEntity { TestRun = _run, Name = $"{_run}_Entity1", Price = 10 }); await _context.SaveChangesAsync(); @@ -309,7 +312,7 @@ await _context.ExecuteBulkInsertAsync(entities, o => [SkippableFact] public async Task InsertsEntitiesWithConflict_MultipleColumns() { - Skip.If(_context.Database.ProviderName!.Contains("Mysql", StringComparison.InvariantCultureIgnoreCase)); + Skip.If(_context.IsProvider(ProviderType.MySql)); _context.TestEntities.Add(new TestEntity { TestRun = _run, Name = $"{_run}_Entity1", Price = 10 }); await _context.SaveChangesAsync(); @@ -494,4 +497,36 @@ public void BulkInsert_WithOpenTransaction_RollsBackOnFailure_Sync() Assert.DoesNotContain(insertedEntities, e => e.Name == $"{_run}_EntityWithTxFail1"); Assert.DoesNotContain(insertedEntities, e => e.Name == $"{_run}_EntityWithTxFail2"); } + + [SkippableFact] + public async Task ThrowsWhenUsingWrongConfigurationType() + { + // Skip for providers that don't support this feature + Skip.If(_context.IsProvider(ProviderType.PostgreSql)); + Skip.If(_context.IsProvider(ProviderType.Sqlite)); + + // Arrange + var entities = new List + { + new TestEntity { TestRun = _run, Name = $"{_run}_Entity1" }, + new TestEntity { TestRun = _run, Name = $"{_run}_Entity2" } + }; + + // Act & Assert + if (_context.IsProvider(ProviderType.SqlServer)) + { + await Assert.ThrowsAsync(async () => + await _context.ExecuteBulkInsertAsync(entities, (MySqlBulkInsertOptions o) => + { + })); + } + + if (_context.IsProvider(ProviderType.MySql)) + { + await Assert.ThrowsAsync(async () => + await _context.ExecuteBulkInsertAsync(entities, (SqlServerBulkInsertOptions o) => + { + })); + } + } } From 99dec5e519a5c6cffb7fb72a9eff181a156eaa45 Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Thu, 22 May 2025 18:02:35 +0200 Subject: [PATCH 3/6] Fix merge issues --- .../BulkInsertProviderBase.cs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs index 9b19a44..6052841 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs @@ -13,7 +13,7 @@ namespace PhenX.EntityFrameworkCore.BulkInsert; #pragma warning disable CS9113 // Parameter is unread. -internal abstract class BulkInsertProviderBase(ILogger>? logger = null) : IBulkInsertProvider +internal abstract class BulkInsertProviderBase(ILogger>? logger = null) : IBulkInsertProvider #pragma warning restore CS9113 // Parameter is unread. where TDialect : SqlDialectBuilder, new() where TOptions : BulkInsertOptions, new() @@ -24,7 +24,7 @@ internal abstract class BulkInsertProviderBase(ILogger SqlDialect; protected async Task CreateTableCopyAsync( @@ -151,10 +151,8 @@ public virtual async Task> BulkInsertReturnEntities( { throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}"); } - - List result; - var (connection, wasClosed, transaction, wasBegan) = await context.GetConnection(sync, ctk); + List result; var connectionInfo = await context.GetConnection(sync, ctk); try @@ -243,7 +241,7 @@ public virtual async Task BulkInsert( var connectionInfo = await context.GetConnection(sync, ctk); try { - var (tableName, _) = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk); + var (tableName, _) = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk); await CopyFromTempTableAsync(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk); From 7c337dacb95ffba8628fa25ee884d68623ac4f97 Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Fri, 23 May 2025 17:29:21 +0200 Subject: [PATCH 4/6] Rename methods to clarify purpose of default options creation --- .../MySqlBulkInsertProvider.cs | 2 +- .../PostgreSqlBulkInsertProvider.cs | 2 +- .../SqlServerBulkInsertProvider.cs | 2 +- .../SqliteBulkInsertProvider.cs | 2 +- .../Abstractions/IBulkInsertProvider.cs | 2 +- .../BulkInsertProviderBase.cs | 7 +++++-- .../Extensions/DbSetExtensions.cs | 2 +- 7 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs index 687473f..4c4661d 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs @@ -27,7 +27,7 @@ public MySqlBulkInsertProvider(ILogger? logger = null) protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; /// - protected override MySqlBulkInsertOptions GetDefaultOptions() => new(); + protected override MySqlBulkInsertOptions CreateDefaultOptions() => new(); /// public override Task> BulkInsertReturnEntities( diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs index 58d7fa0..ef63829 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs @@ -33,7 +33,7 @@ private static string GetBinaryImportCommand(TableMetadata tableInfo, string tab } /// - protected override BulkInsertOptions GetDefaultOptions() => new() + protected override BulkInsertOptions CreateDefaultOptions() => new() { BatchSize = 50_000, }; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs index 706e89c..e5d952b 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs @@ -28,7 +28,7 @@ public SqlServerBulkInsertProvider(ILogger? logger /// protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}"; - protected override SqlServerBulkInsertOptions GetDefaultOptions() => new() + protected override SqlServerBulkInsertOptions CreateDefaultOptions() => new() { BatchSize = 50_000, }; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs index 8ad41ce..88ff03e 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs @@ -30,7 +30,7 @@ public SqliteBulkInsertProvider(ILogger? logger = null protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite /// - protected override BulkInsertOptions GetDefaultOptions() => new() + protected override BulkInsertOptions CreateDefaultOptions() => new() { BatchSize = 5, }; diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs index feb59d3..17aa99b 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs @@ -42,5 +42,5 @@ internal Task BulkInsert( /// /// Make the default options for the provider, can be a subclass of . /// - internal BulkInsertOptions InternalGetDefaultOptions(); + internal BulkInsertOptions InternalCreateDefaultOptions(); } diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs index 6052841..bd0d117 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs @@ -259,9 +259,12 @@ public virtual async Task BulkInsert( } } - public BulkInsertOptions InternalGetDefaultOptions() => GetDefaultOptions(); + public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions(); - protected abstract TOptions GetDefaultOptions(); + /// + /// Create the default options for the provider, can be a subclass of . + /// + protected abstract TOptions CreateDefaultOptions(); private async Task<(string TableName, DbConnection Connection)> PerformBulkInsertAsync( bool sync, diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs index 336ac38..f6e2d1b 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs @@ -246,7 +246,7 @@ private static IBulkInsertProvider InitProvider(DbSet dbSet, Act context = dbSet.GetDbContext(); var provider = context.GetService(); - var defaultOptions = provider.InternalGetDefaultOptions(); + var defaultOptions = provider.InternalCreateDefaultOptions(); if (defaultOptions is not TOptions castedOptions) { From 200cc0a3305705fc7d53d52c06d984f3a5a94ebf Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Fri, 23 May 2025 17:45:36 +0200 Subject: [PATCH 5/6] Update readme for MySQL and the provider options --- README.md | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 227a927..62902e2 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,28 @@ # PhenX.EntityFrameworkCore.BulkInsert -A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite. +A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite and MySQL. Its main purpose is to provide a fast way to perform simple bulk inserts in Entity Framework Core applications. ## Why this library? - **Performance**: It is designed to be fast and memory efficient, making it suitable for high-performance applications. -- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, and SQLite), allowing you to use it in different environments without changing your code. +- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, SQLite and MySQL), allowing you to use it in different environments without changing your code. - **Simplicity**: The API is simple and easy to use, making it accessible for developers of all skill levels. For now, it does not support navigation properties, complex types, owned types, shadow properties, or inheritance, but they are in [the roadmap](#roadmap). +## Packages + +| Package Name | Description | NuGet Link | +|---------------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `PhenX.EntityFrameworkCore.BulkInsert.SqlServer` | For SQL Server | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.SqlServer.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.SqlServer) | +| `PhenX.EntityFrameworkCore.BulkInsert.PostgreSql` | For PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql) | +| `PhenX.EntityFrameworkCore.BulkInsert.Sqlite` | For SQLite | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.Sqlite) | +| `PhenX.EntityFrameworkCore.BulkInsert.MySql` | For MySql | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.MySql) | +| `PhenX.EntityFrameworkCore.BulkInsert` | Common library | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert) | + ## Installation Install the NuGet package for your database provider: @@ -26,6 +36,9 @@ Install-Package PhenX.EntityFrameworkCore.BulkInsert.PostgreSql # For SQLite Install-Package PhenX.EntityFrameworkCore.BulkInsert.Sqlite + +# For MySql +Install-Package PhenX.EntityFrameworkCore.BulkInsert.MySql ``` ## Usage @@ -43,6 +56,8 @@ services.AddDbContext(options => .UseBulkInsertSqlServer() // OR .UseBulkInsertSqlite() + // OR + .UseBulkInsertMySql() ; }); ``` @@ -57,13 +72,35 @@ await dbContext.ExecuteBulkInsertAsync(entities); dbContext.ExecuteBulkInsert(entities); ``` -3. Optionally, you can configure the bulk insert options: +3. You can also configure the bulk insert options: ```csharp +// Common options await dbContext.ExecuteBulkInsertAsync(entities, options => { options.BatchSize = 1000; // Set the batch size for the insert operation, the default value is different for each provider }); + +// Provider specific options, when available, example for SQL Server +await dbContext.ExecuteBulkInsertAsync(entities, (SqlServerBulkInsertOptions o) => // <<< here specify the SQL Server options class +{ + options.EnableStreaming = true; // Enable streaming for SQL Server +}); + +// Provider specific options, supporting multiple providers +await dbContext.ExecuteBulkInsertAsync(entities, o => +{ + o.MoveRows = true; + + if (o is SqlServerBulkInsertOptions sqlServerOptions) + { + sqlServerOptions.EnableStreaming = true; + } + else if (o is MySqlBulkInsertOptions mysqlOptions) + { + mysqlOptions.BatchSize = 1000; + } +}); ``` 4. You can also return the inserted entities (slower): From 8a6b02ff207098d40b9328cca8c0655d1a1b222b Mon Sep 17 00:00:00 2001 From: "fabien.menager" Date: Sat, 24 May 2025 08:36:31 +0200 Subject: [PATCH 6/6] Split public extensions in partials and finish merge --- .../BulkInsertProviderBase.cs | 37 ++- .../Extensions/DbSetExtensions.cs | 204 --------------- ...extExtensions.cs => InternalExtensions.cs} | 6 +- .../Extensions/PublicExtensions.DbContext.cs | 239 ++++++++++++++++++ .../Extensions/PublicExtensions.DbSet.cs | 237 +++++++++++++++++ .../Extensions/PublicExtensions.cs | 67 +++++ .../ConnectionInfo.cs | 2 +- 7 files changed, 575 insertions(+), 217 deletions(-) delete mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs rename src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/{DbContextExtensions.cs => InternalExtensions.cs} (92%) create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbContext.cs create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbSet.cs create mode 100644 src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.cs rename src/PhenX.EntityFrameworkCore.BulkInsert/{Extensions => Metadata}/ConnectionInfo.cs (95%) diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs index ace778a..48fa384 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs @@ -12,7 +12,9 @@ namespace PhenX.EntityFrameworkCore.BulkInsert; -internal abstract class BulkInsertProviderBase(ILogger>? logger = null) : IBulkInsertProvider where TDialect : SqlDialectBuilder, new() +internal abstract class BulkInsertProviderBase(ILogger>? logger = null) : IBulkInsertProvider + where TDialect : SqlDialectBuilder, new() + where TOptions : BulkInsertOptions, new() { protected readonly TDialect SqlDialect = new(); @@ -24,6 +26,13 @@ namespace PhenX.EntityFrameworkCore.BulkInsert; SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect; + public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions(); + + /// + /// Create the default options for the provider, can be a subclass of . + /// + protected abstract TOptions CreateDefaultOptions(); + public virtual async IAsyncEnumerable BulkInsertReturnEntities( bool sync, DbContext context, @@ -33,6 +42,11 @@ public virtual async IAsyncEnumerable BulkInsertReturnEntities( OnConflictOptions? onConflict, [EnumeratorCancellation] CancellationToken ctk) where T : class { + if (options is not TOptions providerOptions) + { + throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}"); + } + using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities"); activity?.AddTag("tableName", tableInfo.TableName); activity?.AddTag("synchronous", sync); @@ -45,10 +59,10 @@ public virtual async IAsyncEnumerable BulkInsertReturnEntities( Log.UsingTempTablToReturnData(logger); } - var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk); + var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk); var result = - await CopyFromTempTableAsync(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk) + await CopyFromTempTableAsync(sync, context, tableInfo, tableName, true, providerOptions, onConflict, ctk: ctk) ?? throw new InvalidOperationException("Copy returns null enumerable."); await foreach (var item in result.WithCancellation(ctk)) @@ -74,6 +88,11 @@ public virtual async Task BulkInsert( OnConflictOptions? onConflict, CancellationToken ctk) where T : class { + if (options is not TOptions providerOptions) + { + throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}"); + } + using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert"); activity?.AddTag("tableName", tableInfo.TableName); activity?.AddTag("synchronous", sync); @@ -88,9 +107,9 @@ public virtual async Task BulkInsert( Log.UsingTempTableToResolveConflicts(logger); } - var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk); + var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk); - await CopyFromTempTableAsync(sync, context, tableInfo, tableName, false, options, onConflict, ctk); + await CopyFromTempTableAsync(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk); } else { @@ -99,7 +118,7 @@ public virtual async Task BulkInsert( Log.UsingDirectInsert(logger); } - await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk); + await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: false, ctk: ctk); } // Commit the transaction if we own them. @@ -116,7 +135,7 @@ private async Task PerformBulkInsertAsync( DbContext context, TableMetadata tableInfo, IEnumerable entities, - BulkInsertOptions options, + TOptions options, bool tempTableRequired, CancellationToken ctk) where T : class { @@ -149,7 +168,7 @@ protected abstract Task BulkInsert( IEnumerable entities, string tableName, IReadOnlyList columns, - BulkInsertOptions options, + TOptions options, CancellationToken ctk) where T : class; protected async Task CreateTableCopyAsync( @@ -187,7 +206,7 @@ protected virtual async Task AddBulkInsertIdColumn( TableMetadata tableInfo, string tempTableName, bool returnData, - BulkInsertOptions options, + TOptions options, OnConflictOptions? onConflict, CancellationToken ctk) where T : class where TResult : class { diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs deleted file mode 100644 index 7e773a9..0000000 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs +++ /dev/null @@ -1,204 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; - -using PhenX.EntityFrameworkCore.BulkInsert.Abstractions; -using PhenX.EntityFrameworkCore.BulkInsert.Options; - -namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; - -/// -/// DbSet extensions for bulk insert operations. -/// -public static class DbSetExtensions -{ - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant). - /// - public static List ExecuteBulkInsertReturnEntities( - this DbSet dbSet, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null - ) where T : class - { - return dbSet.ExecuteBulkInsertReturnEntitiesCoreAsync(true, entities, configure, onConflict, default).GetAwaiter().GetResult(); - } - - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext (synchronous variant). - /// - public static List ExecuteBulkInsertReturnEntities( - this DbContext dbContext, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null - ) where T : class - { - var dbSet = dbContext.Set() ?? throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - - return dbSet.ExecuteBulkInsertReturnEntitiesCoreAsync(true, entities, configure, onConflict, default).GetAwaiter().GetResult(); - } - - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext. - /// - public static Task> ExecuteBulkInsertReturnEntitiesAsync( - this DbContext dbContext, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - var dbSet = dbContext.Set() ?? throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - - return dbSet.ExecuteBulkInsertReturnEntitiesCoreAsync(false, entities, configure, onConflict, ctk); - } - - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet. - /// - public static Task> ExecuteBulkInsertReturnEntitiesAsync( - this DbSet dbSet, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - return dbSet.ExecuteBulkInsertReturnEntitiesCoreAsync(false, entities, configure, onConflict, ctk); - } - - private static async Task> ExecuteBulkInsertReturnEntitiesCoreAsync( - this DbSet dbSet, - bool sync, - IEnumerable entities, - Action? configure, - OnConflictOptions? onConflict, - CancellationToken ctk - ) where T : class - { - var provider = InitProvider(dbSet, configure, out var context, out var options); - - var enumerable = provider.BulkInsertReturnEntities(sync, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict, ctk); - - var result = new List(); - await foreach (var item in enumerable.WithCancellation(ctk)) - { - result.Add(item); - } - - return result; - } - - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext. - /// - public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( - this DbContext dbContext, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - var dbSet = dbContext.Set() ?? throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - - return dbSet.ExecuteBulkInsertReturnEnumerableAsync(entities, configure, onConflict, ctk); - } - - /// - /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet. - /// - public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( - this DbSet dbSet, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - var provider = InitProvider(dbSet, configure, out var context, out var options); - - return provider.BulkInsertReturnEntities(false, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict, ctk); - } - - /// - /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext. - /// - public static async Task ExecuteBulkInsertAsync( - this DbContext dbContext, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - var dbSet = dbContext.Set() ?? throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - - await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, ctk); - } - - /// - /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet. - /// - public static async Task ExecuteBulkInsertAsync( - this DbSet dbSet, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null, - CancellationToken ctk = default - ) where T : class - { - var provider = InitProvider(dbSet, configure, out var context, out var options); - - await provider.BulkInsert(false, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict, ctk); - } - - /// - /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext (synchronous variant). - /// - public static void ExecuteBulkInsert( - this DbContext dbContext, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null - ) where T : class - { - var dbSet = dbContext.Set() ?? throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); - - dbSet.ExecuteBulkInsert(entities, configure, onConflict); - } - - /// - /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet (synchronous variant). - /// - public static void ExecuteBulkInsert( - this DbSet dbSet, - IEnumerable entities, - Action? configure = null, - OnConflictOptions? onConflict = null - ) where T : class - { - var provider = InitProvider(dbSet, configure, out var context, out var options); - - provider.BulkInsert(true, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict).GetAwaiter().GetResult(); - } - - private static DbContext GetDbContext(this DbSet dbSet) where T : class - { - IInfrastructure infrastructure = dbSet; - return (infrastructure.Instance.GetService(typeof(ICurrentDbContext)) as ICurrentDbContext)!.Context; - } - - private static IBulkInsertProvider InitProvider(DbSet dbSet, Action? configure, out DbContext context, - out BulkInsertOptions options) where T : class - { - context = dbSet.GetDbContext(); - var provider = context.GetService(); - - options = new BulkInsertOptions(); - configure?.Invoke(options); - return provider; - } -} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/InternalExtensions.cs similarity index 92% rename from src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs rename to src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/InternalExtensions.cs index f893dac..16443b7 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbContextExtensions.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/InternalExtensions.cs @@ -10,16 +10,16 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; -internal static class DbContextExtensions +internal static class InternalExtensions { - public static TableMetadata GetTableInfo(this DbContext context) + internal static TableMetadata GetTableInfo(this DbContext context) { var provider = context.GetService(); return provider.GetTableInfo(context); } - public static DbContextOptionsBuilder UseProvider(this DbContextOptionsBuilder optionsBuilder) + internal static DbContextOptionsBuilder UseProvider(this DbContextOptionsBuilder optionsBuilder) where TProvider : class, IBulkInsertProvider { var extension = optionsBuilder.Options.FindExtension>() ?? new BulkInsertOptionsExtension(); diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbContext.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbContext.cs new file mode 100644 index 0000000..9bc410b --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbContext.cs @@ -0,0 +1,239 @@ +using Microsoft.EntityFrameworkCore; + +using PhenX.EntityFrameworkCore.BulkInsert.Options; + +namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; + +public static partial class PublicExtensions +{ + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext (synchronous variant), with provider specific options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + where TConfig : BulkInsertOptions + { + var dbSet = dbContext.Set() ?? + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + + return ExecuteBulkInsertReturnEntitiesCoreAsync(dbSet, true, entities, configure, onConflict, CancellationToken.None) + .GetAwaiter().GetResult(); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext (synchronous variant), with common options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + { + return ExecuteBulkInsertReturnEntities(dbContext, entities, configure, onConflict); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext (synchronous variant), without options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null + ) + where T : class + { + return ExecuteBulkInsertReturnEntities(dbContext, entities, _ => { }, onConflict); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with provider specific options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TConfig : BulkInsertOptions + { + var dbSet = dbContext.Set() ?? + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + + return ExecuteBulkInsertReturnEntitiesCoreAsync(dbSet,false, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with common options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEntitiesAsync(dbContext, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, without options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEntitiesAsync(dbContext, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with provider specific options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TConfig : BulkInsertOptions + { + var dbSet = dbContext.Set() ?? + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + + return dbSet.ExecuteBulkInsertReturnEnumerableAsync(entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with common options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) where T : class + { + return ExecuteBulkInsertReturnEnumerableAsync(dbContext, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, without options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) where T : class + { + return ExecuteBulkInsertReturnEnumerableAsync(dbContext, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext, with provider specific options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TConfig : BulkInsertOptions + { + var dbSet = dbContext.Set() ?? + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + + await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext, with common options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + await ExecuteBulkInsertAsync(dbContext, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext, without options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + await ExecuteBulkInsertAsync(dbContext, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext (synchronous variant), with provider specific options. + /// + public static void ExecuteBulkInsert( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + where TConfig : BulkInsertOptions + { + var dbSet = dbContext.Set() ?? + throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext."); + + dbSet.ExecuteBulkInsert(entities, configure, onConflict); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext (synchronous variant), with common options. + /// + public static void ExecuteBulkInsert( + this DbContext dbContext, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) where T : class + { + ExecuteBulkInsert(dbContext, entities, configure, onConflict); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext (synchronous variant), without options. + /// + public static void ExecuteBulkInsert( + this DbContext dbContext, + IEnumerable entities, + OnConflictOptions? onConflict = null + ) where T : class + { + ExecuteBulkInsert(dbContext, entities, _ => { }, onConflict); + } +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbSet.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbSet.cs new file mode 100644 index 0000000..c7cd183 --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbSet.cs @@ -0,0 +1,237 @@ +using Microsoft.EntityFrameworkCore; + +using PhenX.EntityFrameworkCore.BulkInsert.Options; + +namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; + +public static partial class PublicExtensions +{ + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant), with provider specific options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + where TOptions : BulkInsertOptions + { + return ExecuteBulkInsertReturnEntitiesCoreAsync(dbSet, true, entities, configure, onConflict, CancellationToken.None) + .GetAwaiter().GetResult(); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant), with common options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + { + return ExecuteBulkInsertReturnEntities(dbSet, entities, configure, onConflict); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant), without options. + /// + public static List ExecuteBulkInsertReturnEntities( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null + ) + where T : class + { + return ExecuteBulkInsertReturnEntities(dbSet, entities, _ => { }, onConflict); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with provider specific options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TOptions : BulkInsertOptions + { + return ExecuteBulkInsertReturnEntitiesCoreAsync(dbSet, false, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with common options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEntitiesAsync(dbSet, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, without options. + /// + public static Task> ExecuteBulkInsertReturnEntitiesAsync( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEntitiesAsync(dbSet, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with provider specific options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TOptions : BulkInsertOptions + { + var provider = InitProvider(dbSet, configure, out var context, out var options); + + return provider.BulkInsertReturnEntities(false, context, dbSet.GetDbContext().GetTableInfo(), entities, + options, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with common options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEnumerableAsync(dbSet, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, without options. + /// + public static IAsyncEnumerable ExecuteBulkInsertReturnEnumerableAsync( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + return ExecuteBulkInsertReturnEnumerableAsync(dbSet, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet, with provider specific options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + where TOptions : BulkInsertOptions + { + var provider = InitProvider(dbSet, configure, out var context, out var options); + + await provider.BulkInsert(false, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict, + ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet, with common options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + await ExecuteBulkInsertAsync(dbSet, entities, configure, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet, without options. + /// + public static async Task ExecuteBulkInsertAsync( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null, + CancellationToken ctk = default + ) + where T : class + { + await ExecuteBulkInsertAsync(dbSet, entities, _ => { }, onConflict, ctk); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet (synchronous variant), with provider specific options. + /// + public static void ExecuteBulkInsert( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + where TOptions : BulkInsertOptions + { + var provider = InitProvider(dbSet, configure, out var context, out var options); + + provider.BulkInsert(true, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict) + .GetAwaiter().GetResult(); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet (synchronous variant), with common options. + /// + public static void ExecuteBulkInsert( + this DbSet dbSet, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict = null + ) + where T : class + { + ExecuteBulkInsert(dbSet, entities, configure, onConflict); + } + + /// + /// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet (synchronous variant), without options. + /// + public static void ExecuteBulkInsert( + this DbSet dbSet, + IEnumerable entities, + OnConflictOptions? onConflict = null + ) + where T : class + { + ExecuteBulkInsert(dbSet, entities, _ => { }, onConflict); + } +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.cs new file mode 100644 index 0000000..4dcc850 --- /dev/null +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.cs @@ -0,0 +1,67 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; + +using PhenX.EntityFrameworkCore.BulkInsert.Abstractions; +using PhenX.EntityFrameworkCore.BulkInsert.Options; + +namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; + +/// +/// DbSet extensions for bulk insert operations. +/// +public static partial class PublicExtensions +{ + private static async Task> ExecuteBulkInsertReturnEntitiesCoreAsync( + this DbSet dbSet, + bool sync, + IEnumerable entities, + Action configure, + OnConflictOptions? onConflict, + CancellationToken ctk + ) + where T : class + where TOptions : BulkInsertOptions + { + var provider = InitProvider(dbSet, configure, out var context, out var options); + + var enumerable = provider.BulkInsertReturnEntities(sync, context, dbSet.GetDbContext().GetTableInfo(), entities, options, onConflict, ctk); + + var result = new List(); + await foreach (var item in enumerable.WithCancellation(ctk)) + { + result.Add(item); + } + + return result; + } + + private static DbContext GetDbContext(this DbSet dbSet) where T : class + { + IInfrastructure infrastructure = dbSet; + return (infrastructure.Instance.GetService(typeof(ICurrentDbContext)) as ICurrentDbContext)!.Context; + } + + private static IBulkInsertProvider InitProvider( + DbSet dbSet, + Action? configure, + out DbContext context, + out TOptions options + ) + where T : class where TOptions : BulkInsertOptions + { + context = dbSet.GetDbContext(); + var provider = context.GetService(); + + var defaultOptions = provider.InternalCreateDefaultOptions(); + + if (defaultOptions is not TOptions castedOptions) + { + throw new InvalidOperationException($"Options type mismatch. Expected {defaultOptions.GetType().Name}, but got {typeof(TOptions).Name}."); + } + + options = castedOptions; + configure?.Invoke(options); + + return provider; + } +} diff --git a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/ConnectionInfo.cs b/src/PhenX.EntityFrameworkCore.BulkInsert/Metadata/ConnectionInfo.cs similarity index 95% rename from src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/ConnectionInfo.cs rename to src/PhenX.EntityFrameworkCore.BulkInsert/Metadata/ConnectionInfo.cs index 33ecc9b..cbb52e1 100644 --- a/src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/ConnectionInfo.cs +++ b/src/PhenX.EntityFrameworkCore.BulkInsert/Metadata/ConnectionInfo.cs @@ -2,7 +2,7 @@ using Microsoft.EntityFrameworkCore.Storage; -namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions; +namespace PhenX.EntityFrameworkCore.BulkInsert.Metadata; internal readonly record struct ConnectionInfo(DbConnection Connection, bool WasClosed, IDbContextTransaction Transaction, bool WasBegan) {