Skip to content

Commit 685ff84

Browse files
Jsonb (#29)
* Jsonb * FIx conversion. * No result / out mix. * Fix build
1 parent 8a6b02f commit 685ff84

24 files changed

Lines changed: 270 additions & 162 deletions

File tree

src/PhenX.EntityFrameworkCore.BulkInsert.MySql/MySqlBulkInsertProvider.cs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,8 @@
99

1010
namespace PhenX.EntityFrameworkCore.BulkInsert.MySql;
1111

12-
internal class MySqlBulkInsertProvider : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>
12+
internal class MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider> logger) : BulkInsertProviderBase<MySqlServerDialectBuilder, MySqlBulkInsertOptions>(logger)
1313
{
14-
public MySqlBulkInsertProvider(ILogger<MySqlBulkInsertProvider>? logger = null) : base(logger)
15-
{
16-
}
17-
1814
//language=sql
1915
/// <inheritdoc />
2016
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT AUTO_INCREMENT PRIMARY KEY;";
@@ -51,20 +47,16 @@ CancellationToken ctk
5147
)
5248
{
5349
var connection = (MySqlConnection)context.Database.GetDbConnection();
54-
55-
var sqlTransaction = context.Database.CurrentTransaction?.GetDbTransaction()
50+
var sqlTransaction = context.Database.CurrentTransaction!.GetDbTransaction()
5651
?? throw new InvalidOperationException("No open transaction found.");
57-
5852
if (sqlTransaction is not MySqlTransaction mySqlTransaction)
5953
{
6054
throw new InvalidOperationException($"Invalid transaction foud, got {sqlTransaction.GetType()}.");
6155
}
6256

63-
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction)
64-
{
65-
DestinationTableName = tableName,
66-
BulkCopyTimeout = options.GetCopyTimeoutInSeconds(),
67-
};
57+
var bulkCopy = new MySqlBulkCopy(connection, mySqlTransaction);
58+
bulkCopy.DestinationTableName = tableName;
59+
bulkCopy.BulkCopyTimeout = options.GetCopyTimeoutInSeconds();
6860

6961
var sourceOrdinal = 0;
7062
foreach (var prop in properties)

src/PhenX.EntityFrameworkCore.BulkInsert.PostgreSql/PostgreSqlBulkInsertProvider.cs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,18 @@
66
using Microsoft.Extensions.Logging;
77

88
using Npgsql;
9+
using Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.Mapping;
10+
11+
using NpgsqlTypes;
912

1013
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
1114
using PhenX.EntityFrameworkCore.BulkInsert.Options;
1215

1316
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
1417

1518
[UsedImplicitly]
16-
internal class PostgreSqlBulkInsertProvider : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>
19+
internal class PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger) : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>(logger)
1720
{
18-
public PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger = null) : base(logger)
19-
{
20-
}
21-
2221
//language=sql
2322
/// <inheritdoc />
2423
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";
@@ -57,6 +56,9 @@ protected override async Task BulkInsert<T>(
5756
? connection.BeginBinaryImport(command)
5857
: await connection.BeginBinaryImportAsync(command, ctk);
5958

59+
// The type mapping can be null for obvious types like string.
60+
var columnTypes = columns.Select(GetPostgreSqlType).ToArray();
61+
6062
foreach (var entity in entities)
6163
{
6264
if (sync)
@@ -69,19 +71,40 @@ protected override async Task BulkInsert<T>(
6971
await writer.StartRowAsync(ctk);
7072
}
7173

74+
var columnIndex = 0;
7275
foreach (var column in columns)
7376
{
7477
var value = column.GetValue(entity);
7578

79+
// Get the actual type, so that the writer can do the conversation to the target type automatically.
80+
var type = columnTypes[columnIndex];
81+
7682
if (sync)
7783
{
78-
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
79-
writer.Write(value);
84+
if (type != null)
85+
{
86+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
87+
writer.Write(value, type.Value);
88+
}
89+
else
90+
{
91+
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
92+
writer.Write(value);
93+
}
8094
}
8195
else
8296
{
83-
await writer.WriteAsync(value, ctk);
97+
if (type != null)
98+
{
99+
await writer.WriteAsync(value, type.Value, ctk);
100+
}
101+
else
102+
{
103+
await writer.WriteAsync(value, ctk);
104+
}
84105
}
106+
107+
columnIndex++;
85108
}
86109
}
87110

@@ -97,6 +120,12 @@ protected override async Task BulkInsert<T>(
97120
await writer.CompleteAsync(ctk);
98121
await writer.DisposeAsync();
99122
}
123+
}
124+
125+
private static NpgsqlDbType? GetPostgreSqlType(ColumnMetadata column)
126+
{
127+
var mapping = column.Property.GetRelationalTypeMapping() as NpgsqlTypeMapping;
100128

129+
return mapping?.NpgsqlDbType;
101130
}
102131
}

