Skip to content

Commit 9b3aa67

Browse files
author
fabien.menager
committed
Merge branch 'master' into feature/add-linq2db-benchmark
2 parents 59d68fc + 8a6b02f commit 9b3aa67

19 files changed

Lines changed: 778 additions & 245 deletions

File tree

README.md

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
# PhenX.EntityFrameworkCore.BulkInsert
22

3-
A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite.
3+
A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite and MySQL.
44

55
Its main purpose is to provide a fast way to perform simple bulk inserts in Entity Framework Core applications.
66

77
## Why this library?
88

99
- **Performance**: It is designed to be fast and memory efficient, making it suitable for high-performance applications.
10-
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, and SQLite), allowing you to use it in different environments without changing your code.
10+
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, SQLite and MySQL), allowing you to use it in different environments without changing your code.
1111
- **Simplicity**: The API is simple and easy to use, making it accessible for developers of all skill levels.
1212

1313
For now, it does not support navigation properties, complex types, owned types, shadow properties, or inheritance,
1414
but they are in [the roadmap](#roadmap).
1515

16+
## Packages
17+
18+
| Package Name | Description | NuGet Link |
19+
|---------------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
20+
| `PhenX.EntityFrameworkCore.BulkInsert.SqlServer` | For SQL Server | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.SqlServer.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.SqlServer) |
21+
| `PhenX.EntityFrameworkCore.BulkInsert.PostgreSql` | For PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql) |
22+
| `PhenX.EntityFrameworkCore.BulkInsert.Sqlite` | For SQLite | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.Sqlite) |
23+
| `PhenX.EntityFrameworkCore.BulkInsert.MySql` | For MySql | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.MySql) |
24+
| `PhenX.EntityFrameworkCore.BulkInsert` | Common library | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert) |
25+
1626
## Installation
1727

1828
Install the NuGet package for your database provider:
@@ -26,6 +36,9 @@ Install-Package PhenX.EntityFrameworkCore.BulkInsert.PostgreSql
2636

