Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
# PhenX.EntityFrameworkCore.BulkInsert

A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite.
A high-performance, provider-agnostic bulk insert extension for Entity Framework Core 8+. Supports SQL Server, PostgreSQL, SQLite and MySQL.

Its main purpose is to provide a fast way to perform simple bulk inserts in Entity Framework Core applications.

## Why this library?

- **Performance**: It is designed to be fast and memory efficient, making it suitable for high-performance applications.
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, and SQLite), allowing you to use it in different environments without changing your code.
- **Provider-agnostic**: It works with multiple database providers (SQL Server, PostgreSQL, SQLite and MySQL), allowing you to use it in different environments without changing your code.
- **Simplicity**: The API is simple and easy to use, making it accessible for developers of all skill levels.

For now, it does not support navigation properties, complex types, owned types, shadow properties, or inheritance,
but they are in [the roadmap](#roadmap).

## Packages

| Package Name | Description | NuGet Link |
|---------------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `PhenX.EntityFrameworkCore.BulkInsert.SqlServer` | For SQL Server | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.SqlServer.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.SqlServer) |
| `PhenX.EntityFrameworkCore.BulkInsert.PostgreSql` | For PostgreSQL | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql) |
| `PhenX.EntityFrameworkCore.BulkInsert.Sqlite` | For SQLite | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.Sqlite) |
| `PhenX.EntityFrameworkCore.BulkInsert.MySql` | For MySql | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.Sqlite.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert.MySql) |
| `PhenX.EntityFrameworkCore.BulkInsert` | Common library | [![NuGet](https://img.shields.io/nuget/v/PhenX.EntityFrameworkCore.BulkInsert.svg)](https://www.nuget.org/packages/PhenX.EntityFrameworkCore.BulkInsert) |

## Installation

Install the NuGet package for your database provider:
Expand All @@ -26,6 +36,9 @@ Install-Package PhenX.EntityFrameworkCore.BulkInsert.PostgreSql

# For SQLite
Install-Package PhenX.EntityFrameworkCore.BulkInsert.Sqlite

# For MySql
Install-Package PhenX.EntityFrameworkCore.BulkInsert.MySql
```

## Usage
Expand All @@ -43,6 +56,8 @@ services.AddDbContext<MyDbContext>(options =>
.UseBulkInsertSqlServer()
// OR
.UseBulkInsertSqlite()
// OR
.UseBulkInsertMySql()
;
});
```
Expand All @@ -57,13 +72,35 @@ await dbContext.ExecuteBulkInsertAsync(entities);
dbContext.ExecuteBulkInsert(entities);
```

3. Optionally, you can configure the bulk insert options:
3. You can also configure the bulk insert options:

```csharp
// Common options
await dbContext.ExecuteBulkInsertAsync(entities, options =>
{
options.BatchSize = 1000; // Set the batch size for the insert operation, the default value is different for each provider
});

// Provider specific options, when available, example for SQL Server
await dbContext.ExecuteBulkInsertAsync(entities, (SqlServerBulkInsertOptions o) => // <<< here specify the SQL Server options class
{
options.EnableStreaming = true; // Enable streaming for SQL Server
});

// Provider specific options, supporting multiple providers
await dbContext.ExecuteBulkInsertAsync(entities, o =>
{
o.MoveRows = true;

if (o is SqlServerBulkInsertOptions sqlServerOptions)
{
sqlServerOptions.EnableStreaming = true;
}
else if (o is MySqlBulkInsertOptions mysqlOptions)
{
mysqlOptions.BatchSize = 1000;
}
});
```

4. You can also return the inserted entities (slower):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;

/// <summary>
/// Options specific to MySQL bulk insert.
/// </summary>
public class MySqlBulkInsertOptions : BulkInsertOptions
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;

internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder>
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>
{
public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : base(logger)
{
Expand All @@ -22,6 +22,9 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
/// <inheritdoc />
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";

/// <inheritdoc />
protected override MySqlBulkInsertOptions CreateDefaultOptions() => new();

/// <inheritdoc />
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
Expand All @@ -43,21 +46,25 @@ protected override async Task BulkInsert<T>(
IEnumerable<T> entities,
string tableName,
IReadOnlyList<ColumnMetadata> properties,
BulkInsertOptions options,
MySqlBulkInsertOptions options,
CancellationToken ctk
)
{
var connection = (MySqlConnection)context.Database.GetDbConnection();
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction()

var sqlTransaction = context.Database.CurrentTransaction?.GetDbTransaction()
?? throw new InvalidOperationException("No open transaction found.");

if (sqlTransaction is not MySqlTransaction mySqlTransaction)
{
throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}.");
}

var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction);
bulkCopy.DestinationTableName = tableName;
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction)
{
DestinationTableName = tableName,
BulkCopyTimeout = options.GetCopyTimeoutInSeconds(),
};

var sourceOrdinal = 0;
foreach (var prop in properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;

[UsedImplicitly]
internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase<PostgreSqlDialectBuilder>
internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>
{
public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger = null) : base(logger)
{
Expand All @@ -32,6 +32,12 @@ private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> prope
return sql.ToString();
}

/// <inheritdoc />
protected override BulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 50_000,
};

/// <inheritdoc />
protected override async Task BulkInsert<T>(
bool sync,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Microsoft.Data.SqlClient;

using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;

/// <summary>
/// Options specific to SQL Server bulk insert.
/// </summary>
public class SqlServerBulkInsertOptions : BulkInsertOptions
{
/// <inheritdoc cref="SqlBulkCopyOptions"/>
public SqlBulkCopyOptions CopyOptions { get; set; } = SqlBulkCopyOptions.Default;

/// <inheritdoc cref="SqlBulkCopy.EnableStreaming"/>
public bool EnableStreaming { get; set; } = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;

[UsedImplicitly]
internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDialectBuilder>
internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDialectBuilder, SqlServerBulkInsertOptions>
{
public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger = null) : base(logger)
{
Expand All @@ -24,6 +24,11 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
/// <inheritdoc />
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";

protected override SqlServerBulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 50_000,
};

/// <inheritdoc />
protected override async Task BulkInsert<T>(
bool sync,
Expand All @@ -32,16 +37,18 @@ protected override async Task BulkInsert<T>(
IEnumerable<T> entities,
string tableName,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
SqlServerBulkInsertOptions options,
CancellationToken ctk)
{
var connection = (SqlConnection) context.Database.GetDbConnection();
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;

using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, sqlTransaction);
using var bulkCopy = new SqlBulkCopy(connection, options.CopyOptions, sqlTransaction);

bulkCopy.DestinationTableName = tableName;
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
bulkCopy.BatchSize = options.BatchSize;
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
bulkCopy.EnableStreaming = options.EnableStreaming;

foreach (var column in columns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;

[UsedImplicitly]
internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBuilder>
internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBuilder, BulkInsertOptions>
{
public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null) : base(logger)
{
Expand All @@ -26,6 +26,12 @@ public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => "--"; // No need to add an ID column in SQLite

/// <inheritdoc />
protected override BulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 5,
};

/// <inheritdoc />
protected override Task AddBulkInsertIdColumn<T>(
bool sync,
Expand Down Expand Up @@ -142,10 +148,10 @@ CancellationToken ctk
) where T : class
{
const int maxParams = 1000;
var batchSize = options.BatchSize ?? 5;
var batchSize = options.BatchSize;
batchSize = Math.Min(batchSize, maxParams / columns.Count);

// The StringBuilder can be resuse between the batches.
// The StringBuilder can be resuse between the batches.
var sb = new StringBuilder();

var columnList = tableInfo.GetColumns(options.CopyGeneratedColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ internal Task BulkInsert<T>(
) where T : class;

SqlDialectBuilder SqlDialect { get; }

/// <summary>
/// Make the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
/// </summary>
internal BulkInsertOptions InternalCreateDefaultOptions();
}
37 changes: 28 additions & 9 deletions src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

namespace PhenX.EntityFrameworkCore.BulkInsert;

internal abstract class BulkInsertProviderBase<TDialect>(ILogger<BulkInsertProviderBase<TDialect>>? logger = null) : IBulkInsertProvider where TDialect : SqlDialectBuilder, new()
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>>? logger = null) : IBulkInsertProvider
where TDialect : SqlDialectBuilder, new()
where TOptions : BulkInsertOptions, new()
{
protected readonly TDialect SqlDialect = new();

Expand All @@ -24,6 +26,13 @@ namespace PhenX.EntityFrameworkCore.BulkInsert;

SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;

public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions();

/// <summary>
/// Create the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
/// </summary>
protected abstract TOptions CreateDefaultOptions();

public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
Expand All @@ -33,6 +42,11 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
OnConflictOptions? onConflict,
[EnumeratorCancellation] CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
Comment thread
SebastianStehle marked this conversation as resolved.
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities");
activity?.AddTag("tableName", tableInfo.TableName);
activity?.AddTag("synchronous", sync);
Expand All @@ -45,10 +59,10 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
Log.UsingTempTablToReturnData(logger);
}

var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);

var result =
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk)
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, providerOptions, onConflict, ctk: ctk)
?? throw new InvalidOperationException("Copy returns null enumerable.");

await foreach (var item in result.WithCancellation(ctk))
Expand All @@ -74,6 +88,11 @@ public virtual async Task BulkInsert<T>(
OnConflictOptions? onConflict,
CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert");
activity?.AddTag("tableName", tableInfo.TableName);
activity?.AddTag("synchronous", sync);
Expand All @@ -88,9 +107,9 @@ public virtual async Task BulkInsert<T>(
Log.UsingTempTableToResolveConflicts(logger);
}

var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);

await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk);
}
else
{
Expand All @@ -99,7 +118,7 @@ public virtual async Task BulkInsert<T>(
Log.UsingDirectInsert(logger);
}

await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: false, ctk: ctk);
}

// Commit the transaction if we own them.
Expand All @@ -116,7 +135,7 @@ private async Task<string> PerformBulkInsertAsync<T>(
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
TOptions options,
bool tempTableRequired,
CancellationToken ctk) where T : class
{
Expand Down Expand Up @@ -149,7 +168,7 @@ protected abstract Task BulkInsert<T>(
IEnumerable<T> entities,
string tableName,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
TOptions options,
CancellationToken ctk) where T : class;

protected async Task<string> CreateTableCopyAsync<T>(
Expand Down Expand Up @@ -187,7 +206,7 @@ protected virtual async Task AddBulkInsertIdColumn<T>(
TableMetadata tableInfo,
string tempTableName,
bool returnData,
BulkInsertOptions options,
TOptions options,
OnConflictOptions? onConflict,
CancellationToken ctk) where T : class where TResult : class
{
Expand Down
Loading