From 91edfee306709abfbaa26be7d4367024b221070f Mon Sep 17 00:00:00 2001 From: slkzgm Date: Sun, 29 Dec 2024 03:29:45 +0100 Subject: [PATCH 1/5] feat: Twitter spaces integration --- .env.example | 1 + characters/c3po.character.json | 36 +- packages/client-twitter/package.json | 2 +- packages/client-twitter/src/environment.ts | 125 +++-- packages/client-twitter/src/index.ts | 39 +- packages/client-twitter/src/spaces.ts | 512 +++++++++++++++++++++ 6 files changed, 659 insertions(+), 56 deletions(-) create mode 100644 packages/client-twitter/src/spaces.ts diff --git a/.env.example b/.env.example index f54f552f6af..7b904d2ba8b 100644 --- a/.env.example +++ b/.env.example @@ -67,6 +67,7 @@ TWITTER_POLL_INTERVAL=120 # How often (in seconds) the bot should check fo TWITTER_SEARCH_ENABLE=FALSE # Enable timeline search, WARNING this greatly increases your chance of getting banned TWITTER_TARGET_USERS= # Comma separated list of Twitter user names to interact with TWITTER_RETRY_LIMIT= # Maximum retry attempts for Twitter login +TWITTER_SPACES_ENABLE=false # Enable or disable Twitter Spaces logic X_SERVER_URL= XAI_API_KEY= diff --git a/characters/c3po.character.json b/characters/c3po.character.json index dbc1abcb943..9b4d2ea49d0 100644 --- a/characters/c3po.character.json +++ b/characters/c3po.character.json @@ -94,5 +94,37 @@ "Protocol-minded", "Formal", "Loyal" - ] -} \ No newline at end of file + ], + "twitterSpaces": { + "maxSpeakers": 2, + + "topics": [ + "Blockchain Trends", + "AI Innovations", + "Quantum Computing" + ], + + "typicalDurationMinutes": 45, + + "idleKickTimeoutMs": 300000, + + "minIntervalBetweenSpacesMinutes": 1, + + "businessHoursOnly": false, + + "randomChance": 1, + + "enableIdleMonitor": true, + + "enableSttTts": true, + + "enableRecording": false, + + "voiceId": "21m00Tcm4TlvDq8ikWAM", + "sttLanguage": "en", + "gptModel": "gpt-3.5-turbo", + "systemPrompt": "You are a helpful AI co-host assistant.", + + "speakerMaxDurationMs": 240000 + } +} diff --git a/packages/client-twitter/package.json b/packages/client-twitter/package.json index 08f2c818688..5a255a78ed4 100644 --- a/packages/client-twitter/package.json +++ b/packages/client-twitter/package.json @@ -6,7 +6,7 @@ "types": "dist/index.d.ts", "dependencies": { "@elizaos/core": "workspace:*", - "agent-twitter-client": "0.0.17", + "agent-twitter-client": "0.0.18", "glob": "11.0.0", "zod": "3.23.8" }, diff --git a/packages/client-twitter/src/environment.ts b/packages/client-twitter/src/environment.ts index 8ff2fb454ed..f73a5676781 100644 --- a/packages/client-twitter/src/environment.ts +++ b/packages/client-twitter/src/environment.ts @@ -1,12 +1,20 @@ import { parseBooleanFromText, IAgentRuntime } from "@elizaos/core"; -import { z } from "zod"; +import { z, ZodError } from "zod"; + export const DEFAULT_MAX_TWEET_LENGTH = 280; const twitterUsernameSchema = z.string() .min(1) .max(15) - .regex(/^[A-Za-z][A-Za-z0-9_]*[A-Za-z0-9]$|^[A-Za-z]$/, 'Invalid Twitter username format'); + .regex( + /^[A-Za-z][A-Za-z0-9_]*[A-Za-z0-9]$|^[A-Za-z]$/, + "Invalid Twitter username format" + ); +/** + * This schema defines all required/optional environment settings, + * including new fields like TWITTER_SPACES_ENABLE. + */ export const twitterEnvSchema = z.object({ TWITTER_DRY_RUN: z.boolean(), TWITTER_USERNAME: z.string().min(1, "Twitter username is required"), @@ -51,25 +59,23 @@ export const twitterEnvSchema = z.object({ ENABLE_ACTION_PROCESSING: z.boolean(), ACTION_INTERVAL: z.number().int(), POST_IMMEDIATELY: z.boolean(), + TWITTER_SPACES_ENABLE: z.boolean().default(false), }); export type TwitterConfig = z.infer; -function parseTargetUsers(targetUsersStr?:string | null): string[] { +/** + * Helper to parse a comma-separated list of Twitter usernames + * (already present in your code). + */ +function parseTargetUsers(targetUsersStr?: string | null): string[] { if (!targetUsersStr?.trim()) { return []; } - return targetUsersStr - .split(',') - .map(user => user.trim()) - .filter(Boolean); // Remove empty usernames - /* - .filter(user => { - // Twitter username validation (basic example) - return user && /^[A-Za-z0-9_]{1,15}$/.test(user); - }); - */ + .split(",") + .map((user) => user.trim()) + .filter(Boolean); } function safeParseInt(value: string | undefined | null, defaultValue: number): number { @@ -78,14 +84,11 @@ function safeParseInt(value: string | undefined | null, defaultValue: number): n return isNaN(parsed) ? defaultValue : Math.max(1, parsed); } -// This also is organized to serve as a point of documentation for the client -// most of the inputs from the framework (env/character) - -// we also do a lot of typing/parsing here -// so we can do it once and only once per character -export async function validateTwitterConfig( - runtime: IAgentRuntime -): Promise { +/** + * Validates or constructs a TwitterConfig object using zod, + * taking values from the IAgentRuntime or process.env as needed. + */ +export async function validateTwitterConfig(runtime: IAgentRuntime): Promise { try { const twitterConfig = { TWITTER_DRY_RUN: @@ -93,79 +96,103 @@ export async function validateTwitterConfig( runtime.getSetting("TWITTER_DRY_RUN") || process.env.TWITTER_DRY_RUN ) ?? false, // parseBooleanFromText return null if "", map "" to false + TWITTER_USERNAME: - runtime.getSetting ("TWITTER_USERNAME") || + runtime.getSetting("TWITTER_USERNAME") || process.env.TWITTER_USERNAME, + TWITTER_PASSWORD: runtime.getSetting("TWITTER_PASSWORD") || process.env.TWITTER_PASSWORD, + TWITTER_EMAIL: runtime.getSetting("TWITTER_EMAIL") || process.env.TWITTER_EMAIL, - MAX_TWEET_LENGTH: // number as string? + + MAX_TWEET_LENGTH: safeParseInt( runtime.getSetting("MAX_TWEET_LENGTH") || - process.env.MAX_TWEET_LENGTH - , DEFAULT_MAX_TWEET_LENGTH), - TWITTER_SEARCH_ENABLE: // bool + process.env.MAX_TWEET_LENGTH, + DEFAULT_MAX_TWEET_LENGTH + ), + + TWITTER_SEARCH_ENABLE: parseBooleanFromText( runtime.getSetting("TWITTER_SEARCH_ENABLE") || process.env.TWITTER_SEARCH_ENABLE ) ?? false, - TWITTER_2FA_SECRET: // string passthru + + TWITTER_2FA_SECRET: runtime.getSetting("TWITTER_2FA_SECRET") || process.env.TWITTER_2FA_SECRET || "", - TWITTER_RETRY_LIMIT: // int + + TWITTER_RETRY_LIMIT: safeParseInt( runtime.getSetting("TWITTER_RETRY_LIMIT") || - process.env.TWITTER_RETRY_LIMIT - , 5), - TWITTER_POLL_INTERVAL: // int in seconds + process.env.TWITTER_RETRY_LIMIT, + 5 + ), + + TWITTER_POLL_INTERVAL: safeParseInt( runtime.getSetting("TWITTER_POLL_INTERVAL") || - process.env.TWITTER_POLL_INTERVAL - , 120), // 2m - TWITTER_TARGET_USERS: // comma separated string + process.env.TWITTER_POLL_INTERVAL, + 120 + ), + + TWITTER_TARGET_USERS: parseTargetUsers( runtime.getSetting("TWITTER_TARGET_USERS") || process.env.TWITTER_TARGET_USERS ), - POST_INTERVAL_MIN: // int in minutes + + POST_INTERVAL_MIN: safeParseInt( runtime.getSetting("POST_INTERVAL_MIN") || - process.env.POST_INTERVAL_MIN - , 90), // 1.5 hours - POST_INTERVAL_MAX: // int in minutes + process.env.POST_INTERVAL_MIN, + 90 + ), + + POST_INTERVAL_MAX: safeParseInt( runtime.getSetting("POST_INTERVAL_MAX") || - process.env.POST_INTERVAL_MAX - , 180), // 3 hours - ENABLE_ACTION_PROCESSING: // bool + process.env.POST_INTERVAL_MAX, + 180 + ), + + ENABLE_ACTION_PROCESSING: parseBooleanFromText( runtime.getSetting("ENABLE_ACTION_PROCESSING") || process.env.ENABLE_ACTION_PROCESSING ) ?? false, - ACTION_INTERVAL: // int in minutes (min 1m) + + ACTION_INTERVAL: safeParseInt( runtime.getSetting("ACTION_INTERVAL") || - process.env.ACTION_INTERVAL - , 5), // 5 minutes - POST_IMMEDIATELY: // bool + process.env.ACTION_INTERVAL, + 5 + ), + + POST_IMMEDIATELY: parseBooleanFromText( runtime.getSetting("POST_IMMEDIATELY") || process.env.POST_IMMEDIATELY ) ?? false, + + TWITTER_SPACES_ENABLE: + parseBooleanFromText( + runtime.getSetting("TWITTER_SPACES_ENABLE") || + process.env.TWITTER_SPACES_ENABLE + ) ?? false, }; return twitterEnvSchema.parse(twitterConfig); } catch (error) { - if (error instanceof z.ZodError) { + if (error instanceof ZodError) { const errorMessages = error.errors .map((err) => `${err.path.join(".")}: ${err.message}`) .join("\n"); - throw new Error( - `Twitter configuration validation failed:\n${errorMessages}` - ); + throw new Error(`Twitter configuration validation failed:\n${errorMessages}`); } throw error; } diff --git a/packages/client-twitter/src/index.ts b/packages/client-twitter/src/index.ts index 0da22e7d6e3..39ee853e828 100644 --- a/packages/client-twitter/src/index.ts +++ b/packages/client-twitter/src/index.ts @@ -4,18 +4,32 @@ import { validateTwitterConfig, TwitterConfig } from "./environment.ts"; import { TwitterInteractionClient } from "./interactions.ts"; import { TwitterPostClient } from "./post.ts"; import { TwitterSearchClient } from "./search.ts"; +import { TwitterSpaceClient } from "./spaces.ts"; +/** + * A manager that orchestrates all specialized Twitter logic: + * - client: base operations (login, timeline caching, etc.) + * - post: autonomous posting logic + * - search: searching tweets / replying logic + * - interaction: handling mentions, replies + * - space: launching and managing Twitter Spaces (optional) + */ class TwitterManager { client: ClientBase; post: TwitterPostClient; search: TwitterSearchClient; interaction: TwitterInteractionClient; - constructor(runtime: IAgentRuntime, twitterConfig:TwitterConfig) { + space?: TwitterSpaceClient; + + constructor(runtime: IAgentRuntime, twitterConfig: TwitterConfig) { + // Pass twitterConfig to the base client this.client = new ClientBase(runtime, twitterConfig); + + // Posting logic this.post = new TwitterPostClient(this.client, runtime); + // Optional search logic (enabled if TWITTER_SEARCH_ENABLE is true) if (twitterConfig.TWITTER_SEARCH_ENABLE) { - // this searches topics from character file elizaLogger.warn("Twitter/X client running in a mode that:"); elizaLogger.warn("1. violates consent of random users"); elizaLogger.warn("2. burns your rate limit"); @@ -24,29 +38,46 @@ class TwitterManager { this.search = new TwitterSearchClient(this.client, runtime); } + // Mentions and interactions this.interaction = new TwitterInteractionClient(this.client, runtime); + + // Optional Spaces logic (enabled if TWITTER_SPACES_ENABLE is true) + if (twitterConfig.TWITTER_SPACES_ENABLE) { + this.space = new TwitterSpaceClient(this.client, runtime); + } } } export const TwitterClientInterface: Client = { async start(runtime: IAgentRuntime) { - const twitterConfig:TwitterConfig = await validateTwitterConfig(runtime); + const twitterConfig: TwitterConfig = await validateTwitterConfig(runtime); elizaLogger.log("Twitter client started"); const manager = new TwitterManager(runtime, twitterConfig); + // Initialize login/session await manager.client.init(); + // Start the posting loop await manager.post.start(); - if (manager.search) + // Start the search logic if it exists + if (manager.search) { await manager.search.start(); + } + // Start interactions (mentions, replies) await manager.interaction.start(); + // If Spaces are enabled, start the periodic check + if (manager.space) { + manager.space.startPeriodicSpaceCheck(); + } + return manager; }, + async stop(_runtime: IAgentRuntime) { elizaLogger.warn("Twitter client does not support stopping yet"); }, diff --git a/packages/client-twitter/src/spaces.ts b/packages/client-twitter/src/spaces.ts new file mode 100644 index 00000000000..a67430191c7 --- /dev/null +++ b/packages/client-twitter/src/spaces.ts @@ -0,0 +1,512 @@ +import { + elizaLogger, + IAgentRuntime, + composeContext, + generateText, + ModelClass, +} from "@elizaos/core"; +import { ClientBase } from "./base"; +import { + Scraper, + Space, + SpaceConfig, + RecordToDiskPlugin, + SttTtsPlugin, + IdleMonitorPlugin, + SpeakerRequest, +} from "agent-twitter-client"; + +interface SpaceDecisionOptions { + maxSpeakers?: number; + topics?: string[]; + typicalDurationMinutes?: number; + idleKickTimeoutMs?: number; + minIntervalBetweenSpacesMinutes?: number; + businessHoursOnly?: boolean; + randomChance?: number; + enableIdleMonitor?: boolean; + enableSttTts?: boolean; + enableRecording?: boolean; + voiceId?: string; + sttLanguage?: string; + gptModel?: string; + systemPrompt?: string; + speakerMaxDurationMs?: number; +} + +interface CurrentSpeakerState { + userId: string; + sessionUUID: string; + username: string; + startTime: number; +} + +/** + * Generate short filler text via GPT + */ +async function generateFiller(runtime: IAgentRuntime, fillerType: string): Promise { + try { + const context = composeContext({ + state: { fillerType }, + template: ` +# INSTRUCTIONS: +You are generating a short filler message for a Twitter Space. The filler type is "{{fillerType}}". +Keep it brief, friendly, and relevant. No more than two sentences. +Only return the text, no additional formatting. + +--- +`, + }); + const output = await generateText({ + runtime, + context, + modelClass: ModelClass.SMALL, + }); + return output.trim(); + } catch (err) { + elizaLogger.error("[generateFiller] Error generating filler:", err); + return ""; + } +} + +/** + * Speak a filler message if STT/TTS plugin is available. Sleep a bit after TTS to avoid cutoff. + */ +async function speakFiller( + runtime: IAgentRuntime, + sttTtsPlugin: SttTtsPlugin | undefined, + fillerType: string, + sleepAfterMs = 3000 +): Promise { + if (!sttTtsPlugin) return; + const text = await generateFiller(runtime, fillerType); + if (!text) return; + + elizaLogger.log(`[Space] Filler (${fillerType}) => ${text}`); + await sttTtsPlugin.speakText(text); + + if (sleepAfterMs > 0) { + await new Promise((res) => setTimeout(res, sleepAfterMs)); + } +} + +/** + * Generate topic suggestions via GPT if no topics are configured + */ +async function generateTopicsIfEmpty(runtime: IAgentRuntime): Promise { + try { + const context = composeContext({ + state: {}, + template: ` +# INSTRUCTIONS: +Please generate 5 short topic ideas for a Twitter Space about technology or random interesting subjects. +Return them as a comma-separated list, no additional formatting or numbering. + +Example: +"AI Advances, Futuristic Gadgets, Space Exploration, Quantum Computing, Digital Ethics" +--- +`, + }); + const response = await generateText({ + runtime, + context, + modelClass: ModelClass.SMALL, + }); + const topics = response + .split(",") + .map((t) => t.trim()) + .filter(Boolean); + return topics.length ? topics : ["Random Tech Chat", "AI Thoughts"]; + } catch (err) { + elizaLogger.error("[generateTopicsIfEmpty] GPT error =>", err); + return ["Random Tech Chat", "AI Thoughts"]; + } +} + +/** + * Main class: manage a Twitter Space with N speakers max, speaker queue, filler messages, etc. + */ +export class TwitterSpaceClient { + private client: ClientBase; + private scraper: Scraper; + private isSpaceRunning = false; + private currentSpace?: Space; + private spaceId?: string; + private startedAt?: number; + private checkInterval?: NodeJS.Timeout; + private lastSpaceEndedAt?: number; + private sttTtsPlugin?: SttTtsPlugin; + + /** + * We now store an array of active speakers, not just 1 + */ + private activeSpeakers: CurrentSpeakerState[] = []; + private speakerQueue: SpeakerRequest[] = []; + + private decisionOptions: SpaceDecisionOptions; + + constructor(client: ClientBase, runtime: IAgentRuntime) { + this.client = client; + this.scraper = client.twitterClient; + + const charSpaces = runtime.character.twitterSpaces || {}; + this.decisionOptions = { + maxSpeakers: charSpaces.maxSpeakers ?? 1, + topics: charSpaces.topics ?? [], + typicalDurationMinutes: charSpaces.typicalDurationMinutes ?? 30, + idleKickTimeoutMs: charSpaces.idleKickTimeoutMs ?? 5 * 60_000, + minIntervalBetweenSpacesMinutes: charSpaces.minIntervalBetweenSpacesMinutes ?? 60, + businessHoursOnly: charSpaces.businessHoursOnly ?? false, + randomChance: charSpaces.randomChance ?? 0.3, + enableIdleMonitor: charSpaces.enableIdleMonitor !== false, + enableSttTts: charSpaces.enableSttTts !== false, + enableRecording: charSpaces.enableRecording !== false, + voiceId: charSpaces.voiceId || runtime.character.settings.voice.model || 'Xb7hH8MSUJpSbSDYk0k2', + sttLanguage: charSpaces.sttLanguage || "en", + gptModel: charSpaces.gptModel, + systemPrompt: charSpaces.systemPrompt, + speakerMaxDurationMs: charSpaces.speakerMaxDurationMs ?? 4 * 60_000, + }; + } + + /** + * Periodic check to launch or manage space + */ + public async startPeriodicSpaceCheck() { + elizaLogger.log("[Space] Starting periodic check routine..."); + + // For instance: + const intervalMsWhenIdle = 5 * 60_000; // 5 minutes if no Space is running + const intervalMsWhenRunning = 5_000; // 5 seconds if a Space IS running + + const routine = async () => { + try { + if (!this.isSpaceRunning) { + // Space not running => check if we should launch + const launch = await this.shouldLaunchSpace(); + if (launch) { + const config = await this.generateSpaceConfig(); + await this.startSpace(config); + } + // Plan next iteration with a slower pace + this.checkInterval = setTimeout(routine, this.isSpaceRunning ? intervalMsWhenRunning : intervalMsWhenIdle); + } else { + // Space is running => manage it more frequently + await this.manageCurrentSpace(); + // Plan next iteration with a faster pace + this.checkInterval = setTimeout(routine, intervalMsWhenRunning); + } + } catch (error) { + elizaLogger.error("[Space] Error in routine =>", error); + // In case of error, still schedule next iteration + this.checkInterval = setTimeout(routine, intervalMsWhenIdle); + } + }; + + routine(); + } + + stopPeriodicCheck() { + if (this.checkInterval) { + clearTimeout(this.checkInterval); + this.checkInterval = undefined; + } + } + + private async shouldLaunchSpace(): Promise { + // Random chance + const r = Math.random(); + if (r > (this.decisionOptions.randomChance ?? 0.3)) { + elizaLogger.log("[Space] Random check => skip launching"); + return false; + } + // Business hours + if (this.decisionOptions.businessHoursOnly) { + const hour = new Date().getUTCHours(); + if (hour < 9 || hour >= 17) { + elizaLogger.log("[Space] Out of business hours => skip"); + return false; + } + } + // Interval + const now = Date.now(); + if (this.lastSpaceEndedAt) { + const minIntervalMs = + (this.decisionOptions.minIntervalBetweenSpacesMinutes ?? 60) * 60_000; + if (now - this.lastSpaceEndedAt < minIntervalMs) { + elizaLogger.log("[Space] Too soon since last space => skip"); + return false; + } + } + + elizaLogger.log("[Space] Deciding to launch a new Space..."); + return true; + } + + private async generateSpaceConfig(): Promise { + if ( + !this.decisionOptions.topics || + this.decisionOptions.topics.length === 0 + ) { + const newTopics = await generateTopicsIfEmpty(this.client.runtime); + this.decisionOptions.topics = newTopics; + } + + let chosenTopic = "Random Tech Chat"; + if ( + this.decisionOptions.topics && + this.decisionOptions.topics.length > 0 + ) { + chosenTopic = + this.decisionOptions.topics[ + Math.floor(Math.random() * this.decisionOptions.topics.length) + ]; + } + + return { + mode: "INTERACTIVE", + title: chosenTopic, + description: `Discussion about ${chosenTopic}`, + languages: ["en"], + }; + } + + public async startSpace(config: SpaceConfig) { + elizaLogger.log("[Space] Starting a new Twitter Space..."); + + try { + this.currentSpace = new Space(this.scraper); + this.isSpaceRunning = false; + this.spaceId = undefined; + this.startedAt = Date.now(); + + // Reset states + this.activeSpeakers = []; + this.speakerQueue = []; + + // Retrieve keys + const openAiKey = process.env.OPENAI_API_KEY || ""; + const elevenLabsKey = process.env.ELEVENLABS_XI_API_KEY || ""; + + // Plugins + if (this.decisionOptions.enableRecording) { + elizaLogger.log("[Space] Using RecordToDiskPlugin"); + this.currentSpace.use(new RecordToDiskPlugin()); + } + + if (this.decisionOptions.enableSttTts) { + elizaLogger.log("[Space] Using SttTtsPlugin"); + const sttTts = new SttTtsPlugin(); + this.sttTtsPlugin = sttTts; + this.currentSpace.use(sttTts, { + openAiApiKey: openAiKey, + elevenLabsApiKey: elevenLabsKey, + voiceId: this.decisionOptions.voiceId, + gptModel: this.decisionOptions.gptModel, + systemPrompt: this.decisionOptions.systemPrompt, + sttLanguage: this.decisionOptions.sttLanguage, + }); + } + + if (this.decisionOptions.enableIdleMonitor) { + elizaLogger.log("[Space] Using IdleMonitorPlugin"); + this.currentSpace.use( + new IdleMonitorPlugin( + this.decisionOptions.idleKickTimeoutMs ?? 60_000, + 10_000 + ) + ); + } + + const broadcastInfo = await this.currentSpace.initialize(config); + this.spaceId = broadcastInfo.room_id; + this.isSpaceRunning = true; + await this.scraper.sendTweet(broadcastInfo.share_url.replace('broadcasts', 'spaces')); + + const spaceUrl = broadcastInfo.share_url.replace("broadcasts", "spaces"); + elizaLogger.log(`[Space] Space started => ${spaceUrl}`); + + // Greet + await speakFiller(this.client.runtime, this.sttTtsPlugin, "WELCOME"); + + // Events + this.currentSpace.on("occupancyUpdate", (update) => { + elizaLogger.log(`[Space] Occupancy => ${update.occupancy} participant(s).`); + }); + + this.currentSpace.on("speakerRequest", async (req: SpeakerRequest) => { + elizaLogger.log(`[Space] Speaker request from @${req.username} (${req.userId}).`); + await this.handleSpeakerRequest(req); + }); + + this.currentSpace.on("idleTimeout", async (info) => { + elizaLogger.log(`[Space] idleTimeout => no audio for ${info.idleMs} ms.`); + await speakFiller(this.client.runtime, this.sttTtsPlugin, "IDLE_ENDING"); + await this.stopSpace(); + }); + + process.on("SIGINT", async () => { + elizaLogger.log("[Space] SIGINT => stopping space"); + await speakFiller(this.client.runtime, this.sttTtsPlugin, "CLOSING"); + await this.stopSpace(); + process.exit(0); + }); + } catch (error) { + elizaLogger.error("[Space] Error launching Space =>", error); + this.isSpaceRunning = false; + throw error; + } + } + + /** + * Periodic management: check durations, remove extras, maybe accept new from queue + */ + private async manageCurrentSpace() { + if (!this.spaceId || !this.currentSpace) return; + try { + const audioSpace = await this.scraper.getAudioSpaceById(this.spaceId); + const { participants } = audioSpace; + const numSpeakers = participants.speakers?.length || 0; + const totalListeners = participants.listeners?.length || 0; + + // 1) Remove any speaker who exceeded speakerMaxDurationMs + const maxDur = this.decisionOptions.speakerMaxDurationMs ?? 240_000; + const now = Date.now(); + + for (let i = this.activeSpeakers.length - 1; i >= 0; i--) { + const speaker = this.activeSpeakers[i]; + const elapsed = now - speaker.startTime; + if (elapsed > maxDur) { + elizaLogger.log( + `[Space] Speaker @${speaker.username} exceeded max duration => removing` + ); + await this.removeSpeaker(speaker.userId); + this.activeSpeakers.splice(i, 1); + + // Possibly speak a short "SPEAKER_LEFT" filler + await speakFiller(this.client.runtime, this.sttTtsPlugin, "SPEAKER_LEFT"); + } + } + + // 2) If we have capacity for new speakers from the queue, accept them + await this.acceptSpeakersFromQueueIfNeeded(); + + // 3) If somehow more than maxSpeakers are active, remove the extras + if (numSpeakers > (this.decisionOptions.maxSpeakers ?? 1)) { + elizaLogger.log("[Space] More than maxSpeakers => removing extras..."); + await this.kickExtraSpeakers(participants.speakers); + } + + // 4) Possibly stop the space if empty or time exceeded + const elapsedMinutes = (now - (this.startedAt || 0)) / 60000; + if ( + elapsedMinutes > (this.decisionOptions.typicalDurationMinutes ?? 30) || + (numSpeakers === 0 && totalListeners === 0 && elapsedMinutes > 5) + ) { + elizaLogger.log("[Space] Condition met => stopping the Space..."); + await speakFiller(this.client.runtime, this.sttTtsPlugin, "CLOSING", 4000); + await this.stopSpace(); + } + } catch (error) { + elizaLogger.error("[Space] Error in manageCurrentSpace =>", error); + } + } + + /** + * If we have available slots, accept new speakers from the queue + */ + private async acceptSpeakersFromQueueIfNeeded() { + // while queue not empty and activeSpeakers < maxSpeakers, accept next + const ms = this.decisionOptions.maxSpeakers ?? 1; + while (this.speakerQueue.length > 0 && this.activeSpeakers.length < ms) { + const nextReq = this.speakerQueue.shift(); + if (nextReq) { + await speakFiller(this.client.runtime, this.sttTtsPlugin, "PRE_ACCEPT"); + await this.acceptSpeaker(nextReq); + } + } + } + + private async handleSpeakerRequest(req: SpeakerRequest) { + if (!this.spaceId || !this.currentSpace) return; + + const audioSpace = await this.scraper.getAudioSpaceById(this.spaceId); + const janusSpeakers = audioSpace?.participants?.speakers || []; + + // If we haven't reached maxSpeakers, accept immediately + if (janusSpeakers.length < (this.decisionOptions.maxSpeakers ?? 1)) { + elizaLogger.log(`[Space] Accepting speaker @${req.username} now`); + await speakFiller(this.client.runtime, this.sttTtsPlugin, "PRE_ACCEPT"); + await this.acceptSpeaker(req); + } else { + elizaLogger.log(`[Space] Adding speaker @${req.username} to the queue`); + this.speakerQueue.push(req); + } + } + + private async acceptSpeaker(req: SpeakerRequest) { + if (!this.currentSpace) return; + try { + await this.currentSpace.approveSpeaker(req.userId, req.sessionUUID); + this.activeSpeakers.push({ + userId: req.userId, + sessionUUID: req.sessionUUID, + username: req.username, + startTime: Date.now(), + }); + elizaLogger.log(`[Space] Speaker @${req.username} is now live`); + } catch (err) { + elizaLogger.error(`[Space] Error approving speaker @${req.username}:`, err); + } + } + + private async removeSpeaker(userId: string) { + if (!this.currentSpace) return; + try { + await this.currentSpace.removeSpeaker(userId); + elizaLogger.log(`[Space] Removed speaker userId=${userId}`); + } catch (error) { + elizaLogger.error(`[Space] Error removing speaker userId=${userId} =>`, error); + } + } + + /** + * If more than maxSpeakers are found, remove extras + * Also update activeSpeakers array + */ + private async kickExtraSpeakers(speakers: any[]) { + if (!this.currentSpace) return; + const ms = this.decisionOptions.maxSpeakers ?? 1; + + // sort by who joined first if needed, or just slice + const extras = speakers.slice(ms); + for (const sp of extras) { + elizaLogger.log(`[Space] Removing extra speaker => userId=${sp.user_id}`); + await this.removeSpeaker(sp.user_id); + + // remove from activeSpeakers array + const idx = this.activeSpeakers.findIndex((s) => s.userId === sp.user_id); + if (idx !== -1) { + this.activeSpeakers.splice(idx, 1); + } + } + } + + public async stopSpace() { + if (!this.currentSpace || !this.isSpaceRunning) return; + try { + elizaLogger.log("[Space] Stopping the current Space..."); + await this.currentSpace.stop(); + } catch (err) { + elizaLogger.error("[Space] Error stopping Space =>", err); + } finally { + this.isSpaceRunning = false; + this.spaceId = undefined; + this.currentSpace = undefined; + this.startedAt = undefined; + this.lastSpaceEndedAt = Date.now(); + this.activeSpeakers = []; + this.speakerQueue = []; + } + } +} From baaec2dde8aa429ea38d27119b8af1555ba908c0 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Sun, 29 Dec 2024 03:32:26 +0100 Subject: [PATCH 2/5] Adding pnpm-lock.yaml --- pnpm-lock.yaml | 87 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5a4683f236c..482cabba4c8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -755,8 +755,8 @@ importers: specifier: workspace:* version: link:../core agent-twitter-client: - specifier: 0.0.17 - version: 0.0.17 + specifier: 0.0.18 + version: 0.0.18(bufferutil@4.0.8)(utf-8-validate@5.0.10) glob: specifier: 11.0.0 version: 11.0.0 @@ -6644,6 +6644,34 @@ packages: '@remusao/trie@1.5.0': resolution: {integrity: sha512-UX+3utJKgwCsg6sUozjxd38gNMVRXrY4TNX9VvCdSrlZBS1nZjRPi98ON3QjRAdf6KCguJFyQARRsulTeqQiPg==} + '@roamhq/wrtc-darwin-arm64@0.8.0': + resolution: {integrity: sha512-OtV2KWO7zOG3L8TF3KCt9aucynVCD/ww2xeXXgg+FLkya3ca0uzehN8EQJ3BL4tkInksbFJ2ssyu9cehfJ3ZuA==} + cpu: [arm64] + os: [darwin] + + '@roamhq/wrtc-darwin-x64@0.8.0': + resolution: {integrity: sha512-VY7Vzt/SDDDCpW//h8GW9bOZrOr8gWXPZVD9473ypl4jyBIoO57yyLbHzd1G0vBUkS6szsHlQCz1WwpI30YL+g==} + cpu: [x64] + os: [darwin] + + '@roamhq/wrtc-linux-arm64@0.8.1': + resolution: {integrity: sha512-FBJLLazlWkGQUXaokC/rTbrUQbb0CNFYry52fZGstufrGLTWu+g4HcwXdVvxh1tnVtVMvkQGk+mlOL52sCxw0A==} + cpu: [arm64] + os: [linux] + + '@roamhq/wrtc-linux-x64@0.8.1': + resolution: {integrity: sha512-I9oWG7b4uvWO1IOR/aF34n+ID6TKVuSs0jd19h5KdhfRtw7FFh9xxuwN9rONPxLVa6fS0q+MCZgAf8Scz89L8Q==} + cpu: [x64] + os: [linux] + + '@roamhq/wrtc-win32-x64@0.8.0': + resolution: {integrity: sha512-R2fxl41BLWPiP4eaTHGLzbbVvRjx1mV/OsgINCvawO7Hwz5Zx9I45+Fhrw3hd4n5amIeSG9VIF7Kz8eeTFXTGQ==} + cpu: [x64] + os: [win32] + + '@roamhq/wrtc@0.8.0': + resolution: {integrity: sha512-C0V/nqc4/2xzORI5qa4mIeN/8UO3ywN1kInrJ9u6GljFx0D18JMUJEqe8yYHa61RrEeoWN3PKdW++k8TocSx/A==} + '@rollup/plugin-alias@5.1.1': resolution: {integrity: sha512-PR9zDb+rOzkRb2VD+EuKB7UC41vU5DIwZ5qqCpk0KJudcWAyi8rvYOhS7+L5aZCspw1stTViLgN5v6FF1p5cgQ==} engines: {node: '>=14.0.0'} @@ -8656,6 +8684,9 @@ packages: agent-twitter-client@0.0.17: resolution: {integrity: sha512-IxLtNyy+fHmh5uHcaybcfXYkvPMP2h7y79sV2N6JpoAY40GKcy60iey6lsL7NO506MnnYDaqlG1JHMjqbfrOxA==} + agent-twitter-client@0.0.18: + resolution: {integrity: sha512-HncH5mlFcGYLEl5wNEkwtdolcmdxqEMIsqO4kTqiTp5P19O25Zr4P6LNJZz1UTjPRyXDxj+BLmmk/Ou7O0QzEg==} + agentkeepalive@4.5.0: resolution: {integrity: sha512-5GG/5IbQQpC9FpkRGsSvZI5QYeSCzlJHdpBQntCsuTOxhKD8lqKhrleg2Yi7yvMIf82Ycmmqln9U8V9qwEiJew==} engines: {node: '>= 8.0.0'} @@ -10880,6 +10911,11 @@ packages: domelementtype@2.3.0: resolution: {integrity: sha512-OLETBj6w0OsagBwdXnPdN0cnMfF9opN69co+7ZrbfPGrdpPVNBUj02spi6B1N7wChLQiPn4CSH/zJvXw56gmHw==} + domexception@4.0.0: + resolution: {integrity: sha512-A2is4PLG+eeSfoTMA95/s4pvAoSo2mKtiM5jlHkAVewmiO8ISFTFKZjH7UAM1Atli/OT/7JHOrJRJiMKUZKYBw==} + engines: {node: '>=12'} + deprecated: Use your platform's native DOMException instead + domhandler@4.3.1: resolution: {integrity: sha512-GrwoxYN+uWlzO8uhUXRl0P+kHE4GtVPfYzVLcUxPL7KNdHKj66vvlhiweIHqYYXWlw+T8iLMp42Lm67ghw4WMQ==} engines: {node: '>= 4'} @@ -26722,6 +26758,30 @@ snapshots: '@remusao/trie@1.5.0': {} + '@roamhq/wrtc-darwin-arm64@0.8.0': + optional: true + + '@roamhq/wrtc-darwin-x64@0.8.0': + optional: true + + '@roamhq/wrtc-linux-arm64@0.8.1': + optional: true + + '@roamhq/wrtc-linux-x64@0.8.1': + optional: true + + '@roamhq/wrtc-win32-x64@0.8.0': + optional: true + + '@roamhq/wrtc@0.8.0': + optionalDependencies: + '@roamhq/wrtc-darwin-arm64': 0.8.0 + '@roamhq/wrtc-darwin-x64': 0.8.0 + '@roamhq/wrtc-linux-arm64': 0.8.1 + '@roamhq/wrtc-linux-x64': 0.8.1 + '@roamhq/wrtc-win32-x64': 0.8.0 + domexception: 4.0.0 + '@rollup/plugin-alias@5.1.1(rollup@3.29.5)': optionalDependencies: rollup: 3.29.5 @@ -29574,6 +29634,24 @@ snapshots: twitter-api-v2: 1.18.2 undici: 7.2.0 + agent-twitter-client@0.0.18(bufferutil@4.0.8)(utf-8-validate@5.0.10): + dependencies: + '@roamhq/wrtc': 0.8.0 + '@sinclair/typebox': 0.32.35 + headers-polyfill: 3.3.0 + json-stable-stringify: 1.2.1 + node-fetch: 3.3.2 + otpauth: 9.3.6 + set-cookie-parser: 2.7.1 + tough-cookie: 4.1.4 + tslib: 2.8.1 + twitter-api-v2: 1.18.2 + undici: 7.2.0 + ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) + transitivePeerDependencies: + - bufferutil + - utf-8-validate + agentkeepalive@4.5.0: dependencies: humanize-ms: 1.2.1 @@ -32276,6 +32354,11 @@ snapshots: domelementtype@2.3.0: {} + domexception@4.0.0: + dependencies: + webidl-conversions: 7.0.0 + optional: true + domhandler@4.3.1: dependencies: domelementtype: 2.3.0 From eeeb31cba0e04df208ac1351c37ebf8b4b819501 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Mon, 30 Dec 2024 00:03:53 +0100 Subject: [PATCH 3/5] Adding back comments and documentation --- packages/client-twitter/src/environment.ts | 33 +++++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/client-twitter/src/environment.ts b/packages/client-twitter/src/environment.ts index f73a5676781..81bce96c13e 100644 --- a/packages/client-twitter/src/environment.ts +++ b/packages/client-twitter/src/environment.ts @@ -88,6 +88,11 @@ function safeParseInt(value: string | undefined | null, defaultValue: number): n * Validates or constructs a TwitterConfig object using zod, * taking values from the IAgentRuntime or process.env as needed. */ +// This also is organized to serve as a point of documentation for the client +// most of the inputs from the framework (env/character) + +// we also do a lot of typing/parsing here +// so we can do it once and only once per character export async function validateTwitterConfig(runtime: IAgentRuntime): Promise { try { const twitterConfig = { @@ -109,7 +114,7 @@ export async function validateTwitterConfig(runtime: IAgentRuntime): Promise Date: Mon, 30 Dec 2024 19:36:26 +0100 Subject: [PATCH 4/5] feat: add optional TRANSCRIPTION_PROVIDER setting with fallback logic, moving twitter spaces plugins to Eliza repo --- .env.example | 3 + packages/client-twitter/src/index.ts | 6 +- .../src/plugins/SttTtsSpacesPlugin.ts | 532 ++++++++++++++++++ packages/client-twitter/src/spaces.ts | 9 +- .../plugin-node/src/services/transcription.ts | 83 ++- 5 files changed, 621 insertions(+), 12 deletions(-) create mode 100644 packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts diff --git a/.env.example b/.env.example index 7b904d2ba8b..c266dc74f60 100644 --- a/.env.example +++ b/.env.example @@ -43,6 +43,9 @@ LIVEPEER_IMAGE_MODEL= # Default: ByteDance/SDXL-Lightning # Speech Synthesis ELEVENLABS_XI_API_KEY= # API key from elevenlabs +# Transcription Provider +TRANSCRIPTION_PROVIDER= # Default: local (possible values: openai, deepgram, local) + # Direct Client Setting EXPRESS_MAX_PAYLOAD= # Default: 100kb diff --git a/packages/client-twitter/src/index.ts b/packages/client-twitter/src/index.ts index 39ee853e828..6da648636ec 100644 --- a/packages/client-twitter/src/index.ts +++ b/packages/client-twitter/src/index.ts @@ -1,4 +1,8 @@ -import { Client, elizaLogger, IAgentRuntime } from "@elizaos/core"; +import { + Client, + elizaLogger, + IAgentRuntime, +} from "@elizaos/core"; import { ClientBase } from "./base.ts"; import { validateTwitterConfig, TwitterConfig } from "./environment.ts"; import { TwitterInteractionClient } from "./interactions.ts"; diff --git a/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts new file mode 100644 index 00000000000..3ecd5c95059 --- /dev/null +++ b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts @@ -0,0 +1,532 @@ +// src/plugins/SttTtsPlugin.ts + +import fs from 'fs'; +import path from 'path'; +import { spawn } from 'child_process'; +import { ITranscriptionService } from '@elizaos/core'; +import { Space, JanusClient, AudioDataWithUser } from 'agent-twitter-client'; + +interface PluginConfig { + openAiApiKey?: string; // for STT & ChatGPT + elevenLabsApiKey?: string; // for TTS + sttLanguage?: string; // e.g. "en" for Whisper + gptModel?: string; // e.g. "gpt-3.5-turbo" + silenceThreshold?: number; // amplitude threshold for ignoring silence + voiceId?: string; // specify which ElevenLabs voice to use + elevenLabsModel?: string; // e.g. "eleven_monolingual_v1" + systemPrompt?: string; // ex. "You are a helpful AI assistant" + chatContext?: Array<{ + role: 'system' | 'user' | 'assistant'; + content: string; + }>; + transcriptionService: ITranscriptionService; +} + +/** + * MVP plugin for speech-to-text (OpenAI) + conversation + TTS (ElevenLabs) + * Approach: + * - Collect each speaker's unmuted PCM in a memory buffer (only if above silence threshold) + * - On speaker mute -> flush STT -> GPT -> TTS -> push to Janus + */ +export class SttTtsPlugin implements Plugin { + private space?: Space; + private janus?: JanusClient; + + private openAiApiKey?: string; + private elevenLabsApiKey?: string; + + private sttLanguage = 'en'; + private gptModel = 'gpt-3.5-turbo'; + private voiceId = '21m00Tcm4TlvDq8ikWAM'; + private elevenLabsModel = 'eleven_monolingual_v1'; + private systemPrompt = 'You are a helpful AI assistant.'; + private chatContext: Array<{ + role: 'system' | 'user' | 'assistant'; + content: string; + }> = []; + + private transcriptionService: ITranscriptionService; + + /** + * userId => arrayOfChunks (PCM Int16) + */ + private pcmBuffers = new Map(); + + /** + * Track mute states: userId => boolean (true=unmuted) + */ + private speakerUnmuted = new Map(); + + /** + * For ignoring near-silence frames (if amplitude < threshold) + */ + private silenceThreshold = 50; + + // TTS queue for sequentially speaking + private ttsQueue: string[] = []; + private isSpeaking = false; + + onAttach(space: Space) { + console.log('[SttTtsPlugin] onAttach => space was attached'); + } + + init(params: { space: Space; pluginConfig?: Record }): void { + console.log( + '[SttTtsPlugin] init => Space fully ready. Subscribing to events.', + ); + + this.space = params.space; + this.janus = (this.space as any)?.janusClient as JanusClient | undefined; + + const config = params.pluginConfig as PluginConfig; + this.openAiApiKey = config?.openAiApiKey; + this.elevenLabsApiKey = config?.elevenLabsApiKey; + this.transcriptionService = config.transcriptionService; + if (config?.sttLanguage) this.sttLanguage = config.sttLanguage; + if (config?.gptModel) this.gptModel = config.gptModel; + if (typeof config?.silenceThreshold === 'number') { + this.silenceThreshold = config.silenceThreshold; + } + if (config?.voiceId) { + this.voiceId = config.voiceId; + } + if (config?.elevenLabsModel) { + this.elevenLabsModel = config.elevenLabsModel; + } + if (config?.systemPrompt) { + this.systemPrompt = config.systemPrompt; + } + if (config?.chatContext) { + this.chatContext = config.chatContext; + } + console.log('[SttTtsPlugin] Plugin config =>', config); + + // Listen for mute events + this.space.on( + 'muteStateChanged', + (evt: { userId: string; muted: boolean }) => { + console.log('[SttTtsPlugin] Speaker muteStateChanged =>', evt); + if (evt.muted) { + this.handleMute(evt.userId).catch((err) => + console.error('[SttTtsPlugin] handleMute error =>', err), + ); + } else { + this.speakerUnmuted.set(evt.userId, true); + if (!this.pcmBuffers.has(evt.userId)) { + this.pcmBuffers.set(evt.userId, []); + } + } + }, + ); + } + + /** + * Called whenever we receive PCM from a speaker + */ + onAudioData(data: AudioDataWithUser): void { + if (!this.speakerUnmuted.get(data.userId)) return; + + let maxVal = 0; + for (let i = 0; i < data.samples.length; i++) { + const val = Math.abs(data.samples[i]); + if (val > maxVal) maxVal = val; + } + if (maxVal < this.silenceThreshold) { + return; + } + + let arr = this.pcmBuffers.get(data.userId); + if (!arr) { + arr = []; + this.pcmBuffers.set(data.userId, arr); + } + arr.push(data.samples); + } + + // /src/sttTtsPlugin.ts + private async convertPcmToWavInMemory( + pcmData: Int16Array, + sampleRate: number + ): Promise { + // number of channels + const numChannels = 1; + // byte rate = (sampleRate * numChannels * bitsPerSample/8) + const byteRate = sampleRate * numChannels * 2; + const blockAlign = numChannels * 2; + // data chunk size = pcmData.length * (bitsPerSample/8) + const dataSize = pcmData.length * 2; + + // WAV header is 44 bytes + const buffer = new ArrayBuffer(44 + dataSize); + const view = new DataView(buffer); + + // RIFF chunk descriptor + this.writeString(view, 0, 'RIFF'); + view.setUint32(4, 36 + dataSize, true); // file size - 8 + this.writeString(view, 8, 'WAVE'); + + // fmt sub-chunk + this.writeString(view, 12, 'fmt '); + view.setUint32(16, 16, true); // Subchunk1Size (16 for PCM) + view.setUint16(20, 1, true); // AudioFormat (1 = PCM) + view.setUint16(22, numChannels, true); // NumChannels + view.setUint32(24, sampleRate, true); // SampleRate + view.setUint32(28, byteRate, true); // ByteRate + view.setUint16(32, blockAlign, true); // BlockAlign + view.setUint16(34, 16, true); // BitsPerSample (16) + + // data sub-chunk + this.writeString(view, 36, 'data'); + view.setUint32(40, dataSize, true); + + // Write PCM samples + let offset = 44; + for (let i = 0; i < pcmData.length; i++, offset += 2) { + view.setInt16(offset, pcmData[i], true); + } + + return buffer; + } + + private writeString(view: DataView, offset: number, text: string) { + for (let i = 0; i < text.length; i++) { + view.setUint8(offset + i, text.charCodeAt(i)); + } + } + + /** + * On speaker mute => flush STT => GPT => TTS => push to Janus + */ + private async handleMute(userId: string): Promise { + this.speakerUnmuted.set(userId, false); + const chunks = this.pcmBuffers.get(userId) || []; + this.pcmBuffers.set(userId, []); + + if (!chunks.length) { + console.log('[SttTtsPlugin] No audio chunks for user =>', userId); + return; + } + console.log( + `[SttTtsPlugin] Flushing STT buffer for user=${userId}, chunks=${chunks.length}`, + ); + + const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); + const merged = new Int16Array(totalLen); + let offset = 0; + for (const c of chunks) { + merged.set(c, offset); + offset += c.length; + } + + // Convert PCM to WAV for STT + const wavBuffer = await this.convertPcmToWavInMemory(merged, 48000); + + // Whisper STT + const sttText = await this.transcriptionService.transcribe(wavBuffer); + + if (!sttText.trim()) { + console.log('[SttTtsPlugin] No speech recognized for user =>', userId); + return; + } + console.log(`[SttTtsPlugin] STT => user=${userId}, text="${sttText}"`); + + // GPT answer + const replyText = await this.askChatGPT(sttText); + console.log(`[SttTtsPlugin] GPT => user=${userId}, reply="${replyText}"`); + + // Use the standard speak method with queue + await this.speakText(replyText); + } + + /** + * Public method to queue a TTS request + */ + public async speakText(text: string): Promise { + this.ttsQueue.push(text); + if (!this.isSpeaking) { + this.isSpeaking = true; + this.processTtsQueue().catch((err) => { + console.error('[SttTtsPlugin] processTtsQueue error =>', err); + }); + } + } + + /** + * Process TTS requests one by one + */ + private async processTtsQueue(): Promise { + while (this.ttsQueue.length > 0) { + const text = this.ttsQueue.shift(); + if (!text) continue; + + try { + const ttsAudio = await this.elevenLabsTts(text); + const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); + await this.streamToJanus(pcm, 48000); + } catch (err) { + console.error('[SttTtsPlugin] TTS streaming error =>', err); + } + } + this.isSpeaking = false; + } + + private convertPcmToWav( + samples: Int16Array, + sampleRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const tmpPath = path.resolve('/tmp', `stt-${Date.now()}.wav`); + const ff = spawn('ffmpeg', [ + '-f', + 's16le', + '-ar', + sampleRate.toString(), + '-ac', + '1', + '-i', + 'pipe:0', + '-y', + tmpPath, + ]); + ff.stdin.write(Buffer.from(samples.buffer)); + ff.stdin.end(); + ff.on('close', (code) => { + if (code === 0) resolve(tmpPath); + else reject(new Error(`ffmpeg error code=${code}`)); + }); + }); + } + + /** + * OpenAI Whisper STT + */ + private async transcribeWithOpenAI(wavPath: string, language: string) { + if (!this.openAiApiKey) { + throw new Error('[SttTtsPlugin] No OpenAI API key available'); + } + + try { + console.log('[SttTtsPlugin] Transcribe =>', wavPath); + + // Read file into buffer + const fileBuffer = fs.readFileSync(wavPath); + console.log( + '[SttTtsPlugin] File read, size:', + fileBuffer.length, + 'bytes', + ); + + // Create blob from buffer + const blob = new Blob([fileBuffer], { type: 'audio/wav' }); + + // Create FormData + const formData = new FormData(); + formData.append('file', blob, path.basename(wavPath)); + formData.append('model', 'whisper-1'); + formData.append('language', language); + formData.append('temperature', '0'); + + // Call OpenAI API + const response = await fetch( + 'https://api.openai.com/v1/audio/transcriptions', + { + method: 'POST', + headers: { + Authorization: `Bearer ${this.openAiApiKey}`, + }, + body: formData, + }, + ); + if (!response.ok) { + const errorText = await response.text(); + console.error('[SttTtsPlugin] OpenAI API Error:', errorText); + throw new Error(`OpenAI API error: ${response.status} ${errorText}`); + } + const data = (await response.json()) as { text: string }; + return data.text?.trim() || ''; + } catch (err) { + console.error('[SttTtsPlugin] OpenAI STT Error =>', err); + throw new Error('OpenAI STT failed'); + } + } + + /** + * Simple ChatGPT call + */ + private async askChatGPT(userText: string): Promise { + if (!this.openAiApiKey) { + throw new Error('[SttTtsPlugin] No OpenAI API key for ChatGPT'); + } + const url = 'https://api.openai.com/v1/chat/completions'; + const messages = [ + { role: 'system', content: this.systemPrompt }, + ...this.chatContext, + { role: 'user', content: userText }, + ]; + + const resp = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.openAiApiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + model: this.gptModel, + messages, + }), + }); + + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ChatGPT error => ${resp.status} ${errText}`, + ); + } + + const json = await resp.json(); + const reply = json.choices?.[0]?.message?.content || ''; + this.chatContext.push({ role: 'user', content: userText }); + this.chatContext.push({ role: 'assistant', content: reply }); + return reply.trim(); + } + + /** + * ElevenLabs TTS => returns MP3 Buffer + */ + private async elevenLabsTts(text: string): Promise { + if (!this.elevenLabsApiKey) { + throw new Error('[SttTtsPlugin] No ElevenLabs API key'); + } + const url = `https://api.elevenlabs.io/v1/text-to-speech/${this.voiceId}`; + const resp = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'xi-api-key': this.elevenLabsApiKey, + }, + body: JSON.stringify({ + text, + model_id: this.elevenLabsModel, + voice_settings: { stability: 0.4, similarity_boost: 0.8 }, + }), + }); + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ElevenLabs TTS error => ${resp.status} ${errText}`, + ); + } + const arrayBuf = await resp.arrayBuffer(); + return Buffer.from(arrayBuf); + } + + /** + * Convert MP3 => PCM via ffmpeg + */ + private convertMp3ToPcm( + mp3Buf: Buffer, + outRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const ff = spawn('ffmpeg', [ + '-i', + 'pipe:0', + '-f', + 's16le', + '-ar', + outRate.toString(), + '-ac', + '1', + 'pipe:1', + ]); + let raw = Buffer.alloc(0); + + ff.stdout.on('data', (chunk: Buffer) => { + raw = Buffer.concat([raw, chunk]); + }); + ff.stderr.on('data', () => { + // ignoring ffmpeg logs + }); + ff.on('close', (code) => { + if (code !== 0) { + reject(new Error(`ffmpeg error code=${code}`)); + return; + } + const samples = new Int16Array( + raw.buffer, + raw.byteOffset, + raw.byteLength / 2, + ); + resolve(samples); + }); + + ff.stdin.write(mp3Buf); + ff.stdin.end(); + }); + } + + /** + * Push PCM back to Janus in small frames + * We'll do 10ms @48k => 960 samples per frame + */ + private async streamToJanus( + samples: Int16Array, + sampleRate: number, + ): Promise { + // TODO: Check if better than 480 fixed + const FRAME_SIZE = Math.floor(sampleRate * 0.01); // 10ms frames => 480 @48kHz + + for ( + let offset = 0; + offset + FRAME_SIZE <= samples.length; + offset += FRAME_SIZE + ) { + const frame = new Int16Array(FRAME_SIZE); + frame.set(samples.subarray(offset, offset + FRAME_SIZE)); + this.janus?.pushLocalAudio(frame, sampleRate, 1); + + // Short pause so we don't overload + await new Promise((r) => setTimeout(r, 10)); + } + } + + public setSystemPrompt(prompt: string) { + this.systemPrompt = prompt; + console.log('[SttTtsPlugin] setSystemPrompt =>', prompt); + } + + /** + * Change the GPT model at runtime (e.g. "gpt-4", "gpt-3.5-turbo", etc.). + */ + public setGptModel(model: string) { + this.gptModel = model; + console.log('[SttTtsPlugin] setGptModel =>', model); + } + + /** + * Add a message (system, user or assistant) to the chat context. + * E.g. to store conversation history or inject a persona. + */ + public addMessage(role: 'system' | 'user' | 'assistant', content: string) { + this.chatContext.push({ role, content }); + console.log( + `[SttTtsPlugin] addMessage => role=${role}, content=${content}`, + ); + } + + /** + * Clear the chat context if needed. + */ + public clearChatContext() { + this.chatContext = []; + console.log('[SttTtsPlugin] clearChatContext => done'); + } + + cleanup(): void { + console.log('[SttTtsPlugin] cleanup => releasing resources'); + this.pcmBuffers.clear(); + this.speakerUnmuted.clear(); + this.ttsQueue = []; + this.isSpeaking = false; + } +} diff --git a/packages/client-twitter/src/spaces.ts b/packages/client-twitter/src/spaces.ts index a67430191c7..6076b80e8fd 100644 --- a/packages/client-twitter/src/spaces.ts +++ b/packages/client-twitter/src/spaces.ts @@ -4,6 +4,8 @@ import { composeContext, generateText, ModelClass, + ServiceType, + ITranscriptionService, } from "@elizaos/core"; import { ClientBase } from "./base"; import { @@ -11,10 +13,12 @@ import { Space, SpaceConfig, RecordToDiskPlugin, - SttTtsPlugin, IdleMonitorPlugin, SpeakerRequest, } from "agent-twitter-client"; +import { + SttTtsPlugin +} from './plugins/SttTtsSpacesPlugin.ts'; interface SpaceDecisionOptions { maxSpeakers?: number; @@ -305,6 +309,9 @@ export class TwitterSpaceClient { gptModel: this.decisionOptions.gptModel, systemPrompt: this.decisionOptions.systemPrompt, sttLanguage: this.decisionOptions.sttLanguage, + transcriptionService: this.client.runtime.getService( + ServiceType.TRANSCRIPTION, + ) }); } diff --git a/packages/plugin-node/src/services/transcription.ts b/packages/plugin-node/src/services/transcription.ts index 5b734061524..7ffb441ff1d 100644 --- a/packages/plugin-node/src/services/transcription.ts +++ b/packages/plugin-node/src/services/transcription.ts @@ -34,14 +34,35 @@ export class TranscriptionService private isCudaAvailable: boolean = false; private openai: OpenAI | null = null; private deepgram?: DeepgramClient; + private preferredProvider?: string; // "deepgram", "openai", "local" private queue: { audioBuffer: ArrayBuffer; resolve: Function }[] = []; private processing: boolean = false; async initialize(_runtime: IAgentRuntime): Promise { this.runtime = _runtime; + + /** + * We set preferredProvider only if TRANSCRIPTION_PROVIDER is defined. + * The old logic remains in place (Deepgram > OpenAI > Local) for those + * who haven't configured TRANSCRIPTION_PROVIDER yet. + * This way, existing users relying on Deepgram without updating .env + * won't have their workflow broken. + */ + const provider = this.runtime.getSetting("TRANSCRIPTION_PROVIDER"); + if (provider) { + this.preferredProvider = provider; // "deepgram", "openai", "local" ... + } + const deepgramKey = this.runtime.getSetting("DEEPGRAM_API_KEY"); this.deepgram = deepgramKey ? createClient(deepgramKey) : null; + + const openaiKey = this.runtime.getSetting("OPENAI_API_KEY"); + this.openai = openaiKey + ? new OpenAI({ + apiKey: openaiKey, + }) + : null; } constructor() { @@ -92,7 +113,7 @@ export class TranscriptionService } else if (platform === "win32") { const cudaPath = path.join( settings.CUDA_PATH || - "C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v11.0", + "C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v11.0", "bin", "nvcc.exe" ); @@ -192,21 +213,24 @@ export class TranscriptionService } private async processQueue(): Promise { - if (this.processing || this.queue.length === 0) { - return; - } - + // Exit if already processing or if the queue is empty + if (this.processing || this.queue.length === 0) return; this.processing = true; while (this.queue.length > 0) { const { audioBuffer, resolve } = this.queue.shift()!; let result: string | null = null; - if (this.deepgram) { - result = await this.transcribeWithDeepgram(audioBuffer); - } else if (this.openai) { - result = await this.transcribeWithOpenAI(audioBuffer); + + /** + * If TRANSCRIPTION_PROVIDER is set, we use the new approach. + * Otherwise, we preserve the original fallback logic (Deepgram > OpenAI > Local). + * This ensures we don't break existing configurations where Deepgram is expected + * but TRANSCRIPTION_PROVIDER isn't set in the .env. + */ + if (this.preferredProvider) { + result = await this.transcribeUsingPreferredOrFallback(audioBuffer); } else { - result = await this.transcribeLocally(audioBuffer); + result = await this.transcribeUsingDefaultLogic(audioBuffer); } resolve(result); @@ -215,6 +239,45 @@ export class TranscriptionService this.processing = false; } + /** + * New approach (preferred provider + fallback). + * This can still handle a missing provider setting gracefully. + */ + private async transcribeUsingPreferredOrFallback(audioBuffer: ArrayBuffer): Promise { + let result: string | null = null; + + switch (this.preferredProvider) { + case "deepgram": + if (this.deepgram) { + result = await this.transcribeWithDeepgram(audioBuffer); + if (result) return result; + } + // fallback to openai + case "openai": + if (this.openai) { + result = await this.transcribeWithOpenAI(audioBuffer); + if (result) return result; + } + // fallback to local + case "local": + default: + return await this.transcribeLocally(audioBuffer); + } + } + + /** + * Original logic: Deepgram -> OpenAI -> Local + * We keep it untouched for backward compatibility. + */ + private async transcribeUsingDefaultLogic(audioBuffer: ArrayBuffer): Promise { + if (this.deepgram) { + return await this.transcribeWithDeepgram(audioBuffer); + } else if (this.openai) { + return await this.transcribeWithOpenAI(audioBuffer); + } + return await this.transcribeLocally(audioBuffer); + } + private async transcribeWithDeepgram( audioBuffer: ArrayBuffer ): Promise { From daa0e1332d6a80a55c42328dbfa6f611934dd0f1 Mon Sep 17 00:00:00 2001 From: slkzgm Date: Mon, 30 Dec 2024 20:13:17 +0100 Subject: [PATCH 5/5] Cleanup on packages/client-twitter/src/plugins/SttTtsPlugin.ts --- .../src/plugins/SttTtsSpacesPlugin.ts | 86 +------------------ pnpm-lock.yaml | 10 +-- 2 files changed, 6 insertions(+), 90 deletions(-) diff --git a/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts index 3ecd5c95059..8343dac5b17 100644 --- a/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts +++ b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts @@ -1,7 +1,5 @@ // src/plugins/SttTtsPlugin.ts -import fs from 'fs'; -import path from 'path'; import { spawn } from 'child_process'; import { ITranscriptionService } from '@elizaos/core'; import { Space, JanusClient, AudioDataWithUser } from 'agent-twitter-client'; @@ -35,7 +33,6 @@ export class SttTtsPlugin implements Plugin { private openAiApiKey?: string; private elevenLabsApiKey?: string; - private sttLanguage = 'en'; private gptModel = 'gpt-3.5-turbo'; private voiceId = '21m00Tcm4TlvDq8ikWAM'; private elevenLabsModel = 'eleven_monolingual_v1'; @@ -82,7 +79,6 @@ export class SttTtsPlugin implements Plugin { this.openAiApiKey = config?.openAiApiKey; this.elevenLabsApiKey = config?.elevenLabsApiKey; this.transcriptionService = config.transcriptionService; - if (config?.sttLanguage) this.sttLanguage = config.sttLanguage; if (config?.gptModel) this.gptModel = config.gptModel; if (typeof config?.silenceThreshold === 'number') { this.silenceThreshold = config.silenceThreshold; @@ -224,7 +220,7 @@ export class SttTtsPlugin implements Plugin { // Whisper STT const sttText = await this.transcriptionService.transcribe(wavBuffer); - if (!sttText.trim()) { + if (!sttText || !sttText.trim()) { console.log('[SttTtsPlugin] No speech recognized for user =>', userId); return; } @@ -270,86 +266,6 @@ export class SttTtsPlugin implements Plugin { this.isSpeaking = false; } - private convertPcmToWav( - samples: Int16Array, - sampleRate: number, - ): Promise { - return new Promise((resolve, reject) => { - const tmpPath = path.resolve('/tmp', `stt-${Date.now()}.wav`); - const ff = spawn('ffmpeg', [ - '-f', - 's16le', - '-ar', - sampleRate.toString(), - '-ac', - '1', - '-i', - 'pipe:0', - '-y', - tmpPath, - ]); - ff.stdin.write(Buffer.from(samples.buffer)); - ff.stdin.end(); - ff.on('close', (code) => { - if (code === 0) resolve(tmpPath); - else reject(new Error(`ffmpeg error code=${code}`)); - }); - }); - } - - /** - * OpenAI Whisper STT - */ - private async transcribeWithOpenAI(wavPath: string, language: string) { - if (!this.openAiApiKey) { - throw new Error('[SttTtsPlugin] No OpenAI API key available'); - } - - try { - console.log('[SttTtsPlugin] Transcribe =>', wavPath); - - // Read file into buffer - const fileBuffer = fs.readFileSync(wavPath); - console.log( - '[SttTtsPlugin] File read, size:', - fileBuffer.length, - 'bytes', - ); - - // Create blob from buffer - const blob = new Blob([fileBuffer], { type: 'audio/wav' }); - - // Create FormData - const formData = new FormData(); - formData.append('file', blob, path.basename(wavPath)); - formData.append('model', 'whisper-1'); - formData.append('language', language); - formData.append('temperature', '0'); - - // Call OpenAI API - const response = await fetch( - 'https://api.openai.com/v1/audio/transcriptions', - { - method: 'POST', - headers: { - Authorization: `Bearer ${this.openAiApiKey}`, - }, - body: formData, - }, - ); - if (!response.ok) { - const errorText = await response.text(); - console.error('[SttTtsPlugin] OpenAI API Error:', errorText); - throw new Error(`OpenAI API error: ${response.status} ${errorText}`); - } - const data = (await response.json()) as { text: string }; - return data.text?.trim() || ''; - } catch (err) { - console.error('[SttTtsPlugin] OpenAI STT Error =>', err); - throw new Error('OpenAI STT failed'); - } - } - /** * Simple ChatGPT call */ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 482cabba4c8..f7311a2c675 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18491,8 +18491,8 @@ packages: resolution: {integrity: sha512-U8uCCl2x9TK3WANvmBavymRzxbfFYG+tAu+fgx3zxQy3qdagQqBLwJVrdyO1TBfUXvfKveMKJZhpvUYoOjM+4g==} engines: {node: '>=18.17'} - undici@7.2.0: - resolution: {integrity: sha512-klt+0S55GBViA9nsq48/NSCo4YX5mjydjypxD7UmHh/brMu8h/Mhd/F7qAeoH2NOO8SDTk6kjnTFc4WpzmfYpQ==} + undici@7.1.1: + resolution: {integrity: sha512-WZkQ6eH9f5ZT93gaIffsbUaDpBwjbpvmMbfaEhOnbdUneurTESeRxwPGwjI28mRFESH3W3e8Togijh37ptOQqA==} engines: {node: '>=20.18.1'} unenv@1.10.0: @@ -29632,7 +29632,7 @@ snapshots: tough-cookie: 4.1.4 tslib: 2.8.1 twitter-api-v2: 1.18.2 - undici: 7.2.0 + undici: 7.1.1 agent-twitter-client@0.0.18(bufferutil@4.0.8)(utf-8-validate@5.0.10): dependencies: @@ -29646,7 +29646,7 @@ snapshots: tough-cookie: 4.1.4 tslib: 2.8.1 twitter-api-v2: 1.18.2 - undici: 7.2.0 + undici: 7.1.1 ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) transitivePeerDependencies: - bufferutil @@ -42332,7 +42332,7 @@ snapshots: undici@6.19.8: {} - undici@7.2.0: {} + undici@7.1.1: {} unenv@1.10.0: dependencies: