diff --git a/.env.example b/.env.example index 2aa11780748..5f0285650a9 100644 --- a/.env.example +++ b/.env.example @@ -14,7 +14,7 @@ SOCKET_PORT='3001' # AI MODELS AI_EMBEDDING_MODELS='[{"model": "text-embeddings-inference:llmrails/ember-v1", "url": "http://localhost:3040/"}]' AI_GENERATION_MODELS='[{"model": "text-generation-inference:TheBloke/zephyr-7b-beta", "url": "http://localhost:3050/"}]' -AI_EMBEDDER_ENABLED='true' +AI_EMBEDDER_WORKERS='1' # APPLICATION # AMPLITUDE_WRITE_KEY='key_AMPLITUDE_WRITE_KEY' diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 709f0a2d818..4da61b03eac 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,7 +22,7 @@ jobs: id-token: "write" services: postgres: - image: pgvector/pgvector:pg15 + image: pgvector/pgvector:0.6.2-pg15 # This env variables must be the same in the file PARABOL_BUILD_ENV_PATH env: POSTGRES_PASSWORD: "temppassword" @@ -143,6 +143,6 @@ jobs: uses: ravsamhq/notify-slack-action@v2 with: status: ${{ job.status }} - notify_when: 'failure' + notify_when: "failure" env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_GH_ACTIONS_NOTIFICATIONS }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8378696b622..171e080964f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: id-token: "write" services: postgres: - image: postgres:15.4 + image: pgvector/pgvector:0.6.2-pg15 # This env variables must be the same in the file PARABOL_BUILD_ENV_PATH env: POSTGRES_PASSWORD: "temppassword" @@ -78,7 +78,6 @@ jobs: yarn db:migrate yarn pg:migrate up yarn pg:build - yarn pg:generate - name: Build for testing run: yarn build @@ -86,9 +85,6 @@ jobs: - name: Verify source is clean run: git diff --quiet HEAD || (echo "Changes in generated files detected"; git diff; exit 1) - - name: Check Code Quality - run: yarn codecheck - - name: Run Predeploy for Testing run: yarn predeploy @@ -100,6 +96,12 @@ jobs: wait-on: | http://localhost:3000/graphql + - name: Kysely Codegen + run: yarn pg:generate + + - name: Check Code Quality + run: yarn codecheck + - name: Run server tests run: yarn test:server -- --reporters=default --reporters=jest-junit env: @@ -139,6 +141,6 @@ jobs: uses: ravsamhq/notify-slack-action@v2 with: status: ${{ job.status }} - notify_when: 'failure' + notify_when: "failure" env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_GH_ACTIONS_NOTIFICATIONS }} diff --git a/docker/images/parabol-ubi/README.md b/docker/images/parabol-ubi/README.md index 920c6d48fd8..518269140bc 100644 --- a/docker/images/parabol-ubi/README.md +++ b/docker/images/parabol-ubi/README.md @@ -16,21 +16,21 @@ Recommended: ## Variables -| Name | Description | Possible values | Recommended value | -| -------------------- | ----------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------- | ------------------------------------------------------------------- | -| `postgresql_tag` | PostgreSQL version from the [Docker image](https://hub.docker.com/_/postgres) | `Any tag` | `15.4` | -| `rethinkdb_tag` | RethinkDB version from the [Docker image](https://hub.docker.com/_/rethinkdb) | `Any tag` | `2.4.2` | -| `redis_tag` | Redis version from the [Docker image](https://hub.docker.com/_/redis) | `Any tag` | `7.0-alpine` | -| `_BUILD_ENV_PATH` | File `.env` used by the application during the build process | `Relative path from the root level of the repository` | `docker/parabol-ubi/environments/basic-env` | -| `_NODE_VERSION` | Node version, used by Docker to use the Docker image node:\_NODE_VERSION as base image to build | `Same as in root package.json` | | -| `_DOCKERFILE` | Dockerfile used to build the image | `Relative path from the root level of the repository` | `./docker/parabol-ubi/dockerfiles/basic.dockerfile` | -| `_DOCKER_REPOSITORY` | The destination repository | `String` | `parabol` | -| `_DOCKER_TAG` | Tag for the produced image | `String` | | +| Name | Description | Possible values | Recommended value | +| -------------------- | ----------------------------------------------------------------------------------------------- | ----------------------------------------------------- | --------------------------------------------------- | +| `postgresql_tag` | PostgreSQL version from the [Docker image](https://hub.docker.com/r/pgvector/pgvector) | `Any tag` | `0.6.2-pg15` | +| `rethinkdb_tag` | RethinkDB version from the [Docker image](https://hub.docker.com/_/rethinkdb) | `Any tag` | `2.4.2` | +| `redis_tag` | Redis version from the [Docker image](https://hub.docker.com/_/redis) | `Any tag` | `7.0-alpine` | +| `_BUILD_ENV_PATH` | File `.env` used by the application during the build process | `Relative path from the root level of the repository` | `docker/parabol-ubi/environments/basic-env` | +| `_NODE_VERSION` | Node version, used by Docker to use the Docker image node:\_NODE_VERSION as base image to build | `Same as in root package.json` | | +| `_DOCKERFILE` | Dockerfile used to build the image | `Relative path from the root level of the repository` | `./docker/parabol-ubi/dockerfiles/basic.dockerfile` | +| `_DOCKER_REPOSITORY` | The destination repository | `String` | `parabol` | +| `_DOCKER_TAG` | Tag for the produced image | `String` | | Example of variables: ```commandLine -export postgresql_tag=15.4; \ +export postgresql_tag=0.6.2-pg15; \ export rethinkdb_tag=2.4.2; \ export redis_tag=7.0-alpine; \ export _BUILD_ENV_PATH=docker/parabol-ubi/environments/basic-env; \ @@ -61,7 +61,7 @@ cp $_BUILD_ENV_PATH ./.env > :warning: Stop all database containers you might have running before executing the following command. If other database containers are running, some ports might be already taken. ```commandLine -docker run --name temp-postgres -e POSTGRES_PASSWORD=temppassword -e POSTGRES_USER=tempuser -e POSTGRES_DB=tempdb -d -p 5432:5432 postgres:$postgresql_tag && \ +docker run --name temp-postgres -e POSTGRES_PASSWORD=temppassword -e POSTGRES_USER=tempuser -e POSTGRES_DB=tempdb -d -p 5432:5432 pgvector/pgvector:$postgresql_tag && \ docker run --name temp-rethinkdb -d -p 28015:28015 -p 29015:29015 -p 8080:8080 rethinkdb:$rethinkdb_tag && \ docker run --name temp-redis -d -p 6379:6379 redis:$redis_tag ``` diff --git a/docker/images/parabol-ubi/environments/pipeline b/docker/images/parabol-ubi/environments/pipeline index cfc707c746b..cd111fab88b 100644 --- a/docker/images/parabol-ubi/environments/pipeline +++ b/docker/images/parabol-ubi/environments/pipeline @@ -54,3 +54,7 @@ STRIPE_PUBLISHABLE_KEY='pk_test_MNoKbCzQX0lhktuxxI7M14wd' STRIPE_SECRET_KEY='' STRIPE_WEBHOOK_SECRET='' HUBSPOT_API_KEY='' +AI_EMBEDDING_MODELS='[{"model": "text-embeddings-inference:llmrails/ember-v1", "url": "http://localhost:3040/"}]' +AI_GENERATION_MODELS='[{"model": "text-generation-inference:TheBloke/zephyr-7b-beta", "url": "http://localhost:3050/"}]' +AI_EMBEDDER_WORKERS='1' +POSTGRES_USE_PGVECTOR='true' diff --git a/docker/stacks/development/docker-compose.yml b/docker/stacks/development/docker-compose.yml index 91adbbb6578..a4509d051b9 100644 --- a/docker/stacks/development/docker-compose.yml +++ b/docker/stacks/development/docker-compose.yml @@ -70,7 +70,7 @@ services: networks: parabol-network: text-embeddings-inference: - image: ghcr.io/huggingface/text-embeddings-inference:cpu-0.6 + image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.2 command: - "--model-id=llmrails/ember-v1" platform: linux/x86_64 diff --git a/docker/stacks/single-tenant-host/docker-compose.yaml b/docker/stacks/single-tenant-host/docker-compose.yml similarity index 98% rename from docker/stacks/single-tenant-host/docker-compose.yaml rename to docker/stacks/single-tenant-host/docker-compose.yml index e5f662ed74a..a336201bfe3 100644 --- a/docker/stacks/single-tenant-host/docker-compose.yaml +++ b/docker/stacks/single-tenant-host/docker-compose.yml @@ -17,7 +17,7 @@ services: postgres: container_name: postgres profiles: ["databases"] - image: postgres:15.4 + image: pgvector/pgvector:0.6.2-pg15 restart: always env_file: .env environment: diff --git a/package.json b/package.json index 82179da713f..7312d1f47d3 100644 --- a/package.json +++ b/package.json @@ -103,7 +103,7 @@ "html-webpack-plugin": "^5.5.0", "husky": "^7.0.4", "jscodeshift": "^0.14.0", - "kysely": "^0.27.2", + "kysely": "^0.27.3", "kysely-codegen": "^0.11.0", "lerna": "^6.4.1", "mini-css-extract-plugin": "^2.7.2", diff --git a/packages/client/shared/gqlIds/EmbedderChannelId.ts b/packages/client/shared/gqlIds/EmbedderChannelId.ts new file mode 100644 index 00000000000..f1fb49341ef --- /dev/null +++ b/packages/client/shared/gqlIds/EmbedderChannelId.ts @@ -0,0 +1,9 @@ +export const EmbedderChannelId = { + join: (serverId: string) => `embedder:${serverId}`, + split: (id: string) => { + const [, serverId] = id.split(':') + return serverId + } +} + +export default EmbedderChannelId diff --git a/packages/client/types/generics.ts b/packages/client/types/generics.ts index 663bbd7ac52..7a1d76dafb0 100644 --- a/packages/client/types/generics.ts +++ b/packages/client/types/generics.ts @@ -100,6 +100,9 @@ export type WithFieldsAsType = { : TObj[K] } +export type Tuple = R['length'] extends N ? R : Tuple +export type ParseInt = T extends `${infer Digit extends number}` ? Digit : never + declare global { interface Array { findLastIndex(predicate: (value: T, index: number, obj: T[]) => unknown, thisArg?: any): number diff --git a/packages/embedder/EMBEDDER_JOB_PRIORITY.ts b/packages/embedder/EMBEDDER_JOB_PRIORITY.ts new file mode 100644 index 00000000000..a54e4b5c67d --- /dev/null +++ b/packages/embedder/EMBEDDER_JOB_PRIORITY.ts @@ -0,0 +1,6 @@ +export const EMBEDDER_JOB_PRIORITY = { + MEETING: 40, + DEFAULT: 50, + TOPIC_HISTORY: 80, + NEW_MODEL: 90 +} as const diff --git a/packages/embedder/EmbeddingsJobQueueStream.ts b/packages/embedder/EmbeddingsJobQueueStream.ts new file mode 100644 index 00000000000..7f5b1bde03d --- /dev/null +++ b/packages/embedder/EmbeddingsJobQueueStream.ts @@ -0,0 +1,72 @@ +import {Selectable, sql} from 'kysely' +import ms from 'ms' +import sleep from 'parabol-client/utils/sleep' +import 'parabol-server/initSentry' +import getKysely from 'parabol-server/postgres/getKysely' +import {DB} from 'parabol-server/postgres/pg' +import RootDataLoader from '../server/dataloader/RootDataLoader' +import {processJob} from './processJob' +import {Logger} from '../server/utils/Logger' + +export type DBJob = Selectable +export type EmbedJob = DBJob & { + jobType: 'embed' + jobData: { + embeddingsMetadataId: number + model: string + } +} +export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}} +export type Job = EmbedJob | RerankJob + +export class EmbeddingsJobQueueStream implements AsyncIterableIterator { + [Symbol.asyncIterator]() { + return this + } + dataLoader = new RootDataLoader({maxBatchSize: 1000}) + async next(): Promise> { + const pg = getKysely() + const getJob = (isFailed: boolean) => { + return pg + .with( + (cte) => cte('ids').materialized(), + (db) => + db + .selectFrom('EmbeddingsJobQueue') + .select('id') + .orderBy(['priority']) + .$if(!isFailed, (db) => db.where('state', '=', 'queued')) + .$if(isFailed, (db) => + db.where('state', '=', 'failed').where('retryAfter', '<', new Date()) + ) + .limit(1) + .forUpdate() + .skipLocked() + ) + .updateTable('EmbeddingsJobQueue') + .set({state: 'running', startAt: new Date()}) + .where('id', '=', sql`ANY(SELECT id FROM ids)`) + .returningAll() + .executeTakeFirst() + } + const job = (await getJob(false)) || (await getJob(true)) + if (!job) { + Logger.log('JobQueueStream: no jobs found') + // queue is empty, so sleep for a while + await sleep(ms('1m')) + return this.next() + } + + const isSuccessful = await processJob(job as Job, this.dataLoader) + if (isSuccessful) { + await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow() + } + return {done: false, value: job as Job} + } + return() { + return Promise.resolve({done: true as const, value: undefined}) + } + throw(error: any) { + return Promise.resolve({done: true, value: error}) + } +} diff --git a/packages/embedder/README.md b/packages/embedder/README.md index fc3fc68f335..36bb8e2ea50 100644 --- a/packages/embedder/README.md +++ b/packages/embedder/README.md @@ -3,27 +3,14 @@ This service builds embedding vectors for semantic search and for other AI/ML use cases. It does so by: -1. Updating a list of all possible items to create embedding vectors for and - storing that list in the `EmbeddingsMetadata` table -2. Adding these items in batches to the `EmbeddingsJobQueue` table and a redis - priority queue called `embedder:queue` -3. Allowing one or more parallel embedding services to calculate embedding - vectors (EmbeddingJobQueue states transistion from `queued` -> `embedding`, - then `embedding` -> [deleting the `EmbeddingJobQueue` row] - - In addition to deleteing the `EmbeddingJobQueue` row, when a job completes - successfully: - - - A row is added to the model table with the embedding vector; the - `EmbeddingMetadataId` field on this row points the appropriate - metadata row on `EmbeddingsMetadata` - - The `EmbeddingsMetadata.models` array is updated with the name of the - table that the embedding has been generated for - -4. This process repeats forever using a silly polling loop - -In the future, it would be wonderful to enhance this service such that it were -event driven. +1. Homogenizes different types of data into a single `EmbeddingsMetadata` table +2. Each new row in `EmbeddingsMetadata` creates a new row in `EmbeddingsJobQueue` for each model +3. Uses PG to pick a job from the queue and sets the job from `queued` -> `embedding`, + then `embedding` -> [deleting the `EmbeddingJobQueue` row] +4. Embedding involves creating a `fullText` from the work item and then a vector from that `fullText` +5. New jobs to add metadata are sent via redis streams from the GQL Executor +6. If embedding fails, the application increments the `retryCount` and increases the `retryAfter` if a retry is desired +7. If a job gets stalled, a process that runs every 5 minutes will look for jobs older than 5 minutes and reset them to `queued` ## Prerequisites @@ -37,10 +24,9 @@ The predeploy script checks for an environment variable The Embedder service takes no arguments and is controlled by the following environment variables, here given with example configuration: -- `AI_EMBEDDER_ENABLE`: enable/disable the embedder service from - performing work, or sleeping indefinitely +- `AI_EMBEDDER_WORKERS`: How many workers should simultaneously pick jobs from the queue. If less than 1, disabled. -`AI_EMBEDDER_ENABLED='true'` +`AI_EMBEDDER_WORKERS='1'` - `AI_EMBEDDING_MODELS`: JSON configuration for which embedding models are enabled. Each model in the array will be instantiated by @@ -69,3 +55,10 @@ environment variables, here given with example configuration: The Embedder service is stateless and takes no arguments. Multiple instances of the service may be started in order to match embedding load, or to catch up on history more quickly. + +## Resources + +### PG as a Job Queue + +- https://leontrolski.github.io/postgres-as-queue.html +- https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/ diff --git a/packages/embedder/addEmbeddingsMetadata.ts b/packages/embedder/addEmbeddingsMetadata.ts new file mode 100644 index 00000000000..214fecc0409 --- /dev/null +++ b/packages/embedder/addEmbeddingsMetadata.ts @@ -0,0 +1,15 @@ +import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic' +import {MessageToEmbedder} from './custom' + +export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => { + return Promise.all( + objectTypes.map((type) => { + switch (type) { + case 'retrospectiveDiscussionTopic': + return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(options) + default: + throw new Error(`Invalid object type: ${type}`) + } + }) + ) +} diff --git a/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts b/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts new file mode 100644 index 00000000000..724aec6fb13 --- /dev/null +++ b/packages/embedder/addEmbeddingsMetadataForRetrospectiveDiscussionTopic.ts @@ -0,0 +1,143 @@ +import {ExpressionOrFactory, SqlBool, sql} from 'kysely' +import getRethink from 'parabol-server/database/rethinkDriver' +import {RDatum} from 'parabol-server/database/stricterR' +import getKysely from 'parabol-server/postgres/getKysely' +import {DB} from 'parabol-server/postgres/pg' +import {Logger} from 'parabol-server/utils/Logger' +import {EMBEDDER_JOB_PRIORITY} from './EMBEDDER_JOB_PRIORITY' +import getModelManager from './ai_models/ModelManager' +import {EmbedderOptions} from './custom' + +interface DiscussionMeta { + id: string + teamId: string + createdAt: Date +} + +const validateDiscussions = async (discussions: (DiscussionMeta & {meetingId: string})[]) => { + const r = await getRethink() + if (discussions.length === 0) return discussions + // Exclude discussions that belong to an unfinished meeting + const meetingIds = [...new Set(discussions.map(({meetingId}) => meetingId))] + const endedMeetingIds = await r + .table('NewMeeting') + .getAll(r.args(meetingIds), {index: 'id'}) + .filter((row: RDatum) => row('endedAt').default(null).ne(null))('id') + .distinct() + .run() + const endedMeetingIdsSet = new Set(endedMeetingIds) + return discussions.filter(({meetingId}) => endedMeetingIdsSet.has(meetingId)) +} + +const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[], priority: number) => { + const pg = getKysely() + const metadataRows = discussions.map(({id, teamId, createdAt}) => ({ + refId: id, + objectType: 'retrospectiveDiscussionTopic' as const, + teamId, + // Not techincally updatedAt since discussions are be updated after they get created + refUpdatedAt: createdAt + })) + if (!metadataRows[0]) return + + const modelManager = getModelManager() + const models = modelManager.embeddingModels.map((m) => m.tableName) + return ( + pg + .with('Insert', (qc) => + qc + .insertInto('EmbeddingsMetadata') + .values(metadataRows) + .onConflict((oc) => oc.doNothing()) + .returning('id') + ) + // create n*m rows for n models & m discussions + .with('Metadata', (qc) => + qc + .selectFrom('Insert') + .fullJoin(sql<{model: string}>`UNNEST(ARRAY[${sql.join(models)}])`.as('model'), (join) => + join.onTrue() + ) + .select(['id', 'model']) + ) + .insertInto('EmbeddingsJobQueue') + .columns(['jobType', 'priority', 'jobData']) + .expression(({selectFrom}) => + selectFrom('Metadata').select(({lit, fn, ref}) => [ + sql.lit('embed').as('jobType'), + lit(priority).as('priority'), + fn('json_build_object', [ + sql.lit('embeddingsMetadataId'), + ref('Metadata.id'), + sql.lit('model'), + ref('Metadata.model') + ]).as('jobData') + ]) + ) + .execute() + ) +} + +export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ({ + startAt, + endAt, + meetingId +}: EmbedderOptions) => { + // load up the metadata table will all discussion topics that are a part of meetings ended within the given date range + const pg = getKysely() + if (meetingId) { + const discussions = await pg + .selectFrom('Discussion') + .select(['id', 'teamId', 'createdAt']) + .where('meetingId', '=', meetingId) + .execute() + await insertDiscussionsIntoMetadata(discussions, EMBEDDER_JOB_PRIORITY.MEETING) + return + } + // PG only accepts 65K parameters (inserted columns * number of rows + query params). Make the batches as big as possible + const PG_MAX_PARAMS = 65535 + const QUERY_PARAMS = 10 + const METADATA_COLS_PER_ROW = 4 + const BATCH_SIZE = Math.floor((PG_MAX_PARAMS - QUERY_PARAMS) / METADATA_COLS_PER_ROW) + const pgStartAt = startAt || new Date(0) + const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000 + + let curEndAt = pgEndAt + let curEndId = '' + for (let i = 0; i < 1e6; i++) { + // preserve microsecond resolution to keep timestamps equal + // so we can use the ID as a tiebreaker when count(createdAt) > BATCH_SIZE + const pgTime = sql`to_timestamp(${curEndAt})` + const lessThanTimeOrId: ExpressionOrFactory = curEndId + ? ({eb}) => + eb('createdAt', '<', pgTime).or(eb('createdAt', '=', pgTime).and('id', '>', curEndId)) + : ({eb}) => eb('createdAt', '<=', pgTime) + const discussions = await pg + .selectFrom('Discussion') + .select([ + 'id', + 'teamId', + 'createdAt', + 'meetingId', + sql`extract(epoch from "createdAt")`.as('createdAtEpoch') + ]) + .where('createdAt', '>', pgStartAt) + .where(lessThanTimeOrId) + .where('discussionTopicType', '=', 'reflectionGroup') + .orderBy('createdAt', 'desc') + .orderBy('id') + .limit(BATCH_SIZE) + .execute() + const earliestDiscussionInBatch = discussions.at(-1) + if (!earliestDiscussionInBatch) break + const {createdAtEpoch, id} = earliestDiscussionInBatch + curEndId = curEndAt === createdAtEpoch ? id : '' + curEndAt = createdAtEpoch + const validDiscussions = await validateDiscussions(discussions) + await insertDiscussionsIntoMetadata(validDiscussions, EMBEDDER_JOB_PRIORITY.TOPIC_HISTORY) + const jsTime = new Date(createdAtEpoch * 1000) + Logger.log( + `Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}` + ) + } +} diff --git a/packages/embedder/ai_models/AbstractEmbeddingsModel.ts b/packages/embedder/ai_models/AbstractEmbeddingsModel.ts new file mode 100644 index 00000000000..9fd5831ea1f --- /dev/null +++ b/packages/embedder/ai_models/AbstractEmbeddingsModel.ts @@ -0,0 +1,126 @@ +import {sql} from 'kysely' +import getKysely from 'parabol-server/postgres/getKysely' +import {DB} from 'parabol-server/postgres/pg' +import {Logger} from '../../server/utils/Logger' +import {EMBEDDER_JOB_PRIORITY} from '../EMBEDDER_JOB_PRIORITY' +import {ISO6391} from '../iso6393To1' +import {AbstractModel, ModelConfig} from './AbstractModel' + +export interface EmbeddingModelParams { + embeddingDimensions: number + maxInputTokens: number + tableSuffix: string + languages: ISO6391[] +} +export type EmbeddingsTable = Extract +export interface EmbeddingModelConfig extends ModelConfig { + tableSuffix: string +} + +export abstract class AbstractEmbeddingsModel extends AbstractModel { + readonly embeddingDimensions: number + readonly maxInputTokens: number + readonly tableName: string + readonly languages: ISO6391[] + constructor(config: EmbeddingModelConfig) { + super(config) + const modelParams = this.constructModelParams(config) + this.embeddingDimensions = modelParams.embeddingDimensions + this.languages = modelParams.languages + this.maxInputTokens = modelParams.maxInputTokens + this.tableName = `Embeddings_${modelParams.tableSuffix}` + } + protected abstract constructModelParams(config: EmbeddingModelConfig): EmbeddingModelParams + abstract getEmbedding(content: string, retries?: number): Promise + + abstract getTokens(content: string): Promise + splitText(content: string) { + // it's actually 4 / 3, but don't want to chance a failed split + const TOKENS_PER_WORD = 5 / 3 + const WORD_LIMIT = Math.floor(this.maxInputTokens / TOKENS_PER_WORD) + const chunks: string[] = [] + const delimiters = ['\n\n', '\n', '.', ' '] + const countWords = (text: string) => text.trim().split(/\s+/).length + const splitOnDelimiter = (text: string, delimiter: string) => { + const sections = text.split(delimiter) + for (let i = 0; i < sections.length; i++) { + const section = sections[i]! + const sectionWordCount = countWords(section) + if (sectionWordCount < WORD_LIMIT) { + // try to merge this section with the last one + const previousSection = chunks.at(-1) + if (previousSection) { + const combinedChunks = `${previousSection}${delimiter}${section}` + const mergedWordCount = countWords(combinedChunks) + if (mergedWordCount < WORD_LIMIT) { + chunks[chunks.length - 1] = combinedChunks + continue + } + } + chunks.push(section) + } else { + const nextDelimiter = delimiters[delimiters.indexOf(delimiter) + 1]! + splitOnDelimiter(section, nextDelimiter) + } + } + } + splitOnDelimiter(content.trim(), delimiters[0]!) + return chunks + } + + async createEmbeddingsForModel() { + Logger.log(`Queueing EmbeddingsMetadata into EmbeddingsJobQueue for ${this.tableName}`) + const pg = getKysely() + await pg + .insertInto('EmbeddingsJobQueue') + .columns(['jobData', 'priority']) + .expression(({selectFrom}) => + selectFrom('EmbeddingsMetadata') + .select(({fn, lit}) => [ + fn('json_build_object', [ + sql.lit('model'), + sql.lit(this.tableName), + sql.lit('embeddingsMetadataId'), + 'id' + ]).as('jobData'), + lit(EMBEDDER_JOB_PRIORITY.NEW_MODEL).as('priority') + ]) + .where('language', 'in', this.languages) + ) + .onConflict((oc) => oc.doNothing()) + .execute() + } + async createTable() { + const pg = getKysely() + const hasTable = + ( + await sql`SELECT 1 FROM ${sql.id('pg_catalog', 'pg_tables')} WHERE ${sql.id( + 'tablename' + )} = ${this.tableName}`.execute(pg) + ).rows.length > 0 + if (hasTable) return undefined + const vectorDimensions = this.embeddingDimensions + Logger.log(`ModelManager: creating ${this.tableName} with ${vectorDimensions} dimensions`) + await sql` + DO $$ + BEGIN + CREATE TABLE IF NOT EXISTS ${sql.id(this.tableName)} ( + "id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + "embedText" TEXT, + "embedding" vector(${sql.raw(vectorDimensions.toString())}), + "embeddingsMetadataId" INTEGER UNIQUE NOT NULL, + "chunkNumber" SMALLINT, + UNIQUE("embeddingsMetadataId", "chunkNumber"), + FOREIGN KEY ("embeddingsMetadataId") + REFERENCES "EmbeddingsMetadata"("id") + ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS "idx_${sql.raw(this.tableName)}_embedding_vector_cosign_ops" + ON ${sql.id(this.tableName)} + USING hnsw ("embedding" vector_cosine_ops); + END + $$; + `.execute(pg) + await this.createEmbeddingsForModel() + } +} diff --git a/packages/embedder/ai_models/AbstractGenerationModel.ts b/packages/embedder/ai_models/AbstractGenerationModel.ts new file mode 100644 index 00000000000..c57d86c243a --- /dev/null +++ b/packages/embedder/ai_models/AbstractGenerationModel.ts @@ -0,0 +1,26 @@ +import {AbstractModel, ModelConfig} from './AbstractModel' + +export interface GenerationOptions { + maxNewTokens?: number + seed?: number + stop?: string + temperature?: number + topK?: number + topP?: number +} +export interface GenerationModelParams { + maxInputTokens: number +} +export interface GenerationModelConfig extends ModelConfig {} + +export abstract class AbstractGenerationModel extends AbstractModel { + readonly maxInputTokens: number + constructor(config: GenerationModelConfig) { + super(config) + const modelParams = this.constructModelParams(config) + this.maxInputTokens = modelParams.maxInputTokens + } + + protected abstract constructModelParams(config: GenerationModelConfig): GenerationModelParams + abstract summarize(content: string, options: GenerationOptions): Promise +} diff --git a/packages/embedder/ai_models/AbstractModel.ts b/packages/embedder/ai_models/AbstractModel.ts index b57d220cd35..3b114558539 100644 --- a/packages/embedder/ai_models/AbstractModel.ts +++ b/packages/embedder/ai_models/AbstractModel.ts @@ -3,71 +3,18 @@ export interface ModelConfig { url: string } -export interface EmbeddingModelConfig extends ModelConfig { - tableSuffix: string -} - -export interface GenerationModelConfig extends ModelConfig {} - export abstract class AbstractModel { - public readonly url?: string + public readonly url: string constructor(config: ModelConfig) { this.url = this.normalizeUrl(config.url) } // removes a trailing slash from the inputUrl - private normalizeUrl(inputUrl: string | undefined) { - if (!inputUrl) return undefined + private normalizeUrl(inputUrl: string) { const regex = /[/]+$/ return inputUrl.replace(regex, '') } } -export interface EmbeddingModelParams { - embeddingDimensions: number - maxInputTokens: number - tableSuffix: string -} - -export abstract class AbstractEmbeddingsModel extends AbstractModel { - readonly embeddingDimensions: number - readonly maxInputTokens: number - readonly tableName: string - constructor(config: EmbeddingModelConfig) { - super(config) - const modelParams = this.constructModelParams(config) - this.embeddingDimensions = modelParams.embeddingDimensions - this.maxInputTokens = modelParams.maxInputTokens - this.tableName = `Embeddings_${modelParams.tableSuffix}` - } - protected abstract constructModelParams(config: EmbeddingModelConfig): EmbeddingModelParams - abstract getEmbedding(content: string): Promise -} - -export interface GenerationModelParams { - maxInputTokens: number -} - -export interface GenerationOptions { - maxNewTokens?: number - seed?: number - stop?: string - temperature?: number - topK?: number - topP?: number -} - -export abstract class AbstractGenerationModel extends AbstractModel { - readonly maxInputTokens: number - constructor(config: GenerationModelConfig) { - super(config) - const modelParams = this.constructModelParams(config) - this.maxInputTokens = modelParams.maxInputTokens - } - - protected abstract constructModelParams(config: GenerationModelConfig): GenerationModelParams - abstract summarize(content: string, options: GenerationOptions): Promise -} - export default AbstractModel diff --git a/packages/embedder/ai_models/ModelManager.ts b/packages/embedder/ai_models/ModelManager.ts index bf6888378c8..55bf3feeadc 100644 --- a/packages/embedder/ai_models/ModelManager.ts +++ b/packages/embedder/ai_models/ModelManager.ts @@ -1,12 +1,6 @@ -import {Kysely, sql} from 'kysely' - -import { - AbstractEmbeddingsModel, - AbstractGenerationModel, - EmbeddingModelConfig, - GenerationModelConfig, - ModelConfig -} from './AbstractModel' +import {AbstractEmbeddingsModel, EmbeddingModelConfig} from './AbstractEmbeddingsModel' +import {AbstractGenerationModel, GenerationModelConfig} from './AbstractGenerationModel' +import {ModelConfig} from './AbstractModel' import OpenAIGeneration from './OpenAIGeneration' import TextEmbeddingsInference from './TextEmbeddingsInference' import TextGenerationInference from './TextGenerationInference' @@ -16,8 +10,8 @@ interface ModelManagerConfig { generationModels: GenerationModelConfig[] } -export type EmbeddingsModelType = 'text-embeddings-inference' -export type GenerationModelType = 'openai' | 'text-generation-inference' +type EmbeddingsModelType = 'text-embeddings-inference' +type GenerationModelType = 'openai' | 'text-generation-inference' export class ModelManager { embeddingModels: AbstractEmbeddingsModel[] @@ -93,39 +87,8 @@ export class ModelManager { }) } - async maybeCreateTables(pg: Kysely) { - const maybePromises = this.embeddingModels.map(async (embeddingsModel) => { - const tableName = embeddingsModel.tableName - const hasTable = - ( - await sql`SELECT 1 FROM ${sql.id('pg_catalog', 'pg_tables')} WHERE ${sql.id( - 'tablename' - )} = ${tableName}`.execute(pg) - ).rows.length > 0 - if (hasTable) return undefined - const vectorDimensions = embeddingsModel.embeddingDimensions - console.log(`ModelManager: creating ${tableName} with ${vectorDimensions} dimensions`) - const query = sql` - DO $$ - BEGIN - CREATE TABLE IF NOT EXISTS ${sql.id(tableName)} ( - "id" INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - "embedText" TEXT, - "embedding" vector(${sql.raw(vectorDimensions.toString())}), - "embeddingsMetadataId" INTEGER NOT NULL, - FOREIGN KEY ("embeddingsMetadataId") - REFERENCES "EmbeddingsMetadata"("id") - ON DELETE CASCADE - ); - CREATE INDEX IF NOT EXISTS "idx_${sql.raw(tableName)}_embedding_vector_cosign_ops" - ON ${sql.id(tableName)} - USING hnsw ("embedding" vector_cosine_ops); - END $$; - - ` - return query.execute(pg) - }) - Promise.all(maybePromises) + async maybeCreateTables() { + return Promise.all(this.embeddingModels.map((model) => model.createTable())) } } diff --git a/packages/embedder/ai_models/OpenAIGeneration.ts b/packages/embedder/ai_models/OpenAIGeneration.ts index 9818012edb4..697160513ae 100644 --- a/packages/embedder/ai_models/OpenAIGeneration.ts +++ b/packages/embedder/ai_models/OpenAIGeneration.ts @@ -4,7 +4,7 @@ import { GenerationModelConfig, GenerationModelParams, GenerationOptions -} from './AbstractModel' +} from './AbstractGenerationModel' export type ModelId = 'gpt-3.5-turbo-0125' | 'gpt-4-turbo-preview' diff --git a/packages/embedder/ai_models/TextEmbeddingsInference.ts b/packages/embedder/ai_models/TextEmbeddingsInference.ts index 549fadcd6fd..c30c59e8e8d 100644 --- a/packages/embedder/ai_models/TextEmbeddingsInference.ts +++ b/packages/embedder/ai_models/TextEmbeddingsInference.ts @@ -1,20 +1,25 @@ -import {AbstractEmbeddingsModel, EmbeddingModelConfig, EmbeddingModelParams} from './AbstractModel' -import fetchWithRetry from './helpers/fetchWithRetry' - -const MAX_REQUEST_TIME_S = 3 * 60 - +import createClient from 'openapi-fetch' +import sleep from 'parabol-client/utils/sleep' +import type {paths} from '../textEmbeddingsnterface' +import { + AbstractEmbeddingsModel, + EmbeddingModelConfig, + EmbeddingModelParams +} from './AbstractEmbeddingsModel' export type ModelId = 'BAAI/bge-large-en-v1.5' | 'llmrails/ember-v1' const modelIdDefinitions: Record = { 'BAAI/bge-large-en-v1.5': { embeddingDimensions: 1024, maxInputTokens: 512, - tableSuffix: 'bge_l_en_1p5' + tableSuffix: 'bge_l_en_1p5', + languages: ['en'] }, 'llmrails/ember-v1': { embeddingDimensions: 1024, maxInputTokens: 512, - tableSuffix: 'ember_1' + tableSuffix: 'ember_1', + languages: ['en'] } } @@ -23,34 +28,60 @@ function isValidModelId(object: any): object is ModelId { } export class TextEmbeddingsInference extends AbstractEmbeddingsModel { + client: ReturnType> constructor(config: EmbeddingModelConfig) { super(config) + this.client = createClient({baseUrl: this.url}) } - public async getEmbedding(content: string) { - const fetchOptions = { - body: JSON.stringify({inputs: content}), - deadline: new Date(new Date().getTime() + MAX_REQUEST_TIME_S * 1000), - headers: { - Accept: 'application/json', - 'Content-Type': 'application/json; charset=utf-8' - }, - method: 'POST' + async getTokens(content: string) { + try { + const {data, error} = await this.client.POST('/tokenize', { + body: {inputs: content, add_special_tokens: true}, + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json; charset=utf-8' + } + }) + if (error) return new Error(error.error) + return data[0]!.map(({id}) => id) + } catch (e) { + return e instanceof Error ? e : new Error(e as string) } + } + async decodeTokens(inputIds: number[]) { + try { + const {data, error} = await this.client.POST('/decode', { + body: {ids: inputIds}, + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json; charset=utf-8' + } + }) + if (error) return new Error(error.error) + return data + } catch (e) { + return e instanceof Error ? e : new Error(e as string) + } + } + public async getEmbedding(content: string, retries = 5): Promise { try { - const res = await fetchWithRetry(`${this.url}/embed`, fetchOptions) - const listOfVectors = (await res.json()) as Array - if (!listOfVectors) - throw new Error('TextEmbeddingsInference.getEmbeddings(): listOfVectors is undefined') - if (listOfVectors.length !== 1 || !listOfVectors[0]) - throw new Error( - `TextEmbeddingsInference.getEmbeddings(): listOfVectors list length !== 1 (length: ${listOfVectors.length})` - ) - return listOfVectors[0] + const {data, error, response} = await this.client.POST('/embed', { + body: {inputs: content}, + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json; charset=utf-8' + } + }) + if (error) { + if (response.status !== 429 || retries < 1) return new Error(error.error) + await sleep(2000) + return this.getEmbedding(content, retries - 1) + } + return data[0]! } catch (e) { - console.log(`TextEmbeddingsInference.getEmbeddings() timeout: `, e) - throw e + return e instanceof Error ? e : new Error(e as string) } } diff --git a/packages/embedder/ai_models/TextGenerationInference.ts b/packages/embedder/ai_models/TextGenerationInference.ts index bcf1daa6303..8fa4ab7cd7b 100644 --- a/packages/embedder/ai_models/TextGenerationInference.ts +++ b/packages/embedder/ai_models/TextGenerationInference.ts @@ -3,7 +3,7 @@ import { GenerationModelConfig, GenerationModelParams, GenerationOptions -} from './AbstractModel' +} from './AbstractGenerationModel' import fetchWithRetry from './helpers/fetchWithRetry' const MAX_REQUEST_TIME_S = 3 * 60 @@ -51,7 +51,6 @@ export class TextGenerationInference extends AbstractGenerationModel { } try { - // console.log(`TextGenerationInference.summarize(): summarizing from ${this.url}/generate`) const res = await fetchWithRetry(`${this.url}/generate`, fetchOptions) const json = await res.json() if (!json || !json.generated_text) diff --git a/packages/embedder/custom.d.ts b/packages/embedder/custom.d.ts new file mode 100644 index 00000000000..6640974c6a9 --- /dev/null +++ b/packages/embedder/custom.d.ts @@ -0,0 +1,11 @@ +import type {DB} from '../server/postgres/pg' + +export type EmbeddingObjectType = DB['EmbeddingsMetadata']['objectType'] + +export interface MessageToEmbedder { + objectTypes: EmbeddingObjectType[] + startAt?: Date + endAt?: Date + meetingId?: string +} +export type EmbedderOptions = Omit diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index dccd9492a02..628d0d8decd 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -1,42 +1,18 @@ import tracer from 'dd-trace' -import {Insertable} from 'kysely' -import Redlock, {RedlockAbortSignal} from 'redlock' - +import EmbedderChannelId from 'parabol-client/shared/gqlIds/EmbedderChannelId' import 'parabol-server/initSentry' -import getKysely from 'parabol-server/postgres/getKysely' -import {DB} from 'parabol-server/postgres/pg' -import getModelManager, {ModelManager} from './ai_models/ModelManager' -import {countWords} from './indexing/countWords' -import {createEmbeddingTextFrom} from './indexing/createEmbeddingTextFrom' -import { - completeJobTxn, - insertNewJobs, - selectJobQueueItemById, - selectMetaToQueue, - selectMetadataByJobQueueId, - updateJobState -} from './indexing/embeddingsTablesOps' -import {getRedisClient} from './indexing/getRedisClient' -import {getRootDataLoader} from './indexing/getRootDataLoader' -import {orgIdsWithFeatureFlag} from './indexing/orgIdsWithFeatureFlag' -import {refreshRetroDiscussionTopicsMeta} from './indexing/retrospectiveDiscussionTopic' - -/* - * TODO List - * - [ ] implement a clean-up function that re-queues items that haven't transitioned - * to a completed state, or that failed - */ - -export type DBInsert = { - [K in keyof DB]: Insertable -} - -const POLLING_PERIOD_SEC = 60 // How often do we try to grab the lock and re-index? -const Q_MAX_LENGTH = 100 // How many EmbeddingIndex items do we batch in redis? -const WORD_COUNT_TO_TOKEN_RATIO = 3.0 / 2 // We multiple the word count by this to estimate token count - -const {AI_EMBEDDER_ENABLED} = process.env -const {SERVER_ID} = process.env +import {Logger} from 'parabol-server/utils/Logger' +import RedisInstance from 'parabol-server/utils/RedisInstance' +import {Tuple} from '../client/types/generics' +import RedisStream from '../gql-executor/RedisStream' +import {EmbeddingsJobQueueStream} from './EmbeddingsJobQueueStream' +import {addEmbeddingsMetadata} from './addEmbeddingsMetadata' +import getModelManager from './ai_models/ModelManager' +import {MessageToEmbedder} from './custom' +import {establishPrimaryEmbedder} from './establishPrimaryEmbedder' +import {importHistoricalMetadata} from './importHistoricalMetadata' +import {mergeAsyncIterators} from './mergeAsyncIterators' +import {resetStalledJobs} from './resetStalledJobs' tracer.init({ service: `embedder`, @@ -46,207 +22,89 @@ tracer.init({ }) tracer.use('pg') -const refreshMetadata = async () => { - const dataLoader = getRootDataLoader() - await refreshRetroDiscussionTopicsMeta(dataLoader) - // In the future, other sorts of objects to index could be added here... +const parseEmbedderMessage = (message: string): MessageToEmbedder => { + const {startAt, endAt, ...input} = JSON.parse(message) + return { + ...input, + startAt: startAt ? new Date(startAt) : undefined, + endAt: endAt ? new Date(endAt) : undefined + } } -const maybeQueueMetadataItems = async (modelManager: ModelManager) => { - const redisClient = getRedisClient() - const queueLength = await redisClient.zcard('embedder:queue') - if (queueLength >= Q_MAX_LENGTH) return - const itemCountToQueue = Q_MAX_LENGTH - queueLength - const modelTables = modelManager.embeddingModels.map((m) => m.tableName) - const orgIds = await orgIdsWithFeatureFlag() - - // For each configured embedding model, select rows from EmbeddingsMetadata - // that haven't been calculated nor exist in the EmbeddingsJobQueue yet - // - // Notes: - // * `em.models @> ARRAY[v.model]` is an indexed query - // * I don't love all overrides, I wish there was a better way - // see: https://github.com/kysely-org/kysely/issues/872 - - const batchToQueue = await selectMetaToQueue(modelTables, orgIds, itemCountToQueue) - if (!batchToQueue.length) { - console.log(`embedder: no new items to queue`) +const run = async () => { + const SERVER_ID = process.env.SERVER_ID + if (!SERVER_ID) throw new Error('env.SERVER_ID is required') + const embedderChannel = EmbedderChannelId.join(SERVER_ID) + const NUM_WORKERS = parseInt(process.env.AI_EMBEDDER_WORKERS!) + if (!(NUM_WORKERS > 0)) { + Logger.log('env.AI_EMBEDDER_WORKERS is < 0. Embedder will not run.') return } - const ejqHash: { - [key: string]: { - refUpdatedAt: Date - } - } = {} - const makeKey = (item: {objectType: string; refId: string}) => `${item.objectType}:${item.refId}` - - const ejqValues = batchToQueue.map((item) => { - ejqHash[makeKey(item)] = { - refUpdatedAt: item.refUpdatedAt - } - return { - objectType: item.objectType, - refId: item.refId as string, - model: item.model, - state: 'queued' as const - } - }) as any[] - - const ejqRows = await insertNewJobs(ejqValues) - - ejqRows.forEach((item: any) => { - const {refUpdatedAt} = ejqHash[makeKey(item)]! - const score = new Date(refUpdatedAt).getTime() - redisClient.zadd('embedder:queue', score, item.id) - }) - - console.log(`embedder: queued ${batchToQueue.length} items`) -} - -const dequeueAndEmbedUntilEmpty = async (modelManager: ModelManager) => { - const dataLoader = getRootDataLoader() - const redisClient = getRedisClient() - while (true) { - const maybeRedisQItem = await redisClient.zpopmax('embedder:queue', 1) - if (maybeRedisQItem.length < 2) return // Q is empty, all done! - - const [id, _] = maybeRedisQItem - if (!id) { - console.log(`embedder: de-queued undefined item from embedder:queue`) - continue - } - const jobQueueId = parseInt(id, 10) - const jobQueueItem = (await selectJobQueueItemById(jobQueueId)) as any - if (!jobQueueItem) { - console.log(`embedder: unable to fetch EmbeddingsJobQueue.id = ${id}`) - continue - } - - const metadata = await selectMetadataByJobQueueId(jobQueueId) - if (!metadata) { - await updateJobState(jobQueueId, 'failed', { - stateMessage: `unable to fetch metadata by EmbeddingsJobQueue.id = ${id}` - }) - continue - } - - let fullText = metadata?.fullText as string - try { - if (!fullText) { - fullText = await createEmbeddingTextFrom(jobQueueItem, dataLoader) - } - } catch (e) { - await updateJobState(jobQueueId, 'failed', { - stateMessage: `unable to create embedding text: ${e}` - }) - continue - } + const redis = new RedisInstance(`embedder_${SERVER_ID}`) + const primaryLock = await establishPrimaryEmbedder(redis) + const modelManager = getModelManager() + let streams: AsyncIterableIterator | undefined + const kill = () => { + primaryLock?.release() + streams?.return?.() + process.exit() + } + process.on('SIGTERM', kill) + process.on('SIGINT', kill) + if (primaryLock) { + // only 1 worker needs to perform these on startup + await modelManager.maybeCreateTables() + await importHistoricalMetadata() + resetStalledJobs() + } - const wordCount = countWords(fullText) + const onMessage = async (_channel: string, message: string) => { + const parsedMessage = parseEmbedderMessage(message) + await addEmbeddingsMetadata(parsedMessage) + } - const embeddingModel = modelManager.embeddingModelsMapByTable[jobQueueItem.model] - if (!embeddingModel) { - await updateJobState(jobQueueId, 'failed', { - stateMessage: `embedding model ${jobQueueItem.model} not available` - }) - continue - } - const itemKey = `${jobQueueItem.objectType}:${jobQueueItem.refId}` - const modelTable = embeddingModel.tableName + // subscribe to consumer group + try { + await redis.xgroup( + 'CREATE', + 'embedMetadataStream', + 'embedMetadataConsumerGroup', + '$', + 'MKSTREAM' + ) + } catch (e) { + // stream already exists + } - let embedText = fullText - const maxInputTokens = embeddingModel.maxInputTokens - // we're using word count as an appoximation of tokens - if (wordCount * WORD_COUNT_TO_TOKEN_RATIO > maxInputTokens) { - try { - const generator = modelManager.generationModels[0] // use 1st generator - if (!generator) throw new Error(`Generator unavailable`) - console.log(`embedder: ...summarizing ${itemKey} for ${modelTable}`) - embedText = await generator.summarize(fullText, {maxNewTokens: maxInputTokens}) - } catch (e) { - await updateJobState(jobQueueId, 'failed', { - stateMessage: `unable to summarize long embed text: ${e}` - }) + const messageStream = new RedisStream( + 'embedMetadataStream', + 'embedMetadataConsumerGroup', + embedderChannel + ) + + // Assume 3 workers for type safety, but it doesn't really matter at runtime + const jobQueueStreams = Array.from( + {length: NUM_WORKERS}, + () => new EmbeddingsJobQueueStream() + ) as Tuple + + Logger.log(`\n⚡⚡⚡️️ Server ID: ${SERVER_ID}. Embedder is ready ⚡⚡⚡️️️`) + + streams = mergeAsyncIterators([messageStream, ...jobQueueStreams]) + for await (const [idx, message] of streams) { + switch (idx) { + case 0: + onMessage('', message) + continue + default: + Logger.log(`Worker ${idx} finished job ${message.id}`) continue - } - } - // console.log(`embedText: ${embedText}`) - - let embeddingVector: number[] - try { - embeddingVector = await embeddingModel.getEmbedding(embedText) - } catch (e) { - await updateJobState(jobQueueId, 'failed', { - stateMessage: `unable to get embeddings: ${e}` - }) - continue } - - // complete job, do the following atomically - // (1) update EmbeddingsMetadata to reflect model completion - // (2) upsert model table row with embedding - // (3) delete EmbeddingsJobQueue row - await completeJobTxn(modelTable, jobQueueId, metadata, fullText, embedText, embeddingVector) - console.log(`embedder: completed ${itemKey} -> ${modelTable}`) } -} - -const tick = async (modelManager: ModelManager) => { - console.log(`embedder: tick`) - const redisClient = getRedisClient() - const redlock = new Redlock([redisClient], { - driftFactor: 0.01, - retryCount: 10, - retryDelay: 250, - retryJitter: 50, - automaticExtensionThreshold: 500 - }) - - await redlock - .using(['embedder:lock'], 10000, async (signal: RedlockAbortSignal) => { - console.log(`embedder: acquired index queue lock`) - // N.B. one of the many benefits of using redlock is the using() interface - // will automatically extend the lock if these operations exceed the - // original redis timeout time - await refreshMetadata() - await maybeQueueMetadataItems(modelManager) - - if (signal.aborted) { - // Not certain which conditions this would happen, it would - // happen after operations took place, so nothing much to do here. - console.log('embedder: lock was lost!') - } - }) - .catch((err: string) => { - // Handle errors (including lock acquisition errors) - console.error('embedder: an error occurred ', err) - }) - console.log('embedder: index queue lock released') - - // get the highest priority item and embed it - await dequeueAndEmbedUntilEmpty(modelManager) - - setTimeout(() => tick(modelManager), POLLING_PERIOD_SEC * 1000) -} - -function parseEnvBoolean(envVarValue: string | undefined): boolean { - return envVarValue === 'true' -} -const run = async () => { - console.log(`embedder: run()`) - const embedderEnabled = parseEnvBoolean(AI_EMBEDDER_ENABLED) - const modelManager = getModelManager() - if (embedderEnabled && modelManager) { - const pg = getKysely() - await modelManager.maybeCreateTables(pg) - console.log(`\n⚡⚡⚡️️ Server ID: ${SERVER_ID}. Embedder is ready ⚡⚡⚡️️️`) - tick(modelManager) - } else { - console.log(`embedder: no valid configuration (check AI_EMBEDDER_ENABLED in .env)`) - // exit - } + // On graceful shutdown + Logger.log('Streaming Complete. Goodbye!') } run() diff --git a/packages/embedder/establishPrimaryEmbedder.ts b/packages/embedder/establishPrimaryEmbedder.ts new file mode 100644 index 00000000000..72f349d655f --- /dev/null +++ b/packages/embedder/establishPrimaryEmbedder.ts @@ -0,0 +1,17 @@ +import ms from 'ms' +import RedisInstance from 'parabol-server/utils/RedisInstance' +import Redlock from 'redlock' + +export const establishPrimaryEmbedder = async (redis: RedisInstance) => { + const redlock = new Redlock([redis], {retryCount: 0}) + const MAX_TIME_BETWEEN_WORKER_STARTUPS = ms('5s') + try { + const primaryWorkerLock = await redlock.acquire( + [`embedder_isPrimary_${process.env.npm_package_version}`], + MAX_TIME_BETWEEN_WORKER_STARTUPS + ) + return primaryWorkerLock + } catch { + return undefined + } +} diff --git a/packages/embedder/importHistoricalMetadata.ts b/packages/embedder/importHistoricalMetadata.ts new file mode 100644 index 00000000000..0b805f18888 --- /dev/null +++ b/packages/embedder/importHistoricalMetadata.ts @@ -0,0 +1,16 @@ +import {EmbeddingObjectType} from './custom' +import {importHistoricalRetrospectiveDiscussionTopic} from './importHistoricalRetrospectiveDiscussionTopic' + +export const importHistoricalMetadata = async () => { + const OBJECT_TYPES: EmbeddingObjectType[] = ['retrospectiveDiscussionTopic'] + return Promise.all( + OBJECT_TYPES.map(async (objectType) => { + switch (objectType) { + case 'retrospectiveDiscussionTopic': + return importHistoricalRetrospectiveDiscussionTopic() + default: + throw new Error(`Invalid object type: ${objectType}`) + } + }) + ) +} diff --git a/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts b/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts new file mode 100644 index 00000000000..2469327a18a --- /dev/null +++ b/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts @@ -0,0 +1,37 @@ +import getKysely from 'parabol-server/postgres/getKysely' +import {Logger} from 'parabol-server/utils/Logger' +import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic' + +// Check to see if the oldest discussion topic exists in the metadata table +// If not, get the date of the oldest discussion topic in the metadata table and import all items before that date +export const importHistoricalRetrospectiveDiscussionTopic = async () => { + const pg = getKysely() + const isEarliestMetadataImported = await pg + .selectFrom('EmbeddingsMetadata') + .select('id') + .where(({eb, selectFrom}) => + eb( + 'EmbeddingsMetadata.refId', + '=', + selectFrom('Discussion') + .select('Discussion.id') + .where('discussionTopicType', '=', 'reflectionGroup') + .orderBy(['createdAt', 'id']) + .limit(1) + ) + ) + .limit(1) + .executeTakeFirst() + + if (isEarliestMetadataImported) return + const earliestImportedDiscussion = await pg + .selectFrom('EmbeddingsMetadata') + .select(['id', 'refUpdatedAt', 'refId']) + .where('objectType', '=', 'retrospectiveDiscussionTopic') + .orderBy('refUpdatedAt') + .limit(1) + .executeTakeFirst() + const endAt = earliestImportedDiscussion?.refUpdatedAt ?? undefined + Logger.log(`Importing discussion history up to ${endAt || 'now'}`) + return addEmbeddingsMetadataForRetrospectiveDiscussionTopic({endAt}) +} diff --git a/packages/embedder/indexing/countWords.ts b/packages/embedder/indexing/countWords.ts deleted file mode 100644 index 75dae3effa2..00000000000 --- a/packages/embedder/indexing/countWords.ts +++ /dev/null @@ -1,17 +0,0 @@ -export function countWords(text: string) { - let count = 0 - let inWord = false - - for (const char of text) { - if (/\w/.test(char)) { - if (!inWord) { - count++ - inWord = true - } - } else { - inWord = false - } - } - - return count -} diff --git a/packages/embedder/indexing/createEmbeddingTextFrom.ts b/packages/embedder/indexing/createEmbeddingTextFrom.ts index ce76c7fc380..9fb3cceda80 100644 --- a/packages/embedder/indexing/createEmbeddingTextFrom.ts +++ b/packages/embedder/indexing/createEmbeddingTextFrom.ts @@ -1,15 +1,17 @@ import {Selectable} from 'kysely' -import {DataLoaderWorker} from 'parabol-server/graphql/graphql' import {DB} from 'parabol-server/postgres/pg' -import {createText as createTextFromRetrospectiveDiscussionTopic} from './retrospectiveDiscussionTopic' +import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import {createTextFromRetrospectiveDiscussionTopic} from './retrospectiveDiscussionTopic' export const createEmbeddingTextFrom = async ( - item: Selectable, - dataLoader: DataLoaderWorker -): Promise => { - switch ((item as any).objectType) { + embeddingsMetadata: Selectable, + dataLoader: RootDataLoader +) => { + switch (embeddingsMetadata.objectType) { case 'retrospectiveDiscussionTopic': - return createTextFromRetrospectiveDiscussionTopic(item, dataLoader) + return createTextFromRetrospectiveDiscussionTopic(embeddingsMetadata.refId, dataLoader) + default: + throw new Error(`Unexcepted objectType: ${embeddingsMetadata.objectType}`) } } diff --git a/packages/embedder/indexing/embeddingsTablesOps.ts b/packages/embedder/indexing/embeddingsTablesOps.ts deleted file mode 100644 index 4ce843248a9..00000000000 --- a/packages/embedder/indexing/embeddingsTablesOps.ts +++ /dev/null @@ -1,201 +0,0 @@ -import {Insertable, RawBuilder, Selectable, Updateable, sql} from 'kysely' -import getKysely from 'parabol-server/postgres/getKysely' -import {DB} from 'parabol-server/postgres/pg' -import {DBInsert} from '../embedder' -import numberVectorToString from './numberVectorToString' - -function unnestedArray(maybeArray: T[] | T): RawBuilder { - let a: T[] = Array.isArray(maybeArray) ? maybeArray : [maybeArray] - return sql`unnest(ARRAY[${sql.join(a)}]::varchar[])` -} - -export const selectJobQueueItemById = async ( - id: number -): Promise | undefined> => { - const pg = getKysely() - return pg.selectFrom('EmbeddingsJobQueue').selectAll().where('id', '=', id).executeTakeFirst() -} -export const selectMetadataByJobQueueId = async ( - id: number -): Promise | undefined> => { - const pg = getKysely() - return pg - .selectFrom('EmbeddingsMetadata as em') - .selectAll() - .leftJoin('EmbeddingsJobQueue as ejq', (join) => - join - .onRef('em.objectType', '=', 'ejq.objectType' as any) - .onRef('em.refId', '=', 'ejq.refId' as any) - ) - .where('ejq.id', '=', id) - .executeTakeFirstOrThrow() -} - -// For each configured embedding model, select rows from EmbeddingsMetadata -// that haven't been calculated nor exist in the EmbeddingsJobQueue yet -// -// Notes: -// * `em.models @> ARRAY[v.model]` is an indexed query -// * I don't love all overrides, I wish there was a better way -// see: https://github.com/kysely-org/kysely/issues/872 -export async function selectMetaToQueue( - configuredModels: string[], - orgIds: any[], - itemCountToQueue: number -) { - const pg = getKysely() - const maybeMetaToQueue = (await pg - .selectFrom('EmbeddingsMetadata as em') - .selectAll('em') - .leftJoinLateral(unnestedArray(configuredModels).as('model'), (join) => join.onTrue()) - .leftJoin('Team as t', 'em.teamId', 't.id') - .select('model' as any) - .where(({eb, not, or, and, exists, selectFrom}) => - and([ - or([ - not( - eb('em.models' as any, '@>', sql`ARRAY[${sql.ref('model')}]::varchar[]` as any) as any - ), - eb('em.models' as any, 'is', null) - ]), - not( - exists( - selectFrom('EmbeddingsJobQueue as ejq') - .select('ejq.id') - .whereRef('em.objectType', '=', 'ejq.objectType' as any) - .whereRef('em.refId', '=', 'ejq.refId' as any) - .whereRef('ejq.model' as any, '=', 'model' as any) - ) - ), - eb('t.orgId', 'in', orgIds) - ]) - ) - .limit(itemCountToQueue) - .execute()) as unknown as Selectable[] - - type MetadataToQueue = Selectable< - Omit & { - refId: NonNullable - } & {model: string} - > - - return maybeMetaToQueue.filter( - (item) => item.refId !== null && item.refId !== undefined - ) as MetadataToQueue[] -} - -export const updateJobState = async ( - id: number, - state: Updateable['state'], - jobQueueFields: Updateable = {} -) => { - const pg = getKysely() - const jobQueueColumns: Updateable = { - ...jobQueueFields, - state - } - if (state === 'failed') console.log(`embedder: failed job ${id}, ${jobQueueFields.stateMessage}`) - return pg - .updateTable('EmbeddingsJobQueue') - .set(jobQueueColumns) - .where('id', '=', id) - .executeTakeFirstOrThrow() -} - -export function insertNewJobs(ejqValues: Insertable[]) { - const pg = getKysely() - return pg - .insertInto('EmbeddingsJobQueue') - .values(ejqValues) - .returning(['id', 'objectType', 'refId'] as any) - .execute() -} - -// complete job, do the following atomically -// (1) update EmbeddingsMetadata to reflect model completion -// (2) upsert model table row with embedding -// (3) delete EmbeddingsJobQueue row -export function completeJobTxn( - modelTable: string, - jobQueueId: number, - metadata: Updateable, - fullText: string, - embedText: string, - embeddingVector: number[] -) { - const pg = getKysely() - return pg.transaction().execute(async (trx) => { - // get fields to update correct metadata row - const jobQueueItem = (await trx - .selectFrom('EmbeddingsJobQueue') - .select(['objectType', 'refId', 'model'] as any) - .where('id', '=', jobQueueId) - .executeTakeFirstOrThrow()) as any - - // (1) update metadata row - const metadataColumnsToUpdate: { - models: RawBuilder - fullText?: string | null | undefined - } = { - // update models as a set - models: sql`( -SELECT array_agg(DISTINCT value) -FROM ( - SELECT unnest(COALESCE("models", '{}')) AS value - UNION - SELECT unnest(ARRAY[${modelTable}]::VARCHAR[]) AS value -) AS combined_values -)` - } - - if (metadata?.fullText !== fullText) { - metadataColumnsToUpdate.fullText = fullText - } - - const updatedMetadata = await trx - .updateTable('EmbeddingsMetadata') - .set(metadataColumnsToUpdate) - .where('objectType', '=', jobQueueItem.objectType) - .where('refId', '=', jobQueueItem.refId) - .returning(['id']) - .executeTakeFirstOrThrow() - - // (2) upsert into model table - await trx - .insertInto(modelTable as any) - .values({ - embedText: fullText !== embedText ? embedText : null, - embedding: numberVectorToString(embeddingVector), - embeddingsMetadataId: updatedMetadata.id - }) - .onConflict((oc) => - oc.column('id').doUpdateSet((eb) => ({ - embedText: eb.ref('excluded.embedText'), - embeddingsMetadataId: eb.ref('excluded.embeddingsMetadataId') - })) - ) - .executeTakeFirstOrThrow() - - // (3) delete completed job queue item - return await trx - .deleteFrom('EmbeddingsJobQueue') - .where('id', '=', jobQueueId) - .executeTakeFirstOrThrow() - }) -} -export async function upsertEmbeddingsMetaRows( - embeddingsMetaRows: DBInsert['EmbeddingsMetadata'][] -) { - const pg = getKysely() - return pg - .insertInto('EmbeddingsMetadata') - .values(embeddingsMetaRows) - .onConflict((oc) => - oc.columns(['objectType', 'refId']).doUpdateSet((eb) => ({ - objectType: eb.ref('excluded.objectType'), - refId: eb.ref('excluded.refId'), - refUpdatedAt: eb.ref('excluded.refUpdatedAt') - })) - ) - .execute() -} diff --git a/packages/embedder/indexing/failJob.ts b/packages/embedder/indexing/failJob.ts new file mode 100644 index 00000000000..17d293b49a9 --- /dev/null +++ b/packages/embedder/indexing/failJob.ts @@ -0,0 +1,17 @@ +import getKysely from 'parabol-server/postgres/getKysely' +import {Logger} from 'parabol-server/utils/Logger' + +export const failJob = async (jobId: number, stateMessage: string, retryAfter?: Date | null) => { + const pg = getKysely() + Logger.log(`embedder: failed job ${jobId}, ${stateMessage}`) + await pg + .updateTable('EmbeddingsJobQueue') + .set((eb) => ({ + state: 'failed', + stateMessage, + retryCount: eb('retryCount', '+', 1), + retryAfter: retryAfter || null + })) + .where('id', '=', jobId) + .executeTakeFirstOrThrow() +} diff --git a/packages/embedder/indexing/getRedisClient.ts b/packages/embedder/indexing/getRedisClient.ts deleted file mode 100644 index 7aaf65be33c..00000000000 --- a/packages/embedder/indexing/getRedisClient.ts +++ /dev/null @@ -1,11 +0,0 @@ -import RedisInstance from 'parabol-server/utils/RedisInstance' - -const {SERVER_ID} = process.env - -let redisClient: RedisInstance -export const getRedisClient = () => { - if (!redisClient) { - redisClient = new RedisInstance(`embedder-${SERVER_ID}`) - } - return redisClient -} diff --git a/packages/embedder/indexing/getRootDataLoader.ts b/packages/embedder/indexing/getRootDataLoader.ts deleted file mode 100644 index 304c0c01058..00000000000 --- a/packages/embedder/indexing/getRootDataLoader.ts +++ /dev/null @@ -1,10 +0,0 @@ -import getDataLoader from 'parabol-server/graphql/getDataLoader' -import {DataLoaderWorker} from 'parabol-server/graphql/graphql' - -let rootDataLoader: DataLoaderWorker -export const getRootDataLoader = () => { - if (!rootDataLoader) { - rootDataLoader = getDataLoader() as DataLoaderWorker - } - return rootDataLoader -} diff --git a/packages/embedder/indexing/retrospectiveDiscussionTopic.ts b/packages/embedder/indexing/retrospectiveDiscussionTopic.ts index eb28b1bc3b0..4dc0bfaddd5 100644 --- a/packages/embedder/indexing/retrospectiveDiscussionTopic.ts +++ b/packages/embedder/indexing/retrospectiveDiscussionTopic.ts @@ -1,25 +1,8 @@ -import prettier from 'prettier' - -import getRethink, {RethinkSchema} from 'parabol-server/database/rethinkDriver' -import {DataLoaderWorker} from 'parabol-server/graphql/graphql' -import getKysely from 'parabol-server/postgres/getKysely' -import {DB} from 'parabol-server/postgres/pg' - +import {RethinkSchema} from 'parabol-server/database/rethinkDriver' import Comment from 'parabol-server/database/types/Comment' -import DiscussStage from 'parabol-server/database/types/DiscussStage' -import MeetingRetrospective, { - isMeetingRetrospective -} from 'parabol-server/database/types/MeetingRetrospective' - -import {AnyMeeting} from 'parabol-server/postgres/types/Meeting' -import {upsertEmbeddingsMetaRows} from './embeddingsTablesOps' - -const BATCH_SIZE = 1000 - -export interface EmbeddingsJobQueueRetrospectiveDiscussionTopic - extends Omit { - objectType: 'retrospectiveDiscussionTopic' -} +import {isMeetingRetrospective} from 'parabol-server/database/types/MeetingRetrospective' +import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import prettier from 'prettier' // Here's a generic reprentation of the text generated here: @@ -38,109 +21,14 @@ export interface EmbeddingsJobQueueRetrospectiveDiscussionTopic const IGNORE_COMMENT_USER_IDS = ['parabolAIUser'] -const pg = getKysely() - -export async function refreshRetroDiscussionTopicsMeta(dataLoader: DataLoaderWorker) { - const r = await getRethink() - const {createdAt: newestMeetingDate} = (await r - .table('NewMeeting') - .max({index: 'createdAt'}) - .run()) as unknown as RethinkSchema['NewMeeting']['type'] - const {createdAt: oldestMeetingDate} = (await r - .table('NewMeeting') - .min({index: 'createdAt'}) - .run()) as unknown as RethinkSchema['NewMeeting']['type'] - - const {newestMetaDate} = (await pg - .selectFrom('EmbeddingsMetadata') - .select(pg.fn.max('refUpdatedAt').as('newestMetaDate')) - .where('objectType', '=', 'retrospectiveDiscussionTopic') - .executeTakeFirst()) ?? {newestMetaDate: null} - let startDateTime = newestMetaDate || oldestMeetingDate - - if (startDateTime.getTime() === newestMeetingDate.getTime()) return - - console.log( - `refreshRetroDiscussionTopicsMeta(): ` + - `will consider adding items from ${startDateTime.toISOString()} to ` + - `${newestMeetingDate.toISOString()}` - ) - - let totalAdded = 0 - do { - // Process history in batches. - // - // N.B. We add historical meetings to the EmbeddingsMetadata table here. - // This query will intentionally miss meetings that haven't been completed - // (`summarySentAt` is null). These meetings will need to be added to the - // EmbeddingsMetadata table by a hook that runs when the meetings complete. - const {maxCreatedAt, completedNewMeetings} = await r - .table('NewMeeting') - .between(startDateTime, newestMeetingDate, {rightBound: 'closed', index: 'createdAt'}) - .orderBy({index: 'createdAt'}) - .limit(BATCH_SIZE) - .coerceTo('array') - .do((rows: any) => ({ - maxCreatedAt: r.expr(rows).max('createdAt')('createdAt'), // Then find the max createdAt value - completedNewMeetings: r.expr(rows).filter((r: any) => - r('meetingType') - .eq('retrospective') - .and( - r('endedAt').gt(0), - r - .hasFields('phases') - .and(r('phases').count().gt(0)) - .and( - r('phases') - .filter((phase: any) => phase('phaseType').eq('discuss')) - .filter((phase: any) => - phase.hasFields('stages').and(phase('stages').count().gt(0)) - ) - .count() - .gt(0) - ) - ) - ) - })) - .run() - const embeddingsMetaRows = ( - await Promise.all( - completedNewMeetings.map((m: AnyMeeting) => - newRetroDiscussionTopicsFromNewMeeting(m, dataLoader) - ) - ) - ).flat() - if (embeddingsMetaRows.length > 0) { - await upsertEmbeddingsMetaRows(embeddingsMetaRows) - totalAdded += embeddingsMetaRows.length - console.log( - `refreshRetroDiscussionTopicsMeta(): synced to ${maxCreatedAt.toISOString()}, added` + - ` ${embeddingsMetaRows.length} retrospectiveDiscussionTopics` - ) - } - - // N.B. In the unlikely event that we have >=BATCH_SIZE meetings that end at _exactly_ - // the same timetsamp, this will loop forever. - if ( - startDateTime.getTime() === newestMeetingDate.getTime() && - completedNewMeetings.length < BATCH_SIZE - ) - break - startDateTime = maxCreatedAt - } while (true) - - console.log( - `refreshRetroDiscussionTopicsMeta(): added ${totalAdded} total retrospectiveDiscussionTopics` - ) -} - -async function getPreferredNameByUserId(userId: string, dataLoader: DataLoaderWorker) { +async function getPreferredNameByUserId(userId: string, dataLoader: RootDataLoader) { + if (!userId) return 'Unknown' const user = await dataLoader.get('users').load(userId) return !user ? 'Unknown' : user.preferredName } async function formatThread( - dataLoader: DataLoaderWorker, + dataLoader: RootDataLoader, comments: Comment[], parentId: string | null = null, depth = 0 @@ -155,9 +43,7 @@ async function formatThread( const indent = ' '.repeat(depth + 1) const author = comment.isAnonymous ? 'Anonymous' - : comment.createdBy - ? await getPreferredNameByUserId(comment.createdBy, dataLoader) - : 'Unknown' + : await getPreferredNameByUserId(comment.createdBy, dataLoader) const how = depth === 0 ? 'wrote' : 'replied' const content = comment.plaintextContent const formattedPost = `${indent}- ${author} ${how}, "${content}"\n` @@ -172,60 +58,47 @@ async function formatThread( return formattedComments.join('') } -export const createTextFromNewMeetingDiscussionStage = async ( - newMeeting: MeetingRetrospective, - stageId: string, - dataLoader: DataLoaderWorker, +export const createTextFromRetrospectiveDiscussionTopic = async ( + discussionId: string, + dataLoader: RootDataLoader, textForReranking: boolean = false ) => { - if (!newMeeting) throw 'newMeeting is undefined' - if (!isMeetingRetrospective(newMeeting)) throw 'newMeeting is not retrospective' - if (!newMeeting.templateId) throw 'template is undefined' - const template = await dataLoader.get('meetingTemplates').load(newMeeting.templateId) - if (!template) throw 'template is undefined' - const discussPhase = newMeeting.phases.find((phase) => phase.phaseType === 'discuss') - if (!discussPhase) throw 'newMeeting discuss phase is undefined' - if (!discussPhase.stages) throw 'newMeeting discuss phase has no stages' - const discussStage = discussPhase.stages.find((stage) => stage.id === stageId) as DiscussStage - if (!discussStage) throw 'newMeeting discuss stage not found' - const {summary: discussionSummary} = discussStage.discussionId - ? (await dataLoader.get('discussions').load(discussStage.discussionId)) ?? {summary: null} - : {summary: null} - const r = await getRethink() - if (!discussStage.reflectionGroupId) throw 'newMeeting discuss stage has no reflectionGroupId' - const reflectionGroup = await r - .table('RetroReflectionGroup') - .get(discussStage.reflectionGroupId) - .run() - if (!reflectionGroup.id) throw 'newMeeting reflectionGroup has no id' - const reflections = await r - .table('RetroReflection') - .getAll(reflectionGroup.id, {index: 'reflectionGroupId'}) - .run() + const discussion = await dataLoader.get('discussions').load(discussionId) + if (!discussion) throw new Error(`Discussion not found: ${discussionId}`) + const {discussionTopicId: reflectionGroupId, meetingId, summary: discussionSummary} = discussion + const [newMeeting, reflectionGroup, reflections] = await Promise.all([ + dataLoader.get('newMeetings').load(meetingId), + dataLoader.get('retroReflectionGroups').load(reflectionGroupId), + dataLoader.get('retroReflectionsByGroupId').load(reflectionGroupId) + ]) + if (!isMeetingRetrospective(newMeeting)) throw new Error('Meeting is not a retro') + const {templateId} = newMeeting + const promptIds = [...new Set(reflections.map((r) => r.promptId))] + const [template, ...prompts] = await Promise.all([ + dataLoader.get('meetingTemplates').loadNonNull(templateId), + ...promptIds.map((promptId) => dataLoader.get('reflectPrompts').load(promptId)) + ]) + let markdown = '' - if (!textForReranking) + if (!textForReranking) { markdown = - `A topic "${reflectionGroup.title}" was discussed during ` + + `A topic "${reflectionGroup?.title ?? ''}" was discussed during ` + `the meeting "${newMeeting.name}" that followed the "${template.name}" template.\n` + `\n` - const prompts = await dataLoader.get('reflectPrompts').loadMany(promptIds) + } + for (const prompt of prompts) { - if (!prompt || prompt instanceof Error) continue if (!textForReranking) { markdown += `Participants were prompted with, "${prompt.question}` if (prompt.description) markdown += `: ${prompt.description}` markdown += `".\n` } - if (newMeeting.disableAnonymity) { - for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) { - const author = await getPreferredNameByUserId(reflection.creatorId, dataLoader) - markdown += ` - ${author} wrote, "${reflection.plaintextContent}"\n` - } - } else { - for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) { - markdown += ` - Anonymous wrote, "${reflection.plaintextContent}"\n` - } + for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) { + const author = newMeeting.disableAnonymity + ? await getPreferredNameByUserId(reflection.creatorId, dataLoader) + : 'Anonymous' + markdown += ` - ${author} wrote, "${reflection.plaintextContent}"\n` } markdown += `\n` } @@ -250,7 +123,7 @@ export const createTextFromNewMeetingDiscussionStage = async ( if (discussionSummary) { markdown += `Further discussion was made. ` + ` ${discussionSummary}` } else { - const comments = await dataLoader.get('commentsByDiscussionId').load(stageId) + const comments = await dataLoader.get('commentsByDiscussionId').load(discussionId) const sortedComments = comments .map((comment) => { @@ -290,22 +163,9 @@ export const createTextFromNewMeetingDiscussionStage = async ( return markdown } -export const createText = async (item: any, dataLoader: DataLoaderWorker): Promise => { - if (!item.refId) throw 'refId is undefined' - const [newMeetingId, discussionId] = item.refId.split(':') - if (!newMeetingId) throw new Error('newMeetingId cannot be undefined') - if (!discussionId) throw new Error('discussionId cannot be undefined') - const newMeeting = await dataLoader.get('newMeetings').load(newMeetingId) - return createTextFromNewMeetingDiscussionStage( - newMeeting as MeetingRetrospective, - discussionId, - dataLoader - ) -} - export const newRetroDiscussionTopicsFromNewMeeting = async ( newMeeting: RethinkSchema['NewMeeting']['type'], - dataLoader: DataLoaderWorker + dataLoader: RootDataLoader ) => { const discussPhase = newMeeting.phases.find((phase) => phase.phaseType === 'discuss') const orgId = (await dataLoader.get('teams').load(newMeeting.teamId))?.orgId diff --git a/packages/embedder/iso6393To1.ts b/packages/embedder/iso6393To1.ts new file mode 100644 index 00000000000..f1e470e1232 --- /dev/null +++ b/packages/embedder/iso6393To1.ts @@ -0,0 +1,195 @@ +import {ValueOf} from '../client/types/generics' + +/** + * Map of ISO 639-3 codes to ISO 639-1 codes. + * + * @type {Record} + */ +export const iso6393To1 = { + aar: 'aa', + abk: 'ab', + afr: 'af', + aka: 'ak', + amh: 'am', + ara: 'ar', + arg: 'an', + asm: 'as', + ava: 'av', + ave: 'ae', + aym: 'ay', + aze: 'az', + bak: 'ba', + bam: 'bm', + bel: 'be', + ben: 'bn', + bis: 'bi', + bod: 'bo', + bos: 'bs', + bre: 'br', + bul: 'bg', + cat: 'ca', + ces: 'cs', + cha: 'ch', + che: 'ce', + chu: 'cu', + chv: 'cv', + cor: 'kw', + cos: 'co', + cre: 'cr', + cym: 'cy', + dan: 'da', + deu: 'de', + div: 'dv', + dzo: 'dz', + ell: 'el', + eng: 'en', + epo: 'eo', + est: 'et', + eus: 'eu', + ewe: 'ee', + fao: 'fo', + fas: 'fa', + fij: 'fj', + fin: 'fi', + fra: 'fr', + fry: 'fy', + ful: 'ff', + gla: 'gd', + gle: 'ga', + glg: 'gl', + glv: 'gv', + grn: 'gn', + guj: 'gu', + hat: 'ht', + hau: 'ha', + hbs: 'sh', + heb: 'he', + her: 'hz', + hin: 'hi', + hmo: 'ho', + hrv: 'hr', + hun: 'hu', + hye: 'hy', + ibo: 'ig', + ido: 'io', + iii: 'ii', + iku: 'iu', + ile: 'ie', + ina: 'ia', + ind: 'id', + ipk: 'ik', + isl: 'is', + ita: 'it', + jav: 'jv', + jpn: 'ja', + kal: 'kl', + kan: 'kn', + kas: 'ks', + kat: 'ka', + kau: 'kr', + kaz: 'kk', + khm: 'km', + kik: 'ki', + kin: 'rw', + kir: 'ky', + kom: 'kv', + kon: 'kg', + kor: 'ko', + kua: 'kj', + kur: 'ku', + lao: 'lo', + lat: 'la', + lav: 'lv', + lim: 'li', + lin: 'ln', + lit: 'lt', + ltz: 'lb', + lub: 'lu', + lug: 'lg', + mah: 'mh', + mal: 'ml', + mar: 'mr', + mkd: 'mk', + mlg: 'mg', + mlt: 'mt', + mon: 'mn', + mri: 'mi', + msa: 'ms', + mya: 'my', + nau: 'na', + nav: 'nv', + nbl: 'nr', + nde: 'nd', + ndo: 'ng', + nep: 'ne', + nld: 'nl', + nno: 'nn', + nob: 'nb', + nor: 'no', + nya: 'ny', + oci: 'oc', + oji: 'oj', + ori: 'or', + orm: 'om', + oss: 'os', + pan: 'pa', + pli: 'pi', + pol: 'pl', + por: 'pt', + pus: 'ps', + que: 'qu', + roh: 'rm', + ron: 'ro', + run: 'rn', + rus: 'ru', + sag: 'sg', + san: 'sa', + sin: 'si', + slk: 'sk', + slv: 'sl', + sme: 'se', + smo: 'sm', + sna: 'sn', + snd: 'sd', + som: 'so', + sot: 'st', + spa: 'es', + sqi: 'sq', + srd: 'sc', + srp: 'sr', + ssw: 'ss', + sun: 'su', + swa: 'sw', + swe: 'sv', + tah: 'ty', + tam: 'ta', + tat: 'tt', + tel: 'te', + tgk: 'tg', + tgl: 'tl', + tha: 'th', + tir: 'ti', + ton: 'to', + tsn: 'tn', + tso: 'ts', + tuk: 'tk', + tur: 'tr', + twi: 'tw', + uig: 'ug', + ukr: 'uk', + urd: 'ur', + uzb: 'uz', + ven: 've', + vie: 'vi', + vol: 'vo', + wln: 'wa', + wol: 'wo', + xho: 'xh', + yid: 'yi', + yor: 'yo', + zha: 'za', + zho: 'zh', + zul: 'zu' +} as const + +export type ISO6391 = ValueOf diff --git a/packages/embedder/logMemoryUse.ts b/packages/embedder/logMemoryUse.ts new file mode 100644 index 00000000000..afe3259aee5 --- /dev/null +++ b/packages/embedder/logMemoryUse.ts @@ -0,0 +1,10 @@ +// Not for use in prod, but useful for dev +export const logMemoryUse = () => { + const MB = 2 ** 20 + setInterval(() => { + const memoryUsage = process.memoryUsage() + const {rss} = memoryUsage + const usedMB = Math.floor(rss / MB) + console.log('Memory use:', usedMB, 'MB') + }, 10000) +} diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts new file mode 100644 index 00000000000..e274e0cca6f --- /dev/null +++ b/packages/embedder/mergeAsyncIterators.ts @@ -0,0 +1,96 @@ +import {ParseInt} from '../client/types/generics' + +// can remove PromiseCapability after TS v5.4.2 +type PromiseCapability = { + resolve: (value: T) => void + reject: (reason?: any) => void + promise: Promise +} + +type UnYield = T extends IteratorYieldResult ? U : never +type Result> = UnYield>> + +// Promise.race has a memory leak +// To avoid: https://github.com/tc39/proposal-async-iterator-helpers/issues/15#issuecomment-1937011820 +export function mergeAsyncIterators[] | []>( + iterators: T +): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> { + return (async function* () { + type ResultThunk = () => [number, Result] + let count = iterators.length as number + let capability: PromiseCapability | undefined + const queuedResults: ResultThunk[] = [] + const getNext = async (idx: number, iterator: T[number]) => { + try { + const next = await iterator.next() + if (next.done) { + if (--count === 0 && capability !== undefined) { + capability.resolve(null) + } + } else { + resolveResult(() => { + void getNext(idx, iterator) + return [idx, next.value] + }) + } + } catch (error) { + resolveResult(() => { + throw error + }) + } + } + const resolveResult = (resultThunk: ResultThunk) => { + if (capability === undefined) { + queuedResults.push(resultThunk) + } else { + capability.resolve(resultThunk) + } + } + + try { + // Begin all iterators + for (const [idx, iterable] of iterators.entries()) { + void getNext(idx, iterable) + } + + // Delegate to iterables as results complete + while (true) { + while (true) { + const nextQueuedResult = queuedResults.shift() + if (nextQueuedResult === undefined) { + break + } else { + yield nextQueuedResult() + } + } + if (count === 0) { + break + } else { + // Promise.withResolvers() is not yet implemented in node + capability = { + resolve: undefined as any, + reject: undefined as any, + promise: undefined as any + } + capability.promise = new Promise((res, rej) => { + capability!.resolve = res + capability!.reject = rej + }) + const nextResult = await capability.promise + if (nextResult === null) { + break + } else { + capability = undefined + yield nextResult() + } + } + } + } catch (err) { + // Unwind remaining iterators on failure + try { + await Promise.all(iterators.map((iterator) => iterator.return?.())) + } catch {} + throw err + } + })() +} diff --git a/packages/embedder/modules.d.ts b/packages/embedder/modules.d.ts deleted file mode 100644 index 8a2be20ba0c..00000000000 --- a/packages/embedder/modules.d.ts +++ /dev/null @@ -1,2 +0,0 @@ -import '../server/types/modules' -import '../server/types/webpackEnv' diff --git a/packages/embedder/package.json b/packages/embedder/package.json index 47af8737dac..fdf98991ced 100644 --- a/packages/embedder/package.json +++ b/packages/embedder/package.json @@ -18,14 +18,17 @@ "devDependencies": { "@babel/cli": "7.18.6", "@babel/core": "7.18.6", + "@types/franc": "^5.0.3", "@types/node": "^16.11.62", "babel-plugin-inline-import": "^3.0.0", + "openapi-fetch": "^0.9.3", "sucrase": "^3.32.0", "ts-node-dev": "^1.0.0-pre.44", - "typescript": "4.9.5" + "typescript": "^5.3.3" }, "dependencies": { "dd-trace": "^4.2.0", + "franc-min": "^5.0.0", "redlock": "^5.0.0-beta.2" } } diff --git a/packages/embedder/processJob.ts b/packages/embedder/processJob.ts new file mode 100644 index 00000000000..8723b75de6a --- /dev/null +++ b/packages/embedder/processJob.ts @@ -0,0 +1,13 @@ +import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import {Job} from './EmbeddingsJobQueueStream' +import {processJobEmbed} from './processJobEmbed' + +export const processJob = async (job: Job, dataLoader: RootDataLoader) => { + const {jobType} = job + switch (jobType) { + case 'embed': + return processJobEmbed(job, dataLoader) + default: + throw new Error(`Invalid job type: ${jobType}`) + } +} diff --git a/packages/embedder/processJobEmbed.ts b/packages/embedder/processJobEmbed.ts new file mode 100644 index 00000000000..d1b895cc640 --- /dev/null +++ b/packages/embedder/processJobEmbed.ts @@ -0,0 +1,102 @@ +import franc from 'franc-min' +import ms from 'ms' +import RootDataLoader from 'parabol-server/dataloader/RootDataLoader' +import getKysely from 'parabol-server/postgres/getKysely' +import {EmbedJob} from './EmbeddingsJobQueueStream' +import {EmbeddingsTable} from './ai_models/AbstractEmbeddingsModel' +import getModelManager from './ai_models/ModelManager' +import {createEmbeddingTextFrom} from './indexing/createEmbeddingTextFrom' +import {failJob} from './indexing/failJob' +import numberVectorToString from './indexing/numberVectorToString' +import {iso6393To1} from './iso6393To1' + +export const processJobEmbed = async (job: EmbedJob, dataLoader: RootDataLoader) => { + const pg = getKysely() + const {id: jobId, retryCount, jobData} = job + const {embeddingsMetadataId, model} = jobData + const modelManager = getModelManager() + + const metadata = await pg + .selectFrom('EmbeddingsMetadata') + .selectAll() + .where('id', '=', embeddingsMetadataId) + .executeTakeFirst() + + if (!metadata) { + await failJob(jobId, `unable to fetch metadata by EmbeddingsJobQueue.id = ${jobId}`) + return + } + + let {fullText, language} = metadata + try { + if (!fullText) { + fullText = await createEmbeddingTextFrom(metadata, dataLoader) + language = iso6393To1[franc(fullText) as keyof typeof iso6393To1] + await pg + .updateTable('EmbeddingsMetadata') + .set({fullText, language}) + .where('id', '=', embeddingsMetadataId) + .execute() + } + } catch (e) { + // get the trace since the error message may be unobvious + console.trace(e) + await failJob(jobId, `unable to create embedding text: ${e}`) + return + } + + const embeddingModel = modelManager.embeddingModelsMapByTable[model] + if (!embeddingModel) { + await failJob(jobId, `embedding model ${model} not available`) + return + } + + // Exit successfully, we don't want to fail the job because the language is not supported + if (!embeddingModel.languages.includes(language!)) return true + + const tokens = await embeddingModel.getTokens(fullText) + if (tokens instanceof Error) { + await failJob( + jobId, + `unable to get tokens: ${tokens.message}`, + retryCount < 10 ? new Date(Date.now() + ms('1m')) : null + ) + return + } + const isFullTextTooBig = tokens.length > embeddingModel.maxInputTokens + // Cannot use summarization strategy if generation model has same context length as embedding model + // We must split the text & not tokens because BERT tokenizer is not trained for linebreaks e.g. \n\n + const chunks = isFullTextTooBig ? embeddingModel.splitText(fullText) : [fullText] + await Promise.all( + chunks.map(async (chunk, chunkNumber) => { + const embeddingVector = await embeddingModel.getEmbedding(chunk) + if (embeddingVector instanceof Error) { + await failJob( + jobId, + `unable to get embeddings: ${embeddingVector.message}`, + retryCount < 10 ? new Date(Date.now() + ms('1m')) : null + ) + return + } + await pg + // cast to any because these types won't be available in CI + .insertInto(embeddingModel.tableName as EmbeddingsTable) + .values({ + // TODO is the extra space of a null embedText really worth it?! + embedText: isFullTextTooBig ? chunk : null, + embedding: numberVectorToString(embeddingVector), + embeddingsMetadataId, + chunkNumber: isFullTextTooBig ? chunkNumber : null + }) + .onConflict((oc) => + oc.column('embeddingsMetadataId').doUpdateSet((eb) => ({ + embedText: eb.ref('excluded.embedText'), + embedding: eb.ref('excluded.embedding') + })) + ) + .execute() + }) + ) + // Logger.log(`Embedded ${embeddingsMetadataId} -> ${model}`) + return true +} diff --git a/packages/embedder/resetStalledJobs.ts b/packages/embedder/resetStalledJobs.ts new file mode 100644 index 00000000000..592b468faee --- /dev/null +++ b/packages/embedder/resetStalledJobs.ts @@ -0,0 +1,18 @@ +import ms from 'ms' +import getKysely from 'parabol-server/postgres/getKysely' + +export const resetStalledJobs = () => { + setInterval(async () => { + const pg = getKysely() + await pg + .updateTable('EmbeddingsJobQueue') + .set((eb) => ({ + state: 'queued', + startAt: null, + retryCount: eb('retryCount', '+', 1), + stateMessage: 'stalled' + })) + .where('startAt', '<', new Date(Date.now() - ms('5m'))) + .execute() + }, ms('5m')) +} diff --git a/packages/embedder/textEmbeddingsnterface.d.ts b/packages/embedder/textEmbeddingsnterface.d.ts new file mode 100644 index 00000000000..3fdbcac0185 --- /dev/null +++ b/packages/embedder/textEmbeddingsnterface.d.ts @@ -0,0 +1,857 @@ +/** + * This file was auto-generated by openapi-typescript. + * Do not make direct changes to the file. + */ + + +/** OneOf type helpers */ +type Without = { [P in Exclude]?: never }; +type XOR = (T | U) extends object ? (Without & U) | (Without & T) : T | U; +type OneOf = T extends [infer Only] ? Only : T extends [infer A, infer B, ...infer Rest] ? OneOf<[XOR, ...Rest]> : never; + +export interface paths { + "/decode": { + /** + * Decode input ids + * @description Decode input ids + */ + post: operations["decode"]; + }; + "/embed": { + /** + * Get Embeddings. Returns a 424 status code if the model is not an embedding model. + * @description Get Embeddings. Returns a 424 status code if the model is not an embedding model. + */ + post: operations["embed"]; + }; + "/embed_all": { + /** + * Get all Embeddings without Pooling. + * @description Get all Embeddings without Pooling. + * Returns a 424 status code if the model is not an embedding model. + */ + post: operations["embed_all"]; + }; + "/embed_sparse": { + /** + * Get Sparse Embeddings. Returns a 424 status code if the model is not an embedding model with SPLADE pooling. + * @description Get Sparse Embeddings. Returns a 424 status code if the model is not an embedding model with SPLADE pooling. + */ + post: operations["embed_sparse"]; + }; + "/embeddings": { + /** + * OpenAI compatible route. Returns a 424 status code if the model is not an embedding model. + * @description OpenAI compatible route. Returns a 424 status code if the model is not an embedding model. + */ + post: operations["openai_embed"]; + }; + "/health": { + /** + * Health check method + * @description Health check method + */ + get: operations["health"]; + }; + "/info": { + /** + * Text Embeddings Inference endpoint info + * @description Text Embeddings Inference endpoint info + */ + get: operations["get_model_info"]; + }; + "/metrics": { + /** + * Prometheus metrics scrape endpoint + * @description Prometheus metrics scrape endpoint + */ + get: operations["metrics"]; + }; + "/predict": { + /** + * Get Predictions. Returns a 424 status code if the model is not a Sequence Classification model + * @description Get Predictions. Returns a 424 status code if the model is not a Sequence Classification model + */ + post: operations["predict"]; + }; + "/rerank": { + /** + * Get Ranks. Returns a 424 status code if the model is not a Sequence Classification model with + * @description Get Ranks. Returns a 424 status code if the model is not a Sequence Classification model with + * a single class. + */ + post: operations["rerank"]; + }; + "/tokenize": { + /** + * Tokenize inputs + * @description Tokenize inputs + */ + post: operations["tokenize"]; + }; + "/vertex": { + /** + * Generate embeddings from a Vertex request + * @description Generate embeddings from a Vertex request + */ + post: operations["vertex_compatibility"]; + }; +} + +export type webhooks = Record; + +export interface components { + schemas: { + ClassifierModel: { + /** + * @example { + * "0": "LABEL" + * } + */ + id2label: { + [key: string]: string; + }; + /** + * @example { + * "LABEL": 0 + * } + */ + label2id: { + [key: string]: number; + }; + }; + DecodeRequest: { + ids: components["schemas"]["InputIds"]; + /** + * @default true + * @example true + */ + skip_special_tokens?: boolean; + }; + /** + * @example [ + * "test" + * ] + */ + DecodeResponse: string[]; + EmbedAllRequest: { + inputs: components["schemas"]["Input"]; + /** + * @default false + * @example false + */ + truncate?: boolean; + }; + /** + * @example [ + * [ + * [ + * 0, + * 1, + * 2 + * ] + * ] + * ] + */ + EmbedAllResponse: number[][][]; + EmbedRequest: { + inputs: components["schemas"]["Input"]; + /** + * @default true + * @example true + */ + normalize?: boolean; + /** + * @default false + * @example false + */ + truncate?: boolean; + }; + /** + * @example [ + * [ + * 0, + * 1, + * 2 + * ] + * ] + */ + EmbedResponse: number[][]; + EmbedSparseRequest: { + inputs: components["schemas"]["Input"]; + /** + * @default false + * @example false + */ + truncate?: boolean; + }; + EmbedSparseResponse: components["schemas"]["SparseValue"][][]; + EmbeddingModel: { + /** @example cls */ + pooling: string; + }; + ErrorResponse: { + error: string; + error_type: components["schemas"]["ErrorType"]; + }; + /** @enum {string} */ + ErrorType: "Unhealthy" | "Backend" | "Overloaded" | "Validation" | "Tokenizer"; + Info: { + /** @example null */ + docker_label?: string | null; + /** + * @default null + * @example null + */ + max_batch_requests?: number | null; + /** @example 2048 */ + max_batch_tokens: number; + /** @example 32 */ + max_client_batch_size: number; + /** + * @description Router Parameters + * @example 128 + */ + max_concurrent_requests: number; + /** @example 512 */ + max_input_length: number; + /** @example float16 */ + model_dtype: string; + /** + * @description Model info + * @example thenlper/gte-base + */ + model_id: string; + /** @example fca14538aa9956a46526bd1d0d11d69e19b5a101 */ + model_sha?: string | null; + model_type: components["schemas"]["ModelType"]; + /** @example null */ + sha?: string | null; + /** @example 4 */ + tokenization_workers: number; + /** + * @description Router Info + * @example 0.5.0 + */ + version: string; + }; + Input: string | string[]; + InputIds: number[] | number[][]; + ModelType: OneOf<[{ + classifier: components["schemas"]["ClassifierModel"]; + }, { + embedding: components["schemas"]["EmbeddingModel"]; + }, { + reranker: components["schemas"]["ClassifierModel"]; + }]>; + OpenAICompatEmbedding: { + /** + * @example [ + * 0, + * 1, + * 2 + * ] + */ + embedding: number[]; + /** @example 0 */ + index: number; + /** @example embedding */ + object: string; + }; + OpenAICompatErrorResponse: { + /** Format: int32 */ + code: number; + error_type: components["schemas"]["ErrorType"]; + message: string; + }; + OpenAICompatRequest: { + input: components["schemas"]["Input"]; + /** @example null */ + model?: string | null; + /** @example null */ + user?: string | null; + }; + OpenAICompatResponse: { + data: components["schemas"]["OpenAICompatEmbedding"][]; + /** @example thenlper/gte-base */ + model: string; + /** @example list */ + object: string; + usage: components["schemas"]["OpenAICompatUsage"]; + }; + OpenAICompatUsage: { + /** @example 512 */ + prompt_tokens: number; + /** @example 512 */ + total_tokens: number; + }; + /** + * @description Model input. Can be either a single string, a pair of strings or a batch of mixed single and pairs of strings. + * @example What is Deep Learning? + */ + PredictInput: string | string[] | string[][]; + PredictRequest: { + inputs: components["schemas"]["PredictInput"]; + /** + * @default false + * @example false + */ + raw_scores?: boolean; + /** + * @default false + * @example false + */ + truncate?: boolean; + }; + PredictResponse: components["schemas"]["Prediction"][] | components["schemas"]["Prediction"][][]; + Prediction: { + /** @example admiration */ + label: string; + /** + * Format: float + * @example 0.5 + */ + score: number; + }; + Rank: { + /** @example 0 */ + index: number; + /** + * Format: float + * @example 1.0 + */ + score: number; + /** + * @default null + * @example Deep Learning is ... + */ + text?: string | null; + }; + RerankRequest: { + /** @example What is Deep Learning? */ + query: string; + /** + * @default false + * @example false + */ + raw_scores?: boolean; + /** + * @default false + * @example false + */ + return_text?: boolean; + /** + * @example [ + * "Deep Learning is ..." + * ] + */ + texts: string[]; + /** + * @default false + * @example false + */ + truncate?: boolean; + }; + RerankResponse: components["schemas"]["Rank"][]; + SimpleToken: { + /** + * Format: int32 + * @example 0 + */ + id: number; + /** @example false */ + special: boolean; + /** @example 0 */ + start?: number | null; + /** @example 2 */ + stop?: number | null; + /** @example test */ + text: string; + }; + SparseValue: { + index: number; + /** Format: float */ + value: number; + }; + TokenizeRequest: { + /** + * @default true + * @example true + */ + add_special_tokens?: boolean; + inputs: components["schemas"]["Input"]; + }; + /** + * @example [ + * [ + * { + * "id": 0, + * "special": false, + * "start": 0, + * "stop": 2, + * "text": "test" + * } + * ] + * ] + */ + TokenizeResponse: components["schemas"]["SimpleToken"][][]; + VertexInstance: (components["schemas"]["EmbedRequest"] & { + /** @enum {string} */ + type: "embed"; + }) | (components["schemas"]["EmbedAllRequest"] & { + /** @enum {string} */ + type: "embed_all"; + }) | (components["schemas"]["EmbedSparseRequest"] & { + /** @enum {string} */ + type: "embed_sparse"; + }) | (components["schemas"]["PredictRequest"] & { + /** @enum {string} */ + type: "predict"; + }) | (components["schemas"]["RerankRequest"] & { + /** @enum {string} */ + type: "rerank"; + }) | (components["schemas"]["TokenizeRequest"] & { + /** @enum {string} */ + type: "tokenize"; + }); + VertexRequest: { + instances: components["schemas"]["VertexInstance"][]; + }; + VertexResponse: components["schemas"]["VertexResponseInstance"][]; + VertexResponseInstance: { + result: components["schemas"]["EmbedResponse"]; + /** @enum {string} */ + type: "embed"; + } | { + result: components["schemas"]["EmbedAllResponse"]; + /** @enum {string} */ + type: "embed_all"; + } | { + result: components["schemas"]["EmbedSparseResponse"]; + /** @enum {string} */ + type: "embed_sparse"; + } | { + result: components["schemas"]["PredictResponse"]; + /** @enum {string} */ + type: "predict"; + } | { + result: components["schemas"]["RerankResponse"]; + /** @enum {string} */ + type: "rerank"; + } | { + result: components["schemas"]["TokenizeResponse"]; + /** @enum {string} */ + type: "tokenize"; + }; + }; + responses: never; + parameters: never; + requestBodies: never; + headers: never; + pathItems: never; +} + +export type $defs = Record; + +export type external = Record; + +export interface operations { + + /** + * Decode input ids + * @description Decode input ids + */ + decode: { + requestBody: { + content: { + "application/json": components["schemas"]["DecodeRequest"]; + }; + }; + responses: { + /** @description Decoded ids */ + 200: { + content: { + "application/json": components["schemas"]["DecodeResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Get Embeddings. Returns a 424 status code if the model is not an embedding model. + * @description Get Embeddings. Returns a 424 status code if the model is not an embedding model. + */ + embed: { + requestBody: { + content: { + "application/json": components["schemas"]["EmbedRequest"]; + }; + }; + responses: { + /** @description Embeddings */ + 200: { + content: { + "application/json": components["schemas"]["EmbedResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Embedding Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Get all Embeddings without Pooling. + * @description Get all Embeddings without Pooling. + * Returns a 424 status code if the model is not an embedding model. + */ + embed_all: { + requestBody: { + content: { + "application/json": components["schemas"]["EmbedAllRequest"]; + }; + }; + responses: { + /** @description Embeddings */ + 200: { + content: { + "application/json": components["schemas"]["EmbedAllResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Embedding Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Get Sparse Embeddings. Returns a 424 status code if the model is not an embedding model with SPLADE pooling. + * @description Get Sparse Embeddings. Returns a 424 status code if the model is not an embedding model with SPLADE pooling. + */ + embed_sparse: { + requestBody: { + content: { + "application/json": components["schemas"]["EmbedSparseRequest"]; + }; + }; + responses: { + /** @description Embeddings */ + 200: { + content: { + "application/json": components["schemas"]["EmbedSparseResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Embedding Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * OpenAI compatible route. Returns a 424 status code if the model is not an embedding model. + * @description OpenAI compatible route. Returns a 424 status code if the model is not an embedding model. + */ + openai_embed: { + requestBody: { + content: { + "application/json": components["schemas"]["OpenAICompatRequest"]; + }; + }; + responses: { + /** @description Embeddings */ + 200: { + content: { + "application/json": components["schemas"]["OpenAICompatResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["OpenAICompatErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["OpenAICompatErrorResponse"]; + }; + }; + /** @description Embedding Error */ + 424: { + content: { + "application/json": components["schemas"]["OpenAICompatErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["OpenAICompatErrorResponse"]; + }; + }; + }; + }; + /** + * Health check method + * @description Health check method + */ + health: { + responses: { + /** @description Everything is working fine */ + 200: { + content: never; + }; + /** @description Text embeddings Inference is down */ + 503: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Text Embeddings Inference endpoint info + * @description Text Embeddings Inference endpoint info + */ + get_model_info: { + responses: { + /** @description Served model info */ + 200: { + content: { + "application/json": components["schemas"]["Info"]; + }; + }; + }; + }; + /** + * Prometheus metrics scrape endpoint + * @description Prometheus metrics scrape endpoint + */ + metrics: { + responses: { + /** @description Prometheus Metrics */ + 200: { + content: { + "text/plain": string; + }; + }; + }; + }; + /** + * Get Predictions. Returns a 424 status code if the model is not a Sequence Classification model + * @description Get Predictions. Returns a 424 status code if the model is not a Sequence Classification model + */ + predict: { + requestBody: { + content: { + "application/json": components["schemas"]["PredictRequest"]; + }; + }; + responses: { + /** @description Predictions */ + 200: { + content: { + "application/json": components["schemas"]["PredictResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Prediction Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Get Ranks. Returns a 424 status code if the model is not a Sequence Classification model with + * @description Get Ranks. Returns a 424 status code if the model is not a Sequence Classification model with + * a single class. + */ + rerank: { + requestBody: { + content: { + "application/json": components["schemas"]["RerankRequest"]; + }; + }; + responses: { + /** @description Ranks */ + 200: { + content: { + "application/json": components["schemas"]["RerankResponse"]; + }; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Rerank Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Tokenize inputs + * @description Tokenize inputs + */ + tokenize: { + requestBody: { + content: { + "application/json": components["schemas"]["TokenizeRequest"]; + }; + }; + responses: { + /** @description Tokenized ids */ + 200: { + content: { + "application/json": components["schemas"]["TokenizeResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; + /** + * Generate embeddings from a Vertex request + * @description Generate embeddings from a Vertex request + */ + vertex_compatibility: { + requestBody: { + content: { + "application/json": components["schemas"]["VertexRequest"]; + }; + }; + responses: { + /** @description Results */ + 200: { + content: never; + }; + /** @description Batch size error */ + 413: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Tokenization error */ + 422: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Error */ + 424: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + /** @description Model is overloaded */ + 429: { + content: { + "application/json": components["schemas"]["ErrorResponse"]; + }; + }; + }; + }; +} diff --git a/packages/embedder/types/modules.d.ts b/packages/embedder/types/modules.d.ts new file mode 100644 index 00000000000..276eb185fa4 --- /dev/null +++ b/packages/embedder/types/modules.d.ts @@ -0,0 +1,4 @@ +declare module 'franc-min' { + import f from 'franc' + export = f +} diff --git a/packages/embedder/types/shared.d.ts b/packages/embedder/types/shared.d.ts new file mode 100644 index 00000000000..1d6610890e7 --- /dev/null +++ b/packages/embedder/types/shared.d.ts @@ -0,0 +1,2 @@ +import '../../server/types/modules' +import '../../server/types/webpackEnv' diff --git a/packages/gql-executor/RedisStream.ts b/packages/gql-executor/RedisStream.ts index 173a493321a..22798509396 100644 --- a/packages/gql-executor/RedisStream.ts +++ b/packages/gql-executor/RedisStream.ts @@ -7,13 +7,14 @@ export default class RedisStream implements AsyncIterableIterator { private stream: string private consumerGroup: string // xreadgroup blocks until a response is received, so this needs its own connection - private redis = new RedisInstance('gql_stream') + private redis: RedisInstance private consumer: string constructor(stream: string, consumerGroup: string, consumer: string) { this.stream = stream this.consumerGroup = consumerGroup this.consumer = consumer + this.redis = new RedisInstance(stream) } [Symbol.asyncIterator]() { diff --git a/packages/server/dataloader/rethinkForeignKeyLoaderMakers.ts b/packages/server/dataloader/rethinkForeignKeyLoaderMakers.ts index 7c19ba2d254..70a038b712c 100644 --- a/packages/server/dataloader/rethinkForeignKeyLoaderMakers.ts +++ b/packages/server/dataloader/rethinkForeignKeyLoaderMakers.ts @@ -192,6 +192,19 @@ export const retroReflectionsByMeetingId = new RethinkForeignKeyLoaderMaker( } ) +export const retroReflectionsByGroupId = new RethinkForeignKeyLoaderMaker( + 'retroReflections', + 'reflectionGroupId', + async (reflectionGroupIds) => { + const r = await getRethink() + return r + .table('RetroReflection') + .getAll(r.args(reflectionGroupIds), {index: 'reflectionGroupId'}) + .filter({isActive: true}) + .run() + } +) + export const templateDimensionsByTemplateId = new RethinkForeignKeyLoaderMaker( 'templateDimensions', 'templateId', diff --git a/packages/server/graphql/mutations/helpers/publishToEmbedder.ts b/packages/server/graphql/mutations/helpers/publishToEmbedder.ts new file mode 100644 index 00000000000..c8a735f4ac9 --- /dev/null +++ b/packages/server/graphql/mutations/helpers/publishToEmbedder.ts @@ -0,0 +1,14 @@ +import type {MessageToEmbedder} from 'embedder/custom' +import getRedis from '../../../utils/getRedis' + +export const publishToEmbedder = (message: MessageToEmbedder) => { + return getRedis().xadd( + 'embedMetadataStream', + 'MAXLEN', + '~', + 1000, + '*', + 'msg', + JSON.stringify(message) + ) +} diff --git a/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts b/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts index cd116c65ea8..f5a69be4f22 100644 --- a/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts +++ b/packages/server/graphql/mutations/helpers/safeEndRetrospective.ts @@ -1,35 +1,36 @@ +import {RawDraftContentState} from 'draft-js' import {SubscriptionChannel} from 'parabol-client/types/constEnums' import {DISCUSS, PARABOL_AI_USER_ID} from 'parabol-client/utils/constants' import getMeetingPhase from 'parabol-client/utils/getMeetingPhase' import findStageById from 'parabol-client/utils/meetings/findStageById' -import {RawDraftContentState} from 'draft-js' import {checkTeamsLimit} from '../../../billing/helpers/teamLimitsCheck' import getRethink from '../../../database/rethinkDriver' import {RDatum} from '../../../database/stricterR' import MeetingRetrospective from '../../../database/types/MeetingRetrospective' +import NotificationMentioned from '../../../database/types/NotificationMentioned' import TimelineEventRetroComplete from '../../../database/types/TimelineEventRetroComplete' import getKysely from '../../../postgres/getKysely' import removeSuggestedAction from '../../../safeMutations/removeSuggestedAction' +import {Logger} from '../../../utils/Logger' +import RecallAIServerManager from '../../../utils/RecallAIServerManager' import {analytics} from '../../../utils/analytics/analytics' import {getUserId} from '../../../utils/authorization' import getPhase from '../../../utils/getPhase' import publish from '../../../utils/publish' -import publishNotification from '../../public/mutations/helpers/publishNotification' -import RecallAIServerManager from '../../../utils/RecallAIServerManager' import sendToSentry from '../../../utils/sendToSentry' import standardError from '../../../utils/standardError' import {InternalContext} from '../../graphql' -import updateTeamInsights from './updateTeamInsights' +import publishNotification from '../../public/mutations/helpers/publishNotification' import sendNewMeetingSummary from './endMeeting/sendNewMeetingSummary' +import gatherInsights from './gatherInsights' import generateWholeMeetingSentimentScore from './generateWholeMeetingSentimentScore' import generateWholeMeetingSummary from './generateWholeMeetingSummary' import handleCompletedStage from './handleCompletedStage' import {IntegrationNotifier} from './notifications/IntegrationNotifier' +import {publishToEmbedder} from './publishToEmbedder' import removeEmptyTasks from './removeEmptyTasks' import updateQualAIMeetingsCount from './updateQualAIMeetingsCount' -import gatherInsights from './gatherInsights' -import NotificationMentioned from '../../../database/types/NotificationMentioned' -import {Logger} from '../../../utils/Logger' +import updateTeamInsights from './updateTeamInsights' const getTranscription = async (recallBotId?: string | null) => { if (!recallBotId) return @@ -370,6 +371,7 @@ const safeEndRetrospective = async ({ removedTaskIds, timelineEventId } + publishToEmbedder({objectTypes: ['retrospectiveDiscussionTopic'], meetingId}) publish(SubscriptionChannel.TEAM, teamId, 'EndRetrospectiveSuccess', data, subOptions) return data diff --git a/packages/server/postgres/migrations/1703031300000_addEmbeddingTables.ts b/packages/server/postgres/migrations/1703031300000_addEmbeddingTables.ts index e5f6f813877..33718ee75a1 100644 --- a/packages/server/postgres/migrations/1703031300000_addEmbeddingTables.ts +++ b/packages/server/postgres/migrations/1703031300000_addEmbeddingTables.ts @@ -1,7 +1,7 @@ import {Client} from 'pg' -import getPgConfig from '../getPgConfig' import {r} from 'rethinkdb-ts' import connectRethinkDB from '../../database/connectRethinkDB' +import getPgConfig from '../getPgConfig' export async function up() { const client = new Client(getPgConfig()) @@ -78,8 +78,8 @@ export async function down() { EXECUTE 'DROP TABLE IF EXISTS "EmbeddingsJobQueue"'; EXECUTE 'DROP TABLE IF EXISTS "EmbeddingsMetadata"'; - EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsStateEnum"'; - EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsObjectTypeEnum"'; + EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsStateEnum" CASCADE'; + EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsObjectTypeEnum" CASCADE'; END $$; `) await client.end() diff --git a/packages/server/postgres/migrations/1709934935000_embeddingsMetadataId.ts b/packages/server/postgres/migrations/1709934935000_embeddingsMetadataId.ts new file mode 100644 index 00000000000..185b1e0881b --- /dev/null +++ b/packages/server/postgres/migrations/1709934935000_embeddingsMetadataId.ts @@ -0,0 +1,62 @@ +import {Client} from 'pg' +import getPgConfig from '../getPgConfig' + +export async function up() { + const client = new Client(getPgConfig()) + await client.connect() + // wipe data to ensure the non-null constraints succeed + await client.query(` + CREATE TYPE "ISO6391Enum" AS ENUM ('aa', 'ab', 'af', 'ak', 'am', 'ar', 'an', 'as', 'av', 'ae', 'ay', 'az', 'ba', 'bm', 'be', 'bn', 'bi', 'bo', 'bs', 'br', 'bg', 'ca', 'cs', 'ch', 'ce', 'cu', 'cv', 'kw', 'co', 'cr', 'cy', 'da', 'de', 'dv', 'dz', 'el', 'en', 'eo', 'et', 'eu', 'ee', 'fo', 'fa', 'fj', 'fi', 'fr', 'fy', 'ff', 'gd', 'ga', 'gl', 'gv', 'gn', 'gu', 'ht', 'ha', 'sh', 'he', 'hz', 'hi', 'ho', 'hr', 'hu', 'hy', 'ig', 'io', 'ii', 'iu', 'ie', 'ia', 'id', 'ik', 'is', 'it', 'jv', 'ja', 'kl', 'kn', 'ks', 'ka', 'kr', 'kk', 'km', 'ki', 'rw', 'ky', 'kv', 'kg', 'ko', 'kj', 'ku', 'lo', 'la', 'lv', 'li', 'ln', 'lt', 'lb', 'lu', 'lg', 'mh', 'ml', 'mr', 'mk', 'mg', 'mt', 'mn', 'mi', 'ms', 'my', 'na', 'nv', 'nr', 'nd', 'ng', 'ne', 'nl', 'nn', 'nb', 'no', 'ny', 'oc', 'oj', 'or', 'om', 'os', 'pa', 'pi', 'pl', 'pt', 'ps', 'qu', 'rm', 'ro', 'rn', 'ru', 'sg', 'sa', 'si', 'sk', 'sl', 'se', 'sm', 'sn', 'sd', 'so', 'st', 'es', 'sq', 'sc', 'sr', 'ss', 'su', 'sw', 'sv', 'ty', 'ta', 'tt', 'te', 'tg', 'tl', 'th', 'ti', 'to', 'tn', 'ts', 'tk', 'tr', 'tw', 'ug', 'uk', 'ur', 'uz', 've', 'vi', 'vo', 'wa', 'wo', 'xh', 'yi', 'yo', 'za', 'zh', 'zu'); + DELETE FROM "EmbeddingsMetadata"; + DELETE FROM "EmbeddingsJobQueue"; + CREATE INDEX IF NOT EXISTS "idx_Discussion_createdAt" ON "Discussion"("createdAt"); + ALTER TYPE "EmbeddingsStateEnum" RENAME VALUE 'embedding' TO 'running'; + ALTER TYPE "EmbeddingsStateEnum" RENAME TO "EmbeddingsJobStateEnum"; + ALTER TABLE "EmbeddingsMetadata" + DROP COLUMN "models", + ADD COLUMN "language" "ISO6391Enum", + ALTER COLUMN "refId" SET NOT NULL; + ALTER TABLE "EmbeddingsJobQueue" + ADD COLUMN "retryAfter" TIMESTAMP WITH TIME ZONE, + ADD COLUMN "retryCount" SMALLINT NOT NULL DEFAULT 0, + ADD COLUMN "startAt" TIMESTAMP WITH TIME ZONE, + ADD COLUMN "priority" SMALLINT NOT NULL DEFAULT 50, + ADD COLUMN "jobData" JSONB NOT NULL DEFAULT '{}', + ADD COLUMN "jobType" VARCHAR(255) NOT NULL, + DROP CONSTRAINT IF EXISTS "EmbeddingsJobQueue_objectType_refId_model_key", + DROP COLUMN "refId", + DROP COLUMN "objectType", + DROP COLUMN "model"; + CREATE INDEX IF NOT EXISTS "idx_EmbeddingsJobQueue_priority" ON "EmbeddingsJobQueue"("priority"); + `) + await client.end() +} + +export async function down() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(` + DROP TYPE IF EXISTS "ISO6391Enum" CASCADE; + DROP INDEX IF EXISTS "idx_Discussion_createdAt"; + ALTER TYPE "EmbeddingsJobStateEnum" RENAME VALUE 'running' TO 'embedding'; + ALTER TYPE "EmbeddingsJobStateEnum" RENAME TO "EmbeddingsStateEnum"; + ALTER TABLE "EmbeddingsMetadata" + ADD COLUMN "models" VARCHAR(255)[], + DROP COLUMN IF EXISTS "language", + ALTER COLUMN "refId" DROP NOT NULL; + ALTER TABLE "EmbeddingsJobQueue" + ALTER COLUMN "state" TYPE "EmbeddingsStateEnum" USING "state"::text::"EmbeddingsStateEnum", + ADD CONSTRAINT "EmbeddingsJobQueue_objectType_refId_model_key" UNIQUE("objectType", "refId", "model"), + ADD COLUMN "refId" VARCHAR(100), + ADD COLUMN "model" VARCHAR(255), + ADD COLUMN "objectType" "EmbeddingsObjectTypeEnum", + DROP COLUMN "retryAfter", + DROP COLUMN "retryCount", + DROP COLUMN "startAt", + DROP COLUMN "jobData", + DROP COLUMN "priority", + DROP COLUMN "jobType"; + DROP INDEX IF EXISTS "idx_EmbeddingsJobQueue_priority"; + `) + await client.end() +} diff --git a/release-please-config.json b/release-please-config.json index 29f302183a1..b8959848b5a 100644 --- a/release-please-config.json +++ b/release-please-config.json @@ -60,6 +60,11 @@ "path": "packages/integration-tests/package.json", "jsonpath": "$.version" }, + { + "type": "json", + "path": "packages/embedder/package.json", + "jsonpath": "$.version" + }, { "type": "json", "path": "packages/server/package.json", diff --git a/yarn.lock b/yarn.lock index 817e730db40..e6fb56f52bf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7577,6 +7577,11 @@ dependencies: "@types/jsdom" "*" +"@types/franc@^5.0.3": + version "5.0.3" + resolved "https://registry.yarnpkg.com/@types/franc/-/franc-5.0.3.tgz#7263cef3ab3512ac95a78c328fcc51c51396b49f" + integrity sha512-YX6o2vVkeiUvOF12bUmnSGf8sezOoBnCWjHHZGeh2lt3tqAutbJ9OL3cDRiZoiAYaZR638nuOc0Ji9bzdad2XA== + "@types/glob@*": version "8.1.0" resolved "https://registry.yarnpkg.com/@types/glob/-/glob-8.1.0.tgz#b63e70155391b0584dce44e7ea25190bbc38f2fc" @@ -10109,6 +10114,11 @@ codemirror@^5.65.3: resolved "https://registry.yarnpkg.com/codemirror/-/codemirror-5.65.3.tgz#2d029930d5a293bc5fb96ceea64654803c0d4ac7" integrity sha512-kCC0iwGZOVZXHEKW3NDTObvM7pTIyowjty4BUqeREROc/3I6bWbgZDA3fGDwlA+rbgRjvnRnfqs9SfXynel1AQ== +collapse-white-space@^1.0.0: + version "1.0.6" + resolved "https://registry.yarnpkg.com/collapse-white-space/-/collapse-white-space-1.0.6.tgz#e63629c0016665792060dbbeb79c42239d2c5287" + integrity sha512-jEovNnrhMuqyCcjfEJA56v0Xq8SkIoPKDyaHahwo3POf4qcSXqMYuwNcOTzp74vTsR9Tn08z4MxWqAhcekogkQ== + collect-v8-coverage@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/collect-v8-coverage/-/collect-v8-coverage-1.0.1.tgz#cc2c8e94fc18bbdffe64d6534570c8a673b27f59" @@ -12401,6 +12411,13 @@ framesync@6.0.1: dependencies: tslib "^2.1.0" +franc-min@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/franc-min/-/franc-min-5.0.0.tgz#5625d0570a18564dcbbfa8330254d23549294d9a" + integrity sha512-xy7Iq7uNflbvNU+bkyYWtP+BOHWZle7kT9GM84gEV14b7/7sgq7M7Flf6v1XRflHAuHoshBMveWA6Q+kEXYeHQ== + dependencies: + trigram-utils "^1.0.0" + fresh@0.5.2: version "0.5.2" resolved "https://registry.yarnpkg.com/fresh/-/fresh-0.5.2.tgz#3d8cadd90d976569fa835ab1f8e4b23a105605a7" @@ -15026,10 +15043,10 @@ kysely-codegen@^0.11.0: micromatch "^4.0.5" minimist "^1.2.8" -kysely@^0.27.2: - version "0.27.2" - resolved "https://registry.yarnpkg.com/kysely/-/kysely-0.27.2.tgz#b289ce5e561064ec613a17149b7155783d2b36de" - integrity sha512-DmRvEfiR/NLpgsTbSxma2ldekhsdcd65+MNiKXyd/qj7w7X5e3cLkXxcj+MypsRDjPhHQ/CD5u3Eq1sBYzX0bw== +kysely@^0.27.3: + version "0.27.3" + resolved "https://registry.yarnpkg.com/kysely/-/kysely-0.27.3.tgz#6cc6c757040500b43c4ac596cdbb12be400ee276" + integrity sha512-lG03Ru+XyOJFsjH3OMY6R/9U38IjDPfnOfDgO3ynhbDr+Dz8fak+X6L62vqu3iybQnj+lG84OttBuU9KY3L9kA== launch-editor@^2.6.0: version "2.6.0" @@ -16139,6 +16156,11 @@ mz@^2.7.0: object-assign "^4.0.1" thenify-all "^1.0.0" +n-gram@^1.0.0: + version "1.1.2" + resolved "https://registry.yarnpkg.com/n-gram/-/n-gram-1.1.2.tgz#69c609a5c83bb32f82774c9e297f8494c7326798" + integrity sha512-mBTpWKp0NHdujHmxrskPg2jc108mjyMmVxHN1rZGK/ogTLi9O0debDIXlQPqotNELdNmVGtL4jr7SCig+4OWvQ== + nan@^2.17.0: version "2.18.0" resolved "https://registry.yarnpkg.com/nan/-/nan-2.18.0.tgz#26a6faae7ffbeb293a39660e88a76b82e30b7554" @@ -16876,11 +16898,23 @@ openai@^4.24.1: node-fetch "^2.6.7" web-streams-polyfill "^3.2.1" +openapi-fetch@^0.9.3: + version "0.9.3" + resolved "https://registry.yarnpkg.com/openapi-fetch/-/openapi-fetch-0.9.3.tgz#37c1dbde7faec885eaa40f351cab1c231b794761" + integrity sha512-tC1NDn71vJHeCzu+lYdrnIpgRt4GxR0B4eSwXNb15ypWpZcpaEOwHFkoz8FcfG5Fvqkz2P0Fl9zQF1JJwBjuvA== + dependencies: + openapi-typescript-helpers "^0.0.7" + openapi-types@^12.1.0: version "12.1.1" resolved "https://registry.yarnpkg.com/openapi-types/-/openapi-types-12.1.1.tgz#0aface4e05ba60efbf51153ed6af23988796617d" integrity sha512-m/DJaEqOUDSU8KoI74E6A3TokccuDOJ81ewZ6kLFwUT1KEIE0GDWvErtnJJDU4sySx8JKF5kk2GzHUuK6f+VHA== +openapi-typescript-helpers@^0.0.7: + version "0.0.7" + resolved "https://registry.yarnpkg.com/openapi-typescript-helpers/-/openapi-typescript-helpers-0.0.7.tgz#1d0ead67c35864d189c2cb2d0556854ccbb16c38" + integrity sha512-7nwlAtdA1fULipibFRBWE/rnF114q6ejRYzNvhdA/x+qTWAZhXGLc/368dlwMlyJDvCQMCnADjpzb5BS5ZmNSA== + opener@^1.5.2: version "1.5.2" resolved "https://registry.yarnpkg.com/opener/-/opener-1.5.2.tgz#5d37e1f35077b9dcac4301372271afdeb2a13598" @@ -20704,6 +20738,15 @@ treeverse@^2.0.0: resolved "https://registry.yarnpkg.com/treeverse/-/treeverse-2.0.0.tgz#036dcef04bc3fd79a9b79a68d4da03e882d8a9ca" integrity sha512-N5gJCkLu1aXccpOTtqV6ddSEi6ZmGkh3hjmbu1IjcavJK4qyOVQmi0myQKM7z5jVGmD68SJoliaVrMmVObhj6A== +trigram-utils@^1.0.0: + version "1.0.3" + resolved "https://registry.yarnpkg.com/trigram-utils/-/trigram-utils-1.0.3.tgz#535da37a414dae249c4b023512cf2b3dc65c8ea4" + integrity sha512-UAhS1Ll21FtClVIzIN0I/SmGnJ+D08BOxX7Dl1penV8raC0ksf2dJkhNI6kU1Mj3uT86Bul12iMvxXquXSYSng== + dependencies: + collapse-white-space "^1.0.0" + n-gram "^1.0.0" + trim "0.0.1" + trim-newlines@^3.0.0: version "3.0.1" resolved "https://registry.yarnpkg.com/trim-newlines/-/trim-newlines-3.0.1.tgz#260a5d962d8b752425b32f3a7db0dcacd176c144" @@ -20714,6 +20757,11 @@ trim-newlines@^4.0.2: resolved "https://registry.yarnpkg.com/trim-newlines/-/trim-newlines-4.0.2.tgz#d6aaaf6a0df1b4b536d183879a6b939489808c7c" integrity sha512-GJtWyq9InR/2HRiLZgpIKv+ufIKrVrvjQWEj7PxAXNc5dwbNJkqhAUoAGgzRmULAnoOM5EIpveYd3J2VeSAIew== +trim@0.0.1: + version "0.0.1" + resolved "https://registry.yarnpkg.com/trim/-/trim-0.0.1.tgz#5858547f6b290757ee95cccc666fb50084c460dd" + integrity sha512-YzQV+TZg4AxpKxaTHK3c3D+kRDCGVEE7LemdlQZoQXn0iennk10RsIoY6ikzAqJTc9Xjl9C1/waHom/J86ziAQ== + ts-algebra@^1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/ts-algebra/-/ts-algebra-1.2.0.tgz#f91c481207a770f0d14d055c376cbee040afdfc9" @@ -20997,10 +21045,10 @@ typedarray@^0.0.6: resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777" integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c= -typescript@4.9.5, "typescript@^3 || ^4", typescript@^4.2.4, typescript@^5.3.3: - version "5.3.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.3.3.tgz#b3ce6ba258e72e6305ba66f5c9b452aaee3ffe37" - integrity sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw== +"typescript@^3 || ^4", typescript@^4.2.4, typescript@^5.3.3: + version "5.4.2" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.4.2.tgz#0ae9cebcfae970718474fe0da2c090cad6577372" + integrity sha512-+2/g0Fds1ERlP6JsakQQDXjZdZMM+rqpamFZJEKh4kwTIn3iDkgKtby0CeNd5ATNZ4Ry1ax15TMx0W2V+miizQ== uWebSockets.js@uNetworking/uWebSockets.js#v20.34.0: version "20.34.0"