Skip to content

Commit c02b17c

Browse files
author
fabien.menager
committed
Merge branch 'master' into feature/provider-options
# Conflicts: # src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs # src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs # src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/DbSetExtensions.cs # tests/PhenX.EntityFrameworkCore.BulkInsert.Tests/Tests/Basic/BasicTestsBase.cs
2 parents 200cc0a + b2991fb commit c02b17c

24 files changed

Lines changed: 812 additions & 658 deletions

File tree

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
1515
{
1616
}
1717

18-
//language=sql
19-
/// <inheritdoc />
20-
protected override string CreateTableCopySql => "CREATE TEMPORARY TABLE {0} SELECT * FROM {1} WHERE 1 = 0;";
21-
2218
//language=sql
2319
/// <inheritdoc />
2420
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT AUTO_INCREMENT PRIMARY KEY;";
@@ -30,7 +26,7 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
3026
protected override MySqlBulkInsertOptions CreateDefaultOptions() => new();
3127

3228
/// <inheritdoc />
33-
public override Task<List<T>> BulkInsertReturnEntities<T>(
29+
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
3430
bool sync,
3531
DbContext context,
3632
TableMetadata tableInfo,
@@ -49,7 +45,7 @@ protected override async Task BulkInsert<T>(
4945
TableMetadata tableInfo,
5046
IEnumerable<T> entities,
5147
string tableName,
52-
IReadOnlyList<PropertyMetadata> properties,
48+
IReadOnlyList<ColumnMetadata> properties,
5349
MySqlBulkInsertOptions options,
5450
CancellationToken ctk
5551
)

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlDialectBuilder.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ internal class MySqlServerDialectBuilder : SqlDialectBuilder
1414

1515
protected override bool SupportsMoveRows => false;
1616

17+
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
18+
{
19+
return $"CREATE TEMPORARY TABLE {tempNameName} SELECT * FROM {tableInfo.QuotedTableName} WHERE 1 = 0;";
20+
}
21+
1722
protected override void AppendConflictCondition<T>(StringBuilder sql, OnConflictOptions<T> onConflictTyped)
1823
{
1924
throw new NotSupportedException("Conflict conditions are not supported in MYSQL");
@@ -41,9 +46,9 @@ protected override void AppendOnConflictStatement(StringBuilder sql)
4146
sql.Append("ON DUPLICATE KEY");
4247
}
4348

44-
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<PropertyMetadata> insertedProperties)
49+
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<ColumnMetadata> insertedColumns)
4550
{
46-
var columnName = insertedProperties.First().ColumnName;
51+
var columnName = insertedColumns.First().ColumnName;
4752

4853
sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
4954
}

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/PhenX.EntityFrameworkCore.BulkInsert.MySql.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<!-- Ignore error for preview version of Pomelo.EntityFrameworkCore.MySql -->
4+
<NoWarn>$(NoWarn);NU5104</NoWarn>
5+
</PropertyGroup>
26

37
<ItemGroup>
48
<ProjectReference Include="..\PhenX.EntityFrameworkCore.BulkInsert\PhenX.EntityFrameworkCore.BulkInsert.csproj" />

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Text;
2+
13
using JetBrains.Annotations;
24

35
using Microsoft.EntityFrameworkCore;
@@ -17,19 +19,17 @@ public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logge
1719
{
1820
}
1921

20-
//language=sql
21-
/// <inheritdoc />
22-
protected override string CreateTableCopySql => "CREATE TEMPORARY TABLE {0} AS TABLE {1} WITH NO DATA;";
23-
2422
//language=sql
2523
/// <inheritdoc />
2624
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";
2725

28-
private static string GetBinaryImportCommand(TableMetadata tableInfo, string tableName)
26+
private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> properties, string tableName)
2927
{
30-
var columns = tableInfo.GetProperties(false).Select(X => X.QuotedColumName);
31-
32-
return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)";
28+
var sql = new StringBuilder();
29+
sql.Append($"COPY {tableName} (");
30+
sql.AppendColumns(properties);
31+
sql.Append(") FROM STDIN (FORMAT BINARY)");
32+
return sql.ToString();
3333
}
3434

