Skip to content

Commit 8891199

Browse files
committed
Merge branch 'master' into feature/add-linq2db-benchmark
# Conflicts: # tests/PhenX.EntityFrameworkCore.BulkInsert.Benchmark/LibComparator.cs # tests/PhenX.EntityFrameworkCore.BulkInsert.Benchmark/PhenX.EntityFrameworkCore.BulkInsert.Benchmark.csproj
2 parents 4e54217 + b2991fb commit 8891199

50 files changed

Lines changed: 1841 additions & 771 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,6 @@ fabric.properties
8888

8989
# Nuget assets
9090
/nupkgs
91+
92+
# Visual Studio Files
93+
.vs

PhenX.EntityFrameworkCore.BulkInsert.sln

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.12.35707.178 d17.12
5+
MinimumVisualStudioVersion = 10.0.40219.1
36
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert", "src\PhenX.EntityFrameworkCore.BulkInsert\PhenX.EntityFrameworkCore.BulkInsert.csproj", "{56CA0AE2-6EAB-4394-9E06-132558551251}"
47
EndProject
58
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert.PostgreSql", "src\PhenX.EntityFrameworkCore.BulkInsert.PostgreSql\PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.csproj", "{F37308A8-1C3C-44D2-9440-670DF76A8C31}"
@@ -30,6 +33,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "root", "root", "{45366E91-4
3033
README.md = README.md
3134
EndProjectSection
3235
EndProject
36+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert.MySql", "src\PhenX.EntityFrameworkCore.BulkInsert.MySql\PhenX.EntityFrameworkCore.BulkInsert.MySql.csproj", "{17649766-EA68-4333-8DA8-47B014A8B2CC}"
37+
EndProject
3338
Global
3439
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3540
Debug|Any CPU = Debug|Any CPU
@@ -60,6 +65,13 @@ Global
6065
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
6166
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
6267
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Release|Any CPU.Build.0 = Release|Any CPU
68+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
69+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
70+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
71+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Release|Any CPU.Build.0 = Release|Any CPU
72+
EndGlobalSection
73+
GlobalSection(SolutionProperties) = preSolution
74+
HideSolutionNode = FALSE
6375
EndGlobalSection
6476
GlobalSection(NestedProjects) = preSolution
6577
{56CA0AE2-6EAB-4394-9E06-132558551251} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
@@ -68,5 +80,6 @@ Global
6880
{EDCCED5F-D456-45E2-81A6-1077977F042B} = {F8A83782-311C-454D-8B97-B3FB86478BF4}
6981
{E4EB1C53-575C-45F8-924A-93DC42E8ACCA} = {F8A83782-311C-454D-8B97-B3FB86478BF4}
7082
{450E859C-411F-4D67-A0B4-4E02C3D30E14} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
83+
{17649766-EA68-4333-8DA8-47B014A8B2CC} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
7184
EndGlobalSection
7285
EndGlobal
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.Storage;
3+
using Microsoft.Extensions.Logging;
4+
5+
using MySqlConnector;
6+
7+
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
8+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
9+
10+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
11+
12+
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder>
13+
{
14+
public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : base(logger)
15+
{
16+
}
17+
18+
//language=sql
19+
/// <inheritdoc />
20+
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT AUTO_INCREMENT PRIMARY KEY;";
21+
22+
/// <inheritdoc />
23+
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
24+
25+
/// <inheritdoc />
26+
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
27+
bool sync,
28+
DbContext context,
29+
TableMetadata tableInfo,
30+
IEnumerable<T> entities,
31+
BulkInsertOptions options,
32+
OnConflictOptions? onConflict = null,
33+
CancellationToken ctk = default)
34+
{
35+
throw new NotSupportedException("Provider does not support returning entities.");
36+
}
37+
38+
/// <inheritdoc />
39+
protected override async Task BulkInsert<T>(
40+
bool sync,
41+
DbContext context,
42+
TableMetadata tableInfo,
43+
IEnumerable<T> entities,
44+
string tableName,
45+
IReadOnlyList<ColumnMetadata> properties,
46+
BulkInsertOptions options,
47+
CancellationToken ctk
48+
)
49+
{
50+
var connection = (MySqlConnection)context.Database.GetDbConnection();
51+
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction()
52+
?? throw new InvalidOperationException("No open transaction found.");
53+
if (sqlTransaction is not MySqlTransaction mySqlTransaction)
54+
{
55+
throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}.");
56+
}
57+
58+
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction);
59+
bulkCopy.DestinationTableName = tableName;
60+
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
61+
62+
var sourceOrdinal = 0;
63+
foreach (var prop in properties)
64+
{
65+
bulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(sourceOrdinal, prop.ColumnName));
66+
sourceOrdinal++;
67+
}
68+
69+
if (sync)
70+
{
71+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
72+
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, properties));
73+
}
74+
else
75+
{
76+
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
77+
}
78+
}
79+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Microsoft.EntityFrameworkCore;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Extensions;
4+
5+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
6+
7+
/// <summary>
8+
/// DbContext options extension for MySql.
9+
/// </summary>
10+
public static class MySqlDbContextOptionsExtensions
11+
{
12+
/// <summary>
13+
/// Configures the DbContext to use the MySql bulk insert provider.
14+
/// </summary>
15+
public static DbContextOptionsBuilder UseBulkInsertMySql(this DbContextOptionsBuilder optionsBuilder)
16+
{
17+
return optionsBuilder.UseProvider<MySqlBulkInsertProvider>();
18+
}
19+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System.Text;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
4+
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
5+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
6+
7+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
8+
9+
internal class MySqlServerDialectBuilder : SqlDialectBuilder
10+
{
11+
protected override string OpenDelimiter => "`";
12+
13+
protected override string CloseDelimiter => "`";
14+
15+
protected override bool SupportsMoveRows => false;
16+
17+
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
18+
{
19+
return $"CREATE TEMPORARY TABLE {tempNameName} SELECT * FROM {tableInfo.QuotedTableName} WHERE 1 = 0;";
20+
}
21+
22+
protected override void AppendConflictCondition<T>(StringBuilder sql, OnConflictOptions<T> onConflictTyped)
23+
{
24+
throw new NotSupportedException("Conflict conditions are not supported in MYSQL");
25+
}
26+
27+
protected override void AppendOnConflictUpdate(StringBuilder sql, IEnumerable<string> updates)
28+
{
29+
sql.AppendLine("UPDATE");
30+
31+
var i = 0;
32+
foreach (var update in updates)
33+
{
34+
if (i > 0)
35+
{
36+
sql.Append(", ");
37+
}
38+
39+
sql.Append(update);
40+
i++;
41+
}
42+
}
43+
44+
protected override void AppendOnConflictStatement(StringBuilder sql)
45+
{
46+
sql.Append("ON DUPLICATE KEY");
47+
}
48+
49+
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<ColumnMetadata> insertedColumns)
50+
{
51+
var columnName = insertedColumns.First().ColumnName;
52+
53+
sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
54+
}
55+
56+
protected override string GetExcludedColumnName(string columnName)
57+
{
58+
return $"VALUES({Quote(columnName)})";
59+
}
60+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<!-- Ignore error for preview version of Pomelo.EntityFrameworkCore.MySql -->
4+
<NoWarn>$(NoWarn);NU5104</NoWarn>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<ProjectReference Include="..\PhenX.EntityFrameworkCore.BulkInsert\PhenX.EntityFrameworkCore.BulkInsert.csproj" />
9+
</ItemGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Condition="'$(TargetFramework)' == 'net8.0'" Version="8.0.3" />
13+
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Condition="'$(TargetFramework)' == 'net9.0'" Version="9.0.0-preview.3.efcore.9.0.0" />
14+
</ItemGroup>
15+
16+
</Project>

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
using System.Text;
2+
13
using JetBrains.Annotations;
24

35
using Microsoft.EntityFrameworkCore;
46
using Microsoft.Extensions.Logging;
57

68
using Npgsql;
79

10+
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
811
using PhenX.EntityFrameworkCore.BulkInsert.Options;
912

1013
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
@@ -16,39 +19,37 @@ public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logge
1619
{
1720
}
1821

19-
//language=sql
20-
/// <inheritdoc />
21-
protected override string CreateTableCopySql => "CREATE TEMPORARY TABLE {0} AS TABLE {1} WITH NO DATA;";
22-
2322
//language=sql
2423
/// <inheritdoc />
2524
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";
2625

27-
private string GetBinaryImportCommand(DbContext context, Type entityType, string tableName)
26+
private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> properties, string tableName)
2827
{
29-
var columns = GetQuotedColumns(context, entityType, false);
30-
31-
return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)";
28+
var sql = new StringBuilder();
29+
sql.Append($"COPY {tableName} (");
30+
sql.AppendColumns(properties);
31+
sql.Append(") FROM STDIN (FORMAT BINARY)");
32+
return sql.ToString();
3233
}
3334

