Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override async Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<PropertyMetadata> properties,
IReadOnlyList<ColumnMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class MySqlServerDialectBuilder : SqlDialectBuilder

protected override bool SupportsMoveRows => false;

public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<PropertyMetadata> columns)
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
{
return $"CREATE TEMPORARY TABLE {tempNameName} SELECT * FROM {tableInfo.QuotedTableName} WHERE 1 = 0;";
}
Expand Down Expand Up @@ -46,9 +46,9 @@ protected override void AppendOnConflictStatement(StringBuilder sql)
sql.Append("ON DUPLICATE KEY");
}

protected override void AppendDoNothing(StringBuilder sql, IEnumerable<PropertyMetadata> insertedProperties)
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<ColumnMetadata> insertedColumns)
{
var columnName = insertedProperties.First().ColumnName;
var columnName = insertedColumns.First().ColumnName;

sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logge
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";

private static string GetBinaryImportCommand(IReadOnlyList<PropertyMetadata> properties, string tableName)
private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> properties, string tableName)
{
var sql = new StringBuilder();
sql.Append($"COPY {tableName} (");
Expand All @@ -39,18 +39,17 @@ protected override async Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<PropertyMetadata> properties,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
CancellationToken ctk)
{
var connection = (NpgsqlConnection)context.Database.GetDbConnection();

var importCommand = GetBinaryImportCommand(properties, tableName);
var command = GetBinaryImportCommand(columns, tableName);

var writer = sync
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
? connection.BeginBinaryImport(importCommand)
: await connection.BeginBinaryImportAsync(importCommand, ctk);
? connection.BeginBinaryImport(command)
: await connection.BeginBinaryImportAsync(command, ctk);

foreach (var entity in entities)
{
Expand All @@ -64,9 +63,9 @@ protected override async Task BulkInsert<T>(
await writer.StartRowAsync(ctk);
}

foreach (var property in properties)
foreach (var column in columns)
{
var value = property.GetValue(entity);
var value = column.GetValue(entity);

if (sync)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal class PostgreSqlDialectBuilder : SqlDialectBuilder
protected override string OpenDelimiter => "\"";
protected override string CloseDelimiter => "\"";

public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<PropertyMetadata> columns)
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
{
return $"CREATE TEMPORARY TABLE {tempNameName} AS TABLE {tableInfo.QuotedTableName} WITH NO DATA;";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected override async Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<PropertyMetadata> properties,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
CancellationToken ctk)
{
Expand All @@ -43,19 +43,19 @@ protected override async Task BulkInsert<T>(
bulkCopy.BatchSize = options.BatchSize ?? 50_000;
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();

foreach (var prop in properties)
foreach (var column in columns)
{
bulkCopy.ColumnMappings.Add(prop.Name, prop.ColumnName);
bulkCopy.ColumnMappings.Add(column.PropertyName, column.ColumnName);
}

if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, properties));
bulkCopy.WriteToServer(new EnumerableDataReader<T>(entities, columns));
}
else
{
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, properties), ctk);
await bulkCopy.WriteToServerAsync(new EnumerableDataReader<T>(entities, columns), ctk);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class SqlServerDialectBuilder : SqlDialectBuilder

protected override bool SupportsMoveRows => false;

