diff --git a/.vscode/launch.json b/.vscode/launch.json index 12c7f49f94..47bd54f286 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -411,7 +411,8 @@ "MINIO_SECRET_KEY": "minioadmin", "PLATFORM_OPERATION_LOGGING": "true", "FRONT_URL": "http://localhost:8080", - "PORT": "3500" + "PORT": "3500", + "STATS_URL": "http://host.docker.internal:4900" }, "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], "sourceMaps": true, diff --git a/pods/server/src/__tests__/backup.spec.ts b/pods/server/src/__tests__/backup.spec.ts index 7fdc9834b2..12f550241d 100644 --- a/pods/server/src/__tests__/backup.spec.ts +++ b/pods/server/src/__tests__/backup.spec.ts @@ -15,6 +15,8 @@ const model = builder().getTxes() // const dbURL = 'postgresql://root@localhost:26257/defaultdb?sslmode=disable' const dbURL = 'postgresql://postgres:example@localhost:5432' const STORAGE_CONFIG = 'minio|localhost:9000?accessKey=minioadmin&secretKey=minioadmin&useSSL=false' + +// jest.setTimeout(4500000) describe.skip('test-backup-find', () => { it('check create/load/clean', async () => { const toolCtx = new MeasureMetricsContext('-', {}) @@ -53,6 +55,47 @@ describe.skip('test-backup-find', () => { const findDocs = await client.findAll(core.class.Tx, {}) expect(findDocs.length).toBe(0) + // + } finally { + await pipeline.close() + await storageAdapter.close() + } + }) + it('check traverse', async () => { + const toolCtx = new MeasureMetricsContext('-', {}) + // We should setup a DB with docuemnts and try to backup them. + const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' } + const { pipeline, storageAdapter } = await getServerPipeline(toolCtx, model, dbURL, wsUrl, { + storageConfig: STORAGE_CONFIG, + disableTriggers: true + }) + try { + const client = wrapPipeline(toolCtx, pipeline, wsUrl) + const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage + + // We need to create a backup docs if they are missing. + await prepareTxes(lowLevel, toolCtx, 1500) + + const iter = await lowLevel.traverse(DOMAIN_TX, {}) + + const allDocs: Doc[] = [] + + while (true) { + const docs = await iter.next(50) + if (docs == null || docs?.length === 0) { + break + } + await client.clean( + DOMAIN_TX, + docs.map((doc) => doc._id) + ) + allDocs.push(...docs) + } + expect(allDocs.length).toBeGreaterThan(1449) + + const findDocs = await client.findAll(core.class.Tx, {}) + expect(findDocs.length).toBe(0) + // } finally { await pipeline.close() @@ -60,12 +103,16 @@ describe.skip('test-backup-find', () => { } }) }) -async function prepareTxes (lowLevel: LowLevelStorage, toolCtx: MeasureMetricsContext): Promise { +async function prepareTxes ( + lowLevel: LowLevelStorage, + toolCtx: MeasureMetricsContext, + count: number = 500 +): Promise { const docs = await lowLevel.rawFindAll(DOMAIN_TX, {}) - if ((docs?.length ?? 0) < 500) { + if ((docs?.length ?? 0) < count) { // We need to fill some documents to be pressent const docs: TxCreateDoc[] = [] - for (let i = 0; i < 500; i++) { + for (let i = 0; i < count; i++) { docs.push({ _class: core.class.TxCreateDoc, _id: generateId(), diff --git a/server/middleware/src/triggers.ts b/server/middleware/src/triggers.ts index 48188f78f9..7e107eb04f 100644 --- a/server/middleware/src/triggers.ts +++ b/server/middleware/src/triggers.ts @@ -225,8 +225,8 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware { ) ctx.contextData = asyncContextData - if (!((ctx as MeasureContext).contextData.isAsyncContext ?? false)) { - ctx.id = generateId() + if ((ctx as MeasureContext).contextData.isAsyncContext ?? false) { + ctx.id = 'async_tr' + generateId() } const aresult = await this.triggers.apply( ctx, diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 4cd26029dd..88b8610a5a 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -27,7 +27,6 @@ import core, { DOMAIN_TX, type FindOptions, type FindResult, - generateId, groupByArray, type Hierarchy, isOperator, @@ -64,7 +63,7 @@ import { type TxAdapter } from '@hcengineering/server-core' import type postgres from 'postgres' -import { getDocFieldsByDomains, getSchema, translateDomain } from './schemas' +import { getDocFieldsByDomains, getSchema, type Schema, translateDomain } from './schemas' import { type ValueType } from './types' import { convertDoc, @@ -83,6 +82,31 @@ import { type PostgresClientReference } from './utils' +async function * createCursorGenerator ( + client: postgres.ReservedSql, + sql: string, + schema: Schema, + bulkSize = 50 +): AsyncGenerator { + const cursor = client.unsafe(sql).cursor(bulkSize) + try { + let docs: Doc[] = [] + for await (const part of cursor) { + docs.push(...part.filter((it) => it != null).map((it) => parseDoc(it as any, schema))) + if (docs.length > 0) { + yield docs + docs = [] + } + } + if (docs.length > 0) { + yield docs + docs = [] + } + } catch (err: any) { + console.error('failed to recieve data', { err }) + } +} + abstract class PostgresAdapterBase implements DbAdapter { protected readonly _helper: DBCollectionHelper protected readonly tableFields = new Map() @@ -174,49 +198,31 @@ abstract class PostgresAdapterBase implements DbAdapter { ): Promise> { const schema = getSchema(_domain) const client = await this.client.reserve() - let closed = false - const cursorName = `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(_domain)}_${generateId()}` - const close = async (cursorName: string): Promise => { - if (closed) return - try { - await client.unsafe(`CLOSE ${cursorName}`) - await client.unsafe('COMMIT;') - } finally { - client.release() - closed = true - } - } + const tdomain = translateDomain(_domain) - const init = async (): Promise => { - const domain = translateDomain(_domain) - const sqlChunks: string[] = [`CURSOR FOR SELECT * FROM ${domain}`] - sqlChunks.push(`WHERE ${this.buildRawQuery(domain, query, options)}`) - if (options?.sort !== undefined) { - sqlChunks.push(this.buildRawOrder(domain, options.sort)) - } - if (options?.limit !== undefined) { - sqlChunks.push(`LIMIT ${options.limit}`) - } - const finalSql: string = sqlChunks.join(' ') - await client.unsafe('BEGIN;') - await client.unsafe(`DECLARE ${cursorName} ${finalSql}`) + const sqlChunks: string[] = [`SELECT * FROM ${tdomain}`] + sqlChunks.push(`WHERE ${this.buildRawQuery(tdomain, query, options)}`) + if (options?.sort !== undefined) { + sqlChunks.push(this.buildRawOrder(tdomain, options.sort)) } - - const next = async (count: number): Promise => { - const result = await client.unsafe(`FETCH ${count} FROM ${cursorName}`) - if (result.length === 0) { - await close(cursorName) - return null - } - return result.map((p) => parseDoc(p as any, schema)) + if (options?.limit !== undefined) { + sqlChunks.push(`LIMIT ${options.limit}`) } + const finalSql: string = sqlChunks.join(' ') - await init() + const cursor: AsyncGenerator = createCursorGenerator(client, finalSql, schema) return { - next, + next: async (count: number): Promise => { + const result = await cursor.next() + if (result.done === true || result.value.length === 0) { + return null + } + return result.value as T[] + }, close: async () => { - await close(cursorName) + await cursor.return([]) + client.release() } } } @@ -384,8 +390,9 @@ abstract class PostgresAdapterBase implements DbAdapter { params ) }) - } catch (err) { + } catch (err: any) { console.error(err, { domain, params, updates }) + throw err } finally { conn.release() } @@ -1158,17 +1165,10 @@ abstract class PostgresAdapterBase implements DbAdapter { const workspaceId = this.workspaceId - async function * createBulk (projection: string, query: string, limit = 50): AsyncGenerator { - const cursor = client - .unsafe(`SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`) - .cursor(limit) - try { - for await (const part of cursor) { - yield part.filter((it) => it != null).map((it) => parseDoc(it as any, schema)) - } - } catch (err: any) { - ctx.error('failed to recieve data', { err }) - } + function createBulk (projection: string, query: string, limit = 50): AsyncGenerator { + const sql = `SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}` + + return createCursorGenerator(client, sql, schema, limit) } let bulk: AsyncGenerator let forcedRecheck = false @@ -1191,7 +1191,6 @@ abstract class PostgresAdapterBase implements DbAdapter { initialized = true await flush(true) // We need to flush, so wrong id documents will be updated. bulk = createBulk('_id, "%hash%"', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'') - // bulk = createBulk('_id, "%hash%, data', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'') } let docs = await ctx.with('next', { mode }, () => bulk.next()) @@ -1235,6 +1234,7 @@ abstract class PostgresAdapterBase implements DbAdapter { }, close: async () => { await ctx.with('flush', {}, () => flush(true)) + await bulk.return([]) // We need to close generator, just in case client?.release() ctx.end() } @@ -1246,11 +1246,14 @@ abstract class PostgresAdapterBase implements DbAdapter { if (docs.length === 0) { return [] } - return await this.withConnection(ctx, async (connection) => { + const client = await this.client.reserve() + try { const res = - await connection`SELECT * FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}` + await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}` return res.map((p) => parseDocWithProjection(p as any, domain)) - }) + } finally { + client.release() + } }) } @@ -1266,7 +1269,9 @@ abstract class PostgresAdapterBase implements DbAdapter { } const insertStr = insertFields.join(', ') const onConflictStr = onConflict.join(', ') - await this.withConnection(ctx, async (connection) => { + + const client = await this.client.reserve() + try { const domainFields = new Set(getDocFieldsByDomains(domain)) const toUpload = [...docs] const tdomain = translateDomain(domain) @@ -1299,7 +1304,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } const vals = vars.join(',') - await this.retryTxn(connection, (client) => + await this.retryTxn(client, (client) => client.unsafe( `INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals} ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, @@ -1307,7 +1312,12 @@ abstract class PostgresAdapterBase implements DbAdapter { ) ) } - }) + } catch (err: any) { + ctx.error('failed to upload', { err }) + throw err + } finally { + client.release() + } }) } diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index e89142bb80..f4a18f14e9 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -194,7 +194,7 @@ class PostgresClientReferenceImpl { void (async () => { this.onclose() const cl = await this.client - await cl.end() + await cl.end({ timeout: 1 }) })() } } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 76ba55ca6e..e12932d046 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -12,9 +12,16 @@ services: image: postgres environment: - POSTGRES_PASSWORD=example + - PGUSER=postgres ports: - 5433:5432 restart: unless-stopped + healthcheck: + #CHANGE 1: this command checks if the database is ready, right on the source db server + test: [ "CMD-SHELL", "pg_isready" ] + interval: 5s + timeout: 5s + retries: 5 minio: image: 'minio/minio' command: server /data --address ":9000" --console-address ":9001"