Skip to content

Commit

Permalink
Extract: Manage Big docs
Browse files Browse the repository at this point in the history
  • Loading branch information
PopDaph committed Aug 31, 2023
1 parent da65a64 commit 14ad0a6
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 146 deletions.
4 changes: 2 additions & 2 deletions front/lib/actions/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ export const DustProdActionRegistry = createActionRegistry({
"extract-events": {
app: {
workspaceId: PRODUCTION_DUST_APPS_WORKSPACE_ID,
appId: "edf70ecf98",
appId: "d4f31b6a63",
appHash:
"6c32adac64c0876301614c17e7ef98091f7ab52afaacfc11549b3efe09a65ffc",
"73215c9d3fb6819c979d83bae86681313a41ea21760ffd9372d7f2a711387d18",
},
config: {
MODEL: {
Expand Down
113 changes: 113 additions & 0 deletions front/lib/extract_event_app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import {
cloneBaseConfig,
DustProdActionRegistry,
} from "@app/lib/actions/registry";
import { runAction } from "@app/lib/actions/server";
import { Authenticator } from "@app/lib/auth";
import logger from "@app/logger/logger";
import { EventSchemaType } from "@app/types/extract";

import { CoreAPI } from "./core_api";
import { ExtractEventAppResponseResults } from "./extract_events";
import { formatPropertiesForModel } from "./extract_events_properties";

const EXTRACT_MAX_NUMBER_TOKENS_TO_PROCESS = 6000;

/**
* Runs the Extract event app and returns just only the results in which extracted events are found
* @param auth
* @param inputs
*/
export async function _runExtractEventApp({
auth,
content,
marker,
schema,
}: {
auth: Authenticator;
content: string;
marker: string;
schema: EventSchemaType;
}): Promise<string> {
const inputs = [
{
content: content,
marker: marker,
properties_to_extract: formatPropertiesForModel(schema.properties),
},
];

const ACTION_NAME = "extract-events";
const config = cloneBaseConfig(DustProdActionRegistry[ACTION_NAME]?.config);
const response = await runAction(auth, ACTION_NAME, config, inputs);

if (response.isErr()) {
logger.error(
{ error: response.error },
`api_error: ${JSON.stringify(response.error)}`
);
return "";
}

const successResponse = response as ExtractEventAppResponseResults;
const successResponseValue = successResponse.value.results[0][0].value;

logger.info(
{ value: successResponseValue },
"[Extract Event] Extract event app ran successfully."
);

return successResponseValue;
}

/**
* Return the content to process by the Extract Event app.
* If the document is too big, we send only part of it to the Dust App.
* @param fullDocumentText
* @param marker
*/
export async function _getMaxTextContentToProcess({
fullDocumentText,
marker,
}: {
fullDocumentText: string;
marker: string;
}): Promise<string> {
const tokensInDocumentText = await CoreAPI.tokenize({
text: fullDocumentText,
modelId: "text-embedding-ada-002",
providerId: "openai",
});
if (tokensInDocumentText.isErr()) {
{
tokensInDocumentText.error;
}
logger.error(
"Could not get number of tokens for document, trying with full doc."
);
return fullDocumentText;
}

const numberOfTokens = tokensInDocumentText.value.tokens.length;
let documentTextToProcess: string;

if (numberOfTokens > EXTRACT_MAX_NUMBER_TOKENS_TO_PROCESS) {
// Document is too big, we need to send only part of it to the Dust App.
const markerIndex = fullDocumentText.indexOf(marker);
const markerLength = marker.length;
const start = Math.max(
0,
markerIndex - EXTRACT_MAX_NUMBER_TOKENS_TO_PROCESS / 2
);
const end = Math.min(
fullDocumentText.length,
markerIndex + markerLength + EXTRACT_MAX_NUMBER_TOKENS_TO_PROCESS / 2
);
documentTextToProcess = fullDocumentText.substring(start, end);
} else {
// Document is small enough, we send the whole text.
documentTextToProcess = fullDocumentText;
}

return documentTextToProcess;
}
21 changes: 0 additions & 21 deletions front/lib/extract_event_markers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,6 @@ export function getRawExtractEventMarkersFromText(text: string): string[] {
}
}

/**
* We can use [[idea]] or [[idea:2]] in a document to mark 2 events of the same type.
* This function will return a dict of markers with the same name.
* @param rawMarkers string[]
* @returns ExtractedMarkersType
* @example ["idea", "idea:2", "idea:3", "goals"] returns { "idea": ["idea", "idea:2", "idea:3"], "goals": ["goals"] }
*/
export function sanitizeRawExtractEventMarkers(
rawMarkers: string[]
): ExtractedMarkersType {
const markers: { [key: string]: string[] } = {};
rawMarkers.map((m) => {
const [key] = m.split(":");
if (!markers[key]) {
markers[key] = [];
}
markers[key].push(m);
});
return markers;
}

/**
* Get the markers from the doc and returns only the ones that are not already in the DB
* @param documentId
Expand Down
160 changes: 68 additions & 92 deletions front/lib/extract_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ import {
DocumentsPostProcessHookOnUpsertParams,
} from "@app/documents_post_process_hooks/hooks";
import { getDatasource } from "@app/documents_post_process_hooks/hooks/lib/data_source_helpers";
import {
cloneBaseConfig,
DustProdActionRegistry,
} from "@app/lib/actions/registry";
import { runAction } from "@app/lib/actions/server";
import { Authenticator } from "@app/lib/auth";
import {
_getMaxTextContentToProcess,
_runExtractEventApp,
} from "@app/lib/extract_event_app";
import {
getExtractEventMarkersToProcess,
getRawExtractEventMarkersFromText,
hasExtractEventMarker,
sanitizeRawExtractEventMarkers,
} from "@app/lib/extract_event_markers";
import { formatPropertiesForModel } from "@app/lib/extract_events_properties";
import { EventSchema, ExtractedEvent } from "@app/lib/models";
import { generateModelSId } from "@app/lib/utils";
import logger from "@app/logger/logger";
Expand Down Expand Up @@ -91,20 +88,18 @@ export async function processExtractEvents({
}

// Getting the markers from the doc and keeping only those not already in the DB
const rawMarkers = await getExtractEventMarkersToProcess({
const markersToProcess = await getExtractEventMarkersToProcess({
documentId,
dataSourceName: dataSourceName,
documentText,
});
const markers = sanitizeRawExtractEventMarkers(rawMarkers);

await Promise.all(
Object.keys(markers).map((marker) => {
return _processExtractEvent({
markersToProcess.map((marker) => {
return _processExtractEventsForMarker({
auth: auth,
dataSourceName: dataSourceName,
sanitizedMarker: marker,
markers: markers[marker],
marker: marker,
documentText: documentText,
documentId: documentId,
documentSourceUrl: documentSourceUrl || null,
Expand All @@ -114,31 +109,31 @@ export async function processExtractEvents({
}

/**
* Gets the schema for the marker, runs the Dust app to extract properties, and saves the event(s)
* 1/ Gets the schema for the marker,
* 2/ Checks that the document is not too big for the Dust app,
* 3/ Runs the Dust app to extract schema properties from the document,
* 4/ Saves the event(s) in the DB.
*/
async function _processExtractEvent(data: {
async function _processExtractEventsForMarker({
auth,
dataSourceName,
marker,
documentId,
documentSourceUrl,
documentText,
}: {
auth: Authenticator;
dataSourceName: string;
sanitizedMarker: string;
markers: string[];
marker: string;
documentText: string;
documentId: string;
documentSourceUrl: string | null;
}) {
const {
auth,
dataSourceName,
sanitizedMarker,
markers,
documentId,
documentSourceUrl,
documentText,
} = data;

// 1/ Get the schema for the marker
const schema: EventSchema | null = await EventSchema.findOne({
where: {
workspaceId: auth.workspace()?.id,
marker: sanitizedMarker,
marker: marker.split(":")[0],
status: "active",
},
});
Expand All @@ -147,84 +142,65 @@ async function _processExtractEvent(data: {
return;
}

const inputsForApp = [
{
document_text: documentText,
markers: markers,
schema_properties: formatPropertiesForModel(schema.properties),
},
];

const results = await _runExtractEventApp(auth, inputsForApp);
results.map(async (result: string) => {
const properties = JSON.parse(result);
if (!properties.marker) {
logger.error(
{ properties, marker: schema.marker, documentSourceUrl, documentId },
"Extract event app did not return a marker. Skipping."
);
return;
}
// 2/ Check that the document is not to big for the Dust App.
const contentToProcess = await _getMaxTextContentToProcess({
fullDocumentText: documentText,
marker: marker,
});

const event = await ExtractedEvent.create({
sId: generateModelSId(),
documentId: documentId,
properties: properties,
status: "pending",
eventSchemaId: schema.id,
dataSourceName: dataSourceName,
documentSourceUrl: documentSourceUrl || null,
marker: properties.marker,
});
// 3/ Run the Dust app to extract properties
const result = await _runExtractEventApp({
auth,
content: contentToProcess,
marker: marker,
schema: schema,
});

// Temp: we log on slack events that are extracted from the Dust workspace
if (schema.debug === true) {
await _logDebugEventOnSlack({ event, schema, documentSourceUrl });
}
if (result.length === 0) {
logger.info(
{ properties, marker: schema.marker, documentSourceUrl, documentId },
"[Extract Event] Event saved and logged."
{ marker: schema.marker, documentSourceUrl, documentId },
"[Extract Event] No event extracted."
);
});
}

type ExtractEventAppResponseResults = {
value: {
results: { value: string[] }[][];
};
};

/**
* Runs the Extract event app and returns just only the results in which extracted events are found
* @param auth
* @param inputs
*/
async function _runExtractEventApp(
auth: Authenticator,
inputs: { document_text: string; markers: string[]; schema_properties: any }[]
): Promise<string[]> {
const ACTION_NAME = "extract-events";
const config = cloneBaseConfig(DustProdActionRegistry[ACTION_NAME]?.config);
const response = await runAction(auth, ACTION_NAME, config, inputs);
return;
}

if (response.isErr()) {
// 4/ Save the event(s) in the DB
const properties = JSON.parse(result);
if (!properties.marker) {
logger.error(
{ error: response.error },
`api_error: ${JSON.stringify(response.error)}`
{ properties, marker: schema.marker, documentSourceUrl, documentId },
"Extract event app did not return a marker. Skipping."
);
return [];
return;
}

const successResponse = response as ExtractEventAppResponseResults;
const event = await ExtractedEvent.create({
sId: generateModelSId(),
documentId: documentId,
properties: properties,
status: "pending",
eventSchemaId: schema.id,
dataSourceName: dataSourceName,
documentSourceUrl: documentSourceUrl || null,
marker: properties.marker,
});

// 5/ Temp: we log on slack events that are extracted from the Dust workspace
if (schema.debug === true) {
await _logDebugEventOnSlack({ event, schema, documentSourceUrl });
}
logger.info(
{ response: successResponse.value },
"[Extract Event] Extract event app ran successfully."
{ properties, marker: schema.marker, documentSourceUrl, documentId },
"[Extract Event] Event saved and logged."
);

return successResponse.value.results[0][0].value;
}

export type ExtractEventAppResponseResults = {
value: {
results: { value: string }[][];
};
};

/**
* Logs the event on Dust's Slack if the schema is in debug mode
* Temporary, until we have a better way to extract events from the table.
Expand Down
4 changes: 2 additions & 2 deletions front/pages/w/[wId]/u/extract/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export default function AppExtractEvents({
<tr key={schema.marker} className="border-y">
<td className="whitespace-nowrap px-3 py-4 font-medium">
<Link
href={`/w/${owner.sId}/u/extract/templates/${schema.sId}`}
href={`/w/${owner.sId}/u/extract/templates/${schema.sId}/edit`}
className="block"
>
[[{schema.marker}]]
Expand All @@ -92,7 +92,7 @@ export default function AppExtractEvents({
{" "}
{/* Set the second column to take max width */}
<Link
href={`/w/${owner.sId}/u/extract/templates/${schema.sId}`}
href={`/w/${owner.sId}/u/extract/templates/${schema.sId}/edit`}
className="block"
>
{schema.description}
Expand Down
Loading

0 comments on commit 14ad0a6

Please sign in to comment.