Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Python 3, Tornado 6 #227

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
17 changes: 7 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Estonian demo: http://bark.phon.ioc.ee/dikteeri/

Changelog
---------
* 2020-03-06: Quite big changes. Upgraded to Python 3 (Python 2.7 is not upported any more). Also, migrated to
use Tornado 6. Also, worker.py and client.py now uses Tornado's websocket client and ws4py is not needed any more.
Post-processing should also work fine.
* 2019-06-17: The postprocessing mechanism doesn't work properly with Tornado 5+. Use Tornado 4.5.3 if you need it.
* 2018-04-25: Server should now work with Tornado 5 (thanks to @Gastron). If using Python 2, you might need to install the `futures` package (`pip install futures`).
* 2017-12-27: Somewhat big changes in the way post-processor is invoked. The problem was that in some use cases, the program that is used for
Expand Down Expand Up @@ -69,19 +72,13 @@ all the prerequisites manually, one could use the Dockerfile created by José Ed

### Requirements

#### Python 2.7 with the following packages:
#### Python 3.5.2 or newer with the following packages:

* Tornado 4, see http://www.tornadoweb.org/en/stable/
* ws4py (0.3.0 .. 0.3.2)
* Tornado 6, see http://www.tornadoweb.org/en/stable/
* YAML
* JSON

*NB!*: The server doesn't work quite correctly with ws4py 0.3.5 because of a bug I reported here: https://github.com/Lawouach/WebSocket-for-Python/issues/152.
Use ws4py 0.3.2 instead. To install ws4py 0.3.2 using `pip`, run:

pip install ws4py==0.3.2

In addition, you need Python 2.x bindings for gobject-introspection libraries, provided by the `python-gi`
In addition, you need Python bindings for gobject-introspection libraries, provided by the `python-gi`
package on Debian and Ubuntu.

#### Kaldi
Expand All @@ -98,7 +95,7 @@ English models are based on Voxforge acoustic models and the CMU Sphinx 2013 ge
The language models were heavily pruned so that the resulting FST cascade would be less than the
100 MB GitHub file size limit.

*Update:* the server also supports Kaldi's new "online2" online decoder that uses DNN-based acoustic models with i-vector input. See below on
*Update:* the server also supports Kaldi's "online2" online decoder that uses DNN-based acoustic models with i-vector input. See below on
how to use it. According to experiments on two Estonian online decoding setups, the DNN-based models result in about 20% (or more) relatively less
errors than GMM-based models (e.g., WER dropped from 13% to 9%).

Expand Down
131 changes: 77 additions & 54 deletions kaldigstserver/client.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,127 @@
__author__ = 'tanel'

import argparse
from ws4py.client.threadedclient import WebSocketClient
#from ws4py.client.threadedclient import WebSocketClient
import time
import threading
import sys
import urllib
import Queue
import queue
import json
import time
import os
from tornado.ioloop import IOLoop
from tornado import gen
from tornado.websocket import websocket_connect
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor


