From 1899609257a8fc5c1d3f0efac19ed3cb6c0be441 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C5=81ukasz=20Zieniewicz?= <lukzieniewicz@gmail.com>
Date: Thu, 11 Apr 2024 23:08:45 +0200
Subject: [PATCH] Add sync versions of stream and save methods

In order to provide synchronous interface to the library
---
 examples/basic_sync_audio_streaming.py        | 27 +++++++++++++
 examples/basic_sync_generation.py             | 21 ++++++++++
 .../sync_audio_generation_in_async_context.py | 36 +++++++++++++++++
 .../sync_audio_stream_in_async_context.py     | 40 +++++++++++++++++++
 src/edge_tts/communicate.py                   | 38 ++++++++++++++++++
 5 files changed, 162 insertions(+)
 create mode 100644 examples/basic_sync_audio_streaming.py
 create mode 100644 examples/basic_sync_generation.py
 create mode 100644 examples/sync_audio_generation_in_async_context.py
 create mode 100644 examples/sync_audio_stream_in_async_context.py

diff --git a/examples/basic_sync_audio_streaming.py b/examples/basic_sync_audio_streaming.py
new file mode 100644
index 0000000..c906b85
--- /dev/null
+++ b/examples/basic_sync_audio_streaming.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python3
+
+"""
+Basic audio streaming example for sync interface
+
+"""
+
+import edge_tts
+
+TEXT = "Hello World!"
+VOICE = "en-GB-SoniaNeural"
+OUTPUT_FILE = "test.mp3"
+
+
+def main() -> None:
+    """Main function to process audio and metadata synchronously."""
+    communicate = edge_tts.Communicate(TEXT, VOICE)
+    with open(OUTPUT_FILE, "wb") as file:
+        for chunk in communicate.stream_sync():
+            if chunk["type"] == "audio":
+                file.write(chunk["data"])
+            elif chunk["type"] == "WordBoundary":
+                print(f"WordBoundary: {chunk}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/basic_sync_generation.py b/examples/basic_sync_generation.py
new file mode 100644
index 0000000..e6785bd
--- /dev/null
+++ b/examples/basic_sync_generation.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python3
+
+"""
+Basic example of edge_tts usage in synchronous function
+"""
+
+import edge_tts
+
+TEXT = "Hello World!"
+VOICE = "en-GB-SoniaNeural"
+OUTPUT_FILE = "test.mp3"
+
+
+def main() -> None:
+    """Main function"""
+    communicate = edge_tts.Communicate(TEXT, VOICE)
+    communicate.save_sync(OUTPUT_FILE)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/sync_audio_generation_in_async_context.py b/examples/sync_audio_generation_in_async_context.py
new file mode 100644
index 0000000..d159514
--- /dev/null
+++ b/examples/sync_audio_generation_in_async_context.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python3
+
+"""
+This example shows that sync version of save function also works when run from 
+a sync function called itself from an async function.
+The simple implementation of save_sync() with only asyncio.run would fail in this scenario, 
+that's why ThreadPoolExecutor is used in implementation.
+
+"""
+
+import asyncio
+
+import edge_tts
+
+TEXT = "Hello World!"
+VOICE = "en-GB-SoniaNeural"
+OUTPUT_FILE = "test.mp3"
+
+
+def sync_main() -> None:
+    """Main function"""
+    communicate = edge_tts.Communicate(TEXT, VOICE)
+    communicate.save_sync(OUTPUT_FILE)
+
+
+async def amain() -> None:
+    """Main function"""
+    sync_main()
+
+
+if __name__ == "__main__":
+    loop = asyncio.get_event_loop_policy().get_event_loop()
+    try:
+        loop.run_until_complete(amain())
+    finally:
+        loop.close()
diff --git a/examples/sync_audio_stream_in_async_context.py b/examples/sync_audio_stream_in_async_context.py
new file mode 100644
index 0000000..0c64976
--- /dev/null
+++ b/examples/sync_audio_stream_in_async_context.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python3
+
+"""
+This example shows that sync version of string function also works when run from
+a sync function called itself from an async function.
+The simple implementation of stream_sync() with only asyncio.run would fail in this scenario,
+that's why ThreadPoolExecutor is used in implementation.
+
+"""
+
+import asyncio
+
+import edge_tts
+
+TEXT = "Hello World!"
+VOICE = "en-GB-SoniaNeural"
+OUTPUT_FILE = "test.mp3"
+
+
+def main() -> None:
+    """Main function to process audio and metadata synchronously."""
+    communicate = edge_tts.Communicate(TEXT, VOICE)
+    with open(OUTPUT_FILE, "wb") as file:
+        for chunk in communicate.stream_sync():
+            if chunk["type"] == "audio":
+                file.write(chunk["data"])
+            elif chunk["type"] == "WordBoundary":
+                print(f"WordBoundary: {chunk}")
+
+
+async def amain():
+    main()
+
+
+if __name__ == "__main__":
+    loop = asyncio.get_event_loop_policy().get_event_loop()
+    try:
+        loop.run_until_complete(amain())
+    finally:
+        loop.close()
diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py
index 325b0e0..ffa7c0c 100644
--- a/src/edge_tts/communicate.py
+++ b/src/edge_tts/communicate.py
@@ -2,11 +2,13 @@
 Communicate package.
 """
 
+import asyncio
 import json
 import re
 import ssl
 import time
 import uuid
+import concurrent.futures
 from contextlib import nullcontext
 from io import TextIOWrapper
 from typing import (
@@ -21,6 +23,7 @@
     Union,
 )
 from xml.sax.saxutils import escape
+from queue import Queue
 
 import aiohttp
 import certifi
@@ -498,3 +501,38 @@ async def save(
                 ):
                     json.dump(message, metadata)
                     metadata.write("\n")
+
+    def stream_sync(self) -> Generator[Dict[str, Any], None, None]:
+        """Synchronous interface for async stream method"""
+
+        def fetch_async_items(queue):
+            async def get_items():
+                async for item in self.stream():
+                    queue.put(item)
+                queue.put(None)
+
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            loop.run_until_complete(get_items())
+            loop.close()
+
+        queue = Queue()
+
+        with concurrent.futures.ThreadPoolExecutor() as executor:
+            executor.submit(fetch_async_items, queue)
+
+            while True:
+                item = queue.get()
+                if item is None:
+                    break
+                yield item
+
+    def save_sync(
+        self,
+        audio_fname: Union[str, bytes],
+        metadata_fname: Optional[Union[str, bytes]] = None,
+    ) -> None:
+        """Synchronous interface for async save method."""
+        with concurrent.futures.ThreadPoolExecutor() as executor:
+            future = executor.submit(asyncio.run, self.save(audio_fname))
+            future.result()