Skip to content

Commit 586505b

Browse files
Merge branch 'master' of github.com:SebastianStehle/PhenX.EntityFrameworkCore.BulkInsert into jsonb
# Conflicts: # src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs
2 parents 9a10ed2 + 8a6b02f commit 586505b

22 files changed

Lines changed: 785 additions & 250 deletions

File tree

README.md

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
# PhenX.EntityFrameworkCore.BulkInsert
22

3-
A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite.
3+
A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite and MySQL.
44

55
Its main purpose is to provide a fast way to perform simple bulk inserts in Entity Framework Core applications.
66

77
## Why this library?
88

99
- **Performance**: It is designed to be fast and memory efficient, making it suitable for high-performance applications.
10-
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, and SQLite), allowing you to use it in different environments without changing your code.
10+
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, SQLite and MySQL), allowing you to use it in different environments without changing your code.
1111
- **Simplicity**: The API is simple and easy to use, making it accessible for developers of all skill levels.
1212

1313
For now, it does not support navigation properties, complex types, owned types, shadow properties, or inheritance,
1414
but they are in [the roadmap](#roadmap).
1515

16+
## Packages
17+
18+
| Package Name | Description | NuGet Link |
19+
|---------------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
20+
| `PhenX.EntityFrameworkCore.BulkInsert.SqlServer` | For SQL Server | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.SqlServer.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.SqlServer) |
21+
| `PhenX.EntityFrameworkCore.BulkInsert.PostgreSql` | For PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql) |
22+
| `PhenX.EntityFrameworkCore.BulkInsert.Sqlite` | For SQLite | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.Sqlite) |
23+
| `PhenX.EntityFrameworkCore.BulkInsert.MySql` | For MySql | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.MySql) |
24+
| `PhenX.EntityFrameworkCore.BulkInsert` | Common library | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert) |
25+
1626
## Installation
1727

1828
Install the NuGet package for your database provider:
@@ -26,6 +36,9 @@ Install-Package PhenX.EntityFrameworkCore.BulkInsert.PostgreSql
2636

