Skip to content

Commit b72d189

Browse files
author
fabien.menager
committed
Add support for provider specific options
1 parent bfa007e commit b72d189

9 files changed

Lines changed: 153 additions & 17 deletions

File tree

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
2525
/// <inheritdoc />
2626
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2727

28+
/// <inheritdoc />
29+
public override BulkInsertOptions GetDefaultOptions() => new();
30+
2831
/// <inheritdoc />
2932
public override Task<List<T>> BulkInsertReturnEntities<T>(
3033
bool sync,

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string
3131
return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)";
3232
}
3333

34+
/// <inheritdoc />
35+
public override BulkInsertOptions GetDefaultOptions() => new()
36+
{
37+
BatchSize = 50_000,
38+
};
39+
3440
/// <inheritdoc />
3541
protected override async Task BulkInsert<T>(
3642
bool sync,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Microsoft.Data.SqlClient;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
4+
5+
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
6+
7+
/// <summary>
8+
/// Options specific to SQL Server bulk insert.
9+
/// </summary>
10+
public class SqlServerBulkInsertOptions : BulkInsertOptions
11+
{
12+
/// <inheritdoc cref="SqlBulkCopyOptions"/>
13+
public SqlBulkCopyOptions CopyOptions { get; init; } = SqlBulkCopyOptions.Default;
14+
15+
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
2727
/// <inheritdoc />
2828
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2929

30+
public override BulkInsertOptions GetDefaultOptions() => new SqlServerBulkInsertOptions
31+
{
32+
BatchSize = 50_000,
33+
};
34+
3035
/// <inheritdoc />
3136
protected override async Task BulkInsert<T>(
3237
bool sync,
@@ -43,7 +48,7 @@ CancellationToken ctk
4348

4449
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction);
4550
bulkCopy.DestinationTableName = tableName;
46-
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
51+
bulkCopy.BatchSize = options.BatchSize;
4752
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
4853

4954
foreach (var prop in properties)

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null
2929
/// <inheritdoc />
3030
protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite
3131

32+
/// <inheritdoc />
33+
public override BulkInsertOptions GetDefaultOptions() => new()
34+
{
35+
BatchSize = 5,
36+
};
37+
3238
/// <inheritdoc />
3339
protected override Task AddBulkInsertIdColumn<T>(
3440
bool sync,
@@ -134,7 +140,7 @@ CancellationToken ctk
134140
) where T : class
135141
{
136142
const int maxParams = 1000;
137-
var batchSize = options.BatchSize ?? 5;
143+
var batchSize = options.BatchSize;
138144
batchSize = Math.Min(batchSize, maxParams / properties.Length);
139145

140146
await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, options, batchSize);

src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ internal Task BulkInsert<T>(
3232
OnConflictOptions? onConflict = null,
3333
CancellationToken ctk = default
3434
) where T : class;
35+
36+
/// <summary>
37+
/// Make the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
38+
/// </summary>
39+
internal BulkInsertOptions GetDefaultOptions();
3540
}

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ public virtual async Task BulkInsert<T>(
216216
}
217217
}
218218