3435
/// <inheritdoc />
3536
protected override async Task BulkInsert<T>(
3637
bool sync,
3738
DbContext context,
39+
TableMetadata tableInfo,
3840
IEnumerable<T> entities,
3941
string tableName,
40-
PropertyAccessor[] properties,
42+
IReadOnlyList<ColumnMetadata> columns,
4143
BulkInsertOptions options,
42-
CancellationToken ctk) where T : class
44+
CancellationToken ctk)
4345
{
4446
var connection = (NpgsqlConnection)context.Database.GetDbConnection();
45-
46-
var importCommand = GetBinaryImportCommand(context, typeof(T), tableName);
47+
var command = GetBinaryImportCommand(columns, tableName);
4748

4849
var writer = sync
4950
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
50-
? connection.BeginBinaryImport(importCommand)
51-
: await connection.BeginBinaryImportAsync(importCommand, ctk);
51+
? connection.BeginBinaryImport(command)
52+
: await connection.BeginBinaryImportAsync(command, ctk);
5253

5354
foreach (var entity in entities)
5455
{
@@ -62,9 +63,9 @@ protected override async Task BulkInsert<T>(
6263
await writer.StartRowAsync(ctk);
6364
}
6465

65-
foreach (var property in properties)
66+
foreach (var column in columns)
6667
{
67-
var value = property.GetValue(entity);
68+
var value = column.GetValue(entity);
6869

6970
if (sync)
7071
{
Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
using Microsoft.EntityFrameworkCore;
2-
using Microsoft.EntityFrameworkCore.Infrastructure;
1+
using Microsoft.EntityFrameworkCore;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Extensions;
34

45
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
56

@@ -13,10 +14,6 @@ public static class PostgreSqlDbContextOptionsExtensions
1314
/// </summary>
1415
public static DbContextOptionsBuilder UseBulkInsertPostgreSql(this DbContextOptionsBuilder optionsBuilder)
1516
{
16-
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<PostgreSqlBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<PostgreSqlBulkInsertProvider>();
17-
18-
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);
19-
20-
return optionsBuilder;
17+
return optionsBuilder.UseProvider<PostgreSqlBulkInsertProvider>();
2118
}
2219
}
Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,37 @@
1-
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
1+
using System.Text;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
4+
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
5+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
26

37
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
48

59
internal class PostgreSqlDialectBuilder : SqlDialectBuilder
610
{
711
protected override string OpenDelimiter => "\"";
812
protected override string CloseDelimiter => "\"";
13+
14+
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
15+
{
16+
return $"CREATE TEMPORARY TABLE {tempNameName} AS TABLE {tableInfo.QuotedTableName} WITH NO DATA;";
17+
}
18+
19+
protected override void AppendConflictMatch<T>(StringBuilder sql, TableMetadata target, OnConflictOptions<T> conflict)
20+
{
21+
if (conflict.Match != null)
22+
{
23+
base.AppendConflictMatch(sql, target, conflict);
24+
}
25+
else if (target.PrimaryKey.Count > 0)
26+
{
27+
sql.Append(' ');
28+
sql.AppendLine("(");
29+
sql.AppendColumns(target.PrimaryKey);
30+
sql.AppendLine(")");
31+
}
32+
else
33+
{
34+
throw new InvalidOperationException("Table has no primary key that can be used for conflict detection.");
35+
}
36+
}
937
}

0 commit comments

Comments
 (0)