From e78bb6d2e801a81cfdc2f6adcf4a3826e10afbb7 Mon Sep 17 00:00:00 2001 From: xina Date: Fri, 18 Oct 2019 06:59:41 +0000 Subject: [PATCH 01/10] py3 with ws4py==0.3.2 and tornado==6.0.3 --- kaldigstserver/client.py | 10 +++++----- kaldigstserver/decoder.py | 4 ++-- kaldigstserver/decoder2.py | 14 +++++++------- kaldigstserver/master_server.py | 4 ++-- kaldigstserver/worker.py | 25 +++++++++++++++---------- 5 files changed, 31 insertions(+), 26 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index 9a8e8fe..6fd2fbc 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -6,7 +6,7 @@ import threading import sys import urllib -import Queue +import queue import json import time import os @@ -35,7 +35,7 @@ def __init__(self, audiofile, url, protocols=None, extensions=None, heartbeat_fr self.final_hyps = [] self.audiofile = audiofile self.byterate = byterate - self.final_hyp_queue = Queue.Queue() + self.final_hyp_queue = queue.Queue() self.save_adaptation_state_filename = save_adaptation_state_filename self.send_adaptation_state_filename = send_adaptation_state_filename @@ -55,7 +55,7 @@ def send_data_to_ws(): e = sys.exc_info()[0] print >> sys.stderr, "Failed to send adaptation state: ", e with self.audiofile as audiostream: - for block in iter(lambda: audiostream.read(self.byterate/4), ""): + for block in iter(lambda: audiostream.read(int(self.byterate/4)), ""): self.send_data(block) print >> sys.stderr, "Audio sent, now sending EOS" self.send("EOS") @@ -117,11 +117,11 @@ def main(): - ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.urlencode([("content-type", content_type)])), byterate=args.rate, + ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.parse.urlencode([("content-type", content_type)])), byterate=args.rate, save_adaptation_state_filename=args.save_adaptation_state, send_adaptation_state_filename=args.send_adaptation_state) ws.connect() result = ws.get_full_hyp() - print result + print(result) if __name__ == "__main__": main() diff --git a/kaldigstserver/decoder.py b/kaldigstserver/decoder.py index 811b650..9503c19 100644 --- a/kaldigstserver/decoder.py +++ b/kaldigstserver/decoder.py @@ -11,7 +11,7 @@ GObject.threads_init() Gst.init(None) import logging -import thread +import _thread import os logger = logging.getLogger(__name__) @@ -235,4 +235,4 @@ def cancel(self): #logger.debug("Sending EOS to pipeline") #self.pipeline.send_event(Gst.Event.new_eos()) #self.pipeline.set_state(Gst.State.READY) - logger.info("%s: Cancelled pipeline" % self.request_id) \ No newline at end of file + logger.info("%s: Cancelled pipeline" % self.request_id) diff --git a/kaldigstserver/decoder2.py b/kaldigstserver/decoder2.py index befc152..5e6cbf0 100644 --- a/kaldigstserver/decoder2.py +++ b/kaldigstserver/decoder2.py @@ -11,7 +11,7 @@ GObject.threads_init() Gst.init(None) import logging -import thread +import _thread import os from collections import OrderedDict @@ -83,7 +83,7 @@ def create_pipeline(self, conf): if "model" in decoder_config: decoder_config["model"] = decoder_config.pop("model") - for (key, val) in decoder_config.iteritems(): + for (key, val) in decoder_config.items(): if key != "use-threaded-decoder": logger.info("Setting decoder property: %s = %s" % (key, val)) self.asr.set_property(key, val) @@ -139,17 +139,17 @@ def _connect_decoder(self, element, pad): def _on_partial_result(self, asr, hyp): - logger.info("%s: Got partial result: %s" % (self.request_id, hyp.decode('utf8'))) + logger.info("%s: Got partial result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp.decode('utf8'), False) + self.result_handler(hyp, False) def _on_final_result(self, asr, hyp): - logger.info("%s: Got final result: %s" % (self.request_id, hyp.decode('utf8'))) + logger.info("%s: Got final result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp.decode('utf8'), True) + self.result_handler(hyp, True) def _on_full_final_result(self, asr, result_json): - logger.info("%s: Got full final result: %s" % (self.request_id, result_json.decode('utf8'))) + logger.info("%s: Got full final result: %s" % (self.request_id, result_json)) if self.full_result_handler: self.full_result_handler(result_json) diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index 338b37f..3e2055f 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -15,7 +15,7 @@ import time import threading import functools -from Queue import Queue +from queue import Queue import tornado.ioloop import tornado.options @@ -313,7 +313,7 @@ def on_connection_close(self): def on_message(self, message): assert self.worker is not None logging.info("%s: Forwarding client message (%s) of length %d to worker" % (self.id, type(message), len(message))) - if isinstance(message, unicode): + if isinstance(message, str): self.worker.write_message(message, binary=False) else: self.worker.write_message(message, binary=True) diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index 1f06b70..9576bd4 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -3,7 +3,7 @@ import logging import logging.config import time -import thread +import _thread import threading import os import argparse @@ -18,6 +18,8 @@ import base64 import time +import asyncio +from tornado.platform.asyncio import AnyThreadEventLoopPolicy import tornado.gen import tornado.process import tornado.ioloop @@ -68,6 +70,7 @@ def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=No self.timeout_decoder = 5 self.num_segments = 0 self.last_partial_result = "" + asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) self.post_processor_lock = tornado.locks.Lock() self.processing_condition = tornado.locks.Condition() self.num_processing_threads = 0 @@ -79,6 +82,7 @@ def opened(self): self.last_partial_result = "" def guard_timeout(self): + asyncio.set_event_loop(asyncio.new_event_loop()) global SILENCE_TIMEOUT while self.state in [self.STATE_EOS_RECEIVED, self.STATE_CONNECTED, self.STATE_INITIALIZED, self.STATE_PROCESSING]: if time.time() - self.last_decoder_message > SILENCE_TIMEOUT: @@ -97,13 +101,13 @@ def guard_timeout(self): def received_message(self, m): logger.debug("%s: Got message from server of type %s" % (self.request_id, str(type(m)))) if self.state == self.__class__.STATE_CONNECTED: - props = json.loads(str(m)) + props = json.loads(m.data.decode("utf-8")) content_type = props['content_type'] self.request_id = props['id'] self.num_segments = 0 self.decoder_pipeline.init_request(self.request_id, content_type) self.last_decoder_message = time.time() - thread.start_new_thread(self.guard_timeout, ()) + _thread.start_new_thread(self.guard_timeout, ()) logger.info("%s: Started timeout guard" % self.request_id) logger.info("%s: Initialized request" % self.request_id) self.state = self.STATE_INITIALIZED @@ -177,6 +181,7 @@ def _increment_num_processing(self, delta): def _on_result(self, result, final): try: self._increment_num_processing(1) + if final: # final results are handled by _on_full_result() return @@ -203,16 +208,16 @@ def _on_result(self, result, final): def _on_full_result(self, full_result_json): try: self._increment_num_processing(1) - + self.last_decoder_message = time.time() full_result = json.loads(full_result_json) full_result['segment'] = self.num_segments full_result['id'] = self.request_id if full_result.get("status", -1) == common.STATUS_SUCCESS: - logger.debug(u"%s: Before postprocessing: %s" % (self.request_id, repr(full_result).decode("unicode-escape"))) + logger.debug(u"%s: Before postprocessing: %s" % (self.request_id, repr(full_result))) full_result = yield self.post_process_full(full_result) logger.info("%s: Postprocessing done." % self.request_id) - logger.debug(u"%s: After postprocessing: %s" % (self.request_id, repr(full_result).decode("unicode-escape"))) + logger.debug(u"%s: After postprocessing: %s" % (self.request_id, repr(full_result))) try: self.send(json.dumps(full_result)) @@ -236,7 +241,7 @@ def _on_full_result(self, full_result_json): def _on_word(self, word): try: self._increment_num_processing(1) - + self.last_decoder_message = time.time() if word != "<#s>": if len(self.partial_transcript) > 0: @@ -293,7 +298,7 @@ def send_adaptation_state(self): adaptation_state = self.decoder_pipeline.get_adaptation_state() event = dict(status=common.STATUS_SUCCESS, adaptation_state=dict(id=self.request_id, - value=base64.b64encode(zlib.compress(adaptation_state)), + value=base64.b64encode(zlib.compress(adaptation_state.encode())), type="string+gzip+base64", time=time.strftime("%Y-%m-%dT%H:%M:%S"))) try: @@ -416,8 +421,8 @@ def main(): decoder_pipeline = DecoderPipeline(conf) loop = GObject.MainLoop() - thread.start_new_thread(loop.run, ()) - thread.start_new_thread(main_loop, (args.uri, decoder_pipeline, post_processor, full_post_processor)) + _thread.start_new_thread(loop.run, ()) + _thread.start_new_thread(main_loop, (args.uri, decoder_pipeline, post_processor, full_post_processor)) tornado.ioloop.IOLoop.current().start() From c0913dc42420058cfc5db0db821763acfa7f718c Mon Sep 17 00:00:00 2001 From: xina Date: Mon, 21 Oct 2019 09:42:18 +0000 Subject: [PATCH 02/10] update --- kaldigstserver/master_server.py | 1 - kaldigstserver/worker.py | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index 3e2055f..bb33349 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -148,7 +148,6 @@ def get_final_hyp(self): logging.info("%s: Waiting for final result..." % self.id) return self.final_result_queue.get(block=True) - @tornado.web.asynchronous @tornado.gen.coroutine def end_request(self, *args, **kwargs): logging.info("%s: Handling the end of chunked recognize request" % self.id) diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index 9576bd4..f5e3271 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -190,9 +190,10 @@ def _on_result(self, result, final): return self.last_partial_result = result logger.info("%s: Postprocessing (final=%s) result.." % (self.request_id, final)) - processed_transcripts = yield self.post_process([result], blocking=False) + #processed_transcripts = yield self.post_process([result], blocking=False) + processed_transcripts = result if processed_transcripts: - logger.info("%s: Postprocessing done." % self.request_id) + logger.info("%s: Postprocessing done. [result: %s]" % (self.request_id, processed_transcripts[0])) event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, result=dict(hypotheses=[dict(transcript=processed_transcripts[0])], final=final)) @@ -321,7 +322,7 @@ def post_process(self, texts, blocking=False): with (yield self.post_processor_lock.acquire(timeout)): result = [] for text in texts: - self.post_processor.stdin.write("%s\n" % text.encode("utf-8")) + self.post_processor.stdin.write("%s\n" % text) self.post_processor.stdin.flush() logging.debug("%s: Starting postprocessing: %s" % (self.request_id, text)) text = yield self.post_processor.stdout.read_until('\n') From 698dca29a2b6e31bf77719b80b8696cec5bb1414 Mon Sep 17 00:00:00 2001 From: xina Date: Tue, 22 Oct 2019 08:19:46 +0000 Subject: [PATCH 03/10] update --- kaldigstserver/client.py | 26 +++++++++++--------------- kaldigstserver/master_server.py | 2 +- kaldigstserver/worker.py | 9 ++++----- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index 6fd2fbc..712bcc7 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -44,20 +44,20 @@ def send_data(self, data): self.send(data, binary=True) def opened(self): - #print "Socket opened!" + #print("Socket opened!") def send_data_to_ws(): if self.send_adaptation_state_filename is not None: - print >> sys.stderr, "Sending adaptation state from %s" % self.send_adaptation_state_filename + print("Sending adaptation state from " + self.send_adaptation_state_filename) try: adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r")) self.send(json.dumps(dict(adaptation_state=adaptation_state_props))) except: e = sys.exc_info()[0] - print >> sys.stderr, "Failed to send adaptation state: ", e + print("Failed to send adaptation state: " + e) with self.audiofile as audiostream: for block in iter(lambda: audiostream.read(int(self.byterate/4)), ""): self.send_data(block) - print >> sys.stderr, "Audio sent, now sending EOS" + print("Audio sent, now sending EOS") self.send("EOS") t = threading.Thread(target=send_data_to_ws) @@ -66,37 +66,33 @@ def send_data_to_ws(): def received_message(self, m): response = json.loads(str(m)) - #print >> sys.stderr, "RESPONSE:", response - #print >> sys.stderr, "JSON was:", m if response['status'] == 0: if 'result' in response: - trans = response['result']['hypotheses'][0]['transcript'].encode('utf-8') + trans = response['result']['hypotheses'][0]['transcript'] if response['result']['final']: - #print >> sys.stderr, trans, self.final_hyps.append(trans) - print >> sys.stderr, '\r%s' % trans.replace("\n", "\\n") + print(trans.replace("\n", "\\n"), end="\r") else: print_trans = trans.replace("\n", "\\n") if len(print_trans) > 80: print_trans = "... %s" % print_trans[-76:] - print >> sys.stderr, '\r%s' % print_trans, + print(print_trans, end="\r") if 'adaptation_state' in response: if self.save_adaptation_state_filename: - print >> sys.stderr, "Saving adaptation state to %s" % self.save_adaptation_state_filename + print("Saving adaptation state to " + self.save_adaptation_state_filename) with open(self.save_adaptation_state_filename, "w") as f: f.write(json.dumps(response['adaptation_state'])) else: - print >> sys.stderr, "Received error from server (status %d)" % response['status'] + print("Received error from server (status %d)" % response['status']) if 'message' in response: - print >> sys.stderr, "Error message:", response['message'] + print("Error message:" + response['message']) def get_full_hyp(self, timeout=60): return self.final_hyp_queue.get(timeout) def closed(self, code, reason=None): - #print "Websocket closed() called" - #print >> sys.stderr + #print("Websocket closed() called") self.final_hyp_queue.put(" ".join(self.final_hyps)) diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index bb33349..ae26fdb 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -271,7 +271,7 @@ def send_event(self, event): if len(event_str) > 100: event_str = event_str[:97] + "..." logging.info("%s: Sending event %s to client" % (self.id, event_str)) - self.write_message(json.dumps(event)) + self.write_message(json.dumps(event).replace('False', 'false').replace('\'', '\"')) def open(self): self.id = str(uuid.uuid4()) diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index f5e3271..00e82a2 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -127,7 +127,7 @@ def received_message(self, m): if 'adaptation_state' in props: as_props = props['adaptation_state'] if as_props.get('type', "") == "string+gzip+base64": - adaptation_state = zlib.decompress(base64.b64decode(as_props.get('value', ''))) + adaptation_state = zlib.decompress(base64.b64decode(as_props.get('value', ''))).decode("utf-8") logger.info("%s: Setting adaptation state to user-provided value" % (self.request_id)) self.decoder_pipeline.set_adaptation_state(adaptation_state) else: @@ -190,8 +190,7 @@ def _on_result(self, result, final): return self.last_partial_result = result logger.info("%s: Postprocessing (final=%s) result.." % (self.request_id, final)) - #processed_transcripts = yield self.post_process([result], blocking=False) - processed_transcripts = result + processed_transcripts = yield self.post_process([result], blocking=False) if processed_transcripts: logger.info("%s: Postprocessing done. [result: %s]" % (self.request_id, processed_transcripts[0])) event = dict(status=common.STATUS_SUCCESS, @@ -299,7 +298,7 @@ def send_adaptation_state(self): adaptation_state = self.decoder_pipeline.get_adaptation_state() event = dict(status=common.STATUS_SUCCESS, adaptation_state=dict(id=self.request_id, - value=base64.b64encode(zlib.compress(adaptation_state.encode())), + value=base64.b64encode(zlib.compress(adaptation_state.encode())).decode("utf-8"), type="string+gzip+base64", time=time.strftime("%Y-%m-%dT%H:%M:%S"))) try: @@ -319,7 +318,7 @@ def post_process(self, texts, blocking=False): else: timeout=0.0 try: - with (yield self.post_processor_lock.acquire(timeout)): + with (yield self.post_processor_lock.acquire()): result = [] for text in texts: self.post_processor.stdin.write("%s\n" % text) From 2da84a458db4ac2f57021d77bf7b345555d5f99d Mon Sep 17 00:00:00 2001 From: Xingyu Na Date: Fri, 18 Oct 2019 06:59:41 +0000 Subject: [PATCH 04/10] migrate to py3 --- kaldigstserver/client.py | 36 +++++++++++++++------------------ kaldigstserver/decoder.py | 4 ++-- kaldigstserver/decoder2.py | 14 ++++++------- kaldigstserver/master_server.py | 7 +++---- kaldigstserver/worker.py | 31 ++++++++++++++++------------ 5 files changed, 46 insertions(+), 46 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index 9a8e8fe..712bcc7 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -6,7 +6,7 @@ import threading import sys import urllib -import Queue +import queue import json import time import os @@ -35,7 +35,7 @@ def __init__(self, audiofile, url, protocols=None, extensions=None, heartbeat_fr self.final_hyps = [] self.audiofile = audiofile self.byterate = byterate - self.final_hyp_queue = Queue.Queue() + self.final_hyp_queue = queue.Queue() self.save_adaptation_state_filename = save_adaptation_state_filename self.send_adaptation_state_filename = send_adaptation_state_filename @@ -44,20 +44,20 @@ def send_data(self, data): self.send(data, binary=True) def opened(self): - #print "Socket opened!" + #print("Socket opened!") def send_data_to_ws(): if self.send_adaptation_state_filename is not None: - print >> sys.stderr, "Sending adaptation state from %s" % self.send_adaptation_state_filename + print("Sending adaptation state from " + self.send_adaptation_state_filename) try: adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r")) self.send(json.dumps(dict(adaptation_state=adaptation_state_props))) except: e = sys.exc_info()[0] - print >> sys.stderr, "Failed to send adaptation state: ", e + print("Failed to send adaptation state: " + e) with self.audiofile as audiostream: - for block in iter(lambda: audiostream.read(self.byterate/4), ""): + for block in iter(lambda: audiostream.read(int(self.byterate/4)), ""): self.send_data(block) - print >> sys.stderr, "Audio sent, now sending EOS" + print("Audio sent, now sending EOS") self.send("EOS") t = threading.Thread(target=send_data_to_ws) @@ -66,37 +66,33 @@ def send_data_to_ws(): def received_message(self, m): response = json.loads(str(m)) - #print >> sys.stderr, "RESPONSE:", response - #print >> sys.stderr, "JSON was:", m if response['status'] == 0: if 'result' in response: - trans = response['result']['hypotheses'][0]['transcript'].encode('utf-8') + trans = response['result']['hypotheses'][0]['transcript'] if response['result']['final']: - #print >> sys.stderr, trans, self.final_hyps.append(trans) - print >> sys.stderr, '\r%s' % trans.replace("\n", "\\n") + print(trans.replace("\n", "\\n"), end="\r") else: print_trans = trans.replace("\n", "\\n") if len(print_trans) > 80: print_trans = "... %s" % print_trans[-76:] - print >> sys.stderr, '\r%s' % print_trans, + print(print_trans, end="\r") if 'adaptation_state' in response: if self.save_adaptation_state_filename: - print >> sys.stderr, "Saving adaptation state to %s" % self.save_adaptation_state_filename + print("Saving adaptation state to " + self.save_adaptation_state_filename) with open(self.save_adaptation_state_filename, "w") as f: f.write(json.dumps(response['adaptation_state'])) else: - print >> sys.stderr, "Received error from server (status %d)" % response['status'] + print("Received error from server (status %d)" % response['status']) if 'message' in response: - print >> sys.stderr, "Error message:", response['message'] + print("Error message:" + response['message']) def get_full_hyp(self, timeout=60): return self.final_hyp_queue.get(timeout) def closed(self, code, reason=None): - #print "Websocket closed() called" - #print >> sys.stderr + #print("Websocket closed() called") self.final_hyp_queue.put(" ".join(self.final_hyps)) @@ -117,11 +113,11 @@ def main(): - ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.urlencode([("content-type", content_type)])), byterate=args.rate, + ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.parse.urlencode([("content-type", content_type)])), byterate=args.rate, save_adaptation_state_filename=args.save_adaptation_state, send_adaptation_state_filename=args.send_adaptation_state) ws.connect() result = ws.get_full_hyp() - print result + print(result) if __name__ == "__main__": main() diff --git a/kaldigstserver/decoder.py b/kaldigstserver/decoder.py index 811b650..9503c19 100644 --- a/kaldigstserver/decoder.py +++ b/kaldigstserver/decoder.py @@ -11,7 +11,7 @@ GObject.threads_init() Gst.init(None) import logging -import thread +import _thread import os logger = logging.getLogger(__name__) @@ -235,4 +235,4 @@ def cancel(self): #logger.debug("Sending EOS to pipeline") #self.pipeline.send_event(Gst.Event.new_eos()) #self.pipeline.set_state(Gst.State.READY) - logger.info("%s: Cancelled pipeline" % self.request_id) \ No newline at end of file + logger.info("%s: Cancelled pipeline" % self.request_id) diff --git a/kaldigstserver/decoder2.py b/kaldigstserver/decoder2.py index befc152..5e6cbf0 100644 --- a/kaldigstserver/decoder2.py +++ b/kaldigstserver/decoder2.py @@ -11,7 +11,7 @@ GObject.threads_init() Gst.init(None) import logging -import thread +import _thread import os from collections import OrderedDict @@ -83,7 +83,7 @@ def create_pipeline(self, conf): if "model" in decoder_config: decoder_config["model"] = decoder_config.pop("model") - for (key, val) in decoder_config.iteritems(): + for (key, val) in decoder_config.items(): if key != "use-threaded-decoder": logger.info("Setting decoder property: %s = %s" % (key, val)) self.asr.set_property(key, val) @@ -139,17 +139,17 @@ def _connect_decoder(self, element, pad): def _on_partial_result(self, asr, hyp): - logger.info("%s: Got partial result: %s" % (self.request_id, hyp.decode('utf8'))) + logger.info("%s: Got partial result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp.decode('utf8'), False) + self.result_handler(hyp, False) def _on_final_result(self, asr, hyp): - logger.info("%s: Got final result: %s" % (self.request_id, hyp.decode('utf8'))) + logger.info("%s: Got final result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp.decode('utf8'), True) + self.result_handler(hyp, True) def _on_full_final_result(self, asr, result_json): - logger.info("%s: Got full final result: %s" % (self.request_id, result_json.decode('utf8'))) + logger.info("%s: Got full final result: %s" % (self.request_id, result_json)) if self.full_result_handler: self.full_result_handler(result_json) diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index 338b37f..ae26fdb 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -15,7 +15,7 @@ import time import threading import functools -from Queue import Queue +from queue import Queue import tornado.ioloop import tornado.options @@ -148,7 +148,6 @@ def get_final_hyp(self): logging.info("%s: Waiting for final result..." % self.id) return self.final_result_queue.get(block=True) - @tornado.web.asynchronous @tornado.gen.coroutine def end_request(self, *args, **kwargs): logging.info("%s: Handling the end of chunked recognize request" % self.id) @@ -272,7 +271,7 @@ def send_event(self, event): if len(event_str) > 100: event_str = event_str[:97] + "..." logging.info("%s: Sending event %s to client" % (self.id, event_str)) - self.write_message(json.dumps(event)) + self.write_message(json.dumps(event).replace('False', 'false').replace('\'', '\"')) def open(self): self.id = str(uuid.uuid4()) @@ -313,7 +312,7 @@ def on_connection_close(self): def on_message(self, message): assert self.worker is not None logging.info("%s: Forwarding client message (%s) of length %d to worker" % (self.id, type(message), len(message))) - if isinstance(message, unicode): + if isinstance(message, str): self.worker.write_message(message, binary=False) else: self.worker.write_message(message, binary=True) diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index 1f06b70..495c795 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -3,7 +3,7 @@ import logging import logging.config import time -import thread +import _thread import threading import os import argparse @@ -18,6 +18,8 @@ import base64 import time +import asyncio +from tornado.platform.asyncio import AnyThreadEventLoopPolicy import tornado.gen import tornado.process import tornado.ioloop @@ -68,6 +70,7 @@ def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=No self.timeout_decoder = 5 self.num_segments = 0 self.last_partial_result = "" + asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) self.post_processor_lock = tornado.locks.Lock() self.processing_condition = tornado.locks.Condition() self.num_processing_threads = 0 @@ -79,6 +82,7 @@ def opened(self): self.last_partial_result = "" def guard_timeout(self): + asyncio.set_event_loop(asyncio.new_event_loop()) global SILENCE_TIMEOUT while self.state in [self.STATE_EOS_RECEIVED, self.STATE_CONNECTED, self.STATE_INITIALIZED, self.STATE_PROCESSING]: if time.time() - self.last_decoder_message > SILENCE_TIMEOUT: @@ -97,13 +101,13 @@ def guard_timeout(self): def received_message(self, m): logger.debug("%s: Got message from server of type %s" % (self.request_id, str(type(m)))) if self.state == self.__class__.STATE_CONNECTED: - props = json.loads(str(m)) + props = json.loads(m.data.decode("utf-8")) content_type = props['content_type'] self.request_id = props['id'] self.num_segments = 0 self.decoder_pipeline.init_request(self.request_id, content_type) self.last_decoder_message = time.time() - thread.start_new_thread(self.guard_timeout, ()) + _thread.start_new_thread(self.guard_timeout, ()) logger.info("%s: Started timeout guard" % self.request_id) logger.info("%s: Initialized request" % self.request_id) self.state = self.STATE_INITIALIZED @@ -123,7 +127,7 @@ def received_message(self, m): if 'adaptation_state' in props: as_props = props['adaptation_state'] if as_props.get('type', "") == "string+gzip+base64": - adaptation_state = zlib.decompress(base64.b64decode(as_props.get('value', ''))) + adaptation_state = zlib.decompress(base64.b64decode(as_props.get('value', ''))).decode("utf-8") logger.info("%s: Setting adaptation state to user-provided value" % (self.request_id)) self.decoder_pipeline.set_adaptation_state(adaptation_state) else: @@ -177,6 +181,7 @@ def _increment_num_processing(self, delta): def _on_result(self, result, final): try: self._increment_num_processing(1) + if final: # final results are handled by _on_full_result() return @@ -187,7 +192,7 @@ def _on_result(self, result, final): logger.info("%s: Postprocessing (final=%s) result.." % (self.request_id, final)) processed_transcripts = yield self.post_process([result], blocking=False) if processed_transcripts: - logger.info("%s: Postprocessing done." % self.request_id) + logger.info("%s: Postprocessing done." % (self.request_id)) event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, result=dict(hypotheses=[dict(transcript=processed_transcripts[0])], final=final)) @@ -203,16 +208,16 @@ def _on_result(self, result, final): def _on_full_result(self, full_result_json): try: self._increment_num_processing(1) - + self.last_decoder_message = time.time() full_result = json.loads(full_result_json) full_result['segment'] = self.num_segments full_result['id'] = self.request_id if full_result.get("status", -1) == common.STATUS_SUCCESS: - logger.debug(u"%s: Before postprocessing: %s" % (self.request_id, repr(full_result).decode("unicode-escape"))) + logger.debug(u"%s: Before postprocessing: %s" % (self.request_id, repr(full_result))) full_result = yield self.post_process_full(full_result) logger.info("%s: Postprocessing done." % self.request_id) - logger.debug(u"%s: After postprocessing: %s" % (self.request_id, repr(full_result).decode("unicode-escape"))) + logger.debug(u"%s: After postprocessing: %s" % (self.request_id, repr(full_result))) try: self.send(json.dumps(full_result)) @@ -236,7 +241,7 @@ def _on_full_result(self, full_result_json): def _on_word(self, word): try: self._increment_num_processing(1) - + self.last_decoder_message = time.time() if word != "<#s>": if len(self.partial_transcript) > 0: @@ -293,7 +298,7 @@ def send_adaptation_state(self): adaptation_state = self.decoder_pipeline.get_adaptation_state() event = dict(status=common.STATUS_SUCCESS, adaptation_state=dict(id=self.request_id, - value=base64.b64encode(zlib.compress(adaptation_state)), + value=base64.b64encode(zlib.compress(adaptation_state.encode())).decode("utf-8"), type="string+gzip+base64", time=time.strftime("%Y-%m-%dT%H:%M:%S"))) try: @@ -316,7 +321,7 @@ def post_process(self, texts, blocking=False): with (yield self.post_processor_lock.acquire(timeout)): result = [] for text in texts: - self.post_processor.stdin.write("%s\n" % text.encode("utf-8")) + self.post_processor.stdin.write("%s\n" % text) self.post_processor.stdin.flush() logging.debug("%s: Starting postprocessing: %s" % (self.request_id, text)) text = yield self.post_processor.stdout.read_until('\n') @@ -416,8 +421,8 @@ def main(): decoder_pipeline = DecoderPipeline(conf) loop = GObject.MainLoop() - thread.start_new_thread(loop.run, ()) - thread.start_new_thread(main_loop, (args.uri, decoder_pipeline, post_processor, full_post_processor)) + _thread.start_new_thread(loop.run, ()) + _thread.start_new_thread(main_loop, (args.uri, decoder_pipeline, post_processor, full_post_processor)) tornado.ioloop.IOLoop.current().start() From b7a124c99ee31d991a85533b0e46cefa83f54832 Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Thu, 27 Feb 2020 21:49:47 +0200 Subject: [PATCH 05/10] Migrated worker to use Tornado's websocket client. Postprocessing now works with latest Tornado and Python 3 --- kaldigstserver/client.py | 100 ++++++++++++++--------- kaldigstserver/decoder.py | 23 +++--- kaldigstserver/decoder2.py | 30 ++++--- kaldigstserver/master_server.py | 1 + kaldigstserver/worker.py | 135 +++++++++++++++++--------------- 5 files changed, 168 insertions(+), 121 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index 712bcc7..2a3f515 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -1,7 +1,7 @@ __author__ = 'tanel' import argparse -from ws4py.client.threadedclient import WebSocketClient +#from ws4py.client.threadedclient import WebSocketClient import time import threading import sys @@ -10,68 +10,91 @@ import json import time import os +from tornado.ioloop import IOLoop +from tornado import gen +from tornado.websocket import websocket_connect +from concurrent.futures import ThreadPoolExecutor +from tornado.concurrent import run_on_executor + def rate_limited(maxPerSecond): - minInterval = 1.0 / float(maxPerSecond) + min_interval = 1.0 / float(maxPerSecond) def decorate(func): - lastTimeCalled = [0.0] + last_time_called = [0.0] def rate_limited_function(*args,**kargs): - elapsed = time.clock() - lastTimeCalled[0] - leftToWait = minInterval - elapsed - if leftToWait>0: - time.sleep(leftToWait) + elapsed = time.clock() - last_time_called[0] + left_to_wait = min_interval - elapsed + if left_to_wait > 0: + yield gen.sleep(left_to_wait) ret = func(*args,**kargs) - lastTimeCalled[0] = time.clock() + last_time_called[0] = time.clock() return ret return rate_limited_function return decorate +executor = ThreadPoolExecutor(max_workers=5) -class MyClient(WebSocketClient): +class MyClient(): - def __init__(self, audiofile, url, protocols=None, extensions=None, heartbeat_freq=None, byterate=32000, + def __init__(self, audiofile, url, byterate=32000, save_adaptation_state_filename=None, send_adaptation_state_filename=None): - super(MyClient, self).__init__(url, protocols, extensions, heartbeat_freq) + self.url = url self.final_hyps = [] self.audiofile = audiofile self.byterate = byterate self.final_hyp_queue = queue.Queue() self.save_adaptation_state_filename = save_adaptation_state_filename self.send_adaptation_state_filename = send_adaptation_state_filename - + self.ioloop = IOLoop.instance() + self.run() + self.ioloop.start() + + + @gen.coroutine + def run(self): + self.ws = yield websocket_connect(self.url, on_message_callback=self.received_message) + if self.send_adaptation_state_filename is not None: + print("Sending adaptation state from " + self.send_adaptation_state_filename) + try: + adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r")) + self.ws.write_message(json.dumps(dict(adaptation_state=adaptation_state_props))) + except: + e = sys.exc_info()[0] + print("Failed to send adaptation state: " + e) + with self.audiofile as audiostream: + while True: + block = yield from self.ioloop.run_in_executor(executor, audiostream.read, int(self.byterate/4)) + if block == b"": + break + yield self.send_data(block) + self.ws.write_message("EOS") + + + @gen.coroutine @rate_limited(4) def send_data(self, data): - self.send(data, binary=True) - - def opened(self): - #print("Socket opened!") - def send_data_to_ws(): - if self.send_adaptation_state_filename is not None: - print("Sending adaptation state from " + self.send_adaptation_state_filename) - try: - adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r")) - self.send(json.dumps(dict(adaptation_state=adaptation_state_props))) - except: - e = sys.exc_info()[0] - print("Failed to send adaptation state: " + e) - with self.audiofile as audiostream: - for block in iter(lambda: audiostream.read(int(self.byterate/4)), ""): - self.send_data(block) - print("Audio sent, now sending EOS") - self.send("EOS") - - t = threading.Thread(target=send_data_to_ws) - t.start() + self.ws.write_message(data, binary=True) def received_message(self, m): + if m is None: + #print("Websocket closed() called") + self.final_hyp_queue.put(" ".join(self.final_hyps)) + self.ioloop.stop() + + return + + #print("Received message ...") + #print(str(m) + "\n") response = json.loads(str(m)) + if response['status'] == 0: + #print(response) if 'result' in response: trans = response['result']['hypotheses'][0]['transcript'] if response['result']['final']: self.final_hyps.append(trans) - print(trans.replace("\n", "\\n"), end="\r") + print(trans.replace("\n", "\\n"), end="\n") else: print_trans = trans.replace("\n", "\\n") if len(print_trans) > 80: @@ -91,9 +114,9 @@ def received_message(self, m): def get_full_hyp(self, timeout=60): return self.final_hyp_queue.get(timeout) - def closed(self, code, reason=None): - #print("Websocket closed() called") - self.final_hyp_queue.put(" ".join(self.final_hyps)) + # def closed(self, code, reason=None): + # print("Websocket closed() called") + # self.final_hyp_queue.put(" ".join(self.final_hyps)) def main(): @@ -115,9 +138,10 @@ def main(): ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.parse.urlencode([("content-type", content_type)])), byterate=args.rate, save_adaptation_state_filename=args.save_adaptation_state, send_adaptation_state_filename=args.send_adaptation_state) - ws.connect() + result = ws.get_full_hyp() print(result) + if __name__ == "__main__": main() diff --git a/kaldigstserver/decoder.py b/kaldigstserver/decoder.py index 9503c19..8f5cfd6 100644 --- a/kaldigstserver/decoder.py +++ b/kaldigstserver/decoder.py @@ -13,6 +13,7 @@ import logging import _thread import os +import sys logger = logging.getLogger(__name__) @@ -51,20 +52,22 @@ def create_pipeline(self, conf): self.fakesink = Gst.ElementFactory.make("fakesink", "fakesink") if not self.asr: - print >> sys.stderr, "ERROR: Couldn't create the onlinegmmdecodefaster element!" + print("ERROR: Couldn't create the onlinegmmdecodefaster element!", file=sys.stderr) gst_plugin_path = os.environ.get("GST_PLUGIN_PATH") if gst_plugin_path: - print >> sys.stderr, \ - "Couldn't find onlinegmmdecodefaster element at %s. " \ - "If it's not the right path, try to set GST_PLUGIN_PATH to the right one, and retry. " \ - "You can also try to run the following command: " \ - "'GST_PLUGIN_PATH=%s gst-inspect-1.0 onlinegmmdecodefaster'." \ - % (gst_plugin_path, gst_plugin_path) + print( + "Couldn't find onlinegmmdecodefaster element at %s. " + "If it's not the right path, try to set GST_PLUGIN_PATH to the right one, and retry. " + "You can also try to run the following command: " + "'GST_PLUGIN_PATH=%s gst-inspect-1.0 onlinegmmdecodefaster'." + % (gst_plugin_path, gst_plugin_path), + file=sys.stderr) else: - print >> sys.stderr, \ + print( "The environment variable GST_PLUGIN_PATH wasn't set or it's empty. " \ - "Try to set GST_PLUGIN_PATH environment variable, and retry." - sys.exit(-1); + "Try to set GST_PLUGIN_PATH environment variable, and retry.", + file=sys.stderr) + sys.exit(-1) for (key, val) in conf.get("decoder", {}).iteritems(): logger.info("Setting decoder property: %s = %s" % (key, val)) diff --git a/kaldigstserver/decoder2.py b/kaldigstserver/decoder2.py index 5e6cbf0..2159a39 100644 --- a/kaldigstserver/decoder2.py +++ b/kaldigstserver/decoder2.py @@ -13,14 +13,16 @@ import logging import _thread import os +import sys from collections import OrderedDict +import tornado.ioloop logger = logging.getLogger(__name__) import pdb class DecoderPipeline2(object): - def __init__(self, conf={}): + def __init__(self, ioloop, conf={}): logger.info("Creating decoder using conf: %s" % conf) self.create_pipeline(conf) self.outdir = conf.get("out-dir", None) @@ -35,6 +37,7 @@ def __init__(self, conf={}): self.eos_handler = None self.error_handler = None self.request_id = "" + self.ioloop = ioloop def create_pipeline(self, conf): @@ -51,20 +54,22 @@ def create_pipeline(self, conf): self.fakesink = Gst.ElementFactory.make("fakesink", "fakesink") if not self.asr: - print >> sys.stderr, "ERROR: Couldn't create the kaldinnet2onlinedecoder element!" + print("ERROR: Couldn't create the kaldinnet2onlinedecoder element!", file=sys.stderr) gst_plugin_path = os.environ.get("GST_PLUGIN_PATH") if gst_plugin_path: - print >> sys.stderr, \ + print( "Couldn't find kaldinnet2onlinedecoder element at %s. " \ "If it's not the right path, try to set GST_PLUGIN_PATH to the right one, and retry. " \ "You can also try to run the following command: " \ "'GST_PLUGIN_PATH=%s gst-inspect-1.0 kaldinnet2onlinedecoder'." \ - % (gst_plugin_path, gst_plugin_path) + % (gst_plugin_path, gst_plugin_path), + file=sys.stderr) else: - print >> sys.stderr, \ + print( "The environment variable GST_PLUGIN_PATH wasn't set or it's empty. " \ - "Try to set GST_PLUGIN_PATH environment variable, and retry." - sys.exit(-1); + "Try to set GST_PLUGIN_PATH environment variable, and retry.", + file=sys.stderr) + sys.exit(-1) # This needs to be set first if "use-threaded-decoder" in conf["decoder"]: @@ -141,31 +146,32 @@ def _connect_decoder(self, element, pad): def _on_partial_result(self, asr, hyp): logger.info("%s: Got partial result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp, False) + self.ioloop.add_callback(self.result_handler, hyp, False) def _on_final_result(self, asr, hyp): logger.info("%s: Got final result: %s" % (self.request_id, hyp)) if self.result_handler: - self.result_handler(hyp, True) + self.ioloop.add_callback(self.result_handler, hyp, True) def _on_full_final_result(self, asr, result_json): logger.info("%s: Got full final result: %s" % (self.request_id, result_json)) if self.full_result_handler: - self.full_result_handler(result_json) + self.ioloop.add_callback(self.full_result_handler, result_json) def _on_error(self, bus, msg): self.error = msg.parse_error() logger.error(self.error) self.finish_request() if self.error_handler: - self.error_handler(self.error[0].message) + self.ioloop.add_callback(self.error_handler, self.error[0].message) def _on_eos(self, bus, msg): logger.info('%s: Pipeline received eos signal' % self.request_id) #self.decodebin.unlink(self.audioconvert) self.finish_request() if self.eos_handler: - self.eos_handler[0](self.eos_handler[1]) + self.ioloop.add_callback(self.eos_handler[0], self.eos_handler[1]) + def get_adaptation_state(self): return self.asr.get_property("adaptation-state") diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index ae26fdb..f17b624 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -248,6 +248,7 @@ def on_close(self): logging.info("Worker " + self.__str__() + " leaving") self.application.available_workers.discard(self) if self.client_socket: + logging.info("Closing client connection") self.client_socket.close() self.application.send_status_update() diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index 9d17d07..8c3c01d 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -3,8 +3,6 @@ import logging import logging.config import time -import _thread -import threading import os import argparse from subprocess import Popen, PIPE @@ -24,22 +22,29 @@ import tornado.process import tornado.ioloop import tornado.locks -from ws4py.client.threadedclient import WebSocketClient -import ws4py.messaging +import tornado.websocket +#from ws4py.client.threadedclient import WebSocketClient +#import ws4py.messaging from decoder import DecoderPipeline from decoder2 import DecoderPipeline2 import common +from concurrent.futures import ThreadPoolExecutor +from tornado.concurrent import run_on_executor + + logger = logging.getLogger(__name__) +executor = ThreadPoolExecutor(max_workers=5) + CONNECT_TIMEOUT = 5 SILENCE_TIMEOUT = 5 USE_NNET2 = False -class ServerWebsocket(WebSocketClient): +class Worker(): STATE_CREATED = 0 STATE_CONNECTED = 1 STATE_INITIALIZED = 2 @@ -53,7 +58,6 @@ def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=No self.decoder_pipeline = decoder_pipeline self.post_processor = post_processor self.full_post_processor = full_post_processor - WebSocketClient.__init__(self, url=uri, heartbeat_freq=10) self.pipeline_initialized = False self.partial_transcript = "" if USE_NNET2: @@ -65,24 +69,31 @@ def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=No self.decoder_pipeline.set_error_handler(self._on_error) self.decoder_pipeline.set_eos_handler(self._on_eos) self.state = self.STATE_CREATED - self.last_decoder_message = time.time() self.request_id = "" self.timeout_decoder = 5 self.num_segments = 0 self.last_partial_result = "" - asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) self.post_processor_lock = tornado.locks.Lock() self.processing_condition = tornado.locks.Condition() self.num_processing_threads = 0 - - def opened(self): + @tornado.gen.coroutine + def connect_and_run(self): + logger.info("Opening websocket connection to master server") + self.ws = yield tornado.websocket.websocket_connect(self.uri, ping_interval=10) logger.info("Opened websocket connection to server") self.state = self.STATE_CONNECTED self.last_partial_result = "" + self.last_decoder_message = time.time() + while True: + msg = yield self.ws.read_message() + self.received_message(msg) + if msg is None: + self.closed() + break + logger.info("Finished decoding run") def guard_timeout(self): - asyncio.set_event_loop(asyncio.new_event_loop()) global SILENCE_TIMEOUT while self.state in [self.STATE_EOS_RECEIVED, self.STATE_CONNECTED, self.STATE_INITIALIZED, self.STATE_PROCESSING]: if time.time() - self.last_decoder_message > SILENCE_TIMEOUT: @@ -90,10 +101,10 @@ def guard_timeout(self): self.finish_request() event = dict(status=common.STATUS_NO_SPEECH) try: - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) except: logger.warning("%s: Failed to send error event to master" % (self.request_id)) - self.close() + self.ws.close() return logger.debug("%s: Checking that decoder hasn't been silent for more than %d seconds" % (self.request_id, SILENCE_TIMEOUT)) time.sleep(1) @@ -101,17 +112,17 @@ def guard_timeout(self): def received_message(self, m): logger.debug("%s: Got message from server of type %s" % (self.request_id, str(type(m)))) if self.state == self.__class__.STATE_CONNECTED: - props = json.loads(m.data.decode("utf-8")) + props = json.loads(m) content_type = props['content_type'] self.request_id = props['id'] self.num_segments = 0 self.decoder_pipeline.init_request(self.request_id, content_type) self.last_decoder_message = time.time() - _thread.start_new_thread(self.guard_timeout, ()) + tornado.ioloop.IOLoop.current().run_in_executor(executor, self.guard_timeout) logger.info("%s: Started timeout guard" % self.request_id) logger.info("%s: Initialized request" % self.request_id) self.state = self.STATE_INITIALIZED - elif m.data == "EOS": + elif m == "EOS": if self.state != self.STATE_CANCELLING and self.state != self.STATE_EOS_RECEIVED and self.state != self.STATE_FINISHED: self.decoder_pipeline.end_request() self.state = self.STATE_EOS_RECEIVED @@ -119,10 +130,10 @@ def received_message(self, m): logger.info("%s: Ignoring EOS, worker already in state %d" % (self.request_id, self.state)) else: if self.state != self.STATE_CANCELLING and self.state != self.STATE_EOS_RECEIVED and self.state != self.STATE_FINISHED: - if isinstance(m, ws4py.messaging.BinaryMessage): - self.decoder_pipeline.process_data(m.data) + if isinstance(m, bytes): + self.decoder_pipeline.process_data(m) self.state = self.STATE_PROCESSING - elif isinstance(m, ws4py.messaging.TextMessage): + elif isinstance(m, str): props = json.loads(str(m)) if 'adaptation_state' in props: as_props = props['adaptation_state'] @@ -167,7 +178,7 @@ def finish_request(self): logger.info("%s: Finished waiting for EOS" % self.request_id) - def closed(self, code, reason=None): + def closed(self): logger.debug("%s: Websocket closed() called" % self.request_id) self.finish_request() logger.debug("%s: Websocket closed() finished" % self.request_id) @@ -197,7 +208,7 @@ def _on_result(self, result, final): segment=self.num_segments, result=dict(hypotheses=[dict(transcript=processed_transcripts[0])], final=final)) try: - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) except: e = sys.exc_info()[1] logger.warning("Failed to send event to master: %s" % e) @@ -220,7 +231,7 @@ def _on_full_result(self, full_result_json): logger.debug(u"%s: After postprocessing: %s" % (self.request_id, repr(full_result))) try: - self.send(json.dumps(full_result)) + self.ws.write_message(json.dumps(full_result)) except: e = sys.exc_info()[1] logger.warning("Failed to send event to master: %s" % e) @@ -230,7 +241,7 @@ def _on_full_result(self, full_result_json): else: logger.info("%s: Result status is %d, forwarding the result to the server anyway" % (self.request_id, full_result.get("status", -1))) try: - self.send(json.dumps(full_result)) + self.ws.write_message(json.dumps(full_result)) except: e = sys.exc_info()[1] logger.warning("Failed to send event to master: %s" % e) @@ -255,7 +266,7 @@ def _on_word(self, word): event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, result=dict(hypotheses=[dict(transcript=processed_transcript)], final=False)) - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) else: logger.info("%s: Postprocessing final result.." % self.request_id) processed_transcript = (yield self.post_process(self.partial_transcript, blocking=True)) @@ -263,7 +274,7 @@ def _on_word(self, word): event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, result=dict(hypotheses=[dict(transcript=processed_transcript)], final=True)) - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) self.partial_transcript = "" self.num_segments += 1 finally: @@ -280,17 +291,17 @@ def _on_eos(self, data=None): self.state = self.STATE_FINISHED self.send_adaptation_state() - self.close() + self.ws.close() def _on_error(self, error): self.state = self.STATE_FINISHED event = dict(status=common.STATUS_NOT_ALLOWED, message=error) try: - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) except: e = sys.exc_info()[1] logger.warning("Failed to send event to master: %s" % e) - self.close() + self.ws.close() def send_adaptation_state(self): if hasattr(self.decoder_pipeline, 'get_adaptation_state'): @@ -302,7 +313,7 @@ def send_adaptation_state(self): type="string+gzip+base64", time=time.strftime("%Y-%m-%dT%H:%M:%S"))) try: - self.send(json.dumps(event)) + self.ws.write_message(json.dumps(event)) except: e = sys.exc_info()[1] logger.warning("Failed to send event to master: " + str(e)) @@ -312,30 +323,33 @@ def send_adaptation_state(self): @tornado.gen.coroutine def post_process(self, texts, blocking=False): if self.post_processor: - logging.debug("%s: Waiting for postprocessor lock" % self.request_id) + logging.debug("%s: Waiting for postprocessor lock with blocking=%d" % (self.request_id, blocking)) if blocking: - timeout=None + timeout = None else: - timeout=0.0 + timeout = 0.1 try: - with (yield self.post_processor_lock.acquire()): + with (yield self.post_processor_lock.acquire(timeout)): result = [] for text in texts: - self.post_processor.stdin.write("%s\n" % text) - self.post_processor.stdin.flush() - logging.debug("%s: Starting postprocessing: %s" % (self.request_id, text)) - text = yield self.post_processor.stdout.read_until('\n') - text = text.decode("utf-8") - logging.debug("%s: Postprocessing returned: %s" % (self.request_id, text)) - text = text.strip() - text = text.replace("\\n", "\n") - result.append(text) - raise tornado.gen.Return(result) - except tornado.gen.TimeoutError: - logging.debug("%s: Skipping postprocessing since post-processor already in use" % (self.request_id)) - raise tornado.gen.Return(None) + try: + logging.debug("%s: Starting postprocessing: %s" % (self.request_id, text)) + self.post_processor.stdin.write((text + "\n").encode("utf-8")) + self.post_processor.stdin.flush() + logging.debug("%s: Reading from postpocessor" % (self.request_id)) + text = yield self.post_processor.stdout.read_until(b'\n') + text = text.decode("utf-8").strip() + logging.debug("%s: Postprocessing returned: %s" % (self.request_id, text)) + text = text.replace("\\n", "\n") + result.append(text) + except Exception as ex: + logging.exception("Error when postprocessing") + return result + except tornado.util.TimeoutError: + logging.info("%s: Skipping postprocessing since post-processor already in use" % (self.request_id)) + return None else: - raise tornado.gen.Return(texts) + return texts @tornado.gen.coroutine def post_process_full(self, full_result): @@ -359,20 +373,19 @@ def post_process_full(self, full_result): for (i, hyp) in enumerate(full_result.get("result", {}).get("hypotheses", [])): hyp["original-transcript"] = hyp["transcript"] hyp["transcript"] = processed_transcripts[i] - raise tornado.gen.Return(full_result) + return full_result +@tornado.gen.coroutine def main_loop(uri, decoder_pipeline, post_processor, full_post_processor=None): while True: - ws = ServerWebsocket(uri, decoder_pipeline, post_processor, full_post_processor=full_post_processor) - try: - logger.info("Opening websocket connection to master server") - ws.connect() - ws.run_forever() + worker = Worker(uri, decoder_pipeline, post_processor, full_post_processor=full_post_processor) + try: + yield worker.connect_and_run() except Exception: logger.error("Couldn't connect to server, waiting for %d seconds", CONNECT_TIMEOUT) - time.sleep(CONNECT_TIMEOUT) + yield tornado.gen.sleep(CONNECT_TIMEOUT) # fixes a race condition - time.sleep(1) + yield tornado.gen.sleep(1) @@ -403,7 +416,7 @@ def main(): post_processor = None if "post-processor" in conf: STREAM = tornado.process.Subprocess.STREAM - post_processor = tornado.process.Subprocess(conf["post-processor"], shell=True, stdin=PIPE, stdout=STREAM) + post_processor = tornado.process.Subprocess(conf["post-processor"], shell=True, stdin=PIPE, stdout=STREAM, ) full_post_processor = None @@ -416,13 +429,13 @@ def main(): global SILENCE_TIMEOUT SILENCE_TIMEOUT = conf.get("silence-timeout", 5) if USE_NNET2: - decoder_pipeline = DecoderPipeline2(conf) + decoder_pipeline = DecoderPipeline2(tornado.ioloop.IOLoop.current(), conf) else: - decoder_pipeline = DecoderPipeline(conf) + decoder_pipeline = DecoderPipeline(tornado.ioloop.IOLoop.current(), conf) - loop = GObject.MainLoop() - _thread.start_new_thread(loop.run, ()) - _thread.start_new_thread(main_loop, (args.uri, decoder_pipeline, post_processor, full_post_processor)) + gobject_loop = GObject.MainLoop() + tornado.ioloop.IOLoop.current().run_in_executor(executor, gobject_loop.run) + tornado.ioloop.IOLoop.current().spawn_callback(main_loop, args.uri, decoder_pipeline, post_processor, full_post_processor) tornado.ioloop.IOLoop.current().start() From d44b46acfa80d3b166b5c742d27f389d20022052 Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Fri, 6 Mar 2020 15:04:40 +0200 Subject: [PATCH 06/10] Made GMM decoder work with Python3 and new Tornado --- kaldigstserver/decoder.py | 14 +++++++------- kaldigstserver/worker.py | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kaldigstserver/decoder.py b/kaldigstserver/decoder.py index 8f5cfd6..d61b644 100644 --- a/kaldigstserver/decoder.py +++ b/kaldigstserver/decoder.py @@ -20,8 +20,9 @@ import pdb class DecoderPipeline(object): - def __init__(self, conf={}): + def __init__(self, ioloop, conf={}): logger.info("Creating decoder using conf: %s" % conf) + self.ioloop = ioloop self.use_cutter = conf.get("use-vad", False) self.create_pipeline(conf) self.outdir = conf.get("out-dir", None) @@ -69,7 +70,7 @@ def create_pipeline(self, conf): file=sys.stderr) sys.exit(-1) - for (key, val) in conf.get("decoder", {}).iteritems(): + for (key, val) in conf.get("decoder", {}).items(): logger.info("Setting decoder property: %s = %s" % (key, val)) self.asr.set_property(key, val) @@ -151,23 +152,22 @@ def _on_element_message(self, bus, message): self.asr.set_property("silent", True) def _on_word(self, asr, word): - logger.info("%s: Got word: %s" % (self.request_id, word.decode('utf8'))) + logger.info("%s: Got word: %s" % (self.request_id, word)) if self.word_handler: - self.word_handler(word) - + self.ioloop.add_callback(self.word_handler, word) def _on_error(self, bus, msg): self.error = msg.parse_error() logger.error(self.error) self.finish_request() if self.error_handler: - self.error_handler(self.error[0].message) + self.ioloop.add_callback(self.error_handler, self.error[0].message) def _on_eos(self, bus, msg): logger.info('%s: Pipeline received eos signal' % self.request_id) self.finish_request() if self.eos_handler: - self.eos_handler[0](self.eos_handler[1]) + self.ioloop.add_callback(self.eos_handler[0], self.eos_handler[1]) def finish_request(self): logger.info('%s: Finishing request' % self.request_id) diff --git a/kaldigstserver/worker.py b/kaldigstserver/worker.py index 8c3c01d..c2f95dd 100644 --- a/kaldigstserver/worker.py +++ b/kaldigstserver/worker.py @@ -259,21 +259,21 @@ def _on_word(self, word): self.partial_transcript += " " self.partial_transcript += word logger.debug("%s: Postprocessing partial result.." % self.request_id) - processed_transcript = (yield self.post_process([self.partial_transcript], blocking=False))[0] - if processed_transcript: + processed_transcripts = (yield self.post_process([self.partial_transcript], blocking=False)) + if processed_transcripts: logger.debug("%s: Postprocessing done." % self.request_id) event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, - result=dict(hypotheses=[dict(transcript=processed_transcript)], final=False)) + result=dict(hypotheses=[dict(transcript=processed_transcripts[0])], final=False)) self.ws.write_message(json.dumps(event)) else: logger.info("%s: Postprocessing final result.." % self.request_id) - processed_transcript = (yield self.post_process(self.partial_transcript, blocking=True)) + processed_transcripts = (yield self.post_process([self.partial_transcript], blocking=True)) logger.info("%s: Postprocessing done." % self.request_id) event = dict(status=common.STATUS_SUCCESS, segment=self.num_segments, - result=dict(hypotheses=[dict(transcript=processed_transcript)], final=True)) + result=dict(hypotheses=[dict(transcript=processed_transcripts[0])], final=True)) self.ws.write_message(json.dumps(event)) self.partial_transcript = "" self.num_segments += 1 From b7cbe6671ea820e2e9e9cbaeef6ef462beec16ee Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Fri, 6 Mar 2020 17:26:55 +0200 Subject: [PATCH 07/10] Made HTTP requests work with Tornado 6 --- README.md | 17 +++++++---------- kaldigstserver/master_server.py | 34 ++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index a7733fa..1f43b53 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,9 @@ Estonian demo: http://bark.phon.ioc.ee/dikteeri/ Changelog --------- + * 2020-03-06: Quite big changes. Upgraded to Python 3 (Python 2.7 is not upported any more). Also, migrated to + use Tornado 6. Also, worker.py and client.py now uses Tornado's websocket client and ws4py is not needed any more. + Post-processing should also work fine. * 2019-06-17: The postprocessing mechanism doesn't work properly with Tornado 5+. Use Tornado 4.5.3 if you need it. * 2018-04-25: Server should now work with Tornado 5 (thanks to @Gastron). If using Python 2, you might need to install the `futures` package (`pip install futures`). * 2017-12-27: Somewhat big changes in the way post-processor is invoked. The problem was that in some use cases, the program that is used for @@ -69,19 +72,13 @@ all the prerequisites manually, one could use the Dockerfile created by José Ed ### Requirements -#### Python 2.7 with the following packages: +#### Python 3.5.2 or newer with the following packages: - * Tornado 4, see http://www.tornadoweb.org/en/stable/ - * ws4py (0.3.0 .. 0.3.2) + * Tornado 6, see http://www.tornadoweb.org/en/stable/ * YAML * JSON -*NB!*: The server doesn't work quite correctly with ws4py 0.3.5 because of a bug I reported here: https://github.com/Lawouach/WebSocket-for-Python/issues/152. -Use ws4py 0.3.2 instead. To install ws4py 0.3.2 using `pip`, run: - - pip install ws4py==0.3.2 - -In addition, you need Python 2.x bindings for gobject-introspection libraries, provided by the `python-gi` +In addition, you need Python bindings for gobject-introspection libraries, provided by the `python-gi` package on Debian and Ubuntu. #### Kaldi @@ -98,7 +95,7 @@ English models are based on Voxforge acoustic models and the CMU Sphinx 2013 ge The language models were heavily pruned so that the resulting FST cascade would be less than the 100 MB GitHub file size limit. -*Update:* the server also supports Kaldi's new "online2" online decoder that uses DNN-based acoustic models with i-vector input. See below on +*Update:* the server also supports Kaldi's "online2" online decoder that uses DNN-based acoustic models with i-vector input. See below on how to use it. According to experiments on two Estonian online decoding setups, the DNN-based models result in about 20% (or more) relatively less errors than GMM-based models (e.g., WER dropped from 13% to 9%). diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index f17b624..0271454 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -15,7 +15,7 @@ import time import threading import functools -from queue import Queue +from tornado.locks import Condition import tornado.ioloop import tornado.options @@ -102,11 +102,10 @@ class HttpChunkedRecognizeHandler(tornado.web.RequestHandler): Provides a HTTP POST/PUT interface supporting chunked transfer requests, similar to that provided by http://github.com/alumae/ruby-pocketsphinx-server. """ - def prepare(self): self.id = str(uuid.uuid4()) self.final_hyp = "" - self.final_result_queue = Queue() + self.worker_done = Condition() self.user_id = self.request.headers.get("device-id", "none") self.content_id = self.request.headers.get("content-id", "none") logging.info("%s: OPEN: user='%s', content='%s'" % (self.id, self.user_id, self.content_id)) @@ -114,7 +113,7 @@ def prepare(self): self.error_status = 0 self.error_message = None #Waiter thread for final hypothesis: - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + #self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) try: self.worker = self.application.available_workers.pop() self.application.send_status_update() @@ -132,32 +131,35 @@ def prepare(self): self.set_status(503) self.finish("No workers available") + @tornado.gen.coroutine def data_received(self, chunk): assert self.worker is not None logging.debug("%s: Forwarding client message of length %d to worker" % (self.id, len(chunk))) self.worker.write_message(chunk, binary=True) - + + @tornado.gen.coroutine def post(self, *args, **kwargs): - self.end_request(args, kwargs) + yield self.end_request(args, kwargs) + @tornado.gen.coroutine def put(self, *args, **kwargs): - self.end_request(args, kwargs) + yield self.end_request(args, kwargs) - @tornado.concurrent.run_on_executor + @tornado.gen.coroutine def get_final_hyp(self): logging.info("%s: Waiting for final result..." % self.id) - return self.final_result_queue.get(block=True) + yield self.final_result_queue.get() @tornado.gen.coroutine def end_request(self, *args, **kwargs): logging.info("%s: Handling the end of chunked recognize request" % self.id) assert self.worker is not None - self.worker.write_message("EOS", binary=True) - logging.info("%s: yielding..." % self.id) - hyp = yield self.get_final_hyp() + self.worker.write_message("EOS", binary=False) + logging.info("%s: Waiting for worker to finish" % self.id) + yield self.worker_done.wait() if self.error_status == 0: - logging.info("%s: Final hyp: %s" % (self.id, hyp)) - response = {"status" : 0, "id": self.id, "hypotheses": [{"utterance" : hyp}]} + logging.info("%s: Final hyp: %s" % (self.id, self.final_hyp)) + response = {"status" : 0, "id": self.id, "hypotheses": [{"utterance" : self.final_hyp}]} self.write(response) else: logging.info("%s: Error (status=%d) processing HTTP request: %s" % (self.id, self.error_status, self.error_message)) @@ -170,6 +172,7 @@ def end_request(self, *args, **kwargs): self.finish() logging.info("Everything done") + @tornado.gen.coroutine def send_event(self, event): event_str = str(event) if len(event_str) > 100: @@ -188,9 +191,10 @@ def send_event(self, event): self.error_status = event["status"] self.error_message = event.get("message", "") + @tornado.gen.coroutine def close(self): logging.info("%s: Receiving 'close' from worker" % (self.id)) - self.final_result_queue.put(self.final_hyp) + self.worker_done.notify() class ReferenceHandler(tornado.web.RequestHandler): From f68cab490be7eb0da2af1475fbc16655f50a60cb Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Fri, 6 Mar 2020 17:28:58 +0200 Subject: [PATCH 08/10] Made HTTP requests work with Tornado 6 --- kaldigstserver/master_server.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kaldigstserver/master_server.py b/kaldigstserver/master_server.py index 0271454..a50967d 100644 --- a/kaldigstserver/master_server.py +++ b/kaldigstserver/master_server.py @@ -145,11 +145,6 @@ def post(self, *args, **kwargs): def put(self, *args, **kwargs): yield self.end_request(args, kwargs) - @tornado.gen.coroutine - def get_final_hyp(self): - logging.info("%s: Waiting for final result..." % self.id) - yield self.final_result_queue.get() - @tornado.gen.coroutine def end_request(self, *args, **kwargs): logging.info("%s: Handling the end of chunked recognize request" % self.id) From ffa517b1e4561bf2efe14a80414d140e38c87068 Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Mon, 15 Jun 2020 10:53:44 +0300 Subject: [PATCH 09/10] Fix stdin processing, due to argparse ignoring the 'rb' FileType under Python3 --- kaldigstserver/client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index 2a3f515..bb7893b 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -61,6 +61,11 @@ def run(self): except: e = sys.exc_info()[0] print("Failed to send adaptation state: " + e) + + # In Python 3, stdin is always opened as text by argparse + if type(self.audiofile).__name__ == 'TextIOWrapper': + self.audiofile = self.audiofile.buffer + with self.audiofile as audiostream: while True: block = yield from self.ioloop.run_in_executor(executor, audiostream.read, int(self.byterate/4)) @@ -127,15 +132,13 @@ def main(): parser.add_argument('--save-adaptation-state', help="Save adaptation state to file") parser.add_argument('--send-adaptation-state', help="Send adaptation state from file") parser.add_argument('--content-type', default='', help="Use the specified content type (empty by default, for raw files the default is audio/x-raw, layout=(string)interleaved, rate=(int), format=(string)S16LE, channels=(int)1") - parser.add_argument('audiofile', help="Audio file to be sent to the server", type=argparse.FileType('rb'), default=sys.stdin) + parser.add_argument('audiofile', help="Audio file to be sent to the server", type=argparse.FileType('rb')) args = parser.parse_args() content_type = args.content_type if content_type == '' and args.audiofile.name.endswith(".raw"): content_type = "audio/x-raw, layout=(string)interleaved, rate=(int)%d, format=(string)S16LE, channels=(int)1" %(args.rate/2) - - ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.parse.urlencode([("content-type", content_type)])), byterate=args.rate, save_adaptation_state_filename=args.save_adaptation_state, send_adaptation_state_filename=args.send_adaptation_state) From f79e204d751a5964918001822e4520fa2acfd246 Mon Sep 17 00:00:00 2001 From: "Tanel.Alumae" Date: Tue, 13 Oct 2020 16:50:53 +0300 Subject: [PATCH 10/10] Make compatible to python 3.8 --- kaldigstserver/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kaldigstserver/client.py b/kaldigstserver/client.py index bb7893b..9985d65 100644 --- a/kaldigstserver/client.py +++ b/kaldigstserver/client.py @@ -22,12 +22,12 @@ def rate_limited(maxPerSecond): def decorate(func): last_time_called = [0.0] def rate_limited_function(*args,**kargs): - elapsed = time.clock() - last_time_called[0] + elapsed = time.perf_counter() - last_time_called[0] left_to_wait = min_interval - elapsed if left_to_wait > 0: yield gen.sleep(left_to_wait) ret = func(*args,**kargs) - last_time_called[0] = time.clock() + last_time_called[0] = time.perf_counter() return ret return rate_limited_function return decorate