def rate_limited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
min_interval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
last_time_called = [0.0]
def rate_limited_function(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
elapsed = time.perf_counter() - last_time_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
yield gen.sleep(left_to_wait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
last_time_called[0] = time.perf_counter()
return ret
return rate_limited_function
return decorate

executor = ThreadPoolExecutor(max_workers=5)

class MyClient(WebSocketClient):
class MyClient():

def __init__(self, audiofile, url, protocols=None, extensions=None, heartbeat_freq=None, byterate=32000,
def __init__(self, audiofile, url, byterate=32000,
save_adaptation_state_filename=None, send_adaptation_state_filename=None):
super(MyClient, self).__init__(url, protocols, extensions, heartbeat_freq)
self.url = url
self.final_hyps = []
self.audiofile = audiofile
self.byterate = byterate
self.final_hyp_queue = Queue.Queue()
self.final_hyp_queue = queue.Queue()
self.save_adaptation_state_filename = save_adaptation_state_filename
self.send_adaptation_state_filename = send_adaptation_state_filename

self.ioloop = IOLoop.instance()
self.run()
self.ioloop.start()


@gen.coroutine
def run(self):
self.ws = yield websocket_connect(self.url, on_message_callback=self.received_message)
if self.send_adaptation_state_filename is not None:
print("Sending adaptation state from " + self.send_adaptation_state_filename)
try:
adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r"))
self.ws.write_message(json.dumps(dict(adaptation_state=adaptation_state_props)))
except:
e = sys.exc_info()[0]
print("Failed to send adaptation state: " + e)

# In Python 3, stdin is always opened as text by argparse
if type(self.audiofile).__name__ == 'TextIOWrapper':
self.audiofile = self.audiofile.buffer

with self.audiofile as audiostream:
while True:
block = yield from self.ioloop.run_in_executor(executor, audiostream.read, int(self.byterate/4))
if block == b"":
break
yield self.send_data(block)
self.ws.write_message("EOS")


@gen.coroutine
@rate_limited(4)
def send_data(self, data):
self.send(data, binary=True)

def opened(self):
#print "Socket opened!"
def send_data_to_ws():
if self.send_adaptation_state_filename is not None:
print >> sys.stderr, "Sending adaptation state from %s" % self.send_adaptation_state_filename
try:
adaptation_state_props = json.load(open(self.send_adaptation_state_filename, "r"))
self.send(json.dumps(dict(adaptation_state=adaptation_state_props)))
except:
e = sys.exc_info()[0]
print >> sys.stderr, "Failed to send adaptation state: ", e
with self.audiofile as audiostream:
for block in iter(lambda: audiostream.read(self.byterate/4), ""):
self.send_data(block)
print >> sys.stderr, "Audio sent, now sending EOS"
self.send("EOS")

t = threading.Thread(target=send_data_to_ws)
t.start()
self.ws.write_message(data, binary=True)


def received_message(self, m):
if m is None:
#print("Websocket closed() called")
self.final_hyp_queue.put(" ".join(self.final_hyps))
self.ioloop.stop()

return

#print("Received message ...")
#print(str(m) + "\n")
response = json.loads(str(m))
#print >> sys.stderr, "RESPONSE:", response
#print >> sys.stderr, "JSON was:", m

if response['status'] == 0:
#print(response)
if 'result' in response:
trans = response['result']['hypotheses'][0]['transcript'].encode('utf-8')
trans = response['result']['hypotheses'][0]['transcript']
if response['result']['final']:
#print >> sys.stderr, trans,
self.final_hyps.append(trans)
print >> sys.stderr, '\r%s' % trans.replace("\n", "\\n")
print(trans.replace("\n", "\\n"), end="\n")
else:
print_trans = trans.replace("\n", "\\n")
if len(print_trans) > 80:
print_trans = "... %s" % print_trans[-76:]
print >> sys.stderr, '\r%s' % print_trans,
print(print_trans, end="\r")
if 'adaptation_state' in response:
if self.save_adaptation_state_filename:
print >> sys.stderr, "Saving adaptation state to %s" % self.save_adaptation_state_filename
print("Saving adaptation state to " + self.save_adaptation_state_filename)
with open(self.save_adaptation_state_filename, "w") as f:
f.write(json.dumps(response['adaptation_state']))
else:
print >> sys.stderr, "Received error from server (status %d)" % response['status']
print("Received error from server (status %d)" % response['status'])
if 'message' in response:
print >> sys.stderr, "Error message:", response['message']
print("Error message:" + response['message'])


def get_full_hyp(self, timeout=60):
return self.final_hyp_queue.get(timeout)

def closed(self, code, reason=None):
#print "Websocket closed() called"
#print >> sys.stderr
self.final_hyp_queue.put(" ".join(self.final_hyps))
# def closed(self, code, reason=None):
# print("Websocket closed() called")
# self.final_hyp_queue.put(" ".join(self.final_hyps))


def main():
Expand All @@ -108,20 +132,19 @@ def main():
parser.add_argument('--save-adaptation-state', help="Save adaptation state to file")
parser.add_argument('--send-adaptation-state', help="Send adaptation state from file")
parser.add_argument('--content-type', default='', help="Use the specified content type (empty by default, for raw files the default is audio/x-raw, layout=(string)interleaved, rate=(int)<rate>, format=(string)S16LE, channels=(int)1")
parser.add_argument('audiofile', help="Audio file to be sent to the server", type=argparse.FileType('rb'), default=sys.stdin)
parser.add_argument('audiofile', help="Audio file to be sent to the server", type=argparse.FileType('rb'))
args = parser.parse_args()

content_type = args.content_type
if content_type == '' and args.audiofile.name.endswith(".raw"):
content_type = "audio/x-raw, layout=(string)interleaved, rate=(int)%d, format=(string)S16LE, channels=(int)1" %(args.rate/2)



ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.urlencode([("content-type", content_type)])), byterate=args.rate,
ws = MyClient(args.audiofile, args.uri + '?%s' % (urllib.parse.urlencode([("content-type", content_type)])), byterate=args.rate,
save_adaptation_state_filename=args.save_adaptation_state, send_adaptation_state_filename=args.send_adaptation_state)
ws.connect()

result = ws.get_full_hyp()
print result
print(result)


if __name__ == "__main__":
main()
Expand Down
41 changes: 22 additions & 19 deletions kaldigstserver/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
GObject.threads_init()
Gst.init(None)
import logging
import thread
import _thread
import os
import sys

logger = logging.getLogger(__name__)

import pdb

class DecoderPipeline(object):
def __init__(self, conf={}):
def __init__(self, ioloop, conf={}):
logger.info("Creating decoder using conf: %s" % conf)
self.ioloop = ioloop
self.use_cutter = conf.get("use-vad", False)
self.create_pipeline(conf)
self.outdir = conf.get("out-dir", None)
Expand Down Expand Up @@ -51,22 +53,24 @@ def create_pipeline(self, conf):
self.fakesink = Gst.ElementFactory.make("fakesink", "fakesink")

if not self.asr:
print >> sys.stderr, "ERROR: Couldn't create the onlinegmmdecodefaster element!"
print("ERROR: Couldn't create the onlinegmmdecodefaster element!", file=sys.stderr)
gst_plugin_path = os.environ.get("GST_PLUGIN_PATH")
if gst_plugin_path:
print >> sys.stderr, \
"Couldn't find onlinegmmdecodefaster element at %s. " \
"If it's not the right path, try to set GST_PLUGIN_PATH to the right one, and retry. " \
"You can also try to run the following command: " \
"'GST_PLUGIN_PATH=%s gst-inspect-1.0 onlinegmmdecodefaster'." \
% (gst_plugin_path, gst_plugin_path)
print(
"Couldn't find onlinegmmdecodefaster element at %s. "
"If it's not the right path, try to set GST_PLUGIN_PATH to the right one, and retry. "
"You can also try to run the following command: "
"'GST_PLUGIN_PATH=%s gst-inspect-1.0 onlinegmmdecodefaster'."
% (gst_plugin_path, gst_plugin_path),
file=sys.stderr)
else:
print >> sys.stderr, \
print(
"The environment variable GST_PLUGIN_PATH wasn't set or it's empty. " \
"Try to set GST_PLUGIN_PATH environment variable, and retry."
sys.exit(-1);
"Try to set GST_PLUGIN_PATH environment variable, and retry.",
file=sys.stderr)
sys.exit(-1)

for (key, val) in conf.get("decoder", {}).iteritems():
for (key, val) in conf.get("decoder", {}).items():
logger.info("Setting decoder property: %s = %s" % (key, val))
self.asr.set_property(key, val)

Expand Down Expand Up @@ -148,23 +152,22 @@ def _on_element_message(self, bus, message):
self.asr.set_property("silent", True)

def _on_word(self, asr, word):
logger.info("%s: Got word: %s" % (self.request_id, word.decode('utf8')))
logger.info("%s: Got word: %s" % (self.request_id, word))
if self.word_handler:
self.word_handler(word)

self.ioloop.add_callback(self.word_handler, word)

def _on_error(self, bus, msg):
self.error = msg.parse_error()
logger.error(self.error)
self.finish_request()
if self.error_handler:
self.error_handler(self.error[0].message)
self.ioloop.add_callback(self.error_handler, self.error[0].message)

def _on_eos(self, bus, msg):
logger.info('%s: Pipeline received eos signal' % self.request_id)
self.finish_request()
if self.eos_handler:
self.eos_handler[0](self.eos_handler[1])
self.ioloop.add_callback(self.eos_handler[0], self.eos_handler[1])

def finish_request(self):
logger.info('%s: Finishing request' % self.request_id)
Expand Down Expand Up @@ -235,4 +238,4 @@ def cancel(self):
#logger.debug("Sending EOS to pipeline")
#self.pipeline.send_event(Gst.Event.new_eos())
#self.pipeline.set_state(Gst.State.READY)
logger.info("%s: Cancelled pipeline" % self.request_id)
logger.info("%s: Cancelled pipeline" % self.request_id)
Loading