diff --git a/front/lib/api/data_sources.ts b/front/lib/api/data_sources.ts index 2882cea2117a..787b42e3cc4f 100644 --- a/front/lib/api/data_sources.ts +++ b/front/lib/api/data_sources.ts @@ -458,6 +458,10 @@ export async function upsertTable({ return csvRowsRes; } + const detectedHeaders = csvRowsRes?.isOk() + ? csvRowsRes.value.detectedHeaders + : undefined; + const enqueueRes = await enqueueUpsertTable({ upsertTable: { workspaceId: auth.getNonNullableWorkspace().sId, @@ -471,6 +475,7 @@ export async function upsertTable({ csv: csv ?? null, truncate, useAppForHeaderDetection: useApp, + detectedHeaders, }, }); if (enqueueRes.isErr()) { @@ -621,6 +626,10 @@ export async function handleDataSourceTableCSVUpsert({ }); } + const detectedHeaders = csvRowsRes?.isOk() + ? csvRowsRes.value.detectedHeaders + : undefined; + const enqueueRes = await enqueueUpsertTable({ upsertTable: { workspaceId: owner.sId, @@ -634,6 +643,7 @@ export async function handleDataSourceTableCSVUpsert({ csv: csv ?? null, truncate, useAppForHeaderDetection, + detectedHeaders, }, }); if (enqueueRes.isErr()) { diff --git a/front/lib/api/tables.ts b/front/lib/api/tables.ts index 1d255d2dd92a..0f4c58b3b3a8 100644 --- a/front/lib/api/tables.ts +++ b/front/lib/api/tables.ts @@ -50,6 +50,8 @@ type NotFoundError = { message: string; }; +type DetectedHeadersType = { header: string[]; rowIndex: number }; + export type TableOperationError = | { type: "internal_server_error"; @@ -130,6 +132,7 @@ export async function upsertTableFromCsv({ csv, truncate, useAppForHeaderDetection, + detectedHeaders, }: { auth: Authenticator; dataSource: DataSourceResource; @@ -142,9 +145,15 @@ export async function upsertTableFromCsv({ csv: string | null; truncate: boolean; useAppForHeaderDetection: boolean; + detectedHeaders?: DetectedHeadersType; }): Promise> { const csvRowsRes = csv - ? await rowsFromCsv({ auth, csv, useAppForHeaderDetection }) + ? await rowsFromCsv({ + auth, + csv, + useAppForHeaderDetection, + detectedHeaders, + }) : null; const owner = auth.workspace(); @@ -188,7 +197,7 @@ export async function upsertTableFromCsv({ return new Err(errorDetails); } - csvRows = csvRowsRes.value; + csvRows = csvRowsRes.value.rows; } if ((csvRows?.length ?? 0) > 500_000) { @@ -321,11 +330,18 @@ export async function rowsFromCsv({ auth, csv, useAppForHeaderDetection, + detectedHeaders, }: { auth: Authenticator; csv: string; useAppForHeaderDetection: boolean; -}): Promise> { + detectedHeaders?: DetectedHeadersType; +}): Promise< + Result< + { detectedHeaders: DetectedHeadersType; rows: CoreAPIRow[] }, + CsvParsingError + > +> { const delimiter = await guessDelimiter(csv); if (!delimiter) { return new Err({ @@ -334,12 +350,9 @@ export async function rowsFromCsv({ }); } - const headerRes = await detectHeaders( - auth, - csv, - delimiter, - useAppForHeaderDetection - ); + const headerRes = detectedHeaders + ? new Ok(detectedHeaders) + : await detectHeaders(auth, csv, delimiter, useAppForHeaderDetection); if (headerRes.isErr()) { return headerRes; @@ -477,12 +490,12 @@ export async function rowsFromCsv({ rows.push({ row_id: rowId, value: record }); } - return new Ok(rows); + return new Ok({ detectedHeaders: { header, rowIndex }, rows }); } async function staticHeaderDetection( firstRow: string[] -): Promise> { +): Promise> { const firstRecordCells = firstRow.map( (h, i) => h.trim().toLocaleLowerCase() || `col_${i}` ); @@ -500,7 +513,7 @@ async function detectHeaders( csv: string, delimiter: string, useAppForHeaderDetection: boolean -): Promise> { +): Promise> { const headParser = parse(csv, { delimiter }); const records = []; for await (const anyRecord of headParser) { diff --git a/front/lib/upsert_queue.ts b/front/lib/upsert_queue.ts index a6bbb481469a..090280283d2e 100644 --- a/front/lib/upsert_queue.ts +++ b/front/lib/upsert_queue.ts @@ -39,6 +39,11 @@ export const EnqueueUpsertDocument = t.type({ upsertContext: t.union([UpsertContextSchema, t.null]), }); +const DetectedHeaders = t.type({ + header: t.array(t.string), + rowIndex: t.number, +}); + export const EnqueueUpsertTable = t.type({ workspaceId: t.string, dataSourceId: t.string, @@ -51,6 +56,7 @@ export const EnqueueUpsertTable = t.type({ csv: t.union([t.string, t.null]), truncate: t.boolean, useAppForHeaderDetection: t.union([t.boolean, t.undefined, t.null]), + detectedHeaders: t.union([DetectedHeaders, t.undefined]), }); type EnqueueUpsertDocumentType = t.TypeOf; diff --git a/front/temporal/upsert_tables/activities.ts b/front/temporal/upsert_tables/activities.ts index 7c3e09c520ff..876888141872 100644 --- a/front/temporal/upsert_tables/activities.ts +++ b/front/temporal/upsert_tables/activities.ts @@ -88,6 +88,7 @@ export async function upsertTableActivity( csv: upsertQueueItem.csv, truncate: upsertQueueItem.truncate, useAppForHeaderDetection: upsertQueueItem.useAppForHeaderDetection ?? false, + detectedHeaders: upsertQueueItem.detectedHeaders, }); if (tableRes.isErr()) {