Skip to content

Commit bfa007e

Browse files
authored
Merge pull request #15 from SebastianStehle/mysql
MySql support.
2 parents 400da75 + 303e0b7 commit bfa007e

29 files changed

Lines changed: 774 additions & 200 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,6 @@ fabric.properties
8888

8989
# Nuget assets
9090
/nupkgs
91+
92+
# Visual Studio Files
93+
.vs

PhenX.EntityFrameworkCore.BulkInsert.sln

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.12.35707.178 d17.12
5+
MinimumVisualStudioVersion = 10.0.40219.1
36
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert", "src\PhenX.EntityFrameworkCore.BulkInsert\PhenX.EntityFrameworkCore.BulkInsert.csproj", "{56CA0AE2-6EAB-4394-9E06-132558551251}"
47
EndProject
58
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert.PostgreSql", "src\PhenX.EntityFrameworkCore.BulkInsert.PostgreSql\PhenX.EntityFrameworkCore.BulkInsert.PostgreSql.csproj", "{F37308A8-1C3C-44D2-9440-670DF76A8C31}"
@@ -30,6 +33,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "root", "root", "{45366E91-4
3033
README.md = README.md
3134
EndProjectSection
3235
EndProject
36+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PhenX.EntityFrameworkCore.BulkInsert.MySql", "src\PhenX.EntityFrameworkCore.BulkInsert.MySql\PhenX.EntityFrameworkCore.BulkInsert.MySql.csproj", "{17649766-EA68-4333-8DA8-47B014A8B2CC}"
37+
EndProject
3338
Global
3439
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3540
Debug|Any CPU = Debug|Any CPU
@@ -60,6 +65,13 @@ Global
6065
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
6166
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
6267
{450E859C-411F-4D67-A0B4-4E02C3D30E14}.Release|Any CPU.Build.0 = Release|Any CPU
68+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
69+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
70+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
71+
{17649766-EA68-4333-8DA8-47B014A8B2CC}.Release|Any CPU.Build.0 = Release|Any CPU
72+
EndGlobalSection
73+
GlobalSection(SolutionProperties) = preSolution
74+
HideSolutionNode = FALSE
6375
EndGlobalSection
6476
GlobalSection(NestedProjects) = preSolution
6577
{56CA0AE2-6EAB-4394-9E06-132558551251} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
@@ -68,5 +80,6 @@ Global
6880
{EDCCED5F-D456-45E2-81A6-1077977F042B} = {F8A83782-311C-454D-8B97-B3FB86478BF4}
6981
{E4EB1C53-575C-45F8-924A-93DC42E8ACCA} = {F8A83782-311C-454D-8B97-B3FB86478BF4}
7082
{450E859C-411F-4D67-A0B4-4E02C3D30E14} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
83+
{17649766-EA68-4333-8DA8-47B014A8B2CC} = {CBEBA2A8-79E0-412E-93C1-C88F4473D78B}
7184
EndGlobalSection
7285
EndGlobal
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.Storage;
3+
using Microsoft.Extensions.Logging;
4+
5+
using MySqlConnector;
6+
7+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
8+
9+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
10+
11+
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder>
12+
{
13+
public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : base(logger)
14+
{
15+
}
16+
17+
//language=sql
18+
/// <inheritdoc />
19+
protected override string CreateTableCopySql => "CREATE TEMPORARY TABLE {0} SELECT * FROM {1} WHERE 1 = 0;";
20+
21+
//language=sql
22+
/// <inheritdoc />
23+
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT AUTO_INCREMENT PRIMARY KEY;";
24+
25+
/// <inheritdoc />
26+
protected override string GetTempTableName(string tableName) => $"#_temp_bulk_insert_{tableName}";
27+
28+
/// <inheritdoc />
29+
public override Task<List<T>> BulkInsertReturnEntities<T>(
30+
bool sync,
31+
DbContext context,
32+
IEnumerable<T> entities,
33+
BulkInsertOptions options,
34+
OnConflictOptions? onConflict = null,
35+
CancellationToken ctk = default)
36+
{
37+
throw new NotSupportedException("Provider does not support returning entities.");
38+
}
39+
40+
/// <inheritdoc />
41+
protected override async Task BulkInsert<T>(
42+
bool sync,
43+
DbContext context,
44+
IEnumerable<T> entities,
45+
string tableName,
46+
PropertyAccessor[] properties,
47+
BulkInsertOptions options,
48+
CancellationToken ctk
49+
)
50+
{
51+
var connection = (MySqlConnection)context.Database.GetDbConnection();
52+
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction()
53+
?? throw new InvalidOperationException("No open transaction found.");
54+
if (sqlTransaction is not MySqlTransaction mySqlTransaction)
55+
{
56+
throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}.");
57+
}
58+
59+
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction);
60+
bulkCopy.DestinationTableName = tableName;
61+
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
62+
63+
var sourceOrdinal = 0;
64+
foreach (var prop in properties)
65+
{
66+
bulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(sourceOrdinal, prop.ColumnName));
67+
sourceOrdinal++;
68+
}
69+
70+
if (sync)
71+
{
72+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
73+
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, properties));
74+
}
75+
else
76+
{
77+
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
78+
}
79+
}
80+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.Infrastructure;
3+
4+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
5+
6+
/// <summary>
7+
/// DbContext options extension for MySql.
8+
/// </summary>
9+
public static class MySqlDbContextOptionsExtensions
10+
{
11+
/// <summary>
12+
/// Configures the DbContext to use the MySql bulk insert provider.
13+
/// </summary>
14+
public static DbContextOptionsBuilder UseBulkInsertMySql(this DbContextOptionsBuilder optionsBuilder)
15+
{
16+
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<MySqlBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<MySqlBulkInsertProvider>();
17+
18+
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);
19+
20+
return optionsBuilder;
21+
}
22+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System.Text;
2+
3+
using Microsoft.EntityFrameworkCore;
4+
using Microsoft.EntityFrameworkCore.Metadata;
5+
6+
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
7+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
8+
9+
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
10+
11+
internal class MySqlServerDialectBuilder : SqlDialectBuilder
12+
{
13+
protected override string OpenDelimiter => "`";
14+
15+
protected override string CloseDelimiter => "`";
16+
17+
protected override bool SupportsMoveRows => false;
18+
19+
protected override void AppendConflictCondition<T>(StringBuilder sql, OnConflictOptions<T> onConflictTyped)
20+
{
21+
throw new NotSupportedException("Conflict conditions are not supported in MYSQL");
22+
}
23+
24+
protected override void AppendOnConflictUpdate(StringBuilder sql, IEnumerable<string> updates)
25+
{
26+
sql.AppendLine("UPDATE");
27+
28+
var i = 0;
29+
foreach (var update in updates)
30+
{
31+
if (i > 0)
32+
{
33+
sql.Append(", ");
34+
}
35+
36+
sql.Append(update);
37+
i++;
38+
}
39+
}
40+
41+
protected override void AppendOnConflictStatement(StringBuilder sql)
42+
{
43+
sql.Append("ON DUPLICATE KEY");
44+
}
45+
46+
protected override void AppendDoNothing(StringBuilder sql, IProperty[] insertedProperties)
47+
{
48+
var columnName = insertedProperties[0].GetColumnName();
49+
50+
sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
51+
}
52+
53+
protected override string GetExcludedColumnName(string columnName)
54+
{
55+
return $"VALUES({Quote(columnName)})";
56+
}
57+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<ItemGroup>
4+
<ProjectReference Include="..\PhenX.EntityFrameworkCore.BulkInsert\PhenX.EntityFrameworkCore.BulkInsert.csproj" />
5+
</ItemGroup>
6+
7+
<ItemGroup>
8+
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Condition="'$(TargetFramework)' == 'net8.0'" Version="8.0.3" />
9+
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Condition="'$(TargetFramework)' == 'net9.0'" Version="9.0.0-preview.3.efcore.9.0.0" />
10+
</ItemGroup>
11+
12+
</Project>

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ CancellationToken ctk
4848

4949
foreach (var prop in properties)
5050
{
51-
bulkCopy.ColumnMappings.Add(prop.Name, SqlDialect.Quote(prop.ColumnName));
51+
bulkCopy.ColumnMappings.Add(prop.Name, prop.ColumnName);
5252
}
5353

5454
if (sync)

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerDialectBuilder.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System.Linq.Expressions;
1+
using System.Linq.Expressions;
22
using System.Text;
33

44
using Microsoft.EntityFrameworkCore;
@@ -31,6 +31,11 @@ public override string BuildMoveDataSql<T>(DbContext context, string source,
3131

3232
var q = new StringBuilder();
3333

34+
if (options.CopyGeneratedColumns)
35+
{
36+
q.AppendLine($"SET IDENTITY_INSERT {target} ON;");
37+
}
38+
3439
// Merge handling
3540
if (onConflict is OnConflictOptions<T> onConflictTyped && onConflictTyped.Match != null)
3641
{
@@ -39,7 +44,7 @@ public override string BuildMoveDataSql<T>(DbContext context, string source,
3944
matchColumns.Select(col => $"TARGET.{col} = SOURCE.{col}"));
4045

4146
var updateSet = onConflictTyped.Update != null
42-
? string.Join(", ", GetUpdates(context, onConflictTyped.Update))
47+
? string.Join(", ", GetUpdates(context, insertedProperties, onConflictTyped.Update))
4348
: null;
4449

4550
q.AppendLine($"MERGE INTO {target} AS TARGET");
@@ -79,11 +84,16 @@ public override string BuildMoveDataSql<T>(DbContext context, string source,
7984

8085
q.AppendLine(";");
8186

87+
if (options.CopyGeneratedColumns)
88+
{
89+
q.AppendLine($"SET IDENTITY_INSERT {target} OFF;");
90+
}
91+
8292
return q.ToString();
8393
}
8494

85-
protected override string GetExcludedColumnName<TEntity>(DbContext context, MemberExpression member)
95+
protected override string GetExcludedColumnName(string columnName)
8696
{
87-
return $"SOURCE.{GetColumnName<TEntity>(context, member.Member.Name)}";
97+
return $"SOURCE.{columnName}";
8898
}
8999
}

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ private static SqliteType GetSqliteType(Type clrType)
8282
}
8383

8484
private DbCommand GetInsertCommand(DbContext context, Type entityType, string tableName,
85+
BulkInsertOptions options,
8586
int batchSize)
8687
{
87-
var columns = context.GetProperties(entityType, false);
88+
var columns = context.GetProperties(entityType, options.CopyGeneratedColumns);
8889
var cmd = context.Database.GetDbConnection().CreateCommand();
8990

9091
var sqliteColumns = columns
@@ -136,7 +137,7 @@ CancellationToken ctk
136137
var batchSize = options.BatchSize ?? 5;
137138
batchSize = Math.Min(batchSize, maxParams / properties.Length);
138139

139-
await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, batchSize);
140+
await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, options, batchSize);
140141

