-
Notifications
You must be signed in to change notification settings - Fork 12
/
barkTTS_server.py
149 lines (126 loc) · 5.1 KB
/
barkTTS_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#Add to NGINX
# location ~ /send-string {
# proxy_pass http://127.0.0.1:8080$request_uri;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# }
import datetime
from bark import SAMPLE_RATE, generate_audio, preload_models
from scipy.io.wavfile import write as write_wav
import numpy as np
import nltk
from http.server import HTTPServer, BaseHTTPRequestHandler
# nltk.download('punkt')
preload_models(
text_use_small=True,
coarse_use_small=True,
fine_use_small=True,
codec_use_gpu=False
)
# Set up sample rate (importing instead atm)
# SAMPLE_RATE = 22050
# Set a History Prompt (buggy)
HISTORY_PROMPT = "en_speaker_3"
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
# Add these lines to allow cross-origin requests
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Access-Control-Allow-Headers')
self.end_headers()
# Get the user input from the request body
content_length = int(self.headers['Content-Length'])
user_input = self.rfile.read(content_length).decode('utf-8')
long_string = user_input
# Tokenize to split strink into chunks for processing
sentences = nltk.sent_tokenize(long_string)
chunks = ['']
token_counter = 0
for sentence in sentences:
current_tokens = len(nltk.Text(sentence))
if token_counter + current_tokens <= 250:
token_counter = token_counter + current_tokens
chunks[-1] = chunks[-1] + " " + sentence
else:
chunks.append(sentence)
token_counter = current_tokens
# Generate audio for each prompt
audio_arrays = []
for prompt in chunks:
audio_array = generate_audio(prompt,history_prompt=HISTORY_PROMPT)
# audio_array = generate_audio(prompt)
audio_arrays.append(audio_array)
# Combine the audio files
combined_audio = np.concatenate(audio_arrays)
# Write the combined audio to a file
# timestamp_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# filename = f"Bark_audio_{timestamp_str}.wav"
filename = './audio/bark_audio.wav'
write_wav(filename, SAMPLE_RATE, combined_audio)
# play audio using playsound
# playsound.playsound(filename)
# Send a response back to the client
self.send_response(200)
# self.send_header('Content-type', 'text/plain')
self.send_header('Content-type', 'audio/wav')
self.end_headers()
# self.wfile.write(b'String sent to Bark\n')
with open(filename, 'rb') as f:
self.wfile.write(f.read())
def do_OPTIONS(self):
self.send_response(200, "ok")
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'X-Requested-With')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_GET(self):
# Check the path of the request
if self.path == '/':
# Return the index.html page
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
with open('index.html', 'rb') as f:
self.wfile.write(f.read())
elif self.path == '/audio/bark_audio.wav':
# Return the audio file
self.send_response(200)
self.send_header('Content-type', 'audio/wav')
self.end_headers()
with open('./audio/bark_audio.wav', 'rb') as f:
self.wfile.write(f.read())
# Return the audio blob
#audio_blob = get_audio_blob() # function to get the audio blob
#self.send_response(206)
#self.send_header('Content-type', 'audio/wav')
#self.send_header('Content-Range', 'bytes 0-{0}/{0}'.format(len(audio_blob)-1))
#self.send_header('Content-Length', str(len(audio_blob)))
#self.end_headers()
#self.wfile.write(audio_blob)
else:
# Return a 404 error
self.send_error(404, 'File Not Found')
def do_DELETE(self):
# Set CORS headers
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
# self.send_header('Access-Control-Allow-Methods', 'POST, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Access-Control-Allow-Headers')
self.end_headers()
# Get the path of the requested file
file_path = '.' + self.path
# Check if the file exists
if os.path.exists(file_path):
os.remove(file_path)
self.send_response(200)
self.end_headers()
self.wfile.write(b'File deleted')
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b'File not found')
# Start the HTTP server
httpd = HTTPServer(('localhost', 8080), RequestHandler)
httpd.serve_forever()