src/PhenX.EntityFrameworkCore.BulkInsert.SqlServer/SqlServerBulkInsertProvider.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,12 @@
66
using Microsoft.Extensions.Logging;
77

88
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
9-
using PhenX.EntityFrameworkCore.BulkInsert.Options;
109

1110
namespace PhenX.EntityFrameworkCore.BulkInsert.SqlServer;
1211

1312
[UsedImplicitly]
14-
internal class SqlServerBulkInsertProvider : BulkInsertProviderBase<SqlServerDialectBuilder, SqlServerBulkInsertOptions>
13+
internal class SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger) : BulkInsertProviderBase<SqlServerDialectBuilder, SqlServerBulkInsertOptions>(logger)
1514
{
16-
public SqlServerBulkInsertProvider(ILogger<SqlServerBulkInsertProvider>? logger = null) : base(logger)
17-
{
18-
}
19-
2015
//language=sql
2116
/// <inheritdoc />
2217
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD {BulkInsertId} INT IDENTITY PRIMARY KEY;";

src/PhenX.EntityFrameworkCore.BulkInsert.Sqlite/SqliteBulkInsertProvider.cs

Lines changed: 24 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@
1313
namespace PhenX.EntityFrameworkCore.BulkInsert.Sqlite;
1414

1515
[UsedImplicitly]
16-
internal class SqliteBulkInsertProvider : BulkInsertProviderBase<SqliteDialectBuilder, BulkInsertOptions>
16+
internal class SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger) : BulkInsertProviderBase<SqliteDialectBuilder, BulkInsertOptions>(logger)
1717
{
18-
public SqliteBulkInsertProvider(ILogger<SqliteBulkInsertProvider>? logger = null) : base(logger)
19-
{
20-
}
18+
private const int MaxParams = 1000;
2119

2220
/// <inheritdoc />
2321
protected override string BulkInsertId => "rowid";
@@ -40,48 +38,30 @@ protected override Task AddBulkInsertIdColumn<T>(
4038
CancellationToken cancellationToken
4139
) where T : class => Task.CompletedTask;
4240

43-
/// <summary>
44-
/// Taken from https://github.com/dotnet/efcore/blob/667c569c49a1ab7e142621395d3f14f2af0508b4/src/Microsoft.Data.Sqlite.Core/SqliteValueBinder.cs#L231
45-
/// As the method is not exposed in the public API, we need to copy it here.
46-
/// </summary>
47-
private static readonly Dictionary<Type, SqliteType> SqliteTypeMapping =
48-
new()
49-
{
50-
{ typeof(bool), SqliteType.Integer },
51-
{ typeof(byte), SqliteType.Integer },
52-
{ typeof(byte[]), SqliteType.Blob },
53-
{ typeof(char), SqliteType.Text },
54-
{ typeof(DateTime), SqliteType.Text },
55-
{ typeof(DateTimeOffset), SqliteType.Text },
56-
{ typeof(DateOnly), SqliteType.Text },
57-
{ typeof(TimeOnly), SqliteType.Text },
58-
{ typeof(DBNull), SqliteType.Text },
59-
{ typeof(decimal), SqliteType.Text },
60-
{ typeof(double), SqliteType.Real },
61-
{ typeof(float), SqliteType.Real },
62-
{ typeof(Guid), SqliteType.Text },
63-
{ typeof(int), SqliteType.Integer },
64-
{ typeof(long), SqliteType.Integer },
65-
{ typeof(sbyte), SqliteType.Integer },
66-
{ typeof(short), SqliteType.Integer },
67-
{ typeof(string), SqliteType.Text },
68-
{ typeof(TimeSpan), SqliteType.Text },
69-
{ typeof(uint), SqliteType.Integer },
70-
{ typeof(ulong), SqliteType.Integer },
71-
{ typeof(ushort), SqliteType.Integer }
72-
};
73-
74-
private static SqliteType GetSqliteType(Type clrType)
41+
private static SqliteType GetSqliteType(ColumnMetadata column)
7542
{
76-
var type = Nullable.GetUnderlyingType(clrType) ?? clrType;
77-
type = type.IsEnum ? Enum.GetUnderlyingType(type) : type;
43+
var storeType = column.Property.GetRelationalTypeMapping().StoreType;
7844

79-
if (SqliteTypeMapping.TryGetValue(type, out var sqliteType))
45+
if (string.Equals(storeType, "INTEGER", StringComparison.OrdinalIgnoreCase))
8046
{
81-
return sqliteType;
47+
return SqliteType.Integer;
48+
}
49+
else if (string.Equals(storeType, "FLOAT", StringComparison.OrdinalIgnoreCase))
50+
{
51+
return SqliteType.Real;
52+
}
53+
else if (string.Equals(storeType, "TEXT", StringComparison.OrdinalIgnoreCase))
54+
{
55+
return SqliteType.Text;
56+
}
57+
else if (string.Equals(storeType, "BLOB", StringComparison.OrdinalIgnoreCase))
58+
{
59+
return SqliteType.Blob;
60+
}
61+
else
62+
{
63+
throw new NotSupportedException($"Invalid store type '{storeType}' for property '{column.PropertyName}'");
8264
}
83-
84-
throw new InvalidOperationException($"Unknown Sqlite type for {clrType}");
8565
}
8666

8767
private static DbCommand GetInsertCommand(
@@ -147,15 +127,13 @@ protected override async Task BulkInsert<T>(
147127
CancellationToken ctk
148128
) where T : class
149129
{
150-
const int maxParams = 1000;
151-
var batchSize = options.BatchSize;
152-
batchSize = Math.Min(batchSize, maxParams / columns.Count);
130+
var batchSize = Math.Min(options.BatchSize, MaxParams / columns.Count);
153131

154132
// The StringBuilder can be resuse between the batches.
155133
var sb = new StringBuilder();
156134

157135
var columnList = tableInfo.GetColumns(options.CopyGeneratedColumns);
158-
var columnTypes = columnList.Select(c => GetSqliteType(c.ProviderClrType ?? c.ClrType)).ToArray();
136+
var columnTypes = columnList.Select(GetSqliteType).ToArray();
159137

160138
await using var insertCommand =
161139
GetInsertCommand(

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertOptionsExtension.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
using Microsoft.EntityFrameworkCore.Infrastructure;
1+
using Microsoft.EntityFrameworkCore.Infrastructure;
22
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.DependencyInjection.Extensions;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.Logging.Abstractions;
36

47
using PhenX.EntityFrameworkCore.BulkInsert.Abstractions;
58

@@ -13,17 +16,16 @@ public DbContextOptionsExtensionInfo Info
1316

1417
public void ApplyServices(IServiceCollection services)
1518
{
19+
services.TryAddSingleton(typeof(ILogger<>), typeof(NullLogger<>));
1620
services.AddSingleton<IBulkInsertProvider, TProvider>();
1721
}
1822

1923
public void Validate(IDbContextOptions options)
2024
{
2125
}
2226

23-
private class BulkInsertOptionsExtensionInfo : DbContextOptionsExtensionInfo
27+
private class BulkInsertOptionsExtensionInfo(IDbContextOptionsExtension extension) : DbContextOptionsExtensionInfo(extension)
2428
{
25-
public BulkInsertOptionsExtensionInfo(IDbContextOptionsExtension extension)
26-
: base(extension) { }
2729

2830
/// <inheritdoc />
2931
public override int GetServiceProviderHashCode() => 0;

src/PhenX.EntityFrameworkCore.BulkInsert/BulkInsertProviderBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace PhenX.EntityFrameworkCore.BulkInsert;
1414

15-
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>>? logger = null) : IBulkInsertProvider
15+
internal abstract class BulkInsertProviderBase<TDialect, TOptions>(ILogger<BulkInsertProviderBase<TDialect, TOptions>> logger) : IBulkInsertProvider
1616
where TDialect : SqlDialectBuilder, new()
1717
where TOptions : BulkInsertOptions, new()
1818
{
@@ -56,7 +56,7 @@ public virtual async IAsyncEnumerable<T> BulkInsertReturnEntities<T>(
5656
{
5757
if (logger != null)
5858
{
59-
Log.UsingTempTablToReturnData(logger);
59+
Log.UsingTempTableToReturnData(logger);
6060
}
6161

6262
var tableName = await PerformBulkInsertAsync(sync, context, tableInfo, entities, providerOptions, tempTableRequired: true, ctk: ctk);

src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/InternalExtensions.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ internal static TableMetadata GetTableInfo<T>(this DbContext context)
2222
internal static DbContextOptionsBuilder UseProvider<TProvider>(this DbContextOptionsBuilder optionsBuilder)
2323
where TProvider : class, IBulkInsertProvider
2424
{
25-
var extension = optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<TProvider>>() ?? new BulkInsertOptionsExtension<TProvider>();
26-
27-
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(extension);
25+
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(
26+
optionsBuilder.Options.FindExtension<BulkInsertOptionsExtension<TProvider>>() ?? new());
2827

2928
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(
3029
optionsBuilder.Options.FindExtension<MetadataProviderExtension>() ?? new());

src/PhenX.EntityFrameworkCore.BulkInsert/Extensions/PublicExtensions.DbSet.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using Microsoft.EntityFrameworkCore;
1+
using Microsoft.EntityFrameworkCore;
22

33
using PhenX.EntityFrameworkCore.BulkInsert.Options;
44

@@ -107,7 +107,7 @@ public static IAsyncEnumerable<T> ExecuteBulkInsertReturnEnumerableAsync<T, TOpt
107107
where T : class
108108
where TOptions : BulkInsertOptions
109109
{
110-
var provider = InitProvider(dbSet, configure, out var context, out var options);
110+
var (provider, context, options) = InitProvider(dbSet, configure);
111111

112112
return provider.BulkInsertReturnEntities(false, context, dbSet.GetDbContext().GetTableInfo<T>(), entities,
113113
options, onConflict, ctk);
@@ -155,7 +155,7 @@ public static async Task ExecuteBulkInsertAsync<T, TOptions>(
155155
where T : class
156156
where TOptions : BulkInsertOptions
157157
{
158-
var provider = InitProvider(dbSet, configure, out var context, out var options);
158+
var (provider, context, options) = InitProvider(dbSet, configure);
159159

160160
await provider.BulkInsert(false, context, dbSet.GetDbContext().GetTableInfo<T>(), entities, options, onConflict,
161161
ctk);
@@ -202,7 +202,7 @@ public static void ExecuteBulkInsert<T, TOptions>(
202202
where T : class
203203
where TOptions : BulkInsertOptions
204204
{
205-
var provider = InitProvider(dbSet, configure, out var context, out var options);
205+
var (provider, context, options) = InitProvider(dbSet, configure);
206206

207207
provider.BulkInsert(true, context, dbSet.GetDbContext().GetTableInfo<T>(), entities, options, onConflict)
208208
.GetAwaiter().GetResult();

0 commit comments

Comments
 (0)