diff --git a/docs/source/configurator.rst b/docs/source/configurator.rst index 93f63c54d..53648283f 100644 --- a/docs/source/configurator.rst +++ b/docs/source/configurator.rst @@ -106,8 +106,11 @@ Node type dependent options for [nodeN] : :: type="ebuttd-encoder" ├─media_time_zero : ["current" (default) | clock time at media time zero TODO: check format] ├─default_namespace : ["false" (default) | "true"] - └─clock - └─type : ["local" (default) | "auto" | "utc"] + ├─clock + │ └─type : ["local" (default) | "auto" | "utc"] + └─override_begin_count : override the counter for the zeroth output document (for filesystem only, beats begin_count) + ├─first_doc_datetime : datetime when first document would have been e.g. 1970-01-01T00:00:00.0 + └─doc_duration : duration in seconds of each document e.g. 3.84 type="buffer-delay" └─delay : delay in seconds, default 0 @@ -135,6 +138,7 @@ Output carriage type dependent options for "carriage": :: ├─rotating_buf : Rotating buffer size. This will keep the last N number of files created in the folder or all if 0, default 0 ├─suppress_manifest : Whether to suppress writing of a manifest file (e.g. for EBU-TT-D output). Default False ├─message_filename_pattern : File name pattern for message documents or EBU-TT-D documents. It can contain {sequence_identifier} and {counter} format parameters, default "{sequence_identifier}_msg_{counter}.xml" + ├─begin_count : value of zeroth {counter} format value: first output file will use this plus 1 - note that ebuttd-encoder can override this. └─filename_pattern : File name pattern for EBU-TT-Live documents. It needs to contain {counter} format parameter, which will be populated with the sequence number. Default "{sequence_identifier}_{counter}.xml" type="websocket" diff --git a/ebu_tt_live/bindings/__init__.py b/ebu_tt_live/bindings/__init__.py index 4b1eb4347..1381af0ee 100644 --- a/ebu_tt_live/bindings/__init__.py +++ b/ebu_tt_live/bindings/__init__.py @@ -939,7 +939,7 @@ def _merge_deconflict_ids(cls, element, dest, ids): output = [] for item in children: - log.debug('processing child: {} of {}'.format(item.value, element)) + #log.debug('processing child: {} of {}'.format(item.value, element)) if isinstance(item, NonElementContent): copied_stuff = copy.copy(item.value) output.append(copied_stuff) diff --git a/ebu_tt_live/carriage/filesystem.py b/ebu_tt_live/carriage/filesystem.py index ef4db4475..27303a7c7 100644 --- a/ebu_tt_live/carriage/filesystem.py +++ b/ebu_tt_live/carriage/filesystem.py @@ -9,6 +9,7 @@ import six import os import time +import codecs log = logging.getLogger(__name__) @@ -72,20 +73,21 @@ def __init__(self, file_name_pattern = CFG_FILENAME_PATTERN, message_file_name_pattern = CFG_MESSAGE_PATTERN, circular_buf_size = 0, - suppress_manifest = False): + suppress_manifest = False, + first_msg_counter = 0): self._dirpath = dirpath if not os.path.exists(self._dirpath): os.makedirs(self._dirpath) self._file_name_pattern = file_name_pattern self._message_file_name_pattern = message_file_name_pattern self._counter = 0 + self._msg_counter = first_msg_counter self._circular_buf_size = circular_buf_size if circular_buf_size > 0 : self._circular_buf = RotatingFileBuffer(maxlen=circular_buf_size) self._suppress_manifest = suppress_manifest # Get a set of default clocks self._default_clocks = {} - self._msg_counter = 0 def _get_default_clock(self, sequence_identifier, time_base, clock_mode=None): clock_obj = self._default_clocks.get(sequence_identifier, None) @@ -96,6 +98,9 @@ def _get_default_clock(self, sequence_identifier, time_base, clock_mode=None): self._default_clocks[sequence_identifier] = clock_obj return clock_obj + def set_message_counter(self, message_counter): + self._msg_counter = message_counter + def check_availability_time( self, sequence_identifier, time_base=None, clock_mode=None, availability_time=None): """ @@ -153,7 +158,7 @@ def emit_data(self, data, sequence_identifier=None, sequence_number=None, # can be selected once at the beginning and dereferenced rather than repeating # if statements. filepath = os.path.join(self._dirpath, filename) - with open(filepath, 'w') as destfile: + with codecs.open(filepath, mode='w', errors='ignore') as destfile: destfile.write(data) destfile.flush() @@ -198,7 +203,7 @@ def emit_data(self, data, sequence_identifier=None, sequence_number=None, new_manifest_line = CFG_MANIFEST_LINE_PATTERN.format( availability_time=timedelta_to_str_manifest(availability_time), filename=filename) - with open(self._manifest_path, 'a') as f: + with codecs.open(self._manifest_path, mode='a', errors='ignore') as f: f.write(new_manifest_line) @@ -236,11 +241,11 @@ def __init__(self, manifest_path, custom_consumer, do_tail): self._manifest_path = manifest_path self._custom_consumer = custom_consumer self._do_tail = do_tail - with open(manifest_path, 'r') as manifest: + with codecs.open(manifest_path, 'r') as manifest: self._manifest_lines_iter = iter(manifest.readlines()) def resume_reading(self): - with open(self._manifest_path, 'r') as manifest_file: + with codecs.open(self._manifest_path, 'r') as manifest_file: while True: manifest_line = manifest_file.readline() if not manifest_line: @@ -256,7 +261,7 @@ def resume_reading(self): availability_time_str, xml_file_name = manifest_line.rstrip().split(',') xml_file_path = os.path.join(self._dirpath, xml_file_name) xml_content = None - with open(xml_file_path, 'r') as xml_file: + with codecs.open(xml_file_path, 'r') as xml_file: xml_content = xml_file.read() data = [availability_time_str, xml_content] self._custom_consumer.on_new_data(data) diff --git a/ebu_tt_live/config/backend.py b/ebu_tt_live/config/backend.py index c8c03b185..465c459e8 100644 --- a/ebu_tt_live/config/backend.py +++ b/ebu_tt_live/config/backend.py @@ -137,8 +137,6 @@ def _ws_create_server_factory(self, listen, producer=None, consumer=None): def _ws_create_client_factories(self, connect, producer=None, consumer=None, proxy=None): factory_args = {} - if proxy: - factory_args.update({'host': proxy.host, 'port': proxy.port}) for dst in connect: client_factory = self._websocket.BroadcastClientFactory( url=dst.geturl(), @@ -147,6 +145,8 @@ def _ws_create_client_factories(self, connect, producer=None, consumer=None, pro **factory_args ) client_factory.protocol = self._websocket.BroadcastClientProtocol + client_factory.proxy = proxy + client_factory.connect() def ws_backend_producer(self, custom_producer, listen=None, connect=None, proxy=None): diff --git a/ebu_tt_live/config/carriage.py b/ebu_tt_live/config/carriage.py index b1f0f1624..a8aa401d6 100644 --- a/ebu_tt_live/config/carriage.py +++ b/ebu_tt_live/config/carriage.py @@ -2,7 +2,6 @@ from ebu_tt_live.carriage.direct import DirectCarriageImpl from ebu_tt_live.carriage.websocket import WebsocketProducerCarriage, WebsocketConsumerCarriage from ebu_tt_live.carriage import filesystem -from ebu_tt_live.utils import HTTPProxyConfig from ebu_tt_live.strings import ERR_CONF_PROXY_CONF_VALUE, ERR_NO_SUCH_COMPONENT from ebu_tt_live.errors import ConfigurationError from ebu_tt_live.strings import CFG_FILENAME_PATTERN, CFG_MESSAGE_PATTERN @@ -76,6 +75,10 @@ class FilesystemOutput(ConfigurableComponent): default=False, doc='Suppress output of a manifest file (default false)' ) + required_config.add_option( + 'begin_count', + default=0, + doc='Value to begin counting at for patterns including {counter}; the first output value will be this plus 1.') def __init__(self, config, local_config): super(FilesystemOutput, self).__init__(config, local_config) @@ -84,7 +87,8 @@ def __init__(self, config, local_config): file_name_pattern=self.config.filename_pattern, message_file_name_pattern=self.config.message_filename_pattern, circular_buf_size=self.config.rotating_buf, - suppress_manifest=self.config.suppress_manifest) + suppress_manifest=self.config.suppress_manifest, + first_msg_counter=self.config.begin_count) @@ -134,10 +138,7 @@ def parse_proxy_address(value): match = proxy_regex.match(value) if match: # Ignoring the protocol part for now as it is only a http proxy - result = HTTPProxyConfig( - host=match.group('host'), - port=int(match.group('port')) - ) + result = {u'host': match.group('host'), u'port': int(match.group('port'))} elif value: # In this case something was provided that isn't a falsy value but the parsing failed. raise ConfigurationError( diff --git a/ebu_tt_live/config/clocks.py b/ebu_tt_live/config/clocks.py index 9fed98e78..dcaead7c6 100644 --- a/ebu_tt_live/config/clocks.py +++ b/ebu_tt_live/config/clocks.py @@ -1,5 +1,7 @@ from .common import ConfigurableComponent, Namespace from ebu_tt_live import clocks +from datetime import datetime, timedelta +import re from ebu_tt_live.errors import ConfigurationError from ebu_tt_live.strings import ERR_NO_SUCH_COMPONENT @@ -42,3 +44,26 @@ def get_clock(clock_type): type_name=clock_type ) ) + +def _int_or_none(value): + try: + return int(value) + except TypeError: + return 0 + +_datetime_groups_regex = re.compile('([0-9][0-9][0-9][0-9])-([0-9][0-9])-([0-9][0-9])T([0-9][0-9]):([0-5][0-9]):([0-5][0-9]|60)(?:\.([0-9]+))?') + +def get_date(date): + years, months, days, hours, minutes, seconds, microseconds = map( + lambda x: _int_or_none(x), + _datetime_groups_regex.match(date).groups() + ) + + return datetime( + year = years, + month = months, + day = days, + hour = hours, + minute = minutes, + second = seconds, + microsecond = microseconds) diff --git a/ebu_tt_live/config/node.py b/ebu_tt_live/config/node.py index b484cc63b..c2c7d47c5 100644 --- a/ebu_tt_live/config/node.py +++ b/ebu_tt_live/config/node.py @@ -1,5 +1,5 @@ from .common import ConfigurableComponent, Namespace, converters, RequiredConfig -from .clocks import get_clock +from .clocks import get_clock, get_date from .carriage import get_producer_carriage, get_consumer_carriage from ebu_tt_live import documents from ebu_tt_live import bindings @@ -10,6 +10,8 @@ from ebu_tt_live.errors import ConfigurationError from ebu_tt_live.strings import ERR_CONF_NO_SUCH_NODE from .adapters import ProducerNodeCarriageAdapter, ConsumerNodeCarriageAdapter +from datetime import datetime, timedelta +from math import floor class NodeBase(ConfigurableComponent): @@ -250,10 +252,26 @@ class EBUTTDEncoder(ProducerMixin, ConsumerMixin, NodeBase): required_config = Namespace() required_config.add_option('id', default='ebuttd-encoder') - required_config.add_option('media_time_zero', default='current') - required_config.add_option('default_namespace', default=False) + required_config.add_option( + 'media_time_zero', + default='current', + doc='The clock equivalent time to use for media time zero, defaults to the current time.') + required_config.add_option( + 'default_namespace', + default=False, + doc='Whether to use a default namespace, default false.') required_config.clock = Namespace() required_config.clock.add_option('type', default='local', from_string_converter=get_clock) + required_config.override_begin_count = Namespace() + required_config.override_begin_count.add_option( + 'first_doc_datetime', + doc='The time when the document numbered 1 was available, format YYYY-mm-DDTHH:MM:SS', + default = datetime.utcnow(), + from_string_converter=get_date) + required_config.override_begin_count.add_option( + 'doc_duration', + default=5.0, + doc='The duration of each document in seconds, default 5') _clock = None @@ -263,10 +281,20 @@ def _create_component(self, config): mtz = self._clock.component.get_time() else: mtz = bindings.ebuttdt.LimitedClockTimingType(str(self.config.media_time_zero)).timedelta + + begin_count = None + + if self.config.override_begin_count: + # override the carriage mech's document count + fdt = self.config.override_begin_count.first_doc_datetime + tn = datetime.utcnow() + begin_count = int(floor((tn - fdt).total_seconds() / self.config.override_begin_count.doc_duration)) + self.component = processing_node.EBUTTDEncoder( node_id=self.config.id, media_time_zero=mtz, - default_ns=self.config.default_namespace + default_ns=self.config.default_namespace, + begin_count=begin_count ) def __init__(self, config, local_config): diff --git a/ebu_tt_live/node/deduplicator.py b/ebu_tt_live/node/deduplicator.py index e58364aeb..ced39e88d 100644 --- a/ebu_tt_live/node/deduplicator.py +++ b/ebu_tt_live/node/deduplicator.py @@ -55,7 +55,7 @@ def remove_duplication(self, document): if document.binding.head.styling is not None: styles = document.binding.head.styling.style - print styles + document.binding.head.styling.style = None self.CollateUniqueVals(styles, old_id_dict, new_id_dict, hash_dict) diff --git a/ebu_tt_live/node/encoder.py b/ebu_tt_live/node/encoder.py index 50917715b..ccd6587e4 100644 --- a/ebu_tt_live/node/encoder.py +++ b/ebu_tt_live/node/encoder.py @@ -4,6 +4,8 @@ from ebu_tt_live.clocks.media import MediaClock from ebu_tt_live.documents.converters import EBUTT3EBUTTDConverter from ebu_tt_live.documents import EBUTTDDocument, EBUTT3Document +#from ebu_tt_live.carriage.filesystem import FilesystemProducerImpl +#from ebu_tt_live.carriage import FilesystemProducerImpl class EBUTTDEncoder(AbstractCombinedNode): @@ -13,9 +15,14 @@ class EBUTTDEncoder(AbstractCombinedNode): _default_ebuttd_doc = None _expects = EBUTT3Document _provides = EBUTTDDocument + # _begin_count is used to override the first output document count number. when + # provided as a constructor value it is stored, and set on the output carriage + # impl once before the first time emit_document is called. Then it is reset + # to None, which is used as the test to see if it needs to be used. + _begin_count = None def __init__(self, node_id, media_time_zero, default_ns=False, producer_carriage=None, - consumer_carriage=None, **kwargs): + consumer_carriage=None, begin_count=None, **kwargs): super(EBUTTDEncoder, self).__init__( producer_carriage=producer_carriage, consumer_carriage=consumer_carriage, @@ -25,6 +32,7 @@ def __init__(self, node_id, media_time_zero, default_ns=False, producer_carriage self._default_ns = default_ns media_clock = MediaClock() media_clock.adjust_time(timedelta(), media_time_zero) + self._begin_count = begin_count self._ebuttd_converter = EBUTT3EBUTTDConverter( media_clock=media_clock ) @@ -41,6 +49,13 @@ def process_document(self, document, **kwargs): converted_doc = EBUTTDDocument.create_from_raw_binding( self._ebuttd_converter.convert_document(document.binding) ) + + # If this is the first time, and there's a begin count override, apply it + if self._begin_count is not None: + # Will fail unless the concrete producer carriage impl is a FilesystemProducerImpl + self.producer_carriage.producer_carriage.set_message_counter(self._begin_count) + self._begin_count = None + # Specify the time_base since the FilesystemProducerImpl can't derive it otherwise. # Hard coded to 'media' because that's all that's permitted in EBU-TT-D. Alternative # would be to extract it from the EBUTTDDocument but since it's the only permitted diff --git a/ebu_tt_live/utils.py b/ebu_tt_live/utils.py index 000598249..9ae3afca2 100644 --- a/ebu_tt_live/utils.py +++ b/ebu_tt_live/utils.py @@ -358,8 +358,6 @@ def __call__(cls, *args, **kwargs): instance = super(AutoRegisteringABCMeta, cls).__call__(*args, **kwargs) return instance -HTTPProxyConfig = collections.namedtuple('HTTPProxyConfig', ['host', 'port']) - # The following section is taken from https://github.com/django/django/blob/master/django/test/utils.py # This is a relatively simple XML comparator implementation based on Python's minidom library. @@ -467,4 +465,4 @@ def first_node(document): want_root = first_node(parseString(want)) got_root = first_node(parseString(got)) - return check_element(want_root, got_root) \ No newline at end of file + return check_element(want_root, got_root) diff --git a/requirements.txt b/requirements.txt index 39e7c76f6..463a1b9f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,6 @@ sphinx-rtd-theme pytest-bdd pytest-cov pytest-mock -pytest-capturelog pytest-twisted coverage pytest-runner