2737
# For SQLite
2838
Install-Package PhenX.EntityFrameworkCore.BulkInsert.Sqlite
39+
40+
# For MySql
41+
Install-Package PhenX.EntityFrameworkCore.BulkInsert.MySql
2942
```
3043

3144
## Usage
@@ -43,6 +56,8 @@ services.AddDbContext<MyDbContext>(options =>
4356
.UseBulkInsertSqlServer()
4457
// OR
4558
.UseBulkInsertSqlite()
59+
// OR
60+
.UseBulkInsertMySql()
4661
;
4762
});
4863
```
@@ -57,13 +72,35 @@ await dbContext.ExecuteBulkInsertAsync(entities);
5772
dbContext.ExecuteBulkInsert(entities);
5873
```
5974

60-
3. Optionally, you can configure the bulk insert options:
75+
3. You can also configure the bulk insert options:
6176

6277
```csharp
78+
// Common options
6379
await dbContext.ExecuteBulkInsertAsync(entities, options =>
6480
{
6581
options.BatchSize = 1000; // Set the batch size for the insert operation, the default value is different for each provider
6682
});
83+
84+
// Provider specific options, when available, example for SQL Server
85+
await dbContext.ExecuteBulkInsertAsync(entities, (SqlServerBulkInsertOptions o) => // <<< here specify the SQL Server options class
86+
{
87+
options.EnableStreaming = true; // Enable streaming for SQL Server
88+
});
89+
90+
// Provider specific options, supporting multiple providers
91+
await dbContext.ExecuteBulkInsertAsync(entities, o =>
92+
{
93+
o.MoveRows = true;
94+
95+
if (o is SqlServerBulkInsertOptions sqlServerOptions)
96+
{
97+
sqlServerOptions.EnableStreaming = true;
98+
}
99+
else if (o is MySqlBulkInsertOptions mysqlOptions)
100+
{
101+
mysqlOptions.BatchSize = 1000;
102+
}
103+
});
67104
```
68105

69106
4. You can also return the inserted entities (slower):
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
2+
3+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
4+
5+
/// <summary>
6+
/// Options specific to MySQL bulk insert.
7+
/// </summary>
8+
public class MySqlBulkInsertOptions : BulkInsertOptions
9+
{
10+
11+
}

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
1111

12-
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder>
12+
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>
1313
{
1414
public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : base(logger)
1515
{
@@ -22,6 +22,9 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
2222
/// <inheritdoc />
2323
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2424

25+
/// <inheritdoc />
26+
protected override MySqlBulkInsertOptions CreateDefaultOptions() => new();
27+
2528
/// <inheritdoc />
2629
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
2730
bool sync,
@@ -43,21 +46,25 @@ protected override async Task BulkInsert<T>(
4346
IEnumerable<T> entities,
4447
string tableName,
4548
IReadOnlyList<ColumnMetadata> properties,
46-
BulkInsertOptions options,
49+
MySqlBulkInsertOptions options,
4750
CancellationToken ctk
4851
)
4952
{
5053
var connection = (MySqlConnection)context.Database.GetDbConnection();
51-
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction()
54+
55+
var sqlTransaction = context.Database.CurrentTransaction?.GetDbTransaction()
5256
?? throw new InvalidOperationException("No open transaction found.");
57+
5358
if (sqlTransaction is not MySqlTransaction mySqlTransaction)
5459
{
5560
throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}.");
5661
}
5762

58-
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction);
59-
bulkCopy.DestinationTableName = tableName;
60-
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
63+
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction)
64+
{
65+
DestinationTableName = tableName,
66+
BulkCopyTimeout = options.GetCopyTimeoutInSeconds(),
67+
};
6168

6269
var sourceOrdinal = 0;
6370
foreach (var prop in properties)

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
1414

1515
[UsedImplicitly]
16-
internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase<PostgreSqlDialectBuilder>
16+
internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>
1717
{
1818
public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger = null) : base(logger)
1919
{
@@ -32,6 +32,12 @@ private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> prope
3232
return sql.ToString();
3333
}
3434

35+
/// <inheritdoc />
36+
protected override BulkInsertOptions CreateDefaultOptions() => new()
37+
{
38+
BatchSize = 50_000,
39+
};
40+
3541
/// <inheritdoc />
3642
protected override async Task BulkInsert<T>(
3743
bool sync,
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Microsoft.Data.SqlClient;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
4+
5+
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
6+
7+
/// <summary>
8+
/// Options specific to SQL Server bulk insert.
9+
/// </summary>
10+
public class SqlServerBulkInsertOptions : BulkInsertOptions
11+
{
12+
/// <inheritdoc cref="SqlBulkCopyOptions"/>
13+
public SqlBulkCopyOptions CopyOptions { get; set; } = SqlBulkCopyOptions.Default;
14+
15+
/// <inheritdoc cref="SqlBulkCopy.EnableStreaming"/>
16+
public bool EnableStreaming { get; set; } = false;
17+
18+
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
1212

1313
[UsedImplicitly]
14-
internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDialectBuilder>
14+
internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDialectBuilder, SqlServerBulkInsertOptions>
1515
{
1616
public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger = null) : base(logger)
1717
{
@@ -24,6 +24,11 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
2424
/// <inheritdoc />
2525
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
2626

27+
protected override SqlServerBulkInsertOptions CreateDefaultOptions() => new()
28+
{
29+
BatchSize = 50_000,
30+
};
31+
2732
/// <inheritdoc />
2833
protected override async Task BulkInsert<T>(
2934
bool sync,
@@ -32,16 +37,18 @@ protected override async Task BulkInsert<T>(
3237
IEnumerable<T> entities,
3338
string tableName,
3439
IReadOnlyList<ColumnMetadata> columns,
35-
BulkInsertOptions options,
40+
SqlServerBulkInsertOptions options,
3641
CancellationToken ctk)
3742
{
3843
var connection = (SqlConnection) context.Database.GetDbConnection();
3944
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;
4045

41-
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction);
46+
using var bulkCopy = new SqlBulkCopy(connection, options.CopyOptions, sqlTransaction);
47+
4248
bulkCopy.DestinationTableName = tableName;
43-
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
49+
bulkCopy.BatchSize = options.BatchSize;
4450
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
51+
bulkCopy.EnableStreaming = options.EnableStreaming;
4552

4653
foreach (var column in columns)
4754
{

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;
1414

1515
[UsedImplicitly]
16-
internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBuilder>
16+
internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBuilder, BulkInsertOptions>
1717
{
1818
public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null) : base(logger)
1919
{
@@ -26,6 +26,12 @@ public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null
2626
/// <inheritdoc />
2727
protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite
2828

29+
/// <inheritdoc />
30+
protected override BulkInsertOptions CreateDefaultOptions() => new()
31+
{
32+
BatchSize = 5,
33+
};
34+
2935
/// <inheritdoc />
3036
protected override Task AddBulkInsertIdColumn<T>(
3137
bool sync,
@@ -142,10 +148,10 @@ CancellationToken ctk
142148
) where T : class
143149
{
144150
const int maxParams = 1000;
145-
var batchSize = options.BatchSize ?? 5;
151+
var batchSize = options.BatchSize;
146152
batchSize = Math.Min(batchSize, maxParams / columns.Count);
147153

148-
// The StringBuilder can be resuse between the batches.
154+
// The StringBuilder can be resuse between the batches.
149155
var sb = new StringBuilder();
150156

151157
var columnList = tableInfo.GetColumns(options.CopyGeneratedColumns);

src/PhenX.EntityFrameworkCore.BulkInsert/Abstractions/IBulkInsertProvider.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ internal Task BulkInsert<T>(
3838
) where T : class;
3939

4040
SqlDialectBuilder SqlDialect { get; }
41+
42+
/// <summary>
43+
/// Make the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
44+
/// </summary>
45+
internal BulkInsertOptions InternalCreateDefaultOptions();
4146
}

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212

1313
namespace PhenX.EntityFrameworkCore.BulkInsert;
1414

15-
internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProviderBase<TDialect>>? logger = null) : IBulkInsertProvider where TDialect : SqlDialectBuilder, new()
15+
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>>? logger = null) : IBulkInsertProvider
16+
where TDialect : SqlDialectBuilder, new()
17+
where TOptions : BulkInsertOptions, new()
1618
{
1719
protected readonly TDialect SqlDialect = new();
1820

@@ -24,6 +26,13 @@ namespace PhenX.EntityFrameworkCore.BulkInsert;
2426

2527
SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;
2628

29+
public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions();
30+
31+
/// <summary>
32+
/// Create the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
33+
/// </summary>
34+
protected abstract TOptions CreateDefaultOptions();
35+
2736
public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
2837
bool sync,
2938
DbContext context,
@@ -33,6 +42,11 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
3342
OnConflictOptions? onConflict,
3443
[EnumeratorCancellation] CancellationToken ctk) where T : class
3544
{
45+
if (options is not TOptions providerOptions)
46+
{
47+
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
48+
}
49+
3650
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities");
3751
activity?.AddTag("tableName", tableInfo.TableName);
3852
activity?.AddTag("synchronous", sync);
@@ -45,10 +59,10 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
4559
Log.UsingTempTablToReturnData(logger);
4660
}
4761

48-
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
62+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
4963

5064
var result =
51-
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk)
65+
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, providerOptions, onConflict, ctk: ctk)
5266
?? throw new InvalidOperationException("Copy returns null enumerable.");
5367

5468
await foreach (var item in result.WithCancellation(ctk))
@@ -74,6 +88,11 @@ public virtual async Task BulkInsert<T>(
7488
OnConflictOptions? onConflict,
7589
CancellationToken ctk) where T : class
7690
{
91+
if (options is not TOptions providerOptions)
92+
{
93+
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
94+
}
95+
7796
using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert");
7897
activity?.AddTag("tableName", tableInfo.TableName);
7998
activity?.AddTag("synchronous", sync);
@@ -88,9 +107,9 @@ public virtual async Task BulkInsert<T>(
88107
Log.UsingTempTableToResolveConflicts(logger);
89108
}
90109

91-
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
110+
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
92111

93-
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
112+
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk);
94113
}
95114
else
96115
{
@@ -99,7 +118,7 @@ public virtual async Task BulkInsert<T>(
99118
Log.UsingDirectInsert(logger);
100119
}
101120

102-
await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
121+
await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: false, ctk: ctk);
103122
}
104123

105124
// Commit the transaction if we own them.
@@ -116,7 +135,7 @@ private async Task<string> PerformBulkInsertAsync<T>(
116135
DbContext context,
117136
TableMetadata tableInfo,
118137
IEnumerable<T> entities,
119-
BulkInsertOptions options,
138+
TOptions options,
120139
bool tempTableRequired,
121140
CancellationToken ctk) where T : class
122141
{
@@ -149,7 +168,7 @@ protected abstract Task BulkInsert<T>(
149168
IEnumerable<T> entities,
150169
string tableName,
151170
IReadOnlyList<ColumnMetadata> columns,
152-
BulkInsertOptions options,
171+
TOptions options,
153172
CancellationToken ctk) where T : class;
154173

155174
protected async Task<string> CreateTableCopyAsync<T>(
@@ -187,7 +206,7 @@ protected virtual async Task AddBulkInsertIdColumn<T>(
187206
TableMetadata tableInfo,
188207
string tempTableName,
189208
bool returnData,
190-
BulkInsertOptions options,
209+
TOptions options,
191210
OnConflictOptions? onConflict,
192211
CancellationToken ctk) where T : class where TResult : class
193212
{

0 commit comments

Comments
 (0)