Skip to content

Commit

Permalink
getting started on websocket transport
Browse files Browse the repository at this point in the history
  • Loading branch information
Moishe committed Mar 21, 2024
1 parent 2c5628a commit 35a2b3a
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 7 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@ jobs:
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Setup virtual environment
run: |
python -m venv .venv
- name: Install basic Python dependencies
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: autopep8
id: autopep8
uses: peter-evans/autopep8@v2
with:
args: --exit-code -r -d -a -a src/
run: |
source .venv/bin/activate
autopep8 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1
142 changes: 142 additions & 0 deletions examples/foundational/websocket-server/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="//cdn.jsdelivr.net/npm/[email protected]/dist/protobuf.min.js"></script>
<title>WebSocket Audio Stream</title>
</head>

<body>
<h1>WebSocket Audio Stream</h1>
<button onclick="startAudio()">Start Audio</button>
<button onclick="stopAudio()">Stop Audio</button>
<script>
let audioContext;
let microphoneStream;
let audioCtx;
let frame;

proto = protobuf.load("frames.proto", function (err, root) {
if (err) {
throw err;
}
frame = root.lookupType("dailyai_proto.Frame");
});


async function startAudio() {
audioCtx = new (window.AudioContext || window.webkitAudioContext)();

if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}

// Get the microphone stream.
microphoneStream = await navigator.mediaDevices.getUserMedia({ audio: true });

// Create an AudioContext.
const context = new AudioContext();

// Create a ScriptProcessorNode.
const scriptProcessor = context.createScriptProcessor(8192, 1, 1);

// Connect the microphone stream to the ScriptProcessorNode.
const source = context.createMediaStreamSource(microphoneStream);
source.connect(scriptProcessor);

// Connect the ScriptProcessorNode to the destination.
scriptProcessor.connect(context.destination);

scriptProcessor.onaudioprocess = (event) => {
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
scaledToInt = []
for (var i = 0; i < rawLeftChannelData.length; i++) {
// Convert each item from -1.0-1.0 to a 16-bit signed integer
scaledToInt[i] = (rawLeftChannelData[i] * 32767 + 32767) % 65535;
}
//ws.send(scaledToInt);
};


initWebSocket();
}

function stopAudio() {
microphoneStream.disconnect();
ws.close();
ws = undefined;
}

function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');

ws.addEventListener('open', function (event) {
console.log('WebSocket connection established.');
});

var audioChunks = [];
var isPlaying = false;

function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}

isPlaying = true;
var audioOutBuffer = audioChunks.shift(); // Get the next audio chunk
var source = audioCtx.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioCtx.destination);
source.onended = playNextChunk; // Set the onended event handler
source.start();
}

ws.addEventListener('message', async function (event) {
if (!event.data?.size) {
return false;
}

try {
var parsedFrame = frame.decode(new Uint8Array(await event.data.arrayBuffer()));
if (!parsedFrame?.audio) {
return false;
}
var frameCount = parsedFrame.audio.audio.length / 2;
var audioOutBuffer = audioCtx.createBuffer(1, frameCount, 16000);
var nowBuffering = audioOutBuffer.getChannelData(0);
var buffer = parsedFrame.audio.audio.buffer;
var view = new Int16Array(buffer);
for (var i = 0; i < frameCount; i++) {
var word = view[i];
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
}
audioChunks.push(audioOutBuffer); // Add the new audio chunk to the array

if (!isPlaying) {
playNextChunk(); // Start playing if not already playing
}
return false;
} catch (error) {
console.error('Error playing audio:', error);
}
});

ws.addEventListener('close', function (event) {
console.log("WebSocket connection closed.", event.code, event.reason);
});

ws.addEventListener('error', function (event) {
console.error('WebSocket error:', event);
});
}

window.onload = function () {
};
</script>
</body>

</html>
70 changes: 70 additions & 0 deletions examples/foundational/websocket-server/sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import aiohttp
import logging
import os
import wave

from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import AudioFrame, EndFrame, EndPipeFrame, TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.websocket_transport_service import WebsocketTransport
from dailyai.services.whisper_ai_services import WhisperSTTService

logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)


async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketTransport(
mic_enabled=True, speaker_enabled=True)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

class WriteToWav(FrameProcessor):
async def process_frame(self, frame):
if isinstance(frame, AudioFrame):
with wave.open("output.wav", "wb") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(16000)
f.writeframes(frame.data)
yield frame

class AudioChunker(FrameProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._buffer = bytes()

async def process_frame(self, frame):
if isinstance(frame, AudioFrame):
self._buffer += frame.data
if len(self._buffer) >= 1600:
yield AudioFrame(self._buffer[:1600])
self._buffer = self._buffer[1600:]
elif isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
if self._buffer:
yield AudioFrame(self._buffer)
self._buffer = bytes()
else:
yield frame

# pipeline = Pipeline([WriteToWav(), WhisperSTTService(), tts])
pipeline = Pipeline([tts, AudioChunker()])

@transport.on_connection
async def queue_frame():
await pipeline.queue_frames([TextFrame("Hello there!")])

await asyncio.gather(transport.run(pipeline))
del tts


if __name__ == "__main__":
asyncio.run(main())
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ dependencies = [
"torch",
"torchaudio",
"pyaudio",
"typing-extensions"
"typing-extensions",
"websockets"
]

[project.urls]
Expand Down
25 changes: 25 additions & 0 deletions src/dailyai/pipeline/frames.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

package dailyai_proto;

message TextFrame {
string text = 1;
}

message AudioFrame {
bytes audio = 1;
}

message TranscriptionFrame {
string text = 1;
string participant_id = 2;
string timestamp = 3;
}

message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}
Loading

0 comments on commit 35a2b3a

Please sign in to comment.