diff --git a/packages/adapter-postgres/migrations/20240318103238_remote_schema.sql b/packages/adapter-postgres/migrations/20240318103238_remote_schema.sql index 53bba89e45..2867a12aea 100644 --- a/packages/adapter-postgres/migrations/20240318103238_remote_schema.sql +++ b/packages/adapter-postgres/migrations/20240318103238_remote_schema.sql @@ -507,6 +507,63 @@ CREATE TABLE IF NOT EXISTS "public"."rooms" ( "createdAt" timestamp with time zone DEFAULT ("now"() AT TIME ZONE 'utc'::"text") NOT NULL ); +CREATE OR REPLACE FUNCTION "public"."search_knowledge"( + "query_embedding" "extensions"."vector", + "query_agent_id" "uuid", + "match_threshold" double precision, + "match_count" integer, + "search_text" text +) RETURNS TABLE ( + "id" "uuid", + "agentId" "uuid", + "content" "jsonb", + "embedding" "extensions"."vector", + "createdAt" timestamp with time zone, + "similarity" double precision +) LANGUAGE "plpgsql" AS $$ +BEGIN + RETURN QUERY + WITH vector_matches AS ( + SELECT id, + 1 - (embedding <=> query_embedding) as vector_score + FROM knowledge + WHERE (agentId IS NULL AND isShared = true) OR agentId = query_agent_id + AND embedding IS NOT NULL + ), + keyword_matches AS ( + SELECT id, + CASE + WHEN content->>'text' ILIKE '%' || search_text || '%' THEN 3.0 + ELSE 1.0 + END * + CASE + WHEN content->'metadata'->>'isChunk' = 'true' THEN 1.5 + WHEN content->'metadata'->>'isMain' = 'true' THEN 1.2 + ELSE 1.0 + END as keyword_score + FROM knowledge + WHERE (agentId IS NULL AND isShared = true) OR agentId = query_agent_id + ) + SELECT + k.id, + k."agentId", + k.content, + k.embedding, + k."createdAt", + (v.vector_score * kw.keyword_score) as similarity + FROM knowledge k + JOIN vector_matches v ON k.id = v.id + LEFT JOIN keyword_matches kw ON k.id = kw.id + WHERE (k.agentId IS NULL AND k.isShared = true) OR k.agentId = query_agent_id + AND ( + v.vector_score >= match_threshold + OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3) + ) + ORDER BY similarity DESC + LIMIT match_count; +END; +$$; + ALTER TABLE "public"."rooms" OWNER TO "postgres"; ALTER TABLE ONLY "public"."relationships" @@ -564,6 +621,9 @@ ALTER TABLE ONLY "public"."relationships" ALTER TABLE ONLY "public"."relationships" ADD CONSTRAINT "relationships_userId_fkey" FOREIGN KEY ("userId") REFERENCES "public"."accounts"("id"); +ALTER TABLE ONLY "public"."knowledge" + ADD CONSTRAINT "knowledge_agentId_fkey" FOREIGN KEY ("agentId") REFERENCES "public"."accounts"("id") ON DELETE CASCADE; + CREATE POLICY "Can select and update all data" ON "public"."accounts" USING (("auth"."uid"() = "id")) WITH CHECK (("auth"."uid"() = "id")); CREATE POLICY "Enable delete for users based on userId" ON "public"."goals" FOR DELETE TO "authenticated" USING (("auth"."uid"() = "userId")); @@ -600,6 +660,18 @@ CREATE POLICY "Enable update for users of own id" ON "public"."rooms" FOR UPDATE CREATE POLICY "Enable users to delete their own relationships/friendships" ON "public"."relationships" FOR DELETE TO "authenticated" USING ((("auth"."uid"() = "userA") OR ("auth"."uid"() = "userB"))); +CREATE POLICY "Enable read access for all users" ON "public"."knowledge" + FOR SELECT USING (true); + +CREATE POLICY "Enable insert for authenticated users only" ON "public"."knowledge" + FOR INSERT TO "authenticated" WITH CHECK (true); + +CREATE POLICY "Enable update for authenticated users" ON "public"."knowledge" + FOR UPDATE TO "authenticated" USING (true) WITH CHECK (true); + +CREATE POLICY "Enable delete for users based on agentId" ON "public"."knowledge" + FOR DELETE TO "authenticated" USING (("auth"."uid"() = "agentId")); + ALTER TABLE "public"."accounts" ENABLE ROW LEVEL SECURITY; ALTER TABLE "public"."goals" ENABLE ROW LEVEL SECURITY; @@ -614,6 +686,8 @@ ALTER TABLE "public"."relationships" ENABLE ROW LEVEL SECURITY; ALTER TABLE "public"."rooms" ENABLE ROW LEVEL SECURITY; +ALTER TABLE "public"."knowledge" ENABLE ROW LEVEL SECURITY; + CREATE POLICY "select_own_account" ON "public"."accounts" FOR SELECT USING (("auth"."uid"() = "id")); GRANT USAGE ON SCHEMA "public" TO "postgres"; @@ -703,6 +777,10 @@ GRANT ALL ON TABLE "public"."secrets" TO "service_role"; GRANT ALL ON TABLE "public"."secrets" TO "supabase_admin"; GRANT ALL ON TABLE "public"."secrets" TO "supabase_auth_admin"; +GRANT ALL ON TABLE "public"."knowledge" TO "authenticated"; +GRANT ALL ON TABLE "public"."knowledge" TO "service_role"; +GRANT ALL ON TABLE "public"."knowledge" TO "supabase_admin"; +GRANT ALL ON TABLE "public"."knowledge" TO "supabase_auth_admin"; GRANT ALL ON FUNCTION "public"."get_participant_userState"("roomId" "uuid", "userId" "uuid") TO "authenticated"; GRANT ALL ON FUNCTION "public"."get_participant_userState"("roomId" "uuid", "userId" "uuid") TO "service_role"; @@ -710,7 +788,7 @@ GRANT ALL ON FUNCTION "public"."get_participant_userState"("roomId" "uuid", "use GRANT ALL ON FUNCTION "public"."get_participant_userState"("roomId" "uuid", "userId" "uuid") TO "supabase_auth_admin"; GRANT ALL ON FUNCTION "public"."set_participant_userState"("roomId" "uuid", "userId" "uuid", "state" "text") TO "authenticated"; -GRANT ALL ON FUNCTION "public"."set_participant_userState"("roomId" "uuid", "userId" "uuid", "state" "text") TO "service_role"; +GRANT ALL ON FUNCTION "public"."set_participant_userState"("roomId" "uuid", "userId" "uuid", "state" "text") TO "service_role"; GRANT ALL ON FUNCTION "public"."set_participant_userState"("roomId" "uuid", "userId" "uuid", "state" "text") TO "supabase_admin"; GRANT ALL ON FUNCTION "public"."set_participant_userState"("roomId" "uuid", "userId" "uuid", "state" "text") TO "supabase_auth_admin"; @@ -733,4 +811,9 @@ ALTER DEFAULT PRIVILEGES FOR ROLE "postgres" IN SCHEMA "public" GRANT ALL ON TAB ALTER DEFAULT PRIVILEGES FOR ROLE "postgres" IN SCHEMA "public" GRANT ALL ON TABLES TO "supabase_admin"; ALTER DEFAULT PRIVILEGES FOR ROLE "postgres" IN SCHEMA "public" GRANT ALL ON TABLES TO "supabase_auth_admin"; +GRANT ALL ON FUNCTION "public"."search_knowledge"("query_embedding" "extensions"."vector", "query_agent_id" "uuid", "match_threshold" double precision, "match_count" integer, "search_text" text) TO "authenticated"; +GRANT ALL ON FUNCTION "public"."search_knowledge"("query_embedding" "extensions"."vector", "query_agent_id" "uuid", "match_threshold" double precision, "match_count" integer, "search_text" text) TO "service_role"; +GRANT ALL ON FUNCTION "public"."search_knowledge"("query_embedding" "extensions"."vector", "query_agent_id" "uuid", "match_threshold" double precision, "match_count" integer, "search_text" text) TO "supabase_admin"; +GRANT ALL ON FUNCTION "public"."search_knowledge"("query_embedding" "extensions"."vector", "query_agent_id" "uuid", "match_threshold" double precision, "match_count" integer, "search_text" text) TO "supabase_auth_admin"; + RESET ALL; \ No newline at end of file diff --git a/packages/adapter-postgres/schema.sql b/packages/adapter-postgres/schema.sql index 4a0f7c6f1d..b8327e54df 100644 --- a/packages/adapter-postgres/schema.sql +++ b/packages/adapter-postgres/schema.sql @@ -9,6 +9,7 @@ -- DROP TABLE IF EXISTS memories CASCADE; -- DROP TABLE IF EXISTS rooms CASCADE; -- DROP TABLE IF EXISTS accounts CASCADE; +-- DROP TABLE IF EXISTS knowledge CASCADE; CREATE EXTENSION IF NOT EXISTS vector; @@ -24,9 +25,6 @@ BEGIN -- Then check for Ollama ELSIF current_setting('app.use_ollama_embedding', TRUE) = 'true' THEN RETURN 1024; -- Ollama mxbai-embed-large dimension - -- Then check for GAIANET - ELSIF current_setting('app.use_gaianet_embedding', TRUE) = 'true' THEN - RETURN 768; -- Gaianet nomic-embed dimension ELSE RETURN 384; -- BGE/Other embedding dimension END IF; @@ -130,11 +128,38 @@ CREATE TABLE IF NOT EXISTS cache ( PRIMARY KEY ("key", "agentId") ); +DO $$ +DECLARE + vector_dim INTEGER; +BEGIN + vector_dim := get_embedding_dimension(); + + EXECUTE format(' + CREATE TABLE IF NOT EXISTS knowledge ( + "id" UUID PRIMARY KEY, + "agentId" UUID REFERENCES accounts("id"), + "content" JSONB NOT NULL, + "embedding" vector(%s), + "createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + "isMain" BOOLEAN DEFAULT FALSE, + "originalId" UUID REFERENCES knowledge("id"), + "chunkIndex" INTEGER, + "isShared" BOOLEAN DEFAULT FALSE, + CHECK((isShared = true AND "agentId" IS NULL) OR (isShared = false AND "agentId" IS NOT NULL)) + )', vector_dim); +END $$; + -- Indexes CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING hnsw ("embedding" vector_cosine_ops); CREATE INDEX IF NOT EXISTS idx_memories_type_room ON memories("type", "roomId"); CREATE INDEX IF NOT EXISTS idx_participants_user ON participants("userId"); CREATE INDEX IF NOT EXISTS idx_participants_room ON participants("roomId"); CREATE INDEX IF NOT EXISTS idx_relationships_users ON relationships("userA", "userB"); - -COMMIT; +CREATE INDEX IF NOT EXISTS idx_knowledge_agent ON knowledge("agentId"); +CREATE INDEX IF NOT EXISTS idx_knowledge_agent_main ON knowledge("agentId", "isMain"); +CREATE INDEX IF NOT EXISTS idx_knowledge_original ON knowledge("originalId"); +CREATE INDEX IF NOT EXISTS idx_knowledge_created ON knowledge("agentId", "createdAt"); +CREATE INDEX IF NOT EXISTS idx_knowledge_shared ON knowledge("isShared"); +CREATE INDEX IF NOT EXISTS idx_knowledge_embedding ON knowledge USING ivfflat (embedding vector_cosine_ops); + +COMMIT; \ No newline at end of file diff --git a/packages/adapter-postgres/src/index.ts b/packages/adapter-postgres/src/index.ts index 5e4cf936ea..9d1e549b66 100644 --- a/packages/adapter-postgres/src/index.ts +++ b/packages/adapter-postgres/src/index.ts @@ -24,6 +24,7 @@ import { getEmbeddingConfig, DatabaseAdapter, EmbeddingProvider, + RAGKnowledgeItem } from "@elizaos/core"; import fs from "fs"; import { fileURLToPath } from "url"; @@ -1457,6 +1458,182 @@ export class PostgresDatabaseAdapter } }, "deleteCache"); } + + async getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + }): Promise { + return this.withDatabase(async () => { + let sql = `SELECT * FROM knowledge WHERE ("agentId" = $1 OR "isShared" = true)`; + const queryParams: any[] = [params.agentId]; + let paramCount = 1; + + if (params.id) { + paramCount++; + sql += ` AND id = $${paramCount}`; + queryParams.push(params.id); + } + + if (params.limit) { + paramCount++; + sql += ` LIMIT $${paramCount}`; + queryParams.push(params.limit); + } + + const { rows } = await this.pool.query(sql, queryParams); + + return rows.map(row => ({ + id: row.id, + agentId: row.agentId, + content: typeof row.content === 'string' ? JSON.parse(row.content) : row.content, + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: row.createdAt.getTime() + })); + }, "getKnowledge"); + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise { + return this.withDatabase(async () => { + const cacheKey = `embedding_${params.agentId}_${params.searchText}`; + const cachedResult = await this.getCache({ + key: cacheKey, + agentId: params.agentId + }); + + if (cachedResult) { + return JSON.parse(cachedResult); + } + + const vectorStr = `[${Array.from(params.embedding).join(",")}]`; + + const sql = ` + WITH vector_scores AS ( + SELECT id, + 1 - (embedding <-> $1::vector) as vector_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = true) OR "agentId" = $2 + AND embedding IS NOT NULL + ), + keyword_matches AS ( + SELECT id, + CASE + WHEN content->>'text' ILIKE $3 THEN 3.0 + ELSE 1.0 + END * + CASE + WHEN (content->'metadata'->>'isChunk')::boolean = true THEN 1.5 + WHEN (content->'metadata'->>'isMain')::boolean = true THEN 1.2 + ELSE 1.0 + END as keyword_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = true) OR "agentId" = $2 + ) + SELECT k.*, + v.vector_score, + kw.keyword_score, + (v.vector_score * kw.keyword_score) as combined_score + FROM knowledge k + JOIN vector_scores v ON k.id = v.id + LEFT JOIN keyword_matches kw ON k.id = kw.id + WHERE ("agentId" IS NULL AND "isShared" = true) OR k."agentId" = $2 + AND ( + v.vector_score >= $4 + OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3) + ) + ORDER BY combined_score DESC + LIMIT $5 + `; + + const { rows } = await this.pool.query(sql, [ + vectorStr, + params.agentId, + `%${params.searchText || ''}%`, + params.match_threshold, + params.match_count + ]); + + const results = rows.map(row => ({ + id: row.id, + agentId: row.agentId, + content: typeof row.content === 'string' ? JSON.parse(row.content) : row.content, + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: row.createdAt.getTime(), + similarity: row.combined_score + })); + + await this.setCache({ + key: cacheKey, + agentId: params.agentId, + value: JSON.stringify(results) + }); + + return results; + }, "searchKnowledge"); + } + + async createKnowledge(knowledge: RAGKnowledgeItem): Promise { + return this.withDatabase(async () => { + const client = await this.pool.connect(); + try { + await client.query('BEGIN'); + + const sql = ` + INSERT INTO knowledge ( + id, "agentId", content, embedding, "createdAt", + "isMain", "originalId", "chunkIndex", "isShared" + ) VALUES ($1, $2, $3, $4, to_timestamp($5/1000.0), $6, $7, $8, $9) + ON CONFLICT (id) DO NOTHING + `; + + const metadata = knowledge.content.metadata || {}; + const vectorStr = knowledge.embedding ? + `[${Array.from(knowledge.embedding).join(",")}]` : null; + + await client.query(sql, [ + knowledge.id, + metadata.isShared ? null : knowledge.agentId, + knowledge.content, + vectorStr, + knowledge.createdAt || Date.now(), + metadata.isMain || false, + metadata.originalId || null, + metadata.chunkIndex || null, + metadata.isShared || false + ]); + + await client.query('COMMIT'); + } catch (error) { + await client.query('ROLLBACK'); + throw error; + } finally { + client.release(); + } + }, "createKnowledge"); + } + + async removeKnowledge(id: UUID): Promise { + return this.withDatabase(async () => { + await this.pool.query('DELETE FROM knowledge WHERE id = $1', [id]); + }, "removeKnowledge"); + } + + async clearKnowledge(agentId: UUID, shared?: boolean): Promise { + return this.withDatabase(async () => { + const sql = shared ? + 'DELETE FROM knowledge WHERE ("agentId" = $1 OR "isShared" = true)' : + 'DELETE FROM knowledge WHERE "agentId" = $1'; + + await this.pool.query(sql, [agentId]); + }, "clearKnowledge"); + } } export default PostgresDatabaseAdapter; diff --git a/packages/adapter-sqlite/src/index.ts b/packages/adapter-sqlite/src/index.ts index b6627a8c62..e22c036a9f 100644 --- a/packages/adapter-sqlite/src/index.ts +++ b/packages/adapter-sqlite/src/index.ts @@ -1,7 +1,7 @@ export * from "./sqliteTables.ts"; export * from "./sqlite_vec.ts"; -import { DatabaseAdapter, IDatabaseCacheAdapter } from "@elizaos/core"; +import { DatabaseAdapter, elizaLogger, IDatabaseCacheAdapter } from "@elizaos/core"; import { Account, Actor, @@ -11,6 +11,7 @@ import { type Memory, type Relationship, type UUID, + RAGKnowledgeItem } from "@elizaos/core"; import { Database } from "better-sqlite3"; import { v4 } from "uuid"; @@ -707,4 +708,206 @@ export class SqliteDatabaseAdapter return false; } } + + async getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + }): Promise { + let sql = `SELECT * FROM knowledge WHERE (agentId = ? OR isShared = 1)`; + const queryParams: any[] = [params.agentId]; + + if (params.id) { + sql += ` AND id = ?`; + queryParams.push(params.id); + } + + if (params.limit) { + sql += ` LIMIT ?`; + queryParams.push(params.limit); + } + + interface KnowledgeRow { + id: UUID; + agentId: UUID; + content: string; + embedding: Buffer | null; + createdAt: string | number; + } + + const rows = this.db.prepare(sql).all(...queryParams) as KnowledgeRow[]; + + return rows.map(row => ({ + id: row.id, + agentId: row.agentId, + content: JSON.parse(row.content), + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: typeof row.createdAt === 'string' ? Date.parse(row.createdAt) : row.createdAt + })); + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise { + const cacheKey = `embedding_${params.agentId}_${params.searchText}`; + const cachedResult = await this.getCache({ + key: cacheKey, + agentId: params.agentId + }); + + if (cachedResult) { + return JSON.parse(cachedResult); + } + + interface KnowledgeSearchRow { + id: UUID; + agentId: UUID; + content: string; + embedding: Buffer | null; + createdAt: string | number; + vector_score: number; + keyword_score: number; + combined_score: number; + } + + const sql = ` + WITH vector_scores AS ( + SELECT id, + 1 / (1 + vec_distance_L2(embedding, ?)) as vector_score + FROM knowledge + WHERE (agentId IS NULL AND isShared = 1) OR agentId = ? + AND embedding IS NOT NULL + ), + keyword_matches AS ( + SELECT id, + CASE + WHEN lower(json_extract(content, '$.text')) LIKE ? THEN 3.0 + ELSE 1.0 + END * + CASE + WHEN json_extract(content, '$.metadata.isChunk') = 1 THEN 1.5 + WHEN json_extract(content, '$.metadata.isMain') = 1 THEN 1.2 + ELSE 1.0 + END as keyword_score + FROM knowledge + WHERE (agentId IS NULL AND isShared = 1) OR agentId = ? + ) + SELECT k.*, + v.vector_score, + kw.keyword_score, + (v.vector_score * kw.keyword_score) as combined_score + FROM knowledge k + JOIN vector_scores v ON k.id = v.id + LEFT JOIN keyword_matches kw ON k.id = kw.id + WHERE (k.agentId IS NULL AND k.isShared = 1) OR k.agentId = ? + AND ( + v.vector_score >= ? -- Using match_threshold parameter + OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3) + ) + ORDER BY combined_score DESC + LIMIT ? + `; + + const searchParams = [ + params.embedding, + params.agentId, + `%${params.searchText?.toLowerCase() || ''}%`, + params.agentId, + params.agentId, + params.match_threshold, + params.match_count + ]; + + try { + const rows = this.db.prepare(sql).all(...searchParams) as KnowledgeSearchRow[]; + const results = rows.map(row => ({ + id: row.id, + agentId: row.agentId, + content: JSON.parse(row.content), + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: typeof row.createdAt === 'string' ? Date.parse(row.createdAt) : row.createdAt, + similarity: row.combined_score + })); + + await this.setCache({ + key: cacheKey, + agentId: params.agentId, + value: JSON.stringify(results) + }); + + return results; + } catch (error) { + elizaLogger.error('Error in searchKnowledge:', error); + throw error; + } + + } + + async createKnowledge(knowledge: RAGKnowledgeItem): Promise { + try { + this.db.transaction(() => { + const sql = ` + INSERT INTO knowledge ( + id, agentId, content, embedding, createdAt, + isMain, originalId, chunkIndex, isShared + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `; + + const embeddingArray = knowledge.embedding || null; + + const metadata = knowledge.content.metadata || {}; + const isShared = metadata.isShared ? 1 : 0; + + this.db.prepare(sql).run( + knowledge.id, + metadata.isShared ? null : knowledge.agentId, + JSON.stringify(knowledge.content), + embeddingArray, + knowledge.createdAt || Date.now(), + metadata.isMain ? 1 : 0, + metadata.originalId || null, + metadata.chunkIndex || null, + isShared + ); + + })(); + } catch (error: any) { + const isShared = knowledge.content.metadata?.isShared; + const isPrimaryKeyError = error?.code === 'SQLITE_CONSTRAINT_PRIMARYKEY'; + + if (isShared && isPrimaryKeyError) { + elizaLogger.info(`Shared knowledge ${knowledge.id} already exists, skipping`); + return; + } else if (!isShared && !error.message?.includes('SQLITE_CONSTRAINT_PRIMARYKEY')) { + elizaLogger.error(`Error creating knowledge ${knowledge.id}:`, { + error, + embeddingLength: knowledge.embedding?.length, + content: knowledge.content + }); + throw error; + } + + elizaLogger.debug(`Knowledge ${knowledge.id} already exists, skipping`); + } + } + + async removeKnowledge(id: UUID): Promise { + const sql = `DELETE FROM knowledge WHERE id = ?`; + this.db.prepare(sql).run(id); + } + + async clearKnowledge(agentId: UUID, shared?: boolean): Promise { + const sql = shared ? `DELETE FROM knowledge WHERE (agentId = ? OR isShared = 1)` : `DELETE FROM knowledge WHERE agentId = ?`; + try { + this.db.prepare(sql).run(agentId); + } catch (error) { + elizaLogger.error(`Error clearing knowledge for agent ${agentId}:`, error); + throw error; + } + } } diff --git a/packages/adapter-sqlite/src/sqliteTables.ts b/packages/adapter-sqlite/src/sqliteTables.ts index fdd47e5697..87fc26743f 100644 --- a/packages/adapter-sqlite/src/sqliteTables.ts +++ b/packages/adapter-sqlite/src/sqliteTables.ts @@ -92,6 +92,22 @@ CREATE TABLE IF NOT EXISTS "cache" ( PRIMARY KEY ("key", "agentId") ); +-- Table: knowledge +CREATE TABLE IF NOT EXISTS "knowledge" ( + "id" TEXT PRIMARY KEY, + "agentId" TEXT, + "content" TEXT NOT NULL CHECK(json_valid("content")), + "embedding" BLOB, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "isMain" INTEGER DEFAULT 0, + "originalId" TEXT, + "chunkIndex" INTEGER, + "isShared" INTEGER DEFAULT 0, + FOREIGN KEY ("agentId") REFERENCES "accounts"("id"), + FOREIGN KEY ("originalId") REFERENCES "knowledge"("id"), + CHECK((isShared = 1 AND agentId IS NULL) OR (isShared = 0 AND agentId IS NOT NULL)) +); + -- Index: relationships_id_key CREATE UNIQUE INDEX IF NOT EXISTS "relationships_id_key" ON "relationships" ("id"); @@ -101,4 +117,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS "memories_id_key" ON "memories" ("id"); -- Index: participants_id_key CREATE UNIQUE INDEX IF NOT EXISTS "participants_id_key" ON "participants" ("id"); -COMMIT;`; +-- Index: knowledge +CREATE INDEX IF NOT EXISTS "knowledge_agent_key" ON "knowledge" ("agentId"); +CREATE INDEX IF NOT EXISTS "knowledge_agent_main_key" ON "knowledge" ("agentId", "isMain"); +CREATE INDEX IF NOT EXISTS "knowledge_original_key" ON "knowledge" ("originalId"); +CREATE INDEX IF NOT EXISTS "knowledge_content_key" ON "knowledge" + ((json_extract(content, '$.text'))) + WHERE json_extract(content, '$.text') IS NOT NULL; +CREATE INDEX IF NOT EXISTS "knowledge_created_key" ON "knowledge" ("agentId", "createdAt"); +CREATE INDEX IF NOT EXISTS "knowledge_shared_key" ON "knowledge" ("isShared"); + +COMMIT;`; \ No newline at end of file diff --git a/packages/adapter-sqljs/src/index.ts b/packages/adapter-sqljs/src/index.ts index 0865845049..db27215e10 100644 --- a/packages/adapter-sqljs/src/index.ts +++ b/packages/adapter-sqljs/src/index.ts @@ -12,11 +12,12 @@ import { type Memory, type Relationship, type UUID, + RAGKnowledgeItem, + elizaLogger } from "@elizaos/core"; import { v4 } from "uuid"; import { sqliteTables } from "./sqliteTables.ts"; import { Database } from "./types.ts"; -import { elizaLogger } from "@elizaos/core"; export class SqlJsDatabaseAdapter extends DatabaseAdapter @@ -803,4 +804,191 @@ export class SqlJsDatabaseAdapter return false; } } + + async getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + }): Promise { + let sql = `SELECT * FROM knowledge WHERE ("agentId" = ? OR "isShared" = 1)`; + const queryParams: any[] = [params.agentId]; + + if (params.id) { + sql += ` AND id = ?`; + queryParams.push(params.id); + } + + if (params.limit) { + sql += ` LIMIT ?`; + queryParams.push(params.limit); + } + + const stmt = this.db.prepare(sql); + stmt.bind(queryParams); + const results: RAGKnowledgeItem[] = []; + + while (stmt.step()) { + const row = stmt.getAsObject() as any; + results.push({ + id: row.id, + agentId: row.agentId, + content: JSON.parse(row.content), + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, // Convert Uint8Array back to Float32Array + createdAt: row.createdAt + }); + } + stmt.free(); + return results; + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise { + const cacheKey = `embedding_${params.agentId}_${params.searchText}`; + const cachedResult = await this.getCache({ + key: cacheKey, + agentId: params.agentId + }); + + if (cachedResult) { + return JSON.parse(cachedResult); + } + + let sql = ` + WITH vector_scores AS ( + SELECT id, + 1 / (1 + vec_distance_L2(embedding, ?)) as vector_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = 1) OR "agentId" = ? + AND embedding IS NOT NULL + ), + keyword_matches AS ( + SELECT id, + CASE + WHEN json_extract(content, '$.text') LIKE ? THEN 3.0 + ELSE 1.0 + END * + CASE + WHEN json_extract(content, '$.metadata.isChunk') = 1 THEN 1.5 + WHEN json_extract(content, '$.metadata.isMain') = 1 THEN 1.2 + ELSE 1.0 + END as keyword_score + FROM knowledge + WHERE ("agentId" IS NULL AND "isShared" = 1) OR "agentId" = ? + ) + SELECT k.*, + v.vector_score, + kw.keyword_score, + (v.vector_score * kw.keyword_score) as combined_score + FROM knowledge k + JOIN vector_scores v ON k.id = v.id + LEFT JOIN keyword_matches kw ON k.id = kw.id + WHERE (k.agentId IS NULL AND k.isShared = 1) OR k.agentId = ? + AND ( + v.vector_score >= ? -- Using match_threshold parameter + OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3) + ) + ORDER BY combined_score DESC + LIMIT ? + `; + + const stmt = this.db.prepare(sql); + stmt.bind([ + new Uint8Array(params.embedding.buffer), + params.agentId, + `%${params.searchText || ''}%`, + params.agentId, + params.agentId, + params.match_threshold, + params.match_count + ]); + + const results: RAGKnowledgeItem[] = []; + while (stmt.step()) { + const row = stmt.getAsObject() as any; + results.push({ + id: row.id, + agentId: row.agentId, + content: JSON.parse(row.content), + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: row.createdAt, + similarity: row.keyword_score + }); + } + stmt.free(); + + await this.setCache({ + key: cacheKey, + agentId: params.agentId, + value: JSON.stringify(results) + }); + + return results; + } + + async createKnowledge(knowledge: RAGKnowledgeItem): Promise { + try { + const sql = ` + INSERT INTO knowledge ( + id, "agentId", content, embedding, "createdAt", + "isMain", "originalId", "chunkIndex", "isShared" + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `; + + const stmt = this.db.prepare(sql); + const metadata = knowledge.content.metadata || {}; + + stmt.run([ + knowledge.id, + metadata.isShared ? null : knowledge.agentId, + JSON.stringify(knowledge.content), + knowledge.embedding ? new Uint8Array(knowledge.embedding.buffer) : null, + knowledge.createdAt || Date.now(), + metadata.isMain ? 1 : 0, + metadata.originalId || null, + metadata.chunkIndex || null, + metadata.isShared ? 1 : 0 + ]); + stmt.free(); + } catch (error: any) { + const isShared = knowledge.content.metadata?.isShared; + const isPrimaryKeyError = error?.code === 'SQLITE_CONSTRAINT_PRIMARYKEY'; + + if (isShared && isPrimaryKeyError) { + elizaLogger.info(`Shared knowledge ${knowledge.id} already exists, skipping`); + return; + } else if (!isShared && !error.message?.includes('SQLITE_CONSTRAINT_PRIMARYKEY')) { + elizaLogger.error(`Error creating knowledge ${knowledge.id}:`, { + error, + embeddingLength: knowledge.embedding?.length, + content: knowledge.content + }); + throw error; + } + + elizaLogger.debug(`Knowledge ${knowledge.id} already exists, skipping`); + } + } + + async removeKnowledge(id: UUID): Promise { + const sql = `DELETE FROM knowledge WHERE id = ?`; + const stmt = this.db.prepare(sql); + stmt.run([id]); + stmt.free(); + } + + async clearKnowledge(agentId: UUID, shared?: boolean): Promise { + const sql = shared ? + `DELETE FROM knowledge WHERE ("agentId" = ? OR "isShared" = 1)` : + `DELETE FROM knowledge WHERE "agentId" = ?`; + + const stmt = this.db.prepare(sql); + stmt.run([agentId]); + stmt.free(); + } } diff --git a/packages/adapter-sqljs/src/sqliteTables.ts b/packages/adapter-sqljs/src/sqliteTables.ts index fdd47e5697..87fc26743f 100644 --- a/packages/adapter-sqljs/src/sqliteTables.ts +++ b/packages/adapter-sqljs/src/sqliteTables.ts @@ -92,6 +92,22 @@ CREATE TABLE IF NOT EXISTS "cache" ( PRIMARY KEY ("key", "agentId") ); +-- Table: knowledge +CREATE TABLE IF NOT EXISTS "knowledge" ( + "id" TEXT PRIMARY KEY, + "agentId" TEXT, + "content" TEXT NOT NULL CHECK(json_valid("content")), + "embedding" BLOB, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "isMain" INTEGER DEFAULT 0, + "originalId" TEXT, + "chunkIndex" INTEGER, + "isShared" INTEGER DEFAULT 0, + FOREIGN KEY ("agentId") REFERENCES "accounts"("id"), + FOREIGN KEY ("originalId") REFERENCES "knowledge"("id"), + CHECK((isShared = 1 AND agentId IS NULL) OR (isShared = 0 AND agentId IS NOT NULL)) +); + -- Index: relationships_id_key CREATE UNIQUE INDEX IF NOT EXISTS "relationships_id_key" ON "relationships" ("id"); @@ -101,4 +117,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS "memories_id_key" ON "memories" ("id"); -- Index: participants_id_key CREATE UNIQUE INDEX IF NOT EXISTS "participants_id_key" ON "participants" ("id"); -COMMIT;`; +-- Index: knowledge +CREATE INDEX IF NOT EXISTS "knowledge_agent_key" ON "knowledge" ("agentId"); +CREATE INDEX IF NOT EXISTS "knowledge_agent_main_key" ON "knowledge" ("agentId", "isMain"); +CREATE INDEX IF NOT EXISTS "knowledge_original_key" ON "knowledge" ("originalId"); +CREATE INDEX IF NOT EXISTS "knowledge_content_key" ON "knowledge" + ((json_extract(content, '$.text'))) + WHERE json_extract(content, '$.text') IS NOT NULL; +CREATE INDEX IF NOT EXISTS "knowledge_created_key" ON "knowledge" ("agentId", "createdAt"); +CREATE INDEX IF NOT EXISTS "knowledge_shared_key" ON "knowledge" ("isShared"); + +COMMIT;`; \ No newline at end of file diff --git a/packages/adapter-supabase/schema.sql b/packages/adapter-supabase/schema.sql index 25eb0dcae8..e88e5a4d14 100644 --- a/packages/adapter-supabase/schema.sql +++ b/packages/adapter-supabase/schema.sql @@ -9,6 +9,7 @@ -- DROP TABLE IF EXISTS memories CASCADE; -- DROP TABLE IF EXISTS rooms CASCADE; -- DROP TABLE IF EXISTS accounts CASCADE; +-- DROP TABLE IF EXISTS knowledge CASCADE; CREATE EXTENSION IF NOT EXISTS vector; @@ -61,21 +62,6 @@ CREATE TABLE memories_1024 ( CONSTRAINT fk_agent FOREIGN KEY ("agentId") REFERENCES accounts("id") ON DELETE CASCADE ); -CREATE TABLE memories_768 ( - "id" UUID PRIMARY KEY, - "type" TEXT NOT NULL, - "createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - "content" JSONB NOT NULL, - "embedding" vector(768), -- Gaianet nomic-embed - "userId" UUID REFERENCES accounts("id"), - "agentId" UUID REFERENCES accounts("id"), - "roomId" UUID REFERENCES rooms("id"), - "unique" BOOLEAN DEFAULT true NOT NULL, - CONSTRAINT fk_room FOREIGN KEY ("roomId") REFERENCES rooms("id") ON DELETE CASCADE, - CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE, - CONSTRAINT fk_agent FOREIGN KEY ("agentId") REFERENCES accounts("id") ON DELETE CASCADE -); - CREATE TABLE memories_384 ( "id" UUID PRIMARY KEY, "type" TEXT NOT NULL, @@ -150,11 +136,31 @@ CREATE TABLE relationships ( CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE ); +CREATE TABLE cache ( + "key" TEXT NOT NULL, + "agentId" TEXT NOT NULL, + "value" JSONB DEFAULT '{}'::jsonb, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "expiresAt" TIMESTAMP, + PRIMARY KEY ("key", "agentId") +); + +CREATE TABLE knowledge ( + "id" UUID PRIMARY KEY, + "agentId" UUID REFERENCES accounts("id"), + "content" JSONB NOT NULL, + "embedding" vector(1536), + "createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + "isMain" BOOLEAN DEFAULT FALSE, + "originalId" UUID REFERENCES knowledge("id"), + "chunkIndex" INTEGER, + "isShared" BOOLEAN DEFAULT FALSE, + CHECK((isShared = true AND "agentId" IS NULL) OR (isShared = false AND "agentId" IS NOT NULL)) +); + -- Add index for Ollama table CREATE INDEX idx_memories_1024_embedding ON memories_1024 USING hnsw ("embedding" vector_cosine_ops); CREATE INDEX idx_memories_1024_type_room ON memories_1024("type", "roomId"); -CREATE INDEX idx_memories_768_embedding ON memories_768 USING hnsw ("embedding" vector_cosine_ops); -CREATE INDEX idx_memories_768_type_room ON memories_768("type", "roomId"); CREATE INDEX idx_memories_1536_embedding ON memories_1536 USING hnsw ("embedding" vector_cosine_ops); CREATE INDEX idx_memories_384_embedding ON memories_384 USING hnsw ("embedding" vector_cosine_ops); CREATE INDEX idx_memories_1536_type_room ON memories_1536("type", "roomId"); @@ -162,5 +168,11 @@ CREATE INDEX idx_memories_384_type_room ON memories_384("type", "roomId"); CREATE INDEX idx_participants_user ON participants("userId"); CREATE INDEX idx_participants_room ON participants("roomId"); CREATE INDEX idx_relationships_users ON relationships("userA", "userB"); +CREATE INDEX idx_knowledge_agent ON knowledge("agentId"); +CREATE INDEX idx_knowledge_agent_main ON knowledge("agentId", "isMain"); +CREATE INDEX idx_knowledge_original ON knowledge("originalId"); +CREATE INDEX idx_knowledge_created ON knowledge("agentId", "createdAt"); +CREATE INDEX idx_knowledge_shared ON knowledge("isShared"); +CREATE INDEX idx_knowledge_embedding ON knowledge USING ivfflat (embedding vector_cosine_ops); COMMIT; diff --git a/packages/adapter-supabase/src/index.ts b/packages/adapter-supabase/src/index.ts index f6f9840f21..9c8d643f61 100644 --- a/packages/adapter-supabase/src/index.ts +++ b/packages/adapter-supabase/src/index.ts @@ -9,6 +9,8 @@ import { type UUID, Participant, Room, + RAGKnowledgeItem, + elizaLogger } from "@elizaos/core"; import { DatabaseAdapter } from "@elizaos/core"; import { v4 as uuid } from "uuid"; @@ -680,4 +682,229 @@ export class SupabaseDatabaseAdapter extends DatabaseAdapter { return data as Relationship[]; } + + async getCache(params: { + key: string; + agentId: UUID; + }): Promise { + const { data, error } = await this.supabase + .from('cache') + .select('value') + .eq('key', params.key) + .eq('agentId', params.agentId) + .single(); + + if (error) { + console.error('Error fetching cache:', error); + return undefined; + } + + return data?.value; + } + + async setCache(params: { + key: string; + agentId: UUID; + value: string; + }): Promise { + const { error } = await this.supabase + .from('cache') + .upsert({ + key: params.key, + agentId: params.agentId, + value: params.value, + createdAt: new Date() + }); + + if (error) { + console.error('Error setting cache:', error); + return false; + } + + return true; + } + + async deleteCache(params: { + key: string; + agentId: UUID; + }): Promise { + try { + const { error } = await this.supabase + .from('cache') + .delete() + .eq('key', params.key) + .eq('agentId', params.agentId); + + if (error) { + elizaLogger.error("Error deleting cache", { + error: error.message, + key: params.key, + agentId: params.agentId, + }); + return false; + } + return true; + } catch (error) { + elizaLogger.error( + "Database connection error in deleteCache", + error instanceof Error ? error.message : String(error) + ); + return false; + } + } + + async getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + }): Promise { + let query = this.supabase + .from('knowledge') + .select('*') + .or(`agentId.eq.${params.agentId},isShared.eq.true`); + + if (params.id) { + query = query.eq('id', params.id); + } + + if (params.limit) { + query = query.limit(params.limit); + } + + const { data, error } = await query; + + if (error) { + throw new Error(`Error getting knowledge: ${error.message}`); + } + + return data.map(row => ({ + id: row.id, + agentId: row.agentId, + content: typeof row.content === 'string' ? JSON.parse(row.content) : row.content, + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: new Date(row.createdAt).getTime() + })); + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise { + const cacheKey = `embedding_${params.agentId}_${params.searchText}`; + const cachedResult = await this.getCache({ + key: cacheKey, + agentId: params.agentId + }); + + if (cachedResult) { + return JSON.parse(cachedResult); + } + + // Convert Float32Array to array for Postgres vector + const embedding = Array.from(params.embedding); + + const { data, error } = await this.supabase.rpc('search_knowledge', { + query_embedding: embedding, + query_agent_id: params.agentId, + match_threshold: params.match_threshold, + match_count: params.match_count, + search_text: params.searchText || '' + }); + + if (error) { + throw new Error(`Error searching knowledge: ${error.message}`); + } + + const results = data.map(row => ({ + id: row.id, + agentId: row.agentId, + content: typeof row.content === 'string' ? JSON.parse(row.content) : row.content, + embedding: row.embedding ? new Float32Array(row.embedding) : undefined, + createdAt: new Date(row.createdAt).getTime(), + similarity: row.similarity + })); + + await this.setCache({ + key: cacheKey, + agentId: params.agentId, + value: JSON.stringify(results) + }); + + return results; + } + + async createKnowledge(knowledge: RAGKnowledgeItem): Promise { + try { + const metadata = knowledge.content.metadata || {}; + + const { error } = await this.supabase + .from('knowledge') + .insert({ + id: knowledge.id, + agentId: metadata.isShared ? null : knowledge.agentId, + content: knowledge.content, + embedding: knowledge.embedding ? Array.from(knowledge.embedding) : null, + createdAt: knowledge.createdAt || new Date(), + isMain: metadata.isMain || false, + originalId: metadata.originalId || null, + chunkIndex: metadata.chunkIndex || null, + isShared: metadata.isShared || false + }); + + if (error) { + if (metadata.isShared && error.code === '23505') { // Unique violation + elizaLogger.info(`Shared knowledge ${knowledge.id} already exists, skipping`); + return; + } + throw error; + } + } catch (error: any) { + elizaLogger.error(`Error creating knowledge ${knowledge.id}:`, { + error, + embeddingLength: knowledge.embedding?.length, + content: knowledge.content + }); + throw error; + } + } + + async removeKnowledge(id: UUID): Promise { + const { error } = await this.supabase + .from('knowledge') + .delete() + .eq('id', id); + + if (error) { + throw new Error(`Error removing knowledge: ${error.message}`); + } + } + + async clearKnowledge(agentId: UUID, shared?: boolean): Promise { + if (shared) { + const { error } = await this.supabase + .from('knowledge') + .delete() + .filter('agentId', 'eq', agentId) + .filter('isShared', 'eq', true); + + if (error) { + elizaLogger.error(`Error clearing shared knowledge for agent ${agentId}:`, error); + throw error; + } + } else { + const { error } = await this.supabase + .from('knowledge') + .delete() + .eq('agentId', agentId); + + if (error) { + elizaLogger.error(`Error clearing knowledge for agent ${agentId}:`, error); + throw error; + } + } + } } diff --git a/packages/core/src/database.ts b/packages/core/src/database.ts index 9e8cbfa1b5..310c44c32a 100644 --- a/packages/core/src/database.ts +++ b/packages/core/src/database.ts @@ -6,6 +6,7 @@ import { type Memory, type Relationship, type UUID, + RAGKnowledgeItem, Participant, IDatabaseAdapter, } from "./types.ts"; @@ -380,6 +381,48 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { userId: UUID; }): Promise; + /** + * Retrieves knowledge items based on specified parameters. + * @param params Object containing search parameters + * @returns Promise resolving to array of knowledge items + */ + abstract getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + conversationContext?: string; + }): Promise; + + abstract searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise; + + /** + * Creates a new knowledge item in the database. + * @param knowledge The knowledge item to create + * @returns Promise resolving when creation is complete + */ + abstract createKnowledge(knowledge: RAGKnowledgeItem): Promise; + + /** + * Removes a knowledge item and its associated chunks from the database. + * @param id The ID of the knowledge item to remove + * @returns Promise resolving when removal is complete + */ + abstract removeKnowledge(id: UUID): Promise; + + /** + * Removes an agents full knowledge database and its associated chunks from the database. + * @param agentId The Agent ID of the knowledge items to remove + * @returns Promise resolving when removal is complete + */ + abstract clearKnowledge(agentId: UUID, shared?: boolean): Promise; + /** * Executes an operation with circuit breaker protection. * @param operation A function that returns a Promise to be executed with circuit breaker protection diff --git a/packages/core/src/environment.ts b/packages/core/src/environment.ts index bcc0c87ff7..ed7edf3bf2 100644 --- a/packages/core/src/environment.ts +++ b/packages/core/src/environment.ts @@ -77,7 +77,15 @@ export const CharacterSchema = z.object({ postExamples: z.array(z.string()), topics: z.array(z.string()), adjectives: z.array(z.string()), - knowledge: z.array(z.string()).optional(), + knowledge: z.array( + z.union([ + z.string(), + z.object({ + path: z.string(), + shared: z.boolean().optional() + }) + ]) + ).optional(), clients: z.array(z.nativeEnum(Clients)), plugins: z.union([z.array(z.string()), z.array(PluginSchema)]), settings: z diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c0360768e9..7dbf7f832d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -23,4 +23,5 @@ export * from "./uuid.ts"; export * from "./environment.ts"; export * from "./cache.ts"; export { default as knowledge } from "./knowledge.ts"; +export * from "./ragknowledge.ts"; export * from "./utils.ts"; diff --git a/packages/core/src/ragknowledge.ts b/packages/core/src/ragknowledge.ts new file mode 100644 index 0000000000..0856cea67a --- /dev/null +++ b/packages/core/src/ragknowledge.ts @@ -0,0 +1,395 @@ +import { embed } from "./embedding.ts"; +import elizaLogger from "./logger.ts"; +import { + IRAGKnowledgeManager, + RAGKnowledgeItem, + UUID, + IAgentRuntime +} from "./types.ts"; +import { splitChunks } from "./generation.ts"; +import { stringToUuid } from "./uuid.ts"; + +/** + * Manage knowledge in the database. + */ +export class RAGKnowledgeManager implements IRAGKnowledgeManager { + /** + * The AgentRuntime instance associated with this manager. + */ + runtime: IAgentRuntime; + + /** + * The name of the database table this manager operates on. + */ + tableName: string; + + /** + * Constructs a new KnowledgeManager instance. + * @param opts Options for the manager. + * @param opts.tableName The name of the table this manager will operate on. + * @param opts.runtime The AgentRuntime instance associated with this manager. + */ + constructor(opts: { tableName: string; runtime: IAgentRuntime }) { + this.runtime = opts.runtime; + this.tableName = opts.tableName; + } + + private readonly defaultRAGMatchThreshold = 0.85; + private readonly defaultRAGMatchCount = 5; + + /** + * Common English stop words to filter out from query analysis + */ + private readonly stopWords = new Set([ + 'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by', 'does', 'for', 'from', 'had', + 'has', 'have', 'he', 'her', 'his', 'how', 'hey', 'i', 'in', 'is', 'it', 'its', + 'of', 'on', 'or', 'that', 'the', 'this', 'to', 'was', 'what', 'when', 'where', + 'which', 'who', 'will', 'with', 'would', 'there', 'their', 'they', 'your', 'you' + ]); + + /** + * Filters out stop words and returns meaningful terms + */ + private getQueryTerms(query: string): string[] { + return query.toLowerCase() + .split(' ') + .filter(term => term.length > 3) // Filter very short words + .filter(term => !this.stopWords.has(term)); // Filter stop words + } + + /** + * Preprocesses text content for better RAG performance. + * @param content The text content to preprocess. + * @returns The preprocessed text. + */ + + private preprocess(content: string): string { + if (!content || typeof content !== "string") { + elizaLogger.warn("Invalid input for preprocessing"); + return ""; + } + + return content + .replace(/```[\s\S]*?```/g, "") + .replace(/`.*?`/g, "") + .replace(/#{1,6}\s*(.*)/g, "$1") + .replace(/!\[(.*?)\]\(.*?\)/g, "$1") + .replace(/\[(.*?)\]\(.*?\)/g, "$1") + .replace(/(https?:\/\/)?(www\.)?([^\s]+\.[^\s]+)/g, "$3") + .replace(/<@[!&]?\d+>/g, "") + .replace(/<[^>]*>/g, "") + .replace(/^\s*[-*_]{3,}\s*$/gm, "") + .replace(/\/\*[\s\S]*?\*\//g, "") + .replace(/\/\/.*/g, "") + .replace(/\s+/g, " ") + .replace(/\n{3,}/g, "\n\n") + .replace(/[^a-zA-Z0-9\s\-_./:?=&]/g, "") + .trim() + .toLowerCase(); + } + + private hasProximityMatch(text: string, terms: string[]): boolean { + const words = text.toLowerCase().split(' '); + const positions = terms.map(term => words.findIndex(w => w.includes(term))) + .filter(pos => pos !== -1); + + if (positions.length < 2) return false; + + // Check if any matches are within 5 words of each other + for (let i = 0; i < positions.length - 1; i++) { + if (Math.abs(positions[i] - positions[i + 1]) <= 5) { + return true; + } + } + return false; + } + + async getKnowledge(params: { + query?: string; + id?: UUID; + conversationContext?: string; + limit?: number; + agentId?: UUID; + }): Promise { + const agentId = params.agentId || this.runtime.agentId; + + // If id is provided, do direct lookup first + if (params.id) { + const directResults = await this.runtime.databaseAdapter.getKnowledge({ + id: params.id, + agentId: agentId + }); + + if (directResults.length > 0) { + return directResults; + } + } + + // If no id or no direct results, perform semantic search + if (params.query) { + try { + const processedQuery = this.preprocess(params.query); + + // Build search text with optional context + let searchText = processedQuery; + if (params.conversationContext) { + const relevantContext = this.preprocess(params.conversationContext); + searchText = `${relevantContext} ${processedQuery}`; + } + + const embeddingArray = await embed(this.runtime, searchText); + + const embedding = new Float32Array(embeddingArray); + + // Get results with single query + const results = await this.runtime.databaseAdapter.searchKnowledge({ + agentId: this.runtime.agentId, + embedding: embedding, + match_threshold: this.defaultRAGMatchThreshold, + match_count: (params.limit || this.defaultRAGMatchCount) * 2, + searchText: processedQuery + }); + + // Enhanced reranking with sophisticated scoring + const rerankedResults = results.map(result => { + let score = result.similarity; + + // Check for direct query term matches + const queryTerms = this.getQueryTerms(processedQuery); + + const matchingTerms = queryTerms.filter(term => + result.content.text.toLowerCase().includes(term)); + + if (matchingTerms.length > 0) { + // Much stronger boost for matches + score *= (1 + (matchingTerms.length / queryTerms.length) * 2); // Double the boost + + if (this.hasProximityMatch(result.content.text, matchingTerms)) { + score *= 1.5; // Stronger proximity boost + } + } else { + // More aggressive penalty + if (!params.conversationContext) { + score *= 0.3; // Stronger penalty + } + } + + return { + ...result, + score, + matchedTerms: matchingTerms // Add for debugging + }; + }).sort((a, b) => b.score - a.score); + + // Filter and return results + return rerankedResults + .filter(result => result.score >= this.defaultRAGMatchThreshold) + .slice(0, params.limit || this.defaultRAGMatchCount); + + } catch(error) { + console.log(`[RAG Search Error] ${error}`); + return []; + } + } + + // If neither id nor query provided, return empty array + return []; + } + + async createKnowledge(item: RAGKnowledgeItem): Promise { + if (!item.content.text) { + elizaLogger.warn("Empty content in knowledge item"); + return; + } + + try { + // Process main document + const processedContent = this.preprocess(item.content.text); + const mainEmbeddingArray = await embed(this.runtime, processedContent); + + const mainEmbedding = new Float32Array(mainEmbeddingArray); + + // Create main document + await this.runtime.databaseAdapter.createKnowledge({ + id: item.id, + agentId: this.runtime.agentId, + content: { + text: item.content.text, + metadata: { + ...item.content.metadata, + isMain: true + } + }, + embedding: mainEmbedding, + createdAt: Date.now() + }); + + // Generate and store chunks + const chunks = await splitChunks(processedContent, 512, 20); + + for (const [index, chunk] of chunks.entries()) { + const chunkEmbeddingArray = await embed(this.runtime, chunk); + const chunkEmbedding = new Float32Array(chunkEmbeddingArray); + const chunkId = `${item.id}-chunk-${index}` as UUID; + + await this.runtime.databaseAdapter.createKnowledge({ + id: chunkId, + agentId: this.runtime.agentId, + content: { + text: chunk, + metadata: { + ...item.content.metadata, + isChunk: true, + originalId: item.id, + chunkIndex: index + } + }, + embedding: chunkEmbedding, + createdAt: Date.now() + }); + } + } catch (error) { + elizaLogger.error(`Error processing knowledge ${item.id}:`, error); + throw error; + } + } + + async searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array | number[]; + match_threshold?: number; + match_count?: number; + searchText?: string; + }): Promise { + const { + match_threshold = this.defaultRAGMatchThreshold, + match_count = this.defaultRAGMatchCount, + embedding, + searchText + } = params; + + const float32Embedding = Array.isArray(embedding) ? new Float32Array(embedding) : embedding; + + return await this.runtime.databaseAdapter.searchKnowledge({ + agentId: params.agentId || this.runtime.agentId, + embedding: float32Embedding, + match_threshold, + match_count, + searchText + }); + } + + async removeKnowledge(id: UUID): Promise { + await this.runtime.databaseAdapter.removeKnowledge(id); + } + + async clearKnowledge(shared?: boolean): Promise { + await this.runtime.databaseAdapter.clearKnowledge(this.runtime.agentId, shared ? shared : false); + } + + async processFile(file: { + path: string; + content: string; + type: 'pdf' | 'md' | 'txt'; + isShared?: boolean + }): Promise { + const timeMarker = (label: string) => { + const time = (Date.now() - startTime) / 1000; + elizaLogger.info(`[Timing] ${label}: ${time.toFixed(2)}s`); + }; + + const startTime = Date.now(); + let content = file.content; + + try { + const fileSizeKB = (new TextEncoder().encode(content)).length / 1024; + elizaLogger.info(`[File Progress] Starting ${file.path} (${fileSizeKB.toFixed(2)} KB)`); + + // Step 1: Preprocessing + const preprocessStart = Date.now(); + const processedContent = this.preprocess(content); + timeMarker('Preprocessing'); + + // Step 2: Main document embedding + const mainEmbeddingArray = await embed(this.runtime, processedContent); + const mainEmbedding = new Float32Array(mainEmbeddingArray); + timeMarker('Main embedding'); + + // Step 3: Create main document + await this.runtime.databaseAdapter.createKnowledge({ + id: stringToUuid(file.path), + agentId: this.runtime.agentId, + content: { + text: content, + metadata: { + source: file.path, + type: file.type, + isShared: file.isShared || false + } + }, + embedding: mainEmbedding, + createdAt: Date.now() + }); + timeMarker('Main document storage'); + + // Step 4: Generate chunks + const chunks = await splitChunks(processedContent, 512, 20); + const totalChunks = chunks.length; + elizaLogger.info(`Generated ${totalChunks} chunks`); + timeMarker('Chunk generation'); + + // Step 5: Process chunks with larger batches + const BATCH_SIZE = 10; // Increased batch size + let processedChunks = 0; + + for (let i = 0; i < chunks.length; i += BATCH_SIZE) { + const batchStart = Date.now(); + const batch = chunks.slice(i, Math.min(i + BATCH_SIZE, chunks.length)); + + // Process embeddings in parallel + const embeddings = await Promise.all( + batch.map(chunk => embed(this.runtime, chunk)) + ); + + // Batch database operations + await Promise.all(embeddings.map(async (embeddingArray, index) => { + const chunkId = `${stringToUuid(file.path)}-chunk-${i + index}` as UUID; + const chunkEmbedding = new Float32Array(embeddingArray); + + await this.runtime.databaseAdapter.createKnowledge({ + id: chunkId, + agentId: this.runtime.agentId, + content: { + text: batch[index], + metadata: { + source: file.path, + type: file.type, + isShared: file.isShared || false, + isChunk: true, + originalId: stringToUuid(file.path), + chunkIndex: i + index + } + }, + embedding: chunkEmbedding, + createdAt: Date.now() + }); + })); + + processedChunks += batch.length; + const batchTime = (Date.now() - batchStart) / 1000; + elizaLogger.info(`[Batch Progress] Processed ${processedChunks}/${totalChunks} chunks (${batchTime.toFixed(2)}s for batch)`); + } + + const totalTime = (Date.now() - startTime) / 1000; + elizaLogger.info(`[Complete] Processed ${file.path} in ${totalTime.toFixed(2)}s`); + + } catch (error) { + if (file.isShared && error?.code === 'SQLITE_CONSTRAINT_PRIMARYKEY') { + elizaLogger.info(`Shared knowledge ${file.path} already exists in database, skipping creation`); + return; + } + elizaLogger.error(`Error processing file ${file.path}:`, error); + throw error; + } + } +} \ No newline at end of file diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 7e04fa0b99..36d28ed976 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -17,6 +17,7 @@ import { generateText } from "./generation.ts"; import { formatGoalsAsString, getGoals } from "./goals.ts"; import { elizaLogger } from "./index.ts"; import knowledge from "./knowledge.ts"; +import { RAGKnowledgeManager } from "./ragknowledge.ts"; import { MemoryManager } from "./memory.ts"; import { formatActors, formatMessages, getActorDetails } from "./messages.ts"; import { parseJsonArrayFromText } from "./parsing.ts"; @@ -30,8 +31,11 @@ import { IAgentRuntime, ICacheManager, IDatabaseAdapter, + IRAGKnowledgeManager, IMemoryManager, KnowledgeItem, + RAGKnowledgeItem, + Media, ModelClass, ModelProviderName, Plugin, @@ -47,6 +51,8 @@ import { IVerifiableInferenceAdapter, } from "./types.ts"; import { stringToUuid } from "./uuid.ts"; +import { readFile } from 'fs/promises'; +import { join } from 'path'; /** * Represents the runtime environment for an agent, handling message processing, @@ -145,6 +151,8 @@ export class AgentRuntime implements IAgentRuntime { */ knowledgeManager: IMemoryManager; + ragKnowledgeManager: IRAGKnowledgeManager; + services: Map = new Map(); memoryManagers: Map = new Map(); cacheManager: ICacheManager; @@ -298,6 +306,11 @@ export class AgentRuntime implements IAgentRuntime { tableName: "fragments", }); + this.ragKnowledgeManager = new RAGKnowledgeManager({ + runtime: this, + tableName: 'knowledge' + }); + (opts.managers ?? []).forEach((manager: IMemoryManager) => { this.registerMemoryManager(manager); }); @@ -425,7 +438,15 @@ export class AgentRuntime implements IAgentRuntime { this.character.knowledge && this.character.knowledge.length > 0 ) { - await this.processCharacterKnowledge(this.character.knowledge); + if(this.character.settings.ragKnowledge) { + await this.processCharacterRAGKnowledge(this.character.knowledge); + } else { + const stringKnowledge = this.character.knowledge.filter((item): item is string => + typeof item === 'string' + ); + + await this.processCharacterKnowledge(stringKnowledge); + } } } @@ -484,6 +505,137 @@ export class AgentRuntime implements IAgentRuntime { } } + /** + * Processes character knowledge by creating document memories and fragment memories. + * This function takes an array of knowledge items, creates a document knowledge for each item if it doesn't exist, + * then chunks the content into fragments, embeds each fragment, and creates fragment knowledge. + * An array of knowledge items or objects containing id, path, and content. + */ + private async processCharacterRAGKnowledge(items: (string | { path: string; shared?: boolean })[]) { + let hasError = false; + + for (const item of items) { + if (!item) continue; + + try { + // Check if item is marked as shared + let isShared = false; + let contentItem = item; + + // Only treat as shared if explicitly marked + if (typeof item === 'object' && 'path' in item) { + isShared = item.shared === true; + contentItem = item.path; + } else { + contentItem = item; + } + + const knowledgeId = stringToUuid(contentItem); + const fileExtension = contentItem.split('.').pop()?.toLowerCase(); + + // Check if it's a file or direct knowledge + if (fileExtension && ['md', 'txt', 'pdf'].includes(fileExtension)) { + try { + const rootPath = join(process.cwd(), '..'); + const filePath = join(rootPath, 'characters', 'knowledge', contentItem); + elizaLogger.info("Attempting to read file from:", filePath); + + // Get existing knowledge first + const existingKnowledge = await this.ragKnowledgeManager.getKnowledge({ + id: knowledgeId, + agentId: this.agentId + }); + + let content: string; + + content = await readFile(filePath, 'utf8'); + + if (!content) { + hasError = true; + continue; + } + + // If the file exists in DB, check if content has changed + if (existingKnowledge.length > 0) { + const existingContent = existingKnowledge[0].content.text; + if (existingContent === content) { + elizaLogger.info(`File ${contentItem} unchanged, skipping`); + continue; + } else { + // If content changed, remove old knowledge before adding new + await this.ragKnowledgeManager.removeKnowledge(knowledgeId); + // Also remove any associated chunks + await this.ragKnowledgeManager.removeKnowledge(`${knowledgeId}-chunk-*` as UUID); + } + } + + elizaLogger.info( + `Successfully read ${fileExtension.toUpperCase()} file content for`, + this.character.name, + "-", + contentItem + ); + + await this.ragKnowledgeManager.processFile({ + path: contentItem, + content: content, + type: fileExtension as 'pdf' | 'md' | 'txt', + isShared: isShared + }); + + } catch (error: any) { + hasError = true; + elizaLogger.error( + `Failed to read knowledge file ${contentItem}. Error details:`, + error?.message || error || 'Unknown error' + ); + continue; // Continue to next item even if this one fails + } + } else { + // Handle direct knowledge string + elizaLogger.info( + "Processing direct knowledge for", + this.character.name, + "-", + contentItem.slice(0, 100) + ); + + const existingKnowledge = await this.ragKnowledgeManager.getKnowledge({ + id: knowledgeId, + agentId: this.agentId + }); + + if (existingKnowledge.length > 0) { + elizaLogger.info(`Direct knowledge ${knowledgeId} already exists, skipping`); + continue; + } + + await this.ragKnowledgeManager.createKnowledge({ + id: knowledgeId, + agentId: this.agentId, + content: { + text: contentItem, + metadata: { + type: 'direct' + } + } + }); + } + } catch (error: any) { + hasError = true; + elizaLogger.error( + `Error processing knowledge item ${item}:`, + error?.message || error || 'Unknown error' + ); + continue; // Continue to next item even if this one fails + } + } + + if (hasError) { + elizaLogger.warn('Some knowledge items failed to process, but continuing with available knowledge'); + } + } + getSetting(key: string) { // check if the key is in the character.settings.secrets object if (this.character.settings?.secrets?.[key]) { @@ -1027,9 +1179,27 @@ Text: ${attachment.text} .join(" "); } - const knowledegeData = await knowledge.get(this, message); + let knowledgeData = []; + let formattedKnowledge = ''; + + if(this.character.settings?.ragKnowledge) { + const recentContext = recentMessagesData + .slice(-3) // Last 3 messages + .map(msg => msg.content.text) + .join(' '); + + knowledgeData = await this.ragKnowledgeManager.getKnowledge({ + query: message.content.text, + conversationContext: recentContext, + limit: 5 + }); + + formattedKnowledge = formatKnowledge(knowledgeData); + } else { + knowledgeData = await knowledge.get(this, message); - const formattedKnowledge = formatKnowledge(knowledegeData); + formattedKnowledge = formatKnowledge(knowledgeData); + } const initialState = { agentId: this.agentId, @@ -1046,7 +1216,8 @@ Text: ${attachment.text} ] : "", knowledge: formattedKnowledge, - knowledgeData: knowledegeData, + knowledgeData: knowledgeData, + ragKnowledgeData: knowledgeData, // Recent interactions between the sender and receiver, formatted as messages recentMessageInteractions: formattedMessageInteractions, // Recent interactions between the sender and receiver, formatted as posts diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index e07cb74e67..7b7493dc5a 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -338,6 +338,8 @@ export interface State { knowledge?: string; /** Optional knowledge data */ knowledgeData?: KnowledgeItem[]; + /** Optional knowledge data */ + ragKnowledgeData?: RAGKnowledgeItem[]; /** Additional dynamic properties */ [key: string]: unknown; @@ -755,7 +757,7 @@ export type Character = { adjectives: string[]; /** Optional knowledge base */ - knowledge?: string[]; + knowledge?: (string | { path: string; shared?: boolean })[]; /** Supported client platforms */ clients: Clients[]; @@ -803,6 +805,7 @@ export type Character = { [key: string]: any[]; }; transcription?: TranscriptionProvider; + ragKnowledge?: boolean; }; /** Optional client-specific config */ @@ -1015,6 +1018,26 @@ export interface IDatabaseAdapter { }): Promise; getRelationships(params: { userId: UUID }): Promise; + + getKnowledge(params: { + id?: UUID; + agentId: UUID; + limit?: number; + query?: string; + conversationContext?: string; + }): Promise; + + searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array; + match_threshold: number; + match_count: number; + searchText?: string; + }): Promise; + + createKnowledge(knowledge: RAGKnowledgeItem): Promise; + removeKnowledge(id: UUID): Promise; + clearKnowledge(agentId: UUID, shared?: boolean): Promise; } export interface IDatabaseCacheAdapter { @@ -1072,6 +1095,35 @@ export interface IMemoryManager { countMemories(roomId: UUID, unique?: boolean): Promise; } +export interface IRAGKnowledgeManager { + runtime: IAgentRuntime; + tableName: string; + + getKnowledge(params: { + query?: string; + id?: UUID; + limit?: number; + conversationContext?: string; + agentId?: UUID; + }): Promise; + createKnowledge(item: RAGKnowledgeItem): Promise; + removeKnowledge(id: UUID): Promise; + searchKnowledge(params: { + agentId: UUID; + embedding: Float32Array | number[]; + match_threshold?: number; + match_count?: number; + searchText?: string; + }): Promise; + clearKnowledge(shared?: boolean): Promise; + processFile(file: { + path: string; + content: string; + type: 'pdf' | 'md' | 'txt', + isShared: boolean; + }): Promise; +} + export type CacheOptions = { expires?: number; }; @@ -1131,6 +1183,7 @@ export interface IAgentRuntime { descriptionManager: IMemoryManager; documentsManager: IMemoryManager; knowledgeManager: IMemoryManager; + ragKnowledgeManager: IRAGKnowledgeManager; loreManager: IMemoryManager; cacheManager: ICacheManager; @@ -1325,6 +1378,28 @@ export type KnowledgeItem = { content: Content; }; +export interface RAGKnowledgeItem { + id: UUID; + agentId: UUID; + content: { + text: string; + metadata?: { + isMain?: boolean; + isChunk?: boolean; + originalId?: UUID; + chunkIndex?: number; + source?: string; + type?: string; + isShared?: boolean; + [key: string]: unknown; + }; + }; + embedding?: Float32Array; + createdAt?: number; + similarity?: number; + score?: number; +} + export interface ActionResponse { like: boolean; retweet: boolean;