Skip to content

Commit

Permalink
seems to work...?
Browse files Browse the repository at this point in the history
  • Loading branch information
Moishe committed Mar 23, 2024
1 parent 35a2b3a commit f77f643
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 79 deletions.
25 changes: 25 additions & 0 deletions examples/foundational/websocket-server/frames.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

package dailyai_proto;

message TextFrame {
string text = 1;
}

message AudioFrame {
bytes audio = 1;
}

message TranscriptionFrame {
string text = 1;
string participant_id = 2;
string timestamp = 3;
}

message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}
138 changes: 90 additions & 48 deletions examples/foundational/websocket-server/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ <h1>WebSocket Audio Stream</h1>
<script>
let audioContext;
let microphoneStream;
let scriptProcessor;
let source;
let audioCtx;
let frame;
let echo_interval;
let b2p_interval;

proto = protobuf.load("frames.proto", function (err, root) {
if (err) {
Expand All @@ -25,7 +29,6 @@ <h1>WebSocket Audio Stream</h1>
frame = root.lookupType("dailyai_proto.Frame");
});


async function startAudio() {
audioCtx = new (window.AudioContext || window.webkitAudioContext)();

Expand All @@ -41,33 +44,107 @@ <h1>WebSocket Audio Stream</h1>
const context = new AudioContext();

// Create a ScriptProcessorNode.
const scriptProcessor = context.createScriptProcessor(8192, 1, 1);
scriptProcessor = context.createScriptProcessor(8192, 1, 1);

// Connect the microphone stream to the ScriptProcessorNode.
const source = context.createMediaStreamSource(microphoneStream);
source = context.createMediaStreamSource(microphoneStream);
source.connect(scriptProcessor);

// Connect the ScriptProcessorNode to the destination.
scriptProcessor.connect(context.destination);

var audio_buffer = []
function buffer_to_proto() {
MIN_AUDIO_SIZE = 6400 // send an audio frame every 1/4 of a second
if (audio_buffer.length > MIN_AUDIO_SIZE) {
var audioFrame = frame.create({ audio: { audio: audio_buffer.slice(0, MIN_AUDIO_SIZE) } });
var encodedFrame = new Uint8Array(frame.encode(audioFrame).finish())
console.log("sending audio frame")
ws.send(encodedFrame)
audio_buffer = audio_buffer.slice(MIN_AUDIO_SIZE)
}
}


scriptProcessor.onaudioprocess = (event) => {
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
scaledToInt = []
for (var i = 0; i < rawLeftChannelData.length; i++) {
// Convert each item from -1.0-1.0 to a 16-bit signed integer
scaledToInt[i] = (rawLeftChannelData[i] * 32767 + 32767) % 65535;
const skipRatio = Math.floor(audioCtx.sampleRate / (16000 * 2));
var scaledToInt = []
var sum = 0
var mx = 0
for (var i = 0; i < rawLeftChannelData.length; i += skipRatio) {
// Convert each item from -1.0-1.0 to a 16-bit unsigned integer
var normalized = ((rawLeftChannelData[i] * 32768.0) + 32768) % 65536 - 32768;
// swap the bytes
normalized = ((normalized & 0xff) << 8) | ((normalized >> 8) & 0xff);

scaledToInt.push(normalized);
// swap the bytes maybe
sum += normalized
max = Math.max(mx, Math.abs(normalized))
}
//ws.send(scaledToInt);
audio_buffer = audio_buffer.concat(scaledToInt)
};


initWebSocket();
b2p_interval = setInterval(buffer_to_proto, 1)
}

function stopAudio() {
microphoneStream.disconnect();
ws.close();
ws = undefined;
if (ws) {
ws.close();
scriptProcessor.disconnect();
source.disconnect();
echo_interval = clearInterval(echo_interval)
b2p_interval = clearInterval(b2p_interval)
ws = undefined;

}
}

var audioChunks = [];
var isPlaying = false;

function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}

isPlaying = true;
var audioOutBuffer = audioChunks.shift(); // Get the next audio chunk
var source = audioCtx.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioCtx.destination);
source.onended = playNextChunk; // Set the onended event handler
source.start();
}

function enqueueAudioFromProto(arrayBuffer) {
var parsedFrame = frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) {
console.log("no audio in frame")
return false;
}
var frameCount = parsedFrame.audio.audio.length / 2;
var audioOutBuffer = audioCtx.createBuffer(1, frameCount, 16000);
var nowBuffering = audioOutBuffer.getChannelData(0);
var buffer = parsedFrame.audio.audio.buffer;
var view = new Int16Array(buffer);
var sum = 0;
var mx = 0
for (var i = 0; i < frameCount; i++) {
var word = view[i];
//nowBuffering[i] = word;
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
sum += nowBuffering[i];
mx = Math.max(mx, Math.abs(nowBuffering[i]))
}
audioChunks.push(audioOutBuffer); // Add the new audio chunk to the array

if (!isPlaying) {
playNextChunk(); // Start playing if not already playing
}
}

function initWebSocket() {
Expand All @@ -77,48 +154,13 @@ <h1>WebSocket Audio Stream</h1>
console.log('WebSocket connection established.');
});

var audioChunks = [];
var isPlaying = false;

function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}