141142
foreach (var chunk in entities.Chunk(batchSize))
142143
{
@@ -149,7 +150,7 @@ CancellationToken ctk
149150
// Last chunk
150151
else
151152
{
152-
var partialInsertCommand = GetInsertCommand(context, typeof(T), tableName, chunk.Length);
153+
var partialInsertCommand = GetInsertCommand(context, typeof(T), tableName, options, chunk.Length);
153154

154155
FillValues(chunk, partialInsertCommand.Parameters, properties);
155156
await ExecuteCommand(sync, partialInsertCommand, ctk);

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ protected BulkInsertProviderBase(ILogger<BulkInsertProviderBase<TDialect>>? logg
3131
protected async Task<string> CreateTableCopyAsync<T>(
3232
bool sync,
3333
DbContext context,
34+
BulkInsertOptions options,
3435
CancellationToken cancellationToken = default) where T : class
3536
{
3637
var tableInfo = GetTableInfo(context, typeof(T));
3738
var tableName = QuoteTableName(tableInfo.SchemaName, tableInfo.TableName);
3839
var tempTableName = QuoteTableName(null, GetTempTableName(tableInfo.TableName));
3940

40-
var keptColumns = string.Join(", ", GetQuotedColumns(context, typeof(T), false));
41+
var keptColumns = string.Join(", ", GetQuotedColumns(context, typeof(T), options.CopyGeneratedColumns));
4142
var query = string.Format(CreateTableCopySql, tempTableName, tableName, keptColumns);
4243

4344
await ExecuteAsync(sync, context, query, cancellationToken);
@@ -108,13 +109,21 @@ private async Task<List<TResult>> CopyFromTempTableWithoutKeysAsync<T, TResult>(
108109
{
109110
var (schemaName, tableName, _) = GetTableInfo(context, typeof(T));
110111
var quotedTableName = QuoteTableName(schemaName, tableName);
111-
112-
var movedProperties = context.GetProperties(typeof(T), false);
112+
var movedProperties = context.GetProperties(typeof(T), options.CopyGeneratedColumns);
113113
var returnedProperties = returnData ? context.GetProperties(typeof(T)) : [];
114114

115115
var query = SqlDialect.BuildMoveDataSql<T>(context, tempTableName, quotedTableName, movedProperties, returnedProperties, options, onConflict);
116116

117117
if (returnData)
118+
{
119+
return await QueryAsync(sync, context, query, cancellationToken);
120+
}
121+
122+
// If not returning data, just execute the command
123+
await ExecuteAsync(sync, context, query, cancellationToken);
124+
return [];
125+
126+
static async Task<List<TResult>> QueryAsync(bool sync, DbContext context, string query, CancellationToken cancellationToken)
118127
{
119128
// Use EF to execute the query and return the results
120129
IQueryable<TResult> queryable = context
@@ -128,13 +137,9 @@ private async Task<List<TResult>> CopyFromTempTableWithoutKeysAsync<T, TResult>(
128137

129138
return await queryable.ToListAsync(cancellationToken: cancellationToken);
130139
}
131-
132-
// If not returning data, just execute the command
133-
await ExecuteAsync(sync, context, query, cancellationToken);
134-
return [];
135140
}
136141

137-
public async Task<List<T>> BulkInsertReturnEntities<T>(
142+
public virtual async Task<List<T>> BulkInsertReturnEntities<T>(
138143
bool sync,
139144
DbContext context,
140145
IEnumerable<T> entities,
@@ -163,10 +168,12 @@ private static async Task Finish(bool sync, DbConnection connection, bool wasClo
163168
{
164169
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
165170
transaction.Commit();
171+
transaction.Dispose();
166172
}
167173
else
168174
{
169175
await transaction.CommitAsync(ctk);
176+
await transaction.DisposeAsync();
170177
}
171178
}
172179

@@ -184,7 +191,7 @@ private static async Task Finish(bool sync, DbConnection connection, bool wasClo
184191
}
185192
}
186193

187-
public async Task BulkInsert<T>(
194+
public virtual async Task BulkInsert<T>(
188195
bool sync,
189196
DbContext context,
190197
IEnumerable<T> entities,
@@ -225,11 +232,11 @@ public async Task BulkInsert<T>(
225232
var (connection, wasClosed, transaction, wasBegan) = await context.GetConnection(sync, ctk);
226233

227234
var tableName = tempTableRequired
228-
? await CreateTableCopyAsync<T>(sync, context, ctk)
235+
? await CreateTableCopyAsync<T>(sync, context, options, ctk)
229236
: GetQuotedTableName(context, typeof(T));
230237

231238
var properties = context
232-
.GetProperties(typeof(T), false)
239+
.GetProperties(typeof(T), options.CopyGeneratedColumns)
233240
.Select(p => new PropertyAccessor(p))
234241
.ToArray();
235242

0 commit comments

Comments
 (0)