3535
/// <inheritdoc />
@@ -45,18 +45,17 @@ protected override async Task BulkInsert<T>(
4545
TableMetadata tableInfo,
4646
IEnumerable<T> entities,
4747
string tableName,
48-
IReadOnlyList<PropertyMetadata> properties,
48+
IReadOnlyList<ColumnMetadata> columns,
4949
BulkInsertOptions options,
5050
CancellationToken ctk)
5151
{
5252
var connection = (NpgsqlConnection)context.Database.GetDbConnection();
53-
54-
var importCommand = GetBinaryImportCommand(tableInfo, tableName);
53+
var command = GetBinaryImportCommand(columns, tableName);
5554

5655
var writer = sync
5756
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
58-
? connection.BeginBinaryImport(importCommand)
59-
: await connection.BeginBinaryImportAsync(importCommand, ctk);
57+
? connection.BeginBinaryImport(command)
58+
: await connection.BeginBinaryImportAsync(command, ctk);
6059

6160
foreach (var entity in entities)
6261
{
@@ -70,9 +69,9 @@ protected override async Task BulkInsert<T>(
7069
await writer.StartRowAsync(ctk);
7170
}
7271

73-
foreach (var property in properties)
72+
foreach (var column in columns)
7473
{
75-
var value = property.GetValue(entity);
74+
var value = column.GetValue(entity);
7675

7776
if (sync)
7877
{
Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,37 @@
1-
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
1+
using System.Text;
2+
3+
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
4+
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
5+
using PhenX.EntityFrameworkCore.BulkInsert.Options;
26

37
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
48

59
internal class PostgreSqlDialectBuilder : SqlDialectBuilder
610
{
711
protected override string OpenDelimiter => "\"";
812
protected override string CloseDelimiter => "\"";
13+
14+
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
15+
{
16+
return $"CREATE TEMPORARY TABLE {tempNameName} AS TABLE {tableInfo.QuotedTableName} WITH NO DATA;";
17+
}
18+
19+
protected override void AppendConflictMatch<T>(StringBuilder sql, TableMetadata target, OnConflictOptions<T> conflict)
20+
{
21+
if (conflict.Match != null)
22+
{
23+
base.AppendConflictMatch(sql, target, conflict);
24+
}
25+
else if (target.PrimaryKey.Count > 0)
26+
{
27+
sql.Append(' ');
28+
sql.AppendLine("(");
29+
sql.AppendColumns(target.PrimaryKey);
30+
sql.AppendLine(")");
31+
}
32+
else
33+
{
34+
throw new InvalidOperationException("Table has no primary key that can be used for conflict detection.");
35+
}
36+
}
937
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
1717
{
1818
}
1919

20-
//language=sql
21-
/// <inheritdoc />
22-
protected override string CreateTableCopySql => "SELECT {2} INTO {0} FROM {1} WHERE 1 = 0;";
23-
2420
//language=sql
2521
/// <inheritdoc />
2622
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT IDENTITY PRIMARY KEY;";
@@ -40,7 +36,7 @@ protected override async Task BulkInsert<T>(
4036
TableMetadata tableInfo,
4137
IEnumerable<T> entities,
4238
string tableName,
43-
IReadOnlyList<PropertyMetadata> properties,
39+
IReadOnlyList<ColumnMetadata> columns,
4440
SqlServerBulkInsertOptions options,
4541
CancellationToken ctk)
4642
{
@@ -54,19 +50,19 @@ protected override async Task BulkInsert<T>(
5450
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
5551
bulkCopy.EnableStreaming = options.EnableStreaming;
5652

57-
foreach (var prop in properties)
53+
foreach (var column in columns)
5854
{
59-
bulkCopy.ColumnMappings.Add(prop.Name, prop.ColumnName);
55+
bulkCopy.ColumnMappings.Add(column.PropertyName, column.ColumnName);
6056
}
6157

6258
if (sync)
6359
{
6460
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
65-
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, properties));
61+
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, columns));
6662
}
6763
else
6864
{
69-
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
65+
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, columns), ctk);
7066
}
7167
}
7268
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerDialectBuilder.cs

Lines changed: 77 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,34 @@ internal class SqlServerDialectBuilder : SqlDialectBuilder
1414

1515
protected override bool SupportsMoveRows => false;
1616