isPlaying = true;
var audioOutBuffer = audioChunks.shift(); // Get the next audio chunk
var source = audioCtx.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioCtx.destination);
source.onended = playNextChunk; // Set the onended event handler
source.start();
}

ws.addEventListener('message', async function (event) {
if (!event.data?.size) {
return false;
}

try {
var parsedFrame = frame.decode(new Uint8Array(await event.data.arrayBuffer()));
if (!parsedFrame?.audio) {
return false;
}
var frameCount = parsedFrame.audio.audio.length / 2;
var audioOutBuffer = audioCtx.createBuffer(1, frameCount, 16000);
var nowBuffering = audioOutBuffer.getChannelData(0);
var buffer = parsedFrame.audio.audio.buffer;
var view = new Int16Array(buffer);
for (var i = 0; i < frameCount; i++) {
var word = view[i];
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
}
audioChunks.push(audioOutBuffer); // Add the new audio chunk to the array

if (!isPlaying) {
playNextChunk(); // Start playing if not already playing
}
enqueueAudioFromProto(await event.data.arrayBuffer())
return false;
} catch (error) {
console.error('Error playing audio:', error);
Expand Down
55 changes: 36 additions & 19 deletions examples/foundational/websocket-server/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import wave

from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import AudioFrame, EndFrame, EndPipeFrame, TextFrame
from dailyai.pipeline.frames import AudioFrame, EndFrame, EndPipeFrame, TextFrame, TranscriptionQueueFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.websocket_transport_service import WebsocketTransport
Expand All @@ -19,44 +19,61 @@
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketTransport(
mic_enabled=True, speaker_enabled=True)
mic_enabled=True, speaker_enabled=True, duration_minutes=120)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

class WriteToWav(FrameProcessor):
class AudioWriter(FrameProcessor):
SIZE = 160000

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._buffer = bytes()
self._counter = 0

async def process_frame(self, frame):
if isinstance(frame, AudioFrame):
with wave.open("output.wav", "wb") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(16000)
f.writeframes(frame.data)
yield frame
self._buffer += frame.data
if len(self._buffer) >= AudioWriter.SIZE:
with wave.open(f"output-{self._counter}.wav", "wb") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(16000)
f.writeframes(self._buffer)
self._counter += 1
self._buffer = self._buffer[AudioWriter.SIZE:]
yield frame
else:
yield frame

class AudioChunker(FrameProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self):
super().__init__()
self._buffer = bytes()

async def process_frame(self, frame):
if isinstance(frame, AudioFrame):
self._buffer += frame.data
if len(self._buffer) >= 1600:
yield AudioFrame(self._buffer[:1600])
self._buffer = self._buffer[1600:]
elif isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
if self._buffer:
yield AudioFrame(self._buffer)
self._buffer = bytes()
while len(self._buffer) >= 16000:
yield AudioFrame(self._buffer[:16000])
self._buffer = self._buffer[16000:]
else:
yield frame

class WhisperTranscriber(FrameProcessor):
async def process_frame(self, frame):
if isinstance(frame, TranscriptionQueueFrame):
print(f"Transcribed: {frame.text}")
else:
yield frame

# pipeline = Pipeline([WriteToWav(), WhisperSTTService(), tts])
pipeline = Pipeline([tts, AudioChunker()])
pipeline = Pipeline(
[AudioWriter(), WhisperSTTService(), WhisperTranscriber(), tts, AudioChunker()])

@transport.on_connection
async def queue_frame():
Expand Down
Binary file added output-0.wav
Binary file not shown.
Binary file added output-1.wav
Binary file not shown.
Binary file added output.wav
Binary file not shown.
3 changes: 3 additions & 0 deletions src/dailyai/pipeline/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ def FrameFromProto(proto: frame_protos.Frame) -> Frame:
text_frame = TextFrame(text="Hello there!")
print(text_frame)
print(text_frame.to_proto().SerializeToString())
serialized = text_frame.to_proto().SerializeToString()
print(type(serialized))
print(frame_protos.Frame.FromString(serialized))
print(TextFrame.from_proto(text_frame.to_proto()))

transcripton_frame = TranscriptionQueueFrame(
Expand Down
13 changes: 2 additions & 11 deletions src/dailyai/services/websocket_transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def start_server() -> None:
print("Server started")
await self._stop_server_event.wait()
print("Server stopped")
await self.receive_queue.put(EndFrame)
await self.receive_queue.put(EndFrame())

await asyncio.gather(start_server(), timeout_task(), send_task(), pipeline.run_pipeline())

Expand All @@ -73,15 +73,6 @@ async def _websocket_handler(self, websocket: websockets.WebSocketServerProtocol

self._websocket = websocket
async for message in websocket:
"""
generic_frame = frame_protos.Frame.ParseFromString(message)
generic_frame = frame_protos.Frame.FromString(message)
frame = FrameFromProto(generic_frame)
if isinstance(frame, AudioFrame):
with wave.open("output.wav", "wb") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(16000)
f.writeframes(frame.data)
await self.receive_queue.put(frame)
"""
pass
1 change: 0 additions & 1 deletion src/dailyai/services/whisper_ai_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ async def run_stt(self, audio: BinaryIO) -> str:
res: str = ""
for segment in segments:
res += f"{segment.text} "
print("Transcription: ", segment.text)
return res

0 comments on commit f77f643

Please sign in to comment.