Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using MySqlConnector;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
Expand All @@ -29,9 +30,10 @@ public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null)
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using System.Text;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
Expand Down Expand Up @@ -45,9 +43,9 @@ protected override void AppendOnConflictStatement(StringBuilder sql)
sql.Append("ON DUPLICATE KEY");
}

protected override void AppendDoNothing(StringBuilder sql, IProperty[] insertedProperties)
protected override void AppendDoNothing(StringBuilder sql, IEnumerable<PropertyMetadata> insertedProperties)
{
var columnName = insertedProperties[0].GetColumnName();
var columnName = insertedProperties.First().ColumnName;

sql.Append($"UPDATE {Quote(columnName)} = {GetExcludedColumnName(columnName)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using Npgsql;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
Expand All @@ -24,9 +25,9 @@ public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logge
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";

private string GetBinaryImportCommand(DbContext context, Type entityType, string tableName)
private static string GetBinaryImportCommand(TableMetadata tableInfo, string tableName)
{
var columns = GetQuotedColumns(context, entityType, false);
var columns = tableInfo.GetProperties(false).Select(X => X.QuotedColumName);

return $"COPY {tableName} ({string.Join(", ", columns)}) FROM STDIN (FORMAT BINARY)";
}
Expand All @@ -35,15 +36,16 @@ private string GetBinaryImportCommand(DbContext context, Type entityType, string
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk) where T : class
CancellationToken ctk)
{
var connection = (NpgsqlConnection)context.Database.GetDbConnection();

var importCommand = GetBinaryImportCommand(context, typeof(T), tableName);
var importCommand = GetBinaryImportCommand(tableInfo, tableName);

var writer = sync
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
Expand All @@ -31,12 +32,12 @@ public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
)
CancellationToken ctk)
{
var connection = (SqlConnection) context.Database.GetDbConnection();
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction() as SqlTransaction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System.Linq.Expressions;
using System.Text;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
Expand All @@ -17,29 +14,31 @@ internal class SqlServerDialectBuilder : SqlDialectBuilder

protected override bool SupportsMoveRows => false;

public override string BuildMoveDataSql<T>(DbContext context, string source,
public override string BuildMoveDataSql<T>(
TableMetadata source,
string target,
IProperty[] insertedProperties,
IProperty[] properties,
BulkInsertOptions options, OnConflictOptions? onConflict = null)
IReadOnlyList<PropertyMetadata> insertedProperties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
OnConflictOptions? onConflict = null)
{
var insertedColumns = insertedProperties.Select(p => Quote(p.GetColumnName())).ToArray();
var insertedColumns = insertedProperties.Select(x => x.QuotedColumName);
var insertedColumnList = string.Join(", ", insertedColumns);

var returnedColumns = properties.Select(p => $"INSERTED.{p.GetColumnName()} AS {p.GetColumnName()}");
var returnedColumns = properties.Select(p => $"INSERTED.{p.ColumnName} AS {p.ColumnName}");
var columnList = string.Join(", ", returnedColumns);

var q = new StringBuilder();

// Merge handling
if (onConflict is OnConflictOptions<T> onConflictTyped && onConflictTyped.Match != null)
{
var matchColumns = GetColumns(context, onConflictTyped.Match);
var matchColumns = GetColumns(source, onConflictTyped.Match);
var matchOn = string.Join(" AND ",
matchColumns.Select(col => $"TARGET.{col} = SOURCE.{col}"));

var updateSet = onConflictTyped.Update != null
? string.Join(", ", GetUpdates(context, insertedProperties, onConflictTyped.Update))
? string.Join(", ", GetUpdates(source, insertedProperties, onConflictTyped.Update))
: null;

q.AppendLine($"MERGE INTO {target} AS TARGET");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using PhenX.EntityFrameworkCore.BulkInsert.Extensions;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;
Expand Down Expand Up @@ -81,17 +81,17 @@ private static SqliteType GetSqliteType(Type clrType)
throw new InvalidOperationException("Unknown Sqlite type for " + clrType);
}

private DbCommand GetInsertCommand(DbContext context, Type entityType, string tableName,
private DbCommand GetInsertCommand(DbContext context, TableMetadata tableInfo, string tableName,
int batchSize)
{
var columns = context.GetProperties(entityType, false);
var columns = tableInfo.GetProperties(false);
var cmd = context.Database.GetDbConnection().CreateCommand();

var sqliteColumns = columns
.Select(c => new
{
Name = c.GetColumnName(),
Type = GetSqliteType(c.GetProviderClrType() ?? c.ClrType)
Name = c.ColumnName,
Type = GetSqliteType(c.ProviderClrType ?? c.ClrType)
})
.ToArray();

Expand Down Expand Up @@ -125,18 +125,19 @@ private DbCommand GetInsertCommand(DbContext context, Type entityType, string ta
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
PropertyAccessor[] properties,
IReadOnlyList<PropertyMetadata> properties,
BulkInsertOptions options,
CancellationToken ctk
) where T : class
{
const int maxParams = 1000;
var batchSize = options.BatchSize ?? 5;
batchSize = Math.Min(batchSize, maxParams / properties.Length);
batchSize = Math.Min(batchSize, maxParams / properties.Count);

await using var insertCommand = GetInsertCommand(context, typeof(T), tableName, batchSize);
await using var insertCommand = GetInsertCommand(context, tableInfo, tableName, batchSize);

foreach (var chunk in entities.Chunk(batchSize))
{
Expand All @@ -149,7 +150,7 @@ CancellationToken ctk
// Last chunk
else
{
var partialInsertCommand = GetInsertCommand(context, typeof(T), tableName, chunk.Length);
var partialInsertCommand = GetInsertCommand(context, tableInfo, tableName, chunk.Length);

FillValues(chunk, partialInsertCommand.Parameters, properties);
await ExecuteCommand(sync, partialInsertCommand, ctk);
Expand All @@ -170,7 +171,7 @@ private static async Task ExecuteCommand(bool sync, DbCommand insertCommand, Can
}
}

private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, PropertyAccessor[] properties) where T : class
private static void FillValues<T>(T[] chunk, DbParameterCollection parameters, IReadOnlyList<PropertyMetadata> properties) where T : class
{
var index = 0;
foreach (var entity in chunk)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Microsoft.EntityFrameworkCore;

using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
Expand All @@ -15,6 +17,7 @@ internal interface IBulkInsertProvider
internal Task<List<T>> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
Expand All @@ -27,9 +30,12 @@ internal Task<List<T>> BulkInsertReturnEntities<T>(
internal Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
CancellationToken ctk = default
) where T : class;

SqlDialectBuilder SqlDialect { get; }
}
Loading