-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPgsql.cs
365 lines (310 loc) · 15.8 KB
/
Pgsql.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
using Npgsql;
using System.Collections.Concurrent;
namespace Helpers
{
public class Pgsql
{
private readonly string _SafeSchema;
private readonly string _SafeTable;
private readonly string _schema;
private readonly string _table;
private ILogger _logger;
private readonly NpgsqlConnection _connection;
/* DATABASE RESOURCE LOCKS
These locks are important to prevent a storm of resource creation
on a highly parallel cold start. Yes, this does cause a performance
degredation but this provides a more predictable & stable experience,
without having to run all database transactions in SERIALIZABLE isolation mode.
Risk : The dictionaries will grow due to the unbounded nature of tenants.
To mitigate this, the dictionarys could potentially be replaced with MemoryCache
objects. (Assuming modern MemoryCache in .NET 6 is good to go)
*/
static private ConcurrentDictionary<string, object> _locks = new ConcurrentDictionary<string, object>();
static private ConcurrentDictionary<string, string> _resourcesLedger = new ConcurrentDictionary<string, string>();
public Pgsql(string schema, string table, NpgsqlConnection connection, ILogger logger)
{
if (string.IsNullOrEmpty(schema))
throw new ArgumentException("'schema' is not set");
_SafeSchema = Safe(schema);
_schema = schema;
if (string.IsNullOrEmpty(table))
throw new ArgumentException("'table' is not set");
_SafeTable = Safe(table);
_table = table;
_logger = logger;
_connection = connection;
}
public async Task CreateSchemaIfNotExistsAsync(NpgsqlTransaction transaction = null)
{
var sql =
@$"CREATE SCHEMA IF NOT EXISTS {_SafeSchema}
AUTHORIZATION postgres;
CREATE OR REPLACE FUNCTION {_SafeSchema}.delete_key_with_etag_v1(
tbl regclass,
keyvalue text,
etagvalue xid,
OUT success boolean)
RETURNS boolean
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
BEGIN
EXECUTE format('
DELETE FROM %s
WHERE key = $1 AND xmin = $2
RETURNING TRUE', tbl)
USING keyvalue, etagvalue
INTO success;
RETURN; -- optional in this case
END
$BODY$;
ALTER FUNCTION {_SafeSchema}.delete_key_with_etag_v1(regclass, text, xid)
OWNER TO postgres;
CREATE OR REPLACE FUNCTION {_SafeSchema}.delete_key_v1(
tbl regclass,
keyvalue text,
OUT success boolean)
RETURNS boolean
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
BEGIN
EXECUTE format('
DELETE FROM %s
WHERE key = $1
RETURNING TRUE', tbl)
USING keyvalue
INTO success;
RETURN; -- optional in this case
END
$BODY$;
ALTER FUNCTION {_SafeSchema}.delete_key_v1(regclass, text)
OWNER TO postgres;
";
_logger.LogDebug($"{nameof(CreateSchemaIfNotExistsAsync)} - {sql}");
await using (var cmd = new NpgsqlCommand(sql, _connection, transaction))
await cmd.ExecuteNonQueryAsync();
_logger.LogDebug($"{nameof(CreateSchemaIfNotExistsAsync)} - Schema Created : {_SafeSchema}");
}
public async Task CreateTableIfNotExistsAsync(NpgsqlTransaction transaction = null)
{
var sql =
@$"CREATE TABLE IF NOT EXISTS {SchemaAndTable}
(
key text NOT NULL PRIMARY KEY COLLATE pg_catalog.""default""
,value jsonb NOT NULL
,insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
,updatedate TIMESTAMP WITH TIME ZONE NULL
,expiredate TIMESTAMP WITH TIME ZONE NULL
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS {SchemaAndTable} OWNER to postgres;
INSERT INTO ""pluggable_metadata"".""tenant"" (tenant_id, schema_id, table_id) VALUES ('{SchemaAndTable}', '{_schema}', '{_table}') ON CONFLICT (tenant_id) DO NOTHING;
";
_logger.LogDebug($"{nameof(CreateTableIfNotExistsAsync)} - SQL : [{sql}]");
await using (var cmd = new NpgsqlCommand(sql, _connection, transaction))
await cmd.ExecuteNonQueryAsync();
_logger.LogDebug($"{nameof(CreateTableIfNotExistsAsync)} - Table Created : {SchemaAndTable}");
}
private string Safe(string input)
{
return $"\"{input}\"";
}
public string SchemaAndTable
{
get {
return $"{_SafeSchema}.{_SafeTable}";
}
}
public async Task<Tuple<string,string>> GetAsync(string key, NpgsqlTransaction transaction = null)
{
string value = "";
string etag = "";
string sql =
@$"SELECT
key
,value
,xmin::text
FROM {SchemaAndTable}
WHERE
key = (@key)
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)
";
_logger.LogInformation($"{nameof(GetAsync)} - key: [{key}], value: [{value}], sql: [{sql}]");
await using (var cmd = new NpgsqlCommand(sql, _connection, transaction))
{
cmd.Parameters.AddWithValue("key", key);
await using (var reader = await cmd.ExecuteReaderAsync())
while (await reader.ReadAsync())
{
value = reader.GetString(1);
etag = reader.GetString(2);
_logger.LogDebug($"{nameof(GetAsync)} - Result - key: {reader.GetString(0)}, value: {value}, etag : {etag}");
return new Tuple<string,string>(value, etag);
}
}
return new Tuple<string,string>(null,null);
}
public async Task UpsertAsync(string key, string value, string etag, int ttl, NpgsqlTransaction transaction = null)
{
await EnsureDatabaseResourcesExistAsync(
transaction,
onDatabaseResourcesExist: async () => {
await InsertOrUpdateAsync(key, value, etag, ttl, transaction);
}
);
}
private async Task EnsureDatabaseResourcesExistAsync(NpgsqlTransaction transaction, Func<Task> onDatabaseResourcesExist)
{
// `GateAccessToResourceCreationAsync` uses locks to ensure that the same postgres object (table or schema) won't be created
// concurrently. In my testing IF NOT EXISTS Table/schema creation is eventual, so IF THEN EXISTS is not concurrency-safe.
// `GateAccessToResourceCreationAsync` will return an action that can be used to delete any created resources if they need to be rolled back
var removeResourcesFromCache = new []{
await GateAccessToResourceCreationAsync($"S:{_schema}", () => CreateSchemaIfNotExistsAsync(transaction)),
await GateAccessToResourceCreationAsync($"T:{_schema}-{_table}", () => CreateTableIfNotExistsAsync(transaction))
};
// It's possible for the local resource cache to become unsyncronised. One example is if a record can't be inserted into a brand new tenant,
// the transaction will fail, and the new schema/table will be rolledback, however the cache (resourceLedger) will still think the schema/table are created.
// To avoid this, if any db exception occurs, we remove that schema/table from resourceLedger cache, so that the next operation for that schema/tenant
// will be offered the opportunity to create the schema/table if it does not exist in the db.
try
{
await onDatabaseResourcesExist();
}
catch(PostgresException ex) when (ex.TableDoesNotExist())
{
foreach(var action in removeResourcesFromCache)
action();
throw ex;
}
}
private async Task<Action> GateAccessToResourceCreationAsync(string resourceName, Func<Task> resourceFactory)
{
// check the in-memory ledger to see if the resource has already been created
// (remember that this ledger is not global, it's per pluggable component instance (think pod instance)!)
if (_resourcesLedger.TryGetValue(resourceName, out string _))
return () => _resourcesLedger.TryRemove(new KeyValuePair<string, string>(resourceName,""));
// get the lock for this particular resource
var _lock = _locks.GetOrAdd(resourceName, (x) => { return new (); });
// wait patiently until we have exlusive access of the resource...
lock (_lock)
{
// check ledger again to make sure the resource hasn't been created by some other racing thread...
if (_resourcesLedger.TryGetValue(resourceName, out string _))
return () => _resourcesLedger.TryRemove(new KeyValuePair<string, string>(resourceName,""));
// resource doesn't exist, no other thread has exlusive access, so create it now...
resourceFactory().Wait();
// while we have exlusive write-access, update the ledger to show it has been created
_resourcesLedger.TryAdd(resourceName, DateTime.UtcNow.ToString());
}
return () => _resourcesLedger.TryRemove(new KeyValuePair<string, string>(resourceName,""));
}
private async Task InsertOrUpdateAsync(string key, string value, string etag, int ttlInSeconds = 0, NpgsqlTransaction transaction = null)
{
int rowsAffected = 0;
var correlationId = Guid.NewGuid().ToString("N").Substring(23);
var queryExpiredate = "NULL";
if (ttlInSeconds > 0)
queryExpiredate = $"CURRENT_TIMESTAMP + interval '{ttlInSeconds} seconds'";
if (String.IsNullOrEmpty(etag))
{
var query = @$"INSERT INTO {SchemaAndTable}
(
key
,value
,expiredate
)
VALUES
(
@1
,@2
,{queryExpiredate}
)
ON CONFLICT (key)
DO
UPDATE SET
value = @2
,updatedate = CURRENT_TIMESTAMP
,expiredate = {queryExpiredate}
;";
_logger.LogDebug($"{nameof(InsertOrUpdateAsync)} ({correlationId}) - Etag not present - key: [{key}], value: [{value}], sql: [{query}]");
await using (var cmd = new NpgsqlCommand(query, _connection, transaction))
{
cmd.Parameters.AddWithValue("1", NpgsqlTypes.NpgsqlDbType.Text, key);
cmd.Parameters.AddWithValue("2", NpgsqlTypes.NpgsqlDbType.Jsonb, value);
rowsAffected = await cmd.ExecuteNonQueryAsync();
_logger.LogDebug($"{nameof(InsertOrUpdateAsync)} ({correlationId}) - Row inserted/updated: {rowsAffected} ");
}
}
else
{
uint etagi = 0;
try
{
etagi = Convert.ToUInt32(etag,10);
}
catch(Exception ex)
{
throw new Dapr.PluggableComponents.Components.StateStore.ETagInvalidException();
}
var query = @$"
UPDATE {SchemaAndTable}
SET
value = @2
,updatedate = CURRENT_TIMESTAMP
,expiredate = {queryExpiredate}
WHERE
key = (@1)
AND xmin = (@3)
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)
;";
_logger.LogDebug($"{nameof(InsertOrUpdateAsync)} ({correlationId}) - Etag present - key: [{key}], value: [{value}], etag: [{etag}], sql: [{query}]");
await using (var cmd = new NpgsqlCommand(query, _connection, transaction))
{
cmd.Parameters.AddWithValue("1", NpgsqlTypes.NpgsqlDbType.Text, key);
cmd.Parameters.AddWithValue("2", NpgsqlTypes.NpgsqlDbType.Jsonb, value);
cmd.Parameters.AddWithValue("3", NpgsqlTypes.NpgsqlDbType.Xid, etagi);
rowsAffected = await cmd.ExecuteNonQueryAsync();
_logger.LogDebug($"{nameof(InsertOrUpdateAsync)} ({correlationId}) - Row updated: {rowsAffected}");
}
}
if (rowsAffected == 0 && !string.IsNullOrEmpty(etag))
{
_logger.LogDebug($"{nameof(InsertOrUpdateAsync)} ({correlationId}) - Etag present but no rows modified, throwing EtagMismatchException");
throw new Dapr.PluggableComponents.Components.StateStore.ETagMismatchException();
}
}
public async Task DeleteAsync(string key, string etag, NpgsqlTransaction transaction = null)
{
var sql = "";
if (string.IsNullOrEmpty(etag))
sql = $"SELECT * FROM {_SafeSchema}.delete_key_v1(tbl := '{_schema}.{_table}', keyvalue := '{key}')";
else
sql = $"SELECT * FROM {_SafeSchema}.delete_key_with_etag_v1(tbl := '{_schema}.{_table}', keyvalue := '{key}', etagvalue := '{etag}')";
_logger.LogDebug($"{nameof(DeleteAsync)} - Sql : [{sql}]");
using (var cmd = new NpgsqlCommand(sql, _connection, transaction))
{
cmd.Parameters.Add(new NpgsqlParameter("success", System.Data.DbType.Boolean) { Direction = System.Data.ParameterDirection.Output });
var result = await cmd.ExecuteScalarAsync();
if (!string.IsNullOrEmpty(etag) && result is System.DBNull){
_logger.LogDebug($"{nameof(DeleteAsync)} - Etag present but no rows deleted, throwing EtagMismatchException");
throw new Dapr.PluggableComponents.Components.StateStore.ETagMismatchException();
}
else if (result is System.DBNull)
_logger.LogDebug($"{nameof(DeleteAsync)} - Result : DBnull");
else if (result is true)
_logger.LogDebug($"{nameof(DeleteAsync)} - Result : {(bool)result}");
}
}
}
}
public static class PostgresExtensions{
public static bool TableDoesNotExist(this PostgresException ex){
return (ex.SqlState == "42P01");
}
public static bool FunctionDoesNotExist( this PostgresException ex){
return (ex.SqlState == "42883");
}
}