Skip to content

Commit

Permalink
Fixes for traverse/load in backups (#7221)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiodo authored Nov 22, 2024
1 parent fa8edd5 commit 4c52a43
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 64 deletions.
3 changes: 2 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@
"MINIO_SECRET_KEY": "minioadmin",
"PLATFORM_OPERATION_LOGGING": "true",
"FRONT_URL": "http://localhost:8080",
"PORT": "3500"
"PORT": "3500",
"STATS_URL": "http://host.docker.internal:4900"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
Expand Down
53 changes: 50 additions & 3 deletions pods/server/src/__tests__/backup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const model = builder().getTxes()
// const dbURL = 'postgresql://root@localhost:26257/defaultdb?sslmode=disable'
const dbURL = 'postgresql://postgres:example@localhost:5432'
const STORAGE_CONFIG = 'minio|localhost:9000?accessKey=minioadmin&secretKey=minioadmin&useSSL=false'

// jest.setTimeout(4500000)
describe.skip('test-backup-find', () => {
it('check create/load/clean', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
Expand Down Expand Up @@ -53,19 +55,64 @@ describe.skip('test-backup-find', () => {
const findDocs = await client.findAll(core.class.Tx, {})
expect(findDocs.length).toBe(0)

//
} finally {
await pipeline.close()
await storageAdapter.close()
}
})
it('check traverse', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
// We should setup a DB with docuemnts and try to backup them.
const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' }
const { pipeline, storageAdapter } = await getServerPipeline(toolCtx, model, dbURL, wsUrl, {
storageConfig: STORAGE_CONFIG,
disableTriggers: true
})
try {
const client = wrapPipeline(toolCtx, pipeline, wsUrl)
const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage

// We need to create a backup docs if they are missing.
await prepareTxes(lowLevel, toolCtx, 1500)

const iter = await lowLevel.traverse(DOMAIN_TX, {})

const allDocs: Doc[] = []

while (true) {
const docs = await iter.next(50)
if (docs == null || docs?.length === 0) {
break
}
await client.clean(
DOMAIN_TX,
docs.map((doc) => doc._id)
)
allDocs.push(...docs)
}
expect(allDocs.length).toBeGreaterThan(1449)

const findDocs = await client.findAll(core.class.Tx, {})
expect(findDocs.length).toBe(0)

//
} finally {
await pipeline.close()
await storageAdapter.close()
}
})
})
async function prepareTxes (lowLevel: LowLevelStorage, toolCtx: MeasureMetricsContext): Promise<void> {
async function prepareTxes (
lowLevel: LowLevelStorage,
toolCtx: MeasureMetricsContext,
count: number = 500
): Promise<void> {
const docs = await lowLevel.rawFindAll(DOMAIN_TX, {})
if ((docs?.length ?? 0) < 500) {
if ((docs?.length ?? 0) < count) {
// We need to fill some documents to be pressent
const docs: TxCreateDoc<Doc>[] = []
for (let i = 0; i < 500; i++) {
for (let i = 0; i < count; i++) {
docs.push({
_class: core.class.TxCreateDoc,
_id: generateId(),
Expand Down
4 changes: 2 additions & 2 deletions server/middleware/src/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
)
ctx.contextData = asyncContextData

if (!((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false)) {
ctx.id = generateId()
if ((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false) {
ctx.id = 'async_tr' + generateId()
}
const aresult = await this.triggers.apply(
ctx,
Expand Down
124 changes: 67 additions & 57 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import core, {
DOMAIN_TX,
type FindOptions,
type FindResult,
generateId,
groupByArray,
type Hierarchy,
isOperator,
Expand Down Expand Up @@ -64,7 +63,7 @@ import {
type TxAdapter
} from '@hcengineering/server-core'
import type postgres from 'postgres'
import { getDocFieldsByDomains, getSchema, translateDomain } from './schemas'
import { getDocFieldsByDomains, getSchema, type Schema, translateDomain } from './schemas'
import { type ValueType } from './types'
import {
convertDoc,
Expand All @@ -83,6 +82,31 @@ import {
type PostgresClientReference
} from './utils'

async function * createCursorGenerator (
client: postgres.ReservedSql,
sql: string,
schema: Schema,
bulkSize = 50
): AsyncGenerator<Doc[]> {
const cursor = client.unsafe(sql).cursor(bulkSize)
try {
let docs: Doc[] = []
for await (const part of cursor) {
docs.push(...part.filter((it) => it != null).map((it) => parseDoc(it as any, schema)))
if (docs.length > 0) {
yield docs
docs = []
}
}
if (docs.length > 0) {
yield docs
docs = []
}
} catch (err: any) {
console.error('failed to recieve data', { err })
}
}

abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
Expand Down Expand Up @@ -174,49 +198,31 @@ abstract class PostgresAdapterBase implements DbAdapter {
): Promise<Iterator<T>> {
const schema = getSchema(_domain)
const client = await this.client.reserve()
let closed = false
const cursorName = `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(_domain)}_${generateId()}`

const close = async (cursorName: string): Promise<void> => {
if (closed) return
try {
await client.unsafe(`CLOSE ${cursorName}`)
await client.unsafe('COMMIT;')
} finally {
client.release()
closed = true
}
}
const tdomain = translateDomain(_domain)

const init = async (): Promise<void> => {
const domain = translateDomain(_domain)
const sqlChunks: string[] = [`CURSOR FOR SELECT * FROM ${domain}`]
sqlChunks.push(`WHERE ${this.buildRawQuery(domain, query, options)}`)
if (options?.sort !== undefined) {
sqlChunks.push(this.buildRawOrder(domain, options.sort))
}
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = sqlChunks.join(' ')
await client.unsafe('BEGIN;')
await client.unsafe(`DECLARE ${cursorName} ${finalSql}`)
const sqlChunks: string[] = [`SELECT * FROM ${tdomain}`]
sqlChunks.push(`WHERE ${this.buildRawQuery(tdomain, query, options)}`)
if (options?.sort !== undefined) {
sqlChunks.push(this.buildRawOrder(tdomain, options.sort))
}

const next = async (count: number): Promise<T[] | null> => {
const result = await client.unsafe(`FETCH ${count} FROM ${cursorName}`)
if (result.length === 0) {
await close(cursorName)
return null
}
return result.map((p) => parseDoc(p as any, schema))
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = sqlChunks.join(' ')

await init()
const cursor: AsyncGenerator<Doc[]> = createCursorGenerator(client, finalSql, schema)
return {
next,
next: async (count: number): Promise<T[] | null> => {
const result = await cursor.next()
if (result.done === true || result.value.length === 0) {
return null
}
return result.value as T[]
},
close: async () => {
await close(cursorName)
await cursor.return([])
client.release()
}
}
}
Expand Down Expand Up @@ -384,8 +390,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
params
)
})
} catch (err) {
} catch (err: any) {
console.error(err, { domain, params, updates })
throw err
} finally {
conn.release()
}
Expand Down Expand Up @@ -1158,17 +1165,10 @@ abstract class PostgresAdapterBase implements DbAdapter {

const workspaceId = this.workspaceId

async function * createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const cursor = client
.unsafe(`SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`)
.cursor(limit)
try {
for await (const part of cursor) {
yield part.filter((it) => it != null).map((it) => parseDoc(it as any, schema))
}
} catch (err: any) {
ctx.error('failed to recieve data', { err })
}
function createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const sql = `SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`

return createCursorGenerator(client, sql, schema, limit)
}
let bulk: AsyncGenerator<Doc[]>
let forcedRecheck = false
Expand All @@ -1191,7 +1191,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
initialized = true
await flush(true) // We need to flush, so wrong id documents will be updated.
bulk = createBulk('_id, "%hash%"', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
// bulk = createBulk('_id, "%hash%, data', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
}

let docs = await ctx.with('next', { mode }, () => bulk.next())
Expand Down Expand Up @@ -1235,6 +1234,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
},
close: async () => {
await ctx.with('flush', {}, () => flush(true))
await bulk.return([]) // We need to close generator, just in case
client?.release()
ctx.end()
}
Expand All @@ -1246,11 +1246,14 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
return await this.withConnection(ctx, async (connection) => {
const client = await this.client.reserve()
try {
const res =
await connection`SELECT * FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
return res.map((p) => parseDocWithProjection(p as any, domain))
})
} finally {
client.release()
}
})
}

Expand All @@ -1266,7 +1269,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
await this.withConnection(ctx, async (connection) => {

const client = await this.client.reserve()
try {
const domainFields = new Set(getDocFieldsByDomains(domain))
const toUpload = [...docs]
const tdomain = translateDomain(domain)
Expand Down Expand Up @@ -1299,15 +1304,20 @@ abstract class PostgresAdapterBase implements DbAdapter {
}

const vals = vars.join(',')
await this.retryTxn(connection, (client) =>
await this.retryTxn(client, (client) =>
client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values
)
)
}
})
} catch (err: any) {
ctx.error('failed to upload', { err })
throw err
} finally {
client.release()
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion server/postgres/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class PostgresClientReferenceImpl {
void (async () => {
this.onclose()
const cl = await this.client
await cl.end()
await cl.end({ timeout: 1 })
})()
}
}
Expand Down
7 changes: 7 additions & 0 deletions tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ services:
image: postgres
environment:
- POSTGRES_PASSWORD=example
- PGUSER=postgres
ports:
- 5433:5432
restart: unless-stopped
healthcheck:
#CHANGE 1: this command checks if the database is ready, right on the source db server
test: [ "CMD-SHELL", "pg_isready" ]
interval: 5s
timeout: 5s
retries: 5
minio:
image: 'minio/minio'
command: server /data --address ":9000" --console-address ":9001"
Expand Down

0 comments on commit 4c52a43

Please sign in to comment.