Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ dotnet_naming_style.camel_case_underscore_style.capitalization = camel_case
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
dotnet_style_predefined_type_for_member_access = true:suggestion

# IDE0090: Use 'new(...)'
dotnet_diagnostic.IDE0090.severity = none

[*.{csproj,proj,targets}]
indent_size = 2
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using JetBrains.Annotations;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
Expand All @@ -9,6 +11,7 @@

namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;

[UsedImplicitly]
internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider> logger) : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>(logger)
{
//language=sql
Expand All @@ -22,13 +25,13 @@ internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider> logger)
protected override MySqlBulkInsertOptions CreateDefaultOptions() => new();

/// <inheritdoc />
public override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
protected override IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
MySqlBulkInsertOptions options,
OnConflictOptions<T>? onConflict = null,
CancellationToken ctk = default)
{
throw new NotSupportedException("Provider does not support returning entities.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
OnConflictOptions<T>? onConflict = null,
CancellationToken ctk = default
) where T : class;

Expand All @@ -33,7 +33,7 @@ internal Task BulkInsert<T>(
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict = null,
OnConflictOptions<T>? onConflict = null,
CancellationToken ctk = default
) where T : class;

Expand All @@ -42,5 +42,5 @@ internal Task BulkInsert<T>(
/// <summary>
/// Make the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
/// </summary>
internal BulkInsertOptions InternalCreateDefaultOptions();
internal BulkInsertOptions CreateDefaultOptions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public void Validate(IDbContextOptions options)

private class BulkInsertOptionsExtensionInfo(IDbContextOptionsExtension extension) : DbContextOptionsExtensionInfo(extension)
{

/// <inheritdoc />
public override int GetServiceProviderHashCode() => 0;

Expand Down
48 changes: 13 additions & 35 deletions src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,32 @@
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

using PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Extensions;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert;

internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>> logger) : IBulkInsertProvider
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger? logger) : BulkInsertProviderUntyped<TDialect, TOptions>
where TDialect : SqlDialectBuilder, new()
where TOptions : BulkInsertOptions, new()
{
protected readonly TDialect SqlDialect = new();

protected virtual string BulkInsertId => "_bulk_insert_id";

protected abstract string AddTableCopyBulkInsertId { get; }

protected virtual string GetTempTableName(string tableName) => $"_temp_bulk_insert_{tableName}";

SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;

public BulkInsertOptions InternalCreateDefaultOptions() => CreateDefaultOptions();

/// <summary>
/// Create the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
/// </summary>
protected abstract TOptions CreateDefaultOptions();

public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
protected override async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict,
TOptions options,
OnConflictOptions<T>? onConflict,
[EnumeratorCancellation] CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

using var activity = Telemetry.ActivitySource.StartActivity("BulkInsertReturnEntities");
activity?.AddTag("tableName", tableInfo.TableName);
activity?.AddTag("synchronous", sync);
Expand All @@ -59,10 +42,10 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
Log.UsingTempTableToReturnData(logger);
}

var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);

var result =
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, providerOptions, onConflict, ctk: ctk)
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, options, onConflict, ctk: ctk)
?? throw new InvalidOperationException("Copy returns null enumerable.");

await foreach (var item in result.WithCancellation(ctk))
Expand All @@ -79,20 +62,15 @@ await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, true, pr
}
}

public virtual async Task BulkInsert<T>(
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions? onConflict,
TOptions options,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

using var activity = Telemetry.ActivitySource.StartActivity("BulkInsert");
activity?.AddTag("tableName", tableInfo.TableName);
activity?.AddTag("synchronous", sync);
Expand All @@ -107,9 +85,9 @@ public virtual async Task BulkInsert<T>(
Log.UsingTempTableToResolveConflicts(logger);
}

var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: true, ctk: ctk);

await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, providerOptions, onConflict, ctk);
await CopyFromTempTableAsync<T, T>(sync, context, tableInfo, tableName, false, options, onConflict, ctk);
}
else
{
Expand All @@ -118,7 +96,7 @@ public virtual async Task BulkInsert<T>(
Log.UsingDirectInsert(logger);
}

await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: false, ctk: ctk);
await PerformBulkInsertAsync(sync, context, tableInfo, entities, options, tempTableRequired: false, ctk: ctk);
}

// Commit the transaction if we own them.
Expand Down Expand Up @@ -207,7 +185,7 @@ protected virtual async Task AddBulkInsertIdColumn<T>(
string tempTableName,
bool returnData,
TOptions options,
OnConflictOptions? onConflict,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class where TResult : class
{
var query =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using Microsoft.EntityFrameworkCore;

using PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
using PhenX.EntityFrameworkCore.BulkInsert.Dialect;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;

namespace PhenX.EntityFrameworkCore.BulkInsert;

internal abstract class BulkInsertProviderUntyped<TDialect, TOptions> : IBulkInsertProvider
where TDialect : SqlDialectBuilder, new()
where TOptions : BulkInsertOptions, new()
{
protected readonly TDialect SqlDialect = new();

SqlDialectBuilder IBulkInsertProvider.SqlDialect => SqlDialect;

BulkInsertOptions IBulkInsertProvider.CreateDefaultOptions() => CreateDefaultOptions();

/// <summary>
/// Create the default options for the provider, can be a subclass of <see cref="BulkInsertOptions"/>.
/// </summary>
protected abstract TOptions CreateDefaultOptions();

public IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

return BulkInsertReturnEntities(sync, context, tableInfo, entities, providerOptions, onConflict, ctk);
}

protected abstract IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
TOptions options,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class;

public Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
BulkInsertOptions options,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class
{
if (options is not TOptions providerOptions)
{
throw new InvalidOperationException($"Invalid options type: {options.GetType().Name}. Expected: {typeof(TOptions).Name}");
}

return BulkInsert(sync, context, tableInfo, entities, providerOptions, onConflict, ctk);
}

protected abstract Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
TOptions options,
OnConflictOptions<T>? onConflict,
CancellationToken ctk) where T : class;
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public string QuoteTableName(string? schema, string tableName)
/// <summary>
/// Gets column names for the insert statement, from an object initializer.
/// </summary>
protected string[] GetColumns<T>(TableMetadata table, Expression<Func<T, object>> columns)
protected static string[] GetColumns<T>(TableMetadata table, Expression<Func<T, object>> columns)
{
return columns.Body switch
{
Expand Down
Loading