Skip to content

Commit

Permalink
[front] Reuse app result in worker (#8753)
Browse files Browse the repository at this point in the history
* Reuse app result in worker

* type
  • Loading branch information
tdraier authored Nov 20, 2024
1 parent 1023501 commit 9a82337
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
10 changes: 10 additions & 0 deletions front/lib/api/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ export async function upsertTable({
return csvRowsRes;
}

const detectedHeaders = csvRowsRes?.isOk()
? csvRowsRes.value.detectedHeaders
: undefined;

const enqueueRes = await enqueueUpsertTable({
upsertTable: {
workspaceId: auth.getNonNullableWorkspace().sId,
Expand All @@ -471,6 +475,7 @@ export async function upsertTable({
csv: csv ?? null,
truncate,
useAppForHeaderDetection: useApp,
detectedHeaders,
},
});
if (enqueueRes.isErr()) {
Expand Down Expand Up @@ -621,6 +626,10 @@ export async function handleDataSourceTableCSVUpsert({
});
}

const detectedHeaders = csvRowsRes?.isOk()
? csvRowsRes.value.detectedHeaders
: undefined;

const enqueueRes = await enqueueUpsertTable({
upsertTable: {
workspaceId: owner.sId,
Expand All @@ -634,6 +643,7 @@ export async function handleDataSourceTableCSVUpsert({
csv: csv ?? null,
truncate,
useAppForHeaderDetection,
detectedHeaders,
},
});
if (enqueueRes.isErr()) {
Expand Down
37 changes: 25 additions & 12 deletions front/lib/api/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type NotFoundError = {
message: string;
};

type DetectedHeadersType = { header: string[]; rowIndex: number };

export type TableOperationError =
| {
type: "internal_server_error";
Expand Down Expand Up @@ -130,6 +132,7 @@ export async function upsertTableFromCsv({
csv,
truncate,
useAppForHeaderDetection,
detectedHeaders,
}: {
auth: Authenticator;
dataSource: DataSourceResource;
Expand All @@ -142,9 +145,15 @@ export async function upsertTableFromCsv({
csv: string | null;
truncate: boolean;
useAppForHeaderDetection: boolean;
detectedHeaders?: DetectedHeadersType;
}): Promise<Result<{ table: CoreAPITable }, TableOperationError>> {
const csvRowsRes = csv
? await rowsFromCsv({ auth, csv, useAppForHeaderDetection })
? await rowsFromCsv({
auth,
csv,
useAppForHeaderDetection,
detectedHeaders,
})
: null;

const owner = auth.workspace();
Expand Down Expand Up @@ -188,7 +197,7 @@ export async function upsertTableFromCsv({
return new Err(errorDetails);
}

csvRows = csvRowsRes.value;
csvRows = csvRowsRes.value.rows;
}

if ((csvRows?.length ?? 0) > 500_000) {
Expand Down Expand Up @@ -321,11 +330,18 @@ export async function rowsFromCsv({
auth,
csv,
useAppForHeaderDetection,
detectedHeaders,
}: {
auth: Authenticator;
csv: string;
useAppForHeaderDetection: boolean;
}): Promise<Result<CoreAPIRow[], CsvParsingError>> {
detectedHeaders?: DetectedHeadersType;
}): Promise<
Result<
{ detectedHeaders: DetectedHeadersType; rows: CoreAPIRow[] },
CsvParsingError
>
> {
const delimiter = await guessDelimiter(csv);
if (!delimiter) {
return new Err({
Expand All @@ -334,12 +350,9 @@ export async function rowsFromCsv({
});
}

const headerRes = await detectHeaders(
auth,
csv,
delimiter,
useAppForHeaderDetection
);
const headerRes = detectedHeaders
? new Ok(detectedHeaders)
: await detectHeaders(auth, csv, delimiter, useAppForHeaderDetection);

if (headerRes.isErr()) {
return headerRes;
Expand Down Expand Up @@ -477,12 +490,12 @@ export async function rowsFromCsv({
rows.push({ row_id: rowId, value: record });
}

return new Ok(rows);
return new Ok({ detectedHeaders: { header, rowIndex }, rows });
}

async function staticHeaderDetection(
firstRow: string[]
): Promise<Result<{ header: string[]; rowIndex: number }, CsvParsingError>> {
): Promise<Result<DetectedHeadersType, CsvParsingError>> {
const firstRecordCells = firstRow.map(
(h, i) => h.trim().toLocaleLowerCase() || `col_${i}`
);
Expand All @@ -500,7 +513,7 @@ async function detectHeaders(
csv: string,
delimiter: string,
useAppForHeaderDetection: boolean
): Promise<Result<{ header: string[]; rowIndex: number }, CsvParsingError>> {
): Promise<Result<DetectedHeadersType, CsvParsingError>> {
const headParser = parse(csv, { delimiter });
const records = [];
for await (const anyRecord of headParser) {
Expand Down
6 changes: 6 additions & 0 deletions front/lib/upsert_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ export const EnqueueUpsertDocument = t.type({
upsertContext: t.union([UpsertContextSchema, t.null]),
});

const DetectedHeaders = t.type({
header: t.array(t.string),
rowIndex: t.number,
});

export const EnqueueUpsertTable = t.type({
workspaceId: t.string,
dataSourceId: t.string,
Expand All @@ -51,6 +56,7 @@ export const EnqueueUpsertTable = t.type({
csv: t.union([t.string, t.null]),
truncate: t.boolean,
useAppForHeaderDetection: t.union([t.boolean, t.undefined, t.null]),
detectedHeaders: t.union([DetectedHeaders, t.undefined]),
});

type EnqueueUpsertDocumentType = t.TypeOf<typeof EnqueueUpsertDocument>;
Expand Down
1 change: 1 addition & 0 deletions front/temporal/upsert_tables/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export async function upsertTableActivity(
csv: upsertQueueItem.csv,
truncate: upsertQueueItem.truncate,
useAppForHeaderDetection: upsertQueueItem.useAppForHeaderDetection ?? false,
detectedHeaders: upsertQueueItem.detectedHeaders,
});

if (tableRes.isErr()) {
Expand Down

0 comments on commit 9a82337

Please sign in to comment.