2737
# For SQLite
2838
Install-Package PhenX.EntityFrameworkCore.BulkInsert.Sqlite
39+
40+
# For MySql
41+
Install-Package PhenX.EntityFrameworkCore.BulkInsert.MySql
2942
```
3043

3144
## Usage
@@ -43,6 +56,8 @@ services.AddDbContext<MyDbContext>(options =>
4356
.UseBulkInsertSqlServer()
4457
// OR
4558
.UseBulkInsertSqlite()
59+
// OR
60+
.UseBulkInsertMySql()
4661
;
4762
});
4863
```
@@ -57,13 +72,35 @@ await dbContext.ExecuteBulkInsertAsync(entities);
5772
dbContext.ExecuteBulkInsert(entities);
5873
```
5974

60-
3. Optionally, you can configure the bulk insert options:
75+
3. You can also configure the bulk insert options:
6176

6277
```csharp
78+
// Common options
6379
await dbContext.ExecuteBulkInsertAsync(entities, options =>
6480
{
6581
options.BatchSize = 1000; // Set the batch size for the insert operation, the default value is different for each provider
6682
});
83+
84+
// Provider specific options, when available, example for SQL Server
85+
await dbContext.ExecuteBulkInsertAsync(entities, (SqlServerBulkInsertOptions o) => // <<< here specify the SQL Server options class
86+
{
87+
options.EnableStreaming = true; // Enable streaming for SQL Server
88+
});
89+
90+
// Provider specific options, supporting multiple providers
91+
await dbContext.ExecuteBulkInsertAsync(entities, o =>
92+
{
93+
o.MoveRows = true;
94+
95+
if (o is SqlServerBulkInsertOptions sqlServerOptions)
96+
{
97+
sqlServerOptions.EnableStreaming = true;
98+
}
99+
else if (o is MySqlBulkInsertOptions mysqlOptions)
100+
{
101+
mysqlOptions.BatchSize = 1000;
102+
}
103+
});
67104
```
68105

69106
4. You can also return the inserted entities (slower):
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
2+
3+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
4+
5+
/// <summary>
6+
/// Options specific to MySQL bulk insert.
7+
/// </summary>
8+
public class MySqlBulkInsertOptions : BulkInsertOptions
9+
{
10+
11+
}

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
1111

12-
internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : BulkInsertProviderBase<MySqlServerDialectBuilder>(logger)
12+
internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider> logger) : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>(logger)
1313
{
1414
//language=sql
1515
/// <inheritdoc />
@@ -18,6 +18,9 @@ internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger
1818
/// <inheritdoc />
1919
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2020

21+
/// <inheritdoc />
22+
protected override MySqlBulkInsertOptions CreateDefaultOptions() => new();
23+
2124
/// <inheritdoc />
2225
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
2326
bool sync,
@@ -39,7 +42,7 @@ protected override async Task BulkInsert<T>(
3942
IEnumerable<T> entities,
4043
string tableName,
4144
IReadOnlyList<ColumnMetadata> properties,
42-
BulkInsertOptions options,
45+
MySqlBulkInsertOptions options,
4346
CancellationToken ctk
4447
)
4548
{

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
1717

1818
[UsedImplicitly]
19-
internal class PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger = null) : BulkInsertProviderBase<PostgreSqlDialectBuilder>(logger)
19+
internal class PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger) : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>(logger)
2020
{
2121
//language=sql
2222
/// <inheritdoc />
@@ -31,6 +31,12 @@ private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> prope
3131
return sql.ToString();
3232
}
3333

34+
/// <inheritdoc />
35+
protected override BulkInsertOptions CreateDefaultOptions() => new()
36+
{
37+
BatchSize = 50_000,
38+
};
39+
3440
/// <inheritdoc />
3541
protected override async Task BulkInsert<T>(
3642
bool sync,
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Microsoft.Data.SqlClient;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
4+
5+
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
6+
7+
/// <summary>
8+
/// Options specific to SQL Server bulk insert.
9+
/// </summary>
10+
public class SqlServerBulkInsertOptions : BulkInsertOptions
11+
{
12+
/// <inheritdoc cref="SqlBulkCopyOptions"/>
13+
public SqlBulkCopyOptions CopyOptions { get; set; } = SqlBulkCopyOptions.Default;
14+
15+
/// <inheritdoc cref="SqlBulkCopy.EnableStreaming"/>
16+
public bool EnableStreaming { get; set; } = false;
17+
18+
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,24 @@
66
using Microsoft.Extensions.Logging;
77

88
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
9-
using PhenX.EntityFrameworkCore.BulkInsert.Options;
109

1110
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
1211

1312
[UsedImplicitly]
14-
internal class SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger = null) : BulkInsertProviderBase<SqlServerDialectBuilder>(logger)
13+
internal class SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger) : BulkInsertProviderBase<SqlServerDialectBuilder, SqlServerBulkInsertOptions>(logger)
1514
{
16-
1715
//language=sql
1816
/// <inheritdoc />
1917
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT IDENTITY PRIMARY KEY;";
2018

2119
/// <inheritdoc />
2220
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2321

22+
protected override SqlServerBulkInsertOptions CreateDefaultOptions() => new()
23+
{
24+
BatchSize = 50_000,
25+
};
26+
2427
/// <inheritdoc />
2528
protected override async Task BulkInsert<T>(
2629
bool sync,
@@ -29,16 +32,18 @@ protected override async Task BulkInsert<T>(
2932
IEnumerable<T> entities,
3033
string tableName,
3134
IReadOnlyList<ColumnMetadata> columns,
32-
BulkInsertOptions options,
35+
SqlServerBulkInsertOptions options,
3336
CancellationToken ctk)
3437
{
3538
var connection = (SqlConnection) context.Database.GetDbConnection();
3639
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;
3740

38-
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction);
41+
using var bulkCopy = new SqlBulkCopy(connection, options.CopyOptions, sqlTransaction);
42+
3943
bulkCopy.DestinationTableName = tableName;
40-
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
44+
bulkCopy.BatchSize = options.BatchSize;
4145
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
46+
bulkCopy.EnableStreaming = options.EnableStreaming;
4247

4348
foreach (var column in columns)
4449
{

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;
1414

1515
[UsedImplicitly]
16-
internal class SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null) : BulkInsertProviderBase<SqliteDialectBuilder>(logger)
16+
internal class SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger) : BulkInsertProviderBase<SqliteDialectBuilder, BulkInsertOptions>(logger)
1717
{
1818
private const int MaxParams = 1000;
1919

@@ -24,6 +24,12 @@ internal class SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logge
2424
/// <inheritdoc />
2525
protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite
2626

27+
/// <inheritdoc />
28+
protected override BulkInsertOptions CreateDefaultOptions() => new()
29+
{
30+
BatchSize = 5,
31+
};
32+
2733
/// <inheritdoc />
2834
protected override Task AddBulkInsertIdColumn<T>(
2935
bool sync,
@@ -121,9 +127,9 @@ protected override async Task BulkInsert<T>(
121127
CancellationToken ctk
122128
) where T : class
123129
{
124-
var batchSize = Math.Min(options.BatchSize ?? 5, MaxParams / columns.Count);
130+
var batchSize = Math.Min(options.BatchSize, MaxParams / columns.Count);
125131

126-
// The StringBuilder can be resuse between the batches.
132+
// The StringBuilder can be resuse between the batches.
127133
var sb = new StringBuilder();
128134

129135
var columnList = tableInfo.GetColumns(options.CopyGeneratedColumns);

src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ internal Task BulkInsert<T>(
3838
) where T : class;
3939

4040
SqlDialectBuilder SqlDialect { get; }
41+
42+
/// <summary>
43+
/// Make the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
44+
/// </summary>
45+
internal BulkInsertOptions InternalCreateDefaultOptions();
4146
}

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertOptionsExtension.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
using Microsoft.EntityFrameworkCore.Infrastructure;
1+
using Microsoft.EntityFrameworkCore.Infrastructure;
22
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.DependencyInjection.Extensions;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.Logging.Abstractions;
36

47
using PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
58

@@ -13,6 +16,7 @@ public DbContextOptionsExtensionInfo Info
1316

1417
public void ApplyServices(IServiceCollection services)
1518
{
19+
services.TryAddSingleton(typeof(ILogger<>), typeof(NullLogger<>));
1620
services.AddSingleton<IBulkInsertProvider, TProvider>();
1721
}
1822

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212

1313
namespace PhenX.EntityFrameworkCore.BulkInsert;
1414

15-
internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProviderBase<TDialect>>? logger = null) : IBulkInsertProvider where TDialect : SqlDialectBuilder, new()
15+
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>> logger) : IBulkInsertProvider
16+
where TDialect : SqlDialectBuilder, new()
17+
where TOptions : BulkInsertOptions, new()
1618
{
1719
protected readonly TDialect SqlDialect = new();
1820

@@ -24,6 +26,13 @@ namespace PhenX.EntityFrameworkCore.BulkInsert;
2426

2527
SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;
2628

29+
public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions();
30+
31+
/// <summary>
32+
/// Create the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
33+
/// </summary>
34+
protected abstract TOptions CreateDefaultOptions();
35+
2736
public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
2837
bool sync,
2938
DbContext context,
@@ -33,6 +42,11 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
3342
OnConflictOptions? onConflict,
3443
[EnumeratorCancellation] CancellationToken ctk) where T : class
3544
{
45+
if (options is not TOptions providerOptions)
46+
{
47+
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
48+
}
49+
3650
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities");
3751
activity?.AddTag("tableName", tableInfo.TableName);
3852
activity?.AddTag("synchronous", sync);
@@ -42,13 +56,13 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
4256
{
4357
if (logger != null)
4458
{
45-
Log.UsingTempTablToReturnData(logger);
59+
Log.UsingTempTableToReturnData(logger);
4660
}
4761

48-
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
62+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
4963

5064
var result =
51-
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk)
65+
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, providerOptions, onConflict, ctk: ctk)
5266
?? throw new InvalidOperationException("Copy returns null enumerable.");
5367

5468
await foreach (var item in result.WithCancellation(ctk))
@@ -74,6 +88,11 @@ public virtual async Task BulkInsert<T>(
7488
OnConflictOptions? onConflict,
7589
CancellationToken ctk) where T : class
7690
{
91+
if (options is not TOptions providerOptions)
92+
{
93+
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
94+
}
95+
7796
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert");
7897
activity?.AddTag("tableName", tableInfo.TableName);
7998
activity?.AddTag("synchronous", sync);
@@ -88,9 +107,9 @@ public virtual async Task BulkInsert<T>(
88107
Log.UsingTempTableToResolveConflicts(logger);
89108
}
90109

91-
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
110+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
92111

93-
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
112+
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk);
94113
}
95114
else
96115
{
@@ -99,7 +118,7 @@ public virtual async Task BulkInsert<T>(
99118
Log.UsingDirectInsert(logger);
100119
}
101120

102-
await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
121+
await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: false, ctk: ctk);
103122
}
104123

105124
// Commit the transaction if we own them.
@@ -116,7 +135,7 @@ private async Task<string> PerformBulkInsertAsync<T>(
116135
DbContext context,
117136
TableMetadata tableInfo,
118137
IEnumerable<T> entities,
119-
BulkInsertOptions options,
138+
TOptions options,
120139
bool tempTableRequired,
121140
CancellationToken ctk) where T : class
122141
{
@@ -149,7 +168,7 @@ protected abstract Task BulkInsert<T>(
149168
IEnumerable<T> entities,
150169
string tableName,
151170
IReadOnlyList<ColumnMetadata> columns,
152-
BulkInsertOptions options,
171+
TOptions options,
153172
CancellationToken ctk) where T : class;
154173

155174
protected async Task<string> CreateTableCopyAsync<T>(
@@ -187,7 +206,7 @@ protected virtual async Task AddBulkInsertIdColumn<T>(
187206
TableMetadata tableInfo,
188207
string tempTableName,
189208
bool returnData,
190-
BulkInsertOptions options,
209+
TOptions options,
191210
OnConflictOptions? onConflict,
192211
CancellationToken ctk) where T : class where TResult : class
193212
{

0 commit comments

Comments
 (0)