17+
public override string CreateTableCopySql(string tempTableName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
18+
{
19+
var q = new StringBuilder();
20+
q.Append($"CREATE TABLE {tempTableName} (");
21+
22+
foreach (var column in columns)
23+
{
24+
q.Append($"{column.QuotedColumName} {column.StoreDefinition}");
25+
if (column != columns[^1])
26+
{
27+
q.Append(',');
28+
}
29+
q.AppendLine();
30+
}
31+
32+
q.AppendLine(")");
33+
34+
return q.ToString();
35+
}
36+
1737
public override string BuildMoveDataSql<T>(
1838
TableMetadata target,
1939
string source,
20-
IReadOnlyList<PropertyMetadata> insertedProperties,
21-
IReadOnlyList<PropertyMetadata> properties,
40+
IReadOnlyList<ColumnMetadata> insertedColumns,
41+
IReadOnlyList<ColumnMetadata> returnedColumns,
2242
BulkInsertOptions options,
2343
OnConflictOptions? onConflict = null)
2444
{
25-
var insertedColumns = insertedProperties.Select(x => x.QuotedColumName);
26-
var insertedColumnList = string.Join(", ", insertedColumns);
27-
28-
var returnedColumns = properties.Select(p => $"INSERTED.{p.ColumnName} AS {p.ColumnName}");
29-
var columnList = string.Join(", ", returnedColumns);
30-
3145
var q = new StringBuilder();
3246

3347
if (options.CopyGeneratedColumns)
@@ -36,49 +50,78 @@ public override string BuildMoveDataSql<T>(
3650
}
3751

3852
// Merge handling
39-
if (onConflict is OnConflictOptions<T> onConflictTyped && onConflictTyped.Match != null)
53+
if (onConflict is OnConflictOptions<T> onConflictTyped)
4054
{
41-
var matchColumns = GetColumns(target, onConflictTyped.Match);
42-
var matchOn = string.Join(" AND ",
43-
matchColumns.Select(col => $"TARGET.{col} = SOURCE.{col}"));
44-
45-
var updateSet = onConflictTyped.Update != null
46-
? string.Join(", ", GetUpdates(target, insertedProperties, onConflictTyped.Update))
47-
: null;
55+
IEnumerable<string> matchColumns;
56+
if (onConflictTyped.Match != null)
57+
{
58+
matchColumns = GetColumns(target, onConflictTyped.Match);
59+
}
60+
else if (target.PrimaryKey.Count > 0)
61+
{
62+
matchColumns = target.PrimaryKey.Select(x => x.QuotedColumName);
63+
}
64+
else
65+
{
66+
throw new InvalidOperationException("Table has no primary key that can be used for conflict detection.");
67+
}
4868

4969
q.AppendLine($"MERGE INTO {target.QuotedTableName} AS TARGET");
50-
q.AppendLine(
51-
$"USING (SELECT {string.Join(", ", insertedColumns)} FROM {source}) AS SOURCE ({insertedColumnList})");
52-
q.AppendLine($"ON {matchOn}");
5370

54-
if (updateSet != null)
71+
q.Append("USING (SELECT ");
72+
q.AppendColumns(insertedColumns);
73+
q.Append($" FROM {source}) AS SOURCE (");
74+
q.AppendColumns(insertedColumns);
75+
q.AppendLine(")");
76+
77+
q.Append("ON ");
78+
q.AppendJoin($" AND ", matchColumns, (b, col) => b.Append($"TARGET.{col} = SOURCE.{col}"));
79+
q.AppendLine();
80+
81+
if (onConflictTyped.Update != null)
5582
{
56-
q.AppendLine($"WHEN MATCHED THEN UPDATE SET {updateSet}");
83+
var columns = target.GetColumns(false);
84+
85+
q.AppendLine($"WHEN MATCHED THEN UPDATE SET ");
86+
q.AppendJoin(", ", GetUpdates(target, columns, onConflictTyped.Update));
87+
q.AppendLine();
5788
}
5889

59-
q.AppendLine(
60-
$"WHEN NOT MATCHED THEN INSERT ({insertedColumnList}) VALUES ({string.Join(", ", insertedColumns.Select(c => $"SOURCE.{c}"))})");
90+
q.Append($"WHEN NOT MATCHED THEN INSERT (");
91+
q.AppendColumns(insertedColumns);
92+
q.AppendLine(")");
93+
94+
q.Append("VALUES (");
95+
q.AppendJoin(", ", insertedColumns, (b, col) => b.Append($"SOURCE.{col.QuotedColumName}"));
96+
q.AppendLine(")");
6197

62-
if (columnList.Length != 0)
98+
if (returnedColumns.Count != 0)
6399
{
64-
q.AppendLine($"OUTPUT {columnList}");
100+
q.Append("OUTPUT ");
101+
q.AppendJoin($", ", returnedColumns, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
102+
q.AppendLine();
65103
}
66104
}
67105

68106
// No conflict handling
69107
else
70108
{
71-
q.AppendLine($"INSERT INTO {target.QuotedTableName} ({insertedColumnList})");
109+
q.Append($"INSERT INTO {target.QuotedTableName} (");
110+
q.AppendColumns(insertedColumns);
111+
q.AppendLine(")");
72112

73-
if (columnList.Length != 0)
113+
if (returnedColumns.Count != 0)
74114
{
75-
q.AppendLine($"OUTPUT {columnList}");
115+
q.Append("OUTPUT ");
116+
q.AppendJoin($", ", returnedColumns, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
117+
q.AppendLine();
76118
}
77119

78-
q.AppendLine($"""
79-
SELECT {insertedColumnList}
80-
FROM {source}
81-
""");
120+
q.Append("SELECT ");
121+
q.AppendColumns(insertedColumns);
122+
q.AppendLine();
123+
q.Append($"FROM {source}");
124+
q.AppendLine();
82125
}
83126

84127
q.AppendLine(";");
@@ -88,7 +131,8 @@ public override string BuildMoveDataSql<T>(
88131
q.AppendLine($"SET IDENTITY_INSERT {target.QuotedTableName} OFF;");
89132
}
90133

91-
return q.ToString();
134+
var result = q.ToString();
135+
return result;
92136
}
93137

94138
protected override string GetExcludedColumnName(string columnName)

0 commit comments

Comments
 (0)