forked from PhenX/PhenX.EntityFrameworkCore.BulkInsert
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPostgreSqlBulkInsertProvider.cs
More file actions
141 lines (121 loc) · 4.55 KB
/
PostgreSqlBulkInsertProvider.cs
File metadata and controls
141 lines (121 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using System.Text;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.Mapping;
using NpgsqlTypes;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
[UsedImplicitly]
internal class PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger) : BulkInsertProviderBase<PostgreSqlDialectBuilder, PostgreSqlBulkInsertOptions>(logger)
{
//language=sql
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";
private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> properties, string tableName)
{
var sql = new StringBuilder();
sql.Append($"COPY {tableName} (");
sql.AppendColumns(properties);
sql.Append(") FROM STDIN (FORMAT BINARY)");
return sql.ToString();
}
/// <inheritdoc />
protected override PostgreSqlBulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 50_000,
Converters = [PostgreSqlGeometryConverter.Instance],
TypeProviders = [PostgreSqlGeometryConverter.Instance],
};
/// <inheritdoc />
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<ColumnMetadata> columns,
PostgreSqlBulkInsertOptions options,
CancellationToken ctk)
{
var connection = (NpgsqlConnection)context.Database.GetDbConnection();
var command = GetBinaryImportCommand(columns, tableName);
var writer = sync
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
? connection.BeginBinaryImport(command)
: await connection.BeginBinaryImportAsync(command, ctk);
// The type mapping can be null for obvious types like string.
var columnTypes = columns.Select(c => GetPostgreSqlType(c, options)).ToArray();
foreach (var entity in entities)
{
if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.StartRow();
}
else
{
await writer.StartRowAsync(ctk);
}
for (var columnIndex = 0; columnIndex < columns.Count; columnIndex++)
{
var value = columns[columnIndex].GetValue(entity, options);
// Get the actual type, so that the writer can do the conversation to the target type automatically.
var type = columnTypes[columnIndex];
if (sync)
{
if (type != null)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Write(value, type.Value);
}
else
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Write(value);
}
}
else
{
if (type != null)
{
await writer.WriteAsync(value, type.Value, ctk);
}
else
{
await writer.WriteAsync(value, ctk);
}
}
}
}
if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Complete();
// ReSharper disable once MethodHasAsyncOverload
writer.Dispose();
}
else
{
await writer.CompleteAsync(ctk);
await writer.DisposeAsync();
}
}
private static NpgsqlDbType? GetPostgreSqlType(ColumnMetadata column, PostgreSqlBulkInsertOptions options)
{
var typeProviders = options.TypeProviders;
if (typeProviders is { Count: > 0 })
{
foreach (var typeProvider in typeProviders)
{
if (typeProvider.TryGetType(column.Property, out var type))
{
return type;
}
}
}
var mapping = column.Property.GetRelationalTypeMapping() as NpgsqlTypeMapping;
return mapping?.NpgsqlDbType;
}
}