From db02d19d7e1177ce93db7d0a286d57ee090d70d5 Mon Sep 17 00:00:00 2001 From: sliterok <12751644+sliterok@users.noreply.github.com> Date: Sat, 9 Mar 2024 19:34:59 +0100 Subject: [PATCH] import --- src/db/helpers.ts | 18 +++++ src/db/index.ts | 145 +++++++++++++++++++++++++++++++--------- src/db/types.ts | 6 ++ src/routes/UserGrid.tsx | 39 ++++++++--- 4 files changed, 170 insertions(+), 38 deletions(-) diff --git a/src/db/helpers.ts b/src/db/helpers.ts index 1a79d10..e2908fa 100644 --- a/src/db/helpers.ts +++ b/src/db/helpers.ts @@ -45,3 +45,21 @@ export function getQueryFromCondition(type: string, val: string) { [op!]: val, } } +export function batchReduce(arr: T[], batchSize: number): T[][] { + return arr.reduce((batches, curr, i) => { + if (i % batchSize === 0) batches.push([]) + batches[batches.length - 1].push(arr[i]) + return batches + }, [] as T[][]) +} + +export function mergeUint8Arrays(size: number, ...arrays: Uint8Array[]): Uint8Array { + const merged = new Uint8Array(size) + + arrays.forEach((array, i, arrays) => { + const offset = arrays.slice(0, i).reduce((acc, e) => acc + e.length, 0) + merged.set(array, offset) + }) + + return merged +} diff --git a/src/db/index.ts b/src/db/index.ts index 7e114e9..d75736a 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -17,28 +17,59 @@ import { IBasicRecord, IEncoder, IQueryOptions, + IImportInput, } from './types' +import { batchReduce, mergeUint8Arrays } from './helpers' -const readFile = async (dir: FileSystemDirectoryHandle, fileName: string, encoder?: IEncoder) => { +const readFile = async (dir: FileSystemDirectoryHandle, fileName: string, encoder?: IEncoder | false) => { try { const fileHandle = await dir.getFileHandle(fileName) const file: Blob = await fileHandle.getFile() const buffer = await file.arrayBuffer() - if (encoder) { - const tag = encoder.decode(new Uint8Array(buffer)) + const uintArray = new Uint8Array(buffer) + if (encoder === false) { + return uintArray + } else if (encoder) { + const tag = encoder.decode(uintArray) return encoder.decodeKeys(tag) } else { - return decode(new Uint8Array(buffer)) + return decode(uintArray) } + + // const file: Blob = await fileHandle.getFile() + // const stream = await file.stream() + + // const reader = stream.getReader() + // let state = new Uint8Array() + // let readerState = await reader.read() + // while (!readerState.done) { + // state = mergeUint8Arrays(state.length + readerState.value.length, state, readerState.value) + // readerState = await reader.read() + // } + // if (encoder === false) { + // return state + // } else if (encoder) { + // const tag = encoder.decode(state) + // const decoded = encoder.decodeKeys(tag) + // return decoded + // } else { + // return decode(state) + // } } catch (error) { + console.error(error) return null } } -const writeFile = async (dir: FileSystemDirectoryHandle, fileName: string, data: Record, encoder?: IEncoder) => { +const writeFile = async (dir: FileSystemDirectoryHandle, fileName: string, data: Record | Uint8Array, encoder?: IEncoder | false) => { const fileHandle = await dir.getFileHandle(fileName, { create: true }) const writeHandle = await fileHandle.createWritable() - const encoded = encoder ? encoder.encode(data) : encode(data) + let encoded: Uint8Array + + if (encoder === false) encoded = data as Uint8Array + else if (encoder) encoded = encoder.encode(data) + else encoded = encode(data) + await writeHandle.write(encoded) await writeHandle.close() } @@ -53,8 +84,10 @@ export class FileStoreStrategy extends SerializeStrategy { } id(): number { - const random = Math.ceil(Math.random() * 1000000) - return random + const buffer = new BigUint64Array(1) + const [random] = crypto.getRandomValues(buffer) + const id = Math.floor(Number(random / 2048n)) + return id } read(id: number): Promise> { @@ -121,6 +154,7 @@ export class OPFSDB { }, options?: IQueryOptions ): Promise { + // const start = performance.now() let indexes = new Set() for (const key in queries) { const tree = this.trees[key] @@ -140,38 +174,82 @@ export class OPFSDB { } } const indexArray = Array.from(indexes) + // const indexesFinish = performance.now() if (options?.keys) return indexArray - const records: T[] = Array(indexes.size) - for (let i = 0; i < indexes.size; i++) { - records[i] = await this.read(indexArray[i]) - } - return records - } - async filterByKey(key: string, query: BPTreeCondition): Promise { - const tree = this.trees[key] - if (!tree) throw new Error('No such index found') - - const indexes = Array.from(await tree.keys(query)) - const records = Array(indexes.length) - for (let i = 0; i < indexes.length; i++) { - records[i] = await this.read(indexes[i]) - } + const records = await this.readMany(indexArray) + // const responsesLoaded = performance.now() + // console.log('indexes:', indexesFinish - start, 'records:', responsesLoaded - indexesFinish) return records } - async getByKey(key: string, query: BPTreeCondition): Promise { - const tree = this.trees[key] - if (!tree) throw new Error('No such index found') - - const [index] = await tree.keys(query) - return index ? await this.read(index) : undefined + // async filterByKey(key: string, query: BPTreeCondition): Promise { + // const tree = this.trees[key] + // if (!tree) throw new Error('No such index found') + + // const indexes = Array.from(await tree.keys(query)) + // const records = Array(indexes.length) + // for (let i = 0; i < indexes.length; i++) { + // records[i] = await this.read(indexes[i]) + // } + // return records + // } + + // async getByKey(key: string, query: BPTreeCondition): Promise { + // const tree = this.trees[key] + // if (!tree) throw new Error('No such index found') + + // const [index] = await tree.keys(query) + // return index ? await this.read(index) : undefined + // } + + async readMany(ids: string[]): Promise { + const result = batchReduce(ids, 20).map(async ids => { + let size = 0 + const records = await Promise.all( + ids.map(async id => { + const file: Uint8Array = await readFile(this.recordsRoot, id, false) + size += file.length + return file + }) + ) + const merged = mergeUint8Arrays(size, ...records) + const decoded = this.encoder.decodeMultiple(merged) as T[] + return decoded + }) + const response = await Promise.all(result) + // const rawRecords = await Promise.all() + return response.flat() } async read(id: string): Promise { return readFile(this.recordsRoot, id, this.encoder) } + async import(records: { id: string; value: T }[]) { + await Promise.all([ + ...records.map(record => writeFile(this.recordsRoot, record.id, record.value, this.encoder)), + ...Object.keys(this.trees).map(async key => { + const tree = this.trees[key] + for (const record of records) { + const val = record.value[key] + if (val === undefined || val === null) continue + await tree.insert(record.id, val) + } + }), + // (async () => { + // for (const record of records) { + // for (const key in this.trees) { + // const val = record.value[key] + // if (val === undefined || val === null) continue + // const tree = this.trees[key] + // await tree.insert(record.id, val) + // } + // } + // })(), + ]) + } + async insert(id: string, value: T, fullRecord?: boolean) { const oldRecord = await this.read(id) @@ -237,6 +315,10 @@ export const insertCommand = async ({ tableName, record, fullRecord }: ICommandI await tables[tableName].insert(record.id, record, fullRecord) } +export const importCommand = async ({ tableName, records }: ICommandInput): Promise => { + await tables[tableName].import(records.map(value => ({ id: value.id, value }))) +} + export const deleteCommand = async ({ tableName, id }: ICommandInput): Promise => { await tables[tableName].delete(id) } @@ -262,6 +344,9 @@ export const command = async (command: ICommandInputs case 'insert': await insertCommand(command as IInsertInput) break + case 'import': + await importCommand(command as IImportInput) + break case 'delete': await deleteCommand(command as IDeleteInput) break @@ -275,7 +360,7 @@ export const command = async (command: ICommandInputs throw new Error('unknown command') } - return new Response(JSON.stringify(response!), { status: 200 }) + return new Response(JSON.stringify(response! || {}), { status: 200 }) } catch (error) { console.error(command.name, error) return new Response(null, { status: 500, statusText: (error as Error).message }) diff --git a/src/db/types.ts b/src/db/types.ts index 7a1568e..3bf0954 100644 --- a/src/db/types.ts +++ b/src/db/types.ts @@ -13,6 +13,11 @@ export interface IInsertInput extends IIn record: T } +export interface IImportInput extends IBaseInput { + name: 'import' + records: T[] +} + export interface IQueryOptions { isAnd?: boolean limit?: number @@ -51,6 +56,7 @@ export type ICommandInputs = | IDeleteInput | IReadInput | IDropInput + | IImportInput export type ICommandInput = Omit export type IFetchCommandInput = Omit diff --git a/src/routes/UserGrid.tsx b/src/routes/UserGrid.tsx index 6a096e4..62ba48b 100644 --- a/src/routes/UserGrid.tsx +++ b/src/routes/UserGrid.tsx @@ -1,6 +1,6 @@ import { ClientSuspense, useMutation, useQuery } from 'rakkasjs' import { useEffect, useState } from 'react' -import { ICommandInput, IDropInput, IFetchCommandInput, IInsertInput, IQueryInput } from 'src/db/types' +import { ICommandInput, IDropInput, IFetchCommandInput, IImportInput, IInsertInput, IQueryInput } from 'src/db/types' import { IUser } from 'src/types' import Chance from 'chance' import { AgGridReact, AgGridReactProps } from 'ag-grid-react' @@ -20,6 +20,14 @@ const columnDefs: AgGridReactProps['columnDefs'] = [ { headerName: 'address', field: 'address', filter: true }, ] +const generateUser = () => ({ + name: chance.name(), + surname: chance.name_suffix(), + id: crypto.randomUUID(), + itemsBought: chance.integer({ min: 0, max: 500 }), + address: chance.address(), +}) + export default function MainLayout() { const [limit, setLimit] = useState(50) const [isAndQuery, setIsAndQuery] = useState(true) @@ -67,6 +75,20 @@ export default function MainLayout() { await dbFetch('/db/users/drop', {}) }) + const importUsers = useMutation(async () => { + for (let i = 0; i < 15; i++) { + const records = Array(50) + .fill(true) + .map(() => generateUser()) + // eslint-disable-next-line no-console + console.log(i) + await dbFetch>('/db/users/import', { + records, + }) + await new Promise(res => setTimeout(res, 20)) + } + }) + useEffect(() => { const delayDebounceFn = setTimeout(() => { usersQuery.refetch() @@ -85,17 +107,18 @@ export default function MainLayout() {
+