Skip to content

Commit

Permalink
import
Browse files Browse the repository at this point in the history
  • Loading branch information
sliterok committed Mar 9, 2024
1 parent 8e38f68 commit db02d19
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 38 deletions.
18 changes: 18 additions & 0 deletions src/db/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,21 @@ export function getQueryFromCondition(type: string, val: string) {
[op!]: val,
}
}
export function batchReduce<T>(arr: T[], batchSize: number): T[][] {
return arr.reduce((batches, curr, i) => {
if (i % batchSize === 0) batches.push([])
batches[batches.length - 1].push(arr[i])
return batches
}, [] as T[][])
}

export function mergeUint8Arrays(size: number, ...arrays: Uint8Array[]): Uint8Array {
const merged = new Uint8Array(size)

arrays.forEach((array, i, arrays) => {
const offset = arrays.slice(0, i).reduce((acc, e) => acc + e.length, 0)
merged.set(array, offset)
})

return merged
}
145 changes: 115 additions & 30 deletions src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,59 @@ import {
IBasicRecord,
IEncoder,
IQueryOptions,
IImportInput,
} from './types'
import { batchReduce, mergeUint8Arrays } from './helpers'

const readFile = async (dir: FileSystemDirectoryHandle, fileName: string, encoder?: IEncoder) => {
const readFile = async (dir: FileSystemDirectoryHandle, fileName: string, encoder?: IEncoder | false) => {
try {
const fileHandle = await dir.getFileHandle(fileName)
const file: Blob = await fileHandle.getFile()
const buffer = await file.arrayBuffer()
if (encoder) {
const tag = encoder.decode(new Uint8Array(buffer))
const uintArray = new Uint8Array(buffer)
if (encoder === false) {
return uintArray
} else if (encoder) {
const tag = encoder.decode(uintArray)
return encoder.decodeKeys(tag)
} else {
return decode(new Uint8Array(buffer))
return decode(uintArray)
}

// const file: Blob = await fileHandle.getFile()
// const stream = await file.stream()

// const reader = stream.getReader()
// let state = new Uint8Array()
// let readerState = await reader.read()
// while (!readerState.done) {
// state = mergeUint8Arrays(state.length + readerState.value.length, state, readerState.value)
// readerState = await reader.read()
// }
// if (encoder === false) {
// return state
// } else if (encoder) {
// const tag = encoder.decode(state)
// const decoded = encoder.decodeKeys(tag)
// return decoded
// } else {
// return decode(state)
// }
} catch (error) {
console.error(error)
return null
}
}

const writeFile = async (dir: FileSystemDirectoryHandle, fileName: string, data: Record<string, any>, encoder?: IEncoder) => {
const writeFile = async (dir: FileSystemDirectoryHandle, fileName: string, data: Record<string, any> | Uint8Array, encoder?: IEncoder | false) => {
const fileHandle = await dir.getFileHandle(fileName, { create: true })
const writeHandle = await fileHandle.createWritable()
const encoded = encoder ? encoder.encode(data) : encode(data)
let encoded: Uint8Array

if (encoder === false) encoded = data as Uint8Array
else if (encoder) encoded = encoder.encode(data)
else encoded = encode(data)

await writeHandle.write(encoded)
await writeHandle.close()
}
Expand All @@ -53,8 +84,10 @@ export class FileStoreStrategy<K, V> extends SerializeStrategy<K, V> {
}

id(): number {
const random = Math.ceil(Math.random() * 1000000)
return random
const buffer = new BigUint64Array(1)
const [random] = crypto.getRandomValues(buffer)
const id = Math.floor(Number(random / 2048n))
return id
}

read(id: number): Promise<BPTreeNode<K, V>> {
Expand Down Expand Up @@ -121,6 +154,7 @@ export class OPFSDB<T extends IBasicRecord> {
},
options?: IQueryOptions
): Promise<T[] | string[]> {
// const start = performance.now()
let indexes = new Set<string>()
for (const key in queries) {
const tree = this.trees[key]
Expand All @@ -140,38 +174,82 @@ export class OPFSDB<T extends IBasicRecord> {
}
}
const indexArray = Array.from(indexes)
// const indexesFinish = performance.now()
if (options?.keys) return indexArray
const records: T[] = Array(indexes.size)
for (let i = 0; i < indexes.size; i++) {
records[i] = await this.read(indexArray[i])
}
return records
}

async filterByKey(key: string, query: BPTreeCondition<string | number>): Promise<T[]> {
const tree = this.trees[key]
if (!tree) throw new Error('No such index found')

const indexes = Array.from(await tree.keys(query))
const records = Array(indexes.length)
for (let i = 0; i < indexes.length; i++) {
records[i] = await this.read(indexes[i])
}
const records = await this.readMany(indexArray)
// const responsesLoaded = performance.now()
// console.log('indexes:', indexesFinish - start, 'records:', responsesLoaded - indexesFinish)
return records
}

async getByKey(key: string, query: BPTreeCondition<string | number>): Promise<T | void> {
const tree = this.trees[key]
if (!tree) throw new Error('No such index found')

const [index] = await tree.keys(query)
return index ? await this.read(index) : undefined
// async filterByKey(key: string, query: BPTreeCondition<string | number>): Promise<T[]> {
// const tree = this.trees[key]
// if (!tree) throw new Error('No such index found')

// const indexes = Array.from(await tree.keys(query))
// const records = Array(indexes.length)
// for (let i = 0; i < indexes.length; i++) {
// records[i] = await this.read(indexes[i])
// }
// return records
// }

// async getByKey(key: string, query: BPTreeCondition<string | number>): Promise<T | void> {
// const tree = this.trees[key]
// if (!tree) throw new Error('No such index found')

// const [index] = await tree.keys(query)
// return index ? await this.read(index) : undefined
// }

async readMany(ids: string[]): Promise<T[]> {
const result = batchReduce(ids, 20).map(async ids => {
let size = 0
const records = await Promise.all(
ids.map(async id => {
const file: Uint8Array = await readFile(this.recordsRoot, id, false)
size += file.length
return file
})
)
const merged = mergeUint8Arrays(size, ...records)
const decoded = this.encoder.decodeMultiple(merged) as T[]
return decoded
})
const response = await Promise.all(result)
// const rawRecords = await Promise.all()
return response.flat()
}

async read(id: string): Promise<T> {
return readFile(this.recordsRoot, id, this.encoder)
}

async import(records: { id: string; value: T }[]) {
await Promise.all([
...records.map(record => writeFile(this.recordsRoot, record.id, record.value, this.encoder)),
...Object.keys(this.trees).map(async key => {
const tree = this.trees[key]
for (const record of records) {
const val = record.value[key]
if (val === undefined || val === null) continue
await tree.insert(record.id, val)
}
}),
// (async () => {
// for (const record of records) {
// for (const key in this.trees) {
// const val = record.value[key]
// if (val === undefined || val === null) continue
// const tree = this.trees[key]
// await tree.insert(record.id, val)
// }
// }
// })(),
])
}

async insert(id: string, value: T, fullRecord?: boolean) {
const oldRecord = await this.read(id)

Expand Down Expand Up @@ -237,6 +315,10 @@ export const insertCommand = async ({ tableName, record, fullRecord }: ICommandI
await tables[tableName].insert(record.id, record, fullRecord)
}

export const importCommand = async ({ tableName, records }: ICommandInput<IImportInput>): Promise<void> => {
await tables[tableName].import(records.map(value => ({ id: value.id, value })))
}

export const deleteCommand = async ({ tableName, id }: ICommandInput<IDeleteInput>): Promise<void> => {
await tables[tableName].delete(id)
}
Expand All @@ -262,6 +344,9 @@ export const command = async <T extends IBasicRecord>(command: ICommandInputs<T>
case 'insert':
await insertCommand(command as IInsertInput<T>)
break
case 'import':
await importCommand(command as IImportInput<T>)
break
case 'delete':
await deleteCommand(command as IDeleteInput)
break
Expand All @@ -275,7 +360,7 @@ export const command = async <T extends IBasicRecord>(command: ICommandInputs<T>
throw new Error('unknown command')
}

return new Response(JSON.stringify(response!), { status: 200 })
return new Response(JSON.stringify(response! || {}), { status: 200 })
} catch (error) {
console.error(command.name, error)
return new Response(null, { status: 500, statusText: (error as Error).message })
Expand Down
6 changes: 6 additions & 0 deletions src/db/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ export interface IInsertInput<T extends IBasicRecord = IBasicRecord> extends IIn
record: T
}

export interface IImportInput<T extends IBasicRecord = IBasicRecord> extends IBaseInput {
name: 'import'
records: T[]
}

export interface IQueryOptions {
isAnd?: boolean
limit?: number
Expand Down Expand Up @@ -51,6 +56,7 @@ export type ICommandInputs<T extends IBasicRecord | never = IBasicRecord> =
| IDeleteInput
| IReadInput
| IDropInput
| IImportInput

export type ICommandInput<T extends ICommandInputs> = Omit<T, 'name'>
export type IFetchCommandInput<T extends ICommandInputs> = Omit<T, 'tableName' | 'name'>
Expand Down
39 changes: 31 additions & 8 deletions src/routes/UserGrid.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ClientSuspense, useMutation, useQuery } from 'rakkasjs'
import { useEffect, useState } from 'react'
import { ICommandInput, IDropInput, IFetchCommandInput, IInsertInput, IQueryInput } from 'src/db/types'
import { ICommandInput, IDropInput, IFetchCommandInput, IImportInput, IInsertInput, IQueryInput } from 'src/db/types'
import { IUser } from 'src/types'
import Chance from 'chance'
import { AgGridReact, AgGridReactProps } from 'ag-grid-react'
Expand All @@ -20,6 +20,14 @@ const columnDefs: AgGridReactProps['columnDefs'] = [
{ headerName: 'address', field: 'address', filter: true },
]

const generateUser = () => ({
name: chance.name(),
surname: chance.name_suffix(),
id: crypto.randomUUID(),
itemsBought: chance.integer({ min: 0, max: 500 }),
address: chance.address(),
})

export default function MainLayout() {
const [limit, setLimit] = useState(50)
const [isAndQuery, setIsAndQuery] = useState(true)
Expand Down Expand Up @@ -67,6 +75,20 @@ export default function MainLayout() {
await dbFetch<IDropInput>('/db/users/drop', {})
})

const importUsers = useMutation(async () => {
for (let i = 0; i < 15; i++) {
const records = Array(50)
.fill(true)
.map(() => generateUser())
// eslint-disable-next-line no-console
console.log(i)
await dbFetch<IImportInput<IUser>>('/db/users/import', {
records,
})
await new Promise(res => setTimeout(res, 20))
}
})

useEffect(() => {
const delayDebounceFn = setTimeout(() => {
usersQuery.refetch()
Expand All @@ -85,17 +107,18 @@ export default function MainLayout() {
<div>
<button
onClick={() => {
createUser.mutate({
name: chance.name(),
surname: chance.name_suffix(),
id: crypto.randomUUID(),
itemsBought: chance.integer({ min: 0, max: 500 }),
address: chance.address(),
})
createUser.mutate(generateUser())
}}
>
add user
</button>
<button
onClick={() => {
importUsers.mutate()
}}
>
add 10k users
</button>
<button
onClick={() => {
dropTable.mutate()
Expand Down

0 comments on commit db02d19

Please sign in to comment.