Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using MySqlConnector;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
Expand All @@ -29,6 +30,7 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
public override Task<List<T>> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
Expand All @@ -41,9 +43,10 @@ public override Task<List<T>> BulkInsertReturnEntities<T>(
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;

Expand All @@ -13,10 +14,6 @@ public static class MySqlDbContextOptionsExtensions
/// </summary>
public static DbContextOptionsBuilder UseBulkInsertMySql(this DbContextOptionsBuilder optionsBuilder)
{
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<MySqlBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<MySqlBulkInsertProvider>();

((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);

return optionsBuilder;
return optionsBuilder.UseProvider<MySqlBulkInsertProvider>();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using System.Text;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
Expand Down Expand Up @@ -43,9 +41,9 @@ protected override void AppendOnConflictStatement(StringBuilder sql)
sql.Append("ON DUPLICATE KEY");
}

protected override void AppendDoNothing(StringBuilder sql, IProperty[] insertedProperties)
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<PropertyMetadata> insertedProperties)
{
var columnName = insertedProperties[0].GetColumnName();
var columnName = insertedProperties.First().ColumnName;

sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using Npgsql;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
Expand All @@ -24,9 +25,9 @@ public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logge
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";

private string GetBinaryImportCommand(DbContext context, Type entityType, string tableName)
private static string GetBinaryImportCommand(TableMetadata tableInfo, string tableName)
{
var columns = GetQuotedColumns(context, entityType, false);
var columns = tableInfo.GetProperties(false).Select(X => X.QuotedColumName);

return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)";
}
Expand All @@ -35,15 +36,16 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk) where T : class
CancellationToken ctk)
{
var connection = (NpgsqlConnection)context.Database.GetDbConnection();

var importCommand = GetBinaryImportCommand(context, typeof(T), tableName);
var importCommand = GetBinaryImportCommand(tableInfo, tableName);

var writer = sync
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;

namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;

Expand All @@ -13,10 +14,6 @@ public static class PostgreSqlDbContextOptionsExtensions
/// </summary>
public static DbContextOptionsBuilder UseBulkInsertPostgreSql(this DbContextOptionsBuilder optionsBuilder)
{
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<PostgreSqlBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<PostgreSqlBulkInsertProvider>();

((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);

return optionsBuilder;
return optionsBuilder.UseProvider<PostgreSqlBulkInsertProvider>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
Expand All @@ -31,12 +32,12 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
)
CancellationToken ctk)
{
var connection = (SqlConnection) context.Database.GetDbConnection();
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;

Expand All @@ -13,10 +14,6 @@ public static class SqlServerDbContextOptionsExtensions
/// </summary>
public static DbContextOptionsBuilder UseBulkInsertSqlServer(this DbContextOptionsBuilder optionsBuilder)
{
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<SqlServerBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<SqlServerBulkInsertProvider>();

((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);

return optionsBuilder;
return optionsBuilder.UseProvider<SqlServerBulkInsertProvider>();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System.Linq.Expressions;
using System.Text;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
Expand All @@ -17,37 +14,39 @@ internal class SqlServerDialectBuilder : SqlDialectBuilder

protected override bool SupportsMoveRows => false;

public override string BuildMoveDataSql<T>(DbContext context, string source,
string target,
IProperty[] insertedProperties,
IProperty[] properties,
BulkInsertOptions options, OnConflictOptions? onConflict = null)
public override string BuildMoveDataSql<T>(
TableMetadata target,
string source,
IReadOnlyList<PropertyMetadata> insertedProperties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
OnConflictOptions? onConflict = null)
{
var insertedColumns = insertedProperties.Select(p => Quote(p.GetColumnName())).ToArray();
var insertedColumns = insertedProperties.Select(x => x.QuotedColumName);
var insertedColumnList = string.Join(", ", insertedColumns);

var returnedColumns = properties.Select(p => $"INSERTED.{p.GetColumnName()} AS {p.GetColumnName()}");
var returnedColumns = properties.Select(p => $"INSERTED.{p.ColumnName} AS {p.ColumnName}");
var columnList = string.Join(", ", returnedColumns);

var q = new StringBuilder();

if (options.CopyGeneratedColumns)
{
q.AppendLine($"SET IDENTITY_INSERT {target} ON;");
q.AppendLine($"SET IDENTITY_INSERT {target.QuotedTableName} ON;");
}

// Merge handling
if (onConflict is OnConflictOptions<T> onConflictTyped && onConflictTyped.Match != null)
{
var matchColumns = GetColumns(context, onConflictTyped.Match);
var matchColumns = GetColumns(target, onConflictTyped.Match);
var matchOn = string.Join(" AND ",
matchColumns.Select(col => $"TARGET.{col} = SOURCE.{col}"));

var updateSet = onConflictTyped.Update != null
? string.Join(", ", GetUpdates(context, insertedProperties, onConflictTyped.Update))
? string.Join(", ", GetUpdates(target, insertedProperties, onConflictTyped.Update))
: null;

q.AppendLine($"MERGE INTO {target} AS TARGET");
q.AppendLine($"MERGE INTO {target.QuotedTableName} AS TARGET");
q.AppendLine(
$"USING (SELECT {string.Join(", ", insertedColumns)} FROM {source}) AS SOURCE ({insertedColumnList})");
q.AppendLine($"ON {matchOn}");
Expand All @@ -69,7 +68,7 @@ public override string BuildMoveDataSql<T>(DbContext context, string source,
// No conflict handling
else
{
q.AppendLine($"INSERT INTO {target} ({insertedColumnList})");
q.AppendLine($"INSERT INTO {target.QuotedTableName} ({insertedColumnList})");

if (columnList.Length != 0)
{
Expand All @@ -86,7 +85,7 @@ public override string BuildMoveDataSql<T>(DbContext context, string source,

if (options.CopyGeneratedColumns)
{
q.AppendLine($"SET IDENTITY_INSERT {target} OFF;");
q.AppendLine($"SET IDENTITY_INSERT {target.QuotedTableName} OFF;");
}

return q.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;
Expand Down Expand Up @@ -81,18 +81,18 @@ private static SqliteType GetSqliteType(Type clrType)
throw new InvalidOperationException("Unknown Sqlite type for " + clrType);
}

private DbCommand GetInsertCommand(DbContext context, Type entityType, string tableName,
private DbCommand GetInsertCommand(DbContext context, TableMetadata tableInfo, string tableName,
BulkInsertOptions options,
int batchSize)
{
var columns = context.GetProperties(entityType, options.CopyGeneratedColumns);
var columns = tableInfo.GetProperties(options.CopyGeneratedColumns);
var cmd = context.Database.GetDbConnection().CreateCommand();

var sqliteColumns = columns
.Select(c => new
{
Name = c.GetColumnName(),
Type = GetSqliteType(c.GetProviderClrType() ?? c.ClrType)
Name = c.ColumnName,
Type = GetSqliteType(c.ProviderClrType ?? c.ClrType)
})
.ToArray();

Expand Down Expand Up @@ -126,18 +126,19 @@ private DbCommand GetInsertCommand(DbContext context, Type entityType, string ta
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
) where T : class
{
const int maxParams = 1000;
var batchSize = options.BatchSize ?? 5;
batchSize = Math.Min(batchSize, maxParams / properties.Length);
batchSize = Math.Min(batchSize, maxParams / properties.Count);

await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, options, batchSize);
await using var insertCommand = GetInsertCommand(context, tableInfo, tableName, options, batchSize);

foreach (var chunk in entities.Chunk(batchSize))
{
Expand All @@ -150,7 +151,7 @@ CancellationToken ctk
// Last chunk
else
{
var partialInsertCommand = GetInsertCommand(context, typeof(T), tableName, options, chunk.Length);
var partialInsertCommand = GetInsertCommand(context, tableInfo, tableName, options, chunk.Length);

FillValues(chunk, partialInsertCommand.Parameters, properties);
await ExecuteCommand(sync, partialInsertCommand, ctk);
Expand All @@ -171,7 +172,7 @@ private static async Task ExecuteCommand(bool sync, DbCommand insertCommand, Can
}
}

private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, PropertyAccessor[] properties) where T : class
private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, IReadOnlyList<PropertyMetadata> properties) where T : class
{
var index = 0;
foreach (var entity in chunk)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;

namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;

Expand All @@ -13,9 +14,7 @@ public static class SqliteDbContextOptionsExtensions
/// </summary>
public static DbContextOptionsBuilder UseBulkInsertSqlite(this DbContextOptionsBuilder optionsBuilder)
{
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<SqliteBulkInsertProvider>>() ?? new BulkInsertOptionsExtension<SqliteBulkInsertProvider>();
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);
return optionsBuilder;
return optionsBuilder.UseProvider<SqliteBulkInsertProvider>();
}
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Microsoft.EntityFrameworkCore;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
Expand All @@ -15,6 +17,7 @@ internal interface IBulkInsertProvider
internal Task<List<T>> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
Expand All @@ -27,9 +30,12 @@ internal Task<List<T>> BulkInsertReturnEntities<T>(
internal Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
CancellationToken ctk = default
) where T : class;

SqlDialectBuilder SqlDialect { get; }
}
Loading