forked from PhenX/PhenX.EntityFrameworkCore.BulkInsert
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPostgreSqlBulkInsertProvider.cs
More file actions
131 lines (111 loc) · 4.06 KB
/
PostgreSqlBulkInsertProvider.cs
File metadata and controls
131 lines (111 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
using System.Text;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.Mapping;
using NpgsqlTypes;
using PhenX.EntityFrameworkCore.BulkInsert.Metadata;
using PhenX.EntityFrameworkCore.BulkInsert.Options;
namespace PhenX.EntityFrameworkCore.BulkInsert.PostgreSql;
[UsedImplicitly]
internal class PostgreSqlBulkInsertProvider(ILogger<PostgreSqlBulkInsertProvider>? logger) : BulkInsertProviderBase<PostgreSqlDialectBuilder, BulkInsertOptions>(logger)
{
//language=sql
/// <inheritdoc />
protected override string AddTableCopyBulkInsertId => $"ALTER TABLE {{0}} ADD COLUMN {BulkInsertId} SERIAL PRIMARY KEY;";
private static string GetBinaryImportCommand(IReadOnlyList<ColumnMetadata> properties, string tableName)
{
var sql = new StringBuilder();
sql.Append($"COPY {tableName} (");
sql.AppendColumns(properties);
sql.Append(") FROM STDIN (FORMAT BINARY)");
return sql.ToString();
}
/// <inheritdoc />
protected override BulkInsertOptions CreateDefaultOptions() => new()
{
BatchSize = 50_000,
};
/// <inheritdoc />
protected override async Task BulkInsert<T>(
bool sync,
DbContext context,
TableMetadata tableInfo,
IEnumerable<T> entities,
string tableName,
IReadOnlyList<ColumnMetadata> columns,
BulkInsertOptions options,
CancellationToken ctk)
{
var connection = (NpgsqlConnection)context.Database.GetDbConnection();
var command = GetBinaryImportCommand(columns, tableName);
var writer = sync
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
? connection.BeginBinaryImport(command)
: await connection.BeginBinaryImportAsync(command, ctk);
// The type mapping can be null for obvious types like string.
var columnTypes = columns.Select(GetPostgreSqlType).ToArray();
foreach (var entity in entities)
{
if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.StartRow();
}
else
{
await writer.StartRowAsync(ctk);
}
var columnIndex = 0;
foreach (var column in columns)
{
var value = column.GetValue(entity);
// Get the actual type, so that the writer can do the conversation to the target type automatically.
var type = columnTypes[columnIndex];
if (sync)
{
if (type != null)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Write(value, type.Value);
}
else
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Write(value);
}
}
else
{
if (type != null)
{
await writer.WriteAsync(value, type.Value, ctk);
}
else
{
await writer.WriteAsync(value, ctk);
}
}
columnIndex++;
}
}
if (sync)
{
// ReSharper disable once MethodHasAsyncOverloadWithCancellation
writer.Complete();
// ReSharper disable once MethodHasAsyncOverload
writer.Dispose();
}
else
{
await writer.CompleteAsync(ctk);
await writer.DisposeAsync();
}
}
private static NpgsqlDbType? GetPostgreSqlType(ColumnMetadata column)
{
var mapping = column.Property.GetRelationalTypeMapping() as NpgsqlTypeMapping;
return mapping?.NpgsqlDbType;
}
}