219+
public abstract BulkInsertOptions GetDefaultOptions();
220+
219221
private async Task<(string TableName, DbConnection Connection)> PerformBulkInsertAsync<T>(
220222
bool sync,
221223
DbContext context,

src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs

Lines changed: 106 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,60 @@ namespace PhenX.EntityFrameworkCore.BulkInsert.Extensions;
1212
public static class DbSetExtensions
1313
{
1414
/// <summary>
15-
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet.
15+
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet, with options which
16+
/// can be a subclass of <see cref="BulkInsertOptions"/>.
1617
/// </summary>
17-
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(
18+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T, TOptions>(
1819
this DbSet<T> dbSet,
1920
IEnumerable<T> entities,
20-
Action<BulkInsertOptions>? configure = null,
21+
Action<TOptions> configure,
2122
OnConflictOptions? onConflict = null,
2223
CancellationToken ctk = default
23-
) where T : class
24+
)
25+
where T : class
26+
where TOptions : BulkInsertOptions
2427
{
2528
var provider = InitProvider(dbSet, configure, out var context, out var options);
2629

2730
return await provider.BulkInsertReturnEntities(false, context, entities, options, onConflict, ctk);
2831
}
2932

33+
/// <summary>
34+
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet without options.
35+
/// </summary>
36+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(
37+
this DbSet<T> dbSet,
38+
IEnumerable<T> entities,
39+
Action<BulkInsertOptions> configure,
40+
OnConflictOptions? onConflict = null,
41+
CancellationToken ctk = default
42+
) where T : class
43+
=> await ExecuteBulkInsertReturnEntitiesAsync<T, BulkInsertOptions>(dbSet, entities, configure, onConflict, ctk);
44+
45+
46+
/// <summary>
47+
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet without options.
48+
/// </summary>
49+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(
50+
this DbSet<T> dbSet,
51+
IEnumerable<T> entities,
52+
OnConflictOptions? onConflict = null,
53+
CancellationToken ctk = default
54+
) where T : class
55+
=> await ExecuteBulkInsertReturnEntitiesAsync<T, BulkInsertOptions>(dbSet, entities, _ => { }, onConflict, ctk);
56+
3057
/// <summary>
3158
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext.
3259
/// </summary>
33-
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(this DbContext dbContext, IEnumerable<T> entities, Action<BulkInsertOptions>? configure = null, OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default) where T : class
60+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T, TOptions>(
61+
this DbContext dbContext,
62+
IEnumerable<T> entities,
63+
Action<TOptions> configure,
64+
OnConflictOptions? onConflict = null,
65+
CancellationToken cancellationToken = default
66+
)
67+
where T : class
68+
where TOptions : BulkInsertOptions
3469
{
3570
var dbSet = dbContext.Set<T>();
3671
if (dbSet == null)
@@ -41,16 +76,41 @@ public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(this D
4176
return await dbSet.ExecuteBulkInsertReturnEntitiesAsync(entities, configure, onConflict, cancellationToken);
4277
}
4378

79+
/// <summary>
80+
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with generic options.
81+
/// </summary>
82+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(
83+
this DbContext dbContext,
84+
IEnumerable<T> entities,
85+
Action<BulkInsertOptions> configure,
86+
OnConflictOptions? onConflict = null,
87+
CancellationToken cancellationToken = default
88+
) where T : class
89+
=> await ExecuteBulkInsertReturnEntitiesAsync<T, BulkInsertOptions>(dbContext, entities, configure, onConflict, cancellationToken);
90+
91+
/// <summary>
92+
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbContext, with generic options.
93+
/// </summary>
94+
public static async Task<List<T>> ExecuteBulkInsertReturnEntitiesAsync<T>(
95+
this DbContext dbContext,
96+
IEnumerable<T> entities,
97+
OnConflictOptions? onConflict = null,
98+
CancellationToken cancellationToken = default
99+
) where T : class =>
100+
await dbContext.ExecuteBulkInsertReturnEntitiesAsync<T, BulkInsertOptions>(entities, _ => { }, onConflict, cancellationToken);
101+
44102
/// <summary>
45103
/// Executes a bulk insert operation without returning the inserted/updated entities, from the DbSet.
46104
/// </summary>
47-
public static async Task ExecuteBulkInsertAsync<T>(
105+
public static async Task ExecuteBulkInsertAsync<T, TOptions>(
48106
this DbSet<T> dbSet,
49107
IEnumerable<T> entities,
50-
Action<BulkInsertOptions>? configure = null,
108+
Action<TOptions> configure,
51109
OnConflictOptions? onConflict = null,
52110
CancellationToken ctk = default
53-
) where T : class
111+
)
112+
where T : class
113+
where TOptions : BulkInsertOptions
54114
{
55115
var provider = InitProvider(dbSet, configure, out var context, out var options);
56116

@@ -60,7 +120,14 @@ public static async Task ExecuteBulkInsertAsync<T>(
60120
/// <summary>
61121
/// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext.
62122
/// </summary>
63-
public static async Task ExecuteBulkInsertAsync<T>(this DbContext dbContext, IEnumerable<T> entities, Action<BulkInsertOptions>? configure = null, OnConflictOptions? onConflict = null, CancellationToken cancellationToken = default) where T : class
123+
public static async Task ExecuteBulkInsertAsync<T>(
124+
this DbContext dbContext,
125+
IEnumerable<T> entities,
126+
Action<BulkInsertOptions> configure,
127+
OnConflictOptions? onConflict = null,
128+
CancellationToken cancellationToken = default
129+
)
130+
where T : class
64131
{
65132
var dbSet = dbContext.Set<T>();
66133
if (dbSet == null)
@@ -71,6 +138,25 @@ public static async Task ExecuteBulkInsertAsync<T>(this DbContext dbContext, IEn
71138
await dbSet.ExecuteBulkInsertAsync(entities, configure, onConflict, cancellationToken);
72139
}
73140

141+
/// <summary>
142+
/// Executes a bulk insert operation without returning the inserted/updated entities, from the DbContext.
143+
/// </summary>
144+
public static async Task ExecuteBulkInsertAsync<T>(
145+
this DbContext dbContext,
146+
IEnumerable<T> entities,
147+
OnConflictOptions? onConflict = null,
148+
CancellationToken cancellationToken = default
149+
) where T : class
150+
{
151+
var dbSet = dbContext.Set<T>();
152+
if (dbSet == null)
153+
{
154+
throw new InvalidOperationException($"DbSet of type {typeof(T).Name} not found in DbContext.");
155+
}
156+
157+
await dbSet.ExecuteBulkInsertAsync<T, BulkInsertOptions>(entities, _ => { }, onConflict, cancellationToken);
158+
}
159+
74160
/// <summary>
75161
/// Executes a bulk insert operation returning the inserted/updated entities, from the DbSet (synchronous variant).
76162
/// </summary>
@@ -145,14 +231,22 @@ private static DbContext GetDbContext<T>(this DbSet<T> dbSet) where T : class
145231
return (infrastructure.Instance.GetService(typeof(ICurrentDbContext)) as ICurrentDbContext)!.Context;
146232
}
147233

148-
private static IBulkInsertProvider InitProvider<T>(DbSet<T> dbSet, Action<BulkInsertOptions>? configure, out DbContext context,
149-
out BulkInsertOptions options) where T : class
234+
private static IBulkInsertProvider InitProvider<T, TOptions>(DbSet<T> dbSet, Action<TOptions>? configure, out DbContext context,
235+
out TOptions options) where T : class where TOptions : BulkInsertOptions
150236
{
151237
context = dbSet.GetDbContext();
152238
var provider = context.GetService<IBulkInsertProvider>();
153239

154-
options = new BulkInsertOptions();
240+
var defaultOptions = provider.GetDefaultOptions();
241+
242+
if (defaultOptions is not TOptions castedOptions)
243+
{
244+
throw new InvalidOperationException($"Options type mismatch. Expected {defaultOptions.GetType().Name}, but got {typeof(TOptions).Name}.");
245+
}
246+
247+
options = castedOptions;
155248
configure?.Invoke(options);
249+
156250
return provider;
157251
}
158252
}

src/PhenX.EntityFrameworkCore.BulkInsert/Options/BulkInsertOptions.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ public class BulkInsertOptions
3131
/// </item>
3232
/// </list>
3333
/// </summary>
34-
public int? BatchSize { get; set; }
34+
public int BatchSize { get; set; }
3535

3636
/// <summary>
3737
/// Indicates if also generated columns should be copied. This is useful for upsert operations.
3838
/// </summary>
3939
public bool CopyGeneratedColumns { get; set; }
40-
40+
4141
/// <summary>
4242
/// The timeout to copy records.
4343
/// </summary>
44-
public TimeSpan CopyTimeout = TimeSpan.FromMinutes(10);
44+
public TimeSpan CopyTimeout { get; set; } = TimeSpan.FromMinutes(10);
4545

4646
internal int GetCopyTimeoutInSeconds()
4747
{

0 commit comments

Comments
 (0)