public override string CreateTableCopySql(string tempTableName, TableMetadata tableInfo, IReadOnlyList<PropertyMetadata> columns)
public override string CreateTableCopySql(string tempTableName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
{
var q = new StringBuilder();
q.Append($"CREATE TABLE {tempTableName} (");
Expand All @@ -37,8 +37,8 @@ public override string CreateTableCopySql(string tempTableName, TableMetadata ta
public override string BuildMoveDataSql<T>(
TableMetadata target,
string source,
IReadOnlyList<PropertyMetadata> insertedProperties,
IReadOnlyList<PropertyMetadata> returnedProperties,
IReadOnlyList<ColumnMetadata> insertedColumns,
IReadOnlyList<ColumnMetadata> returnedColumns,
BulkInsertOptions options,
OnConflictOptions? onConflict = null)
{
Expand Down Expand Up @@ -69,9 +69,9 @@ public override string BuildMoveDataSql<T>(
q.AppendLine($"MERGE INTO {target.QuotedTableName} AS TARGET");

q.Append("USING (SELECT ");
q.AppendColumns(insertedProperties);
q.AppendColumns(insertedColumns);
q.Append($" FROM {source}) AS SOURCE (");
q.AppendColumns(insertedProperties);
q.AppendColumns(insertedColumns);
q.AppendLine(")");

q.Append("ON ");
Expand All @@ -80,25 +80,25 @@ public override string BuildMoveDataSql<T>(

if (onConflictTyped.Update != null)
{
var properties = target.GetProperties(false);
var columns = target.GetColumns(false);

q.AppendLine($"WHEN MATCHED THEN UPDATE SET ");
q.AppendJoin(", ", GetUpdates(target, properties, onConflictTyped.Update));
q.AppendJoin(", ", GetUpdates(target, columns, onConflictTyped.Update));
q.AppendLine();
}

q.Append($"WHEN NOT MATCHED THEN INSERT (");
q.AppendColumns(insertedProperties);
q.AppendColumns(insertedColumns);
q.AppendLine(")");

q.Append("VALUES (");
q.AppendJoin(", ", insertedProperties, (b, col) => b.Append($"SOURCE.{col.QuotedColumName}"));
q.AppendJoin(", ", insertedColumns, (b, col) => b.Append($"SOURCE.{col.QuotedColumName}"));
q.AppendLine(")");

if (returnedProperties.Count != 0)
if (returnedColumns.Count != 0)
{
q.Append("OUTPUT ");
q.AppendJoin($", ", returnedProperties, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
q.AppendJoin($", ", returnedColumns, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
q.AppendLine();
}
}
Expand All @@ -107,18 +107,18 @@ public override string BuildMoveDataSql<T>(
else
{
q.Append($"INSERT INTO {target.QuotedTableName} (");
q.AppendColumns(insertedProperties);
q.AppendColumns(insertedColumns);
q.AppendLine(")");

if (returnedProperties.Count != 0)
if (returnedColumns.Count != 0)
{
q.Append("OUTPUT ");
q.AppendJoin($", ", returnedProperties, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
q.AppendJoin($", ", returnedColumns, (b, col) => b.Append($"INSERTED.{col.QuotedColumName} AS {col.QuotedColumName}"));
q.AppendLine();
}

q.Append("SELECT ");
q.AppendColumns(insertedProperties);
q.AppendColumns(insertedColumns);
q.AppendLine();
q.Append($"FROM {source}");
q.AppendLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static SqliteType GetSqliteType(Type clrType)
private static DbCommand GetInsertCommand(
DbContext context,
string tableName,
IReadOnlyList<PropertyMetadata> columns,
IReadOnlyList<ColumnMetadata> columns,
SqliteType[] columnTypes,
StringBuilder sb,
int batchSize)
Expand Down Expand Up @@ -136,19 +136,19 @@ protected override async Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<PropertyMetadata> properties,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
CancellationToken ctk
) where T : class
{
const int maxParams = 1000;
var batchSize = options.BatchSize ?? 5;
batchSize = Math.Min(batchSize, maxParams / properties.Count);
batchSize = Math.Min(batchSize, maxParams / columns.Count);

// The StringBuilder can be resuse between the batches.
var sb = new StringBuilder();

var columnList = tableInfo.GetProperties(options.CopyGeneratedColumns);
var columnList = tableInfo.GetColumns(options.CopyGeneratedColumns);
var columnTypes = columnList.Select(c => GetSqliteType(c.ProviderClrType ?? c.ClrType)).ToArray();

await using var insertCommand =
Expand All @@ -165,7 +165,7 @@ CancellationToken ctk
// Full chunks
if (chunk.Length == batchSize)
{
FillValues(chunk, insertCommand.Parameters, properties);
FillValues(chunk, insertCommand.Parameters, columns);
await ExecuteCommand(sync, insertCommand, ctk);
}
// Last chunk
Expand All @@ -180,7 +180,7 @@ CancellationToken ctk
sb,
chunk.Length);

FillValues(chunk, partialInsertCommand.Parameters, properties);
FillValues(chunk, partialInsertCommand.Parameters, columns);
await ExecuteCommand(sync, partialInsertCommand, ctk);
}
}
Expand All @@ -199,14 +199,14 @@ private static async Task ExecuteCommand(bool sync, DbCommand insertCommand, Can
}
}

private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, IReadOnlyList<PropertyMetadata> properties) where T : class
private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, IReadOnlyList<ColumnMetadata> columns) where T : class
{
var p = 0;
foreach (var entity in chunk)
{
foreach (var property in properties)
foreach (var column in columns)
{
var value = property.GetValue(entity);
var value = column.GetValue(entity);
parameters[p].Value = value;
p++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal class SqliteDialectBuilder : SqlDialectBuilder
protected override bool SupportsMoveRows => false;

/// <inheritdoc />
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<PropertyMetadata> columns)
public override string CreateTableCopySql(string tempNameName, TableMetadata tableInfo, IReadOnlyList<ColumnMetadata> columns)
{
return $"CREATE TEMP TABLE {tempNameName} AS SELECT * FROM {tableInfo.QuotedTableName} WHERE 0;";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ private async Task<string> PerformBulkInsertAsync<T>(
? await CreateTableCopyAsync<T>(sync, context, options, tableInfo, ctk)
: tableInfo.QuotedTableName;

var properties = tableInfo.GetProperties(options.CopyGeneratedColumns);
var columns = tableInfo.GetColumns(options.CopyGeneratedColumns);

using var activity = Telemetry.ActivitySource.StartActivity("Insert");
activity?.AddTag("tempTable", tempTableRequired);
activity?.AddTag("synchronous", sync);

await BulkInsert(false, context, tableInfo, entities, tableName, properties, options, ctk);
await BulkInsert(false, context, tableInfo, entities, tableName, columns, options, ctk);
return tableName;
}

Expand All @@ -148,7 +148,7 @@ protected abstract Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<PropertyMetadata> properties,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
CancellationToken ctk) where T : class;

Expand All @@ -160,7 +160,7 @@ protected async Task<string> CreateTableCopyAsync<T>(
CancellationToken ctk) where T : class
{
var tempTableName = SqlDialect.QuoteTableName(null, GetTempTableName(tableInfo.TableName));
var tempColumns = tableInfo.GetProperties(options.CopyGeneratedColumns);
var tempColumns = tableInfo.GetColumns(options.CopyGeneratedColumns);

var query = SqlDialect.CreateTableCopySql(tempTableName, tableInfo, tempColumns);

Expand Down Expand Up @@ -195,8 +195,8 @@ protected virtual async Task AddBulkInsertIdColumn<T>(
SqlDialect.BuildMoveDataSql<T>(
tableInfo,
tempTableName,
tableInfo.GetProperties(options.CopyGeneratedColumns),
returnData ? tableInfo.GetProperties() : [],
tableInfo.GetColumns(options.CopyGeneratedColumns),
returnData ? tableInfo.GetColumns() : [],
options,
onConflict);

Expand Down
Loading