Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 481 counter number #482

Open
wants to merge 9 commits into
base: release/2.1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/source/configurator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ Node type dependent options for [nodeN] : ::
type="ebuttd-encoder"
├─media_time_zero : ["current" (default) | clock time at media time zero TODO: check format]
├─default_namespace : ["false" (default) | "true"]
└─clock
└─type : ["local" (default) | "auto" | "utc"]
├─clock
│ └─type : ["local" (default) | "auto" | "utc"]
└─override_begin_count : override the counter for the zeroth output document (for filesystem only, beats begin_count)
├─first_doc_datetime : datetime when first document would have been e.g. 1970-01-01T00:00:00.0
└─doc_duration : duration in seconds of each document e.g. 3.84

type="buffer-delay"
└─delay : delay in seconds, default 0
Expand Down Expand Up @@ -135,6 +138,7 @@ Output carriage type dependent options for "carriage": ::
├─rotating_buf : Rotating buffer size. This will keep the last N number of files created in the folder or all if 0, default 0
├─suppress_manifest : Whether to suppress writing of a manifest file (e.g. for EBU-TT-D output). Default False
├─message_filename_pattern : File name pattern for message documents or EBU-TT-D documents. It can contain {sequence_identifier} and {counter} format parameters, default "{sequence_identifier}_msg_{counter}.xml"
├─begin_count : value of zeroth {counter} format value: first output file will use this plus 1 - note that ebuttd-encoder can override this.
└─filename_pattern : File name pattern for EBU-TT-Live documents. It needs to contain {counter} format parameter, which will be populated with the sequence number. Default "{sequence_identifier}_{counter}.xml"

type="websocket"
Expand Down
2 changes: 1 addition & 1 deletion ebu_tt_live/bindings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ def _merge_deconflict_ids(cls, element, dest, ids):
output = []

for item in children:
log.debug('processing child: {} of {}'.format(item.value, element))
#log.debug('processing child: {} of {}'.format(item.value, element))
if isinstance(item, NonElementContent):
copied_stuff = copy.copy(item.value)
output.append(copied_stuff)
Expand Down
19 changes: 12 additions & 7 deletions ebu_tt_live/carriage/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import six
import os
import time
import codecs


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,20 +73,21 @@ def __init__(self,
file_name_pattern = CFG_FILENAME_PATTERN,
message_file_name_pattern = CFG_MESSAGE_PATTERN,
circular_buf_size = 0,
suppress_manifest = False):
suppress_manifest = False,
first_msg_counter = 0):
self._dirpath = dirpath
if not os.path.exists(self._dirpath):
os.makedirs(self._dirpath)
self._file_name_pattern = file_name_pattern
self._message_file_name_pattern = message_file_name_pattern
self._counter = 0
self._msg_counter = first_msg_counter
self._circular_buf_size = circular_buf_size
if circular_buf_size > 0 :
self._circular_buf = RotatingFileBuffer(maxlen=circular_buf_size)
self._suppress_manifest = suppress_manifest
# Get a set of default clocks
self._default_clocks = {}
self._msg_counter = 0

def _get_default_clock(self, sequence_identifier, time_base, clock_mode=None):
clock_obj = self._default_clocks.get(sequence_identifier, None)
Expand All @@ -96,6 +98,9 @@ def _get_default_clock(self, sequence_identifier, time_base, clock_mode=None):
self._default_clocks[sequence_identifier] = clock_obj
return clock_obj

def set_message_counter(self, message_counter):
self._msg_counter = message_counter

def check_availability_time(
self, sequence_identifier, time_base=None, clock_mode=None, availability_time=None):
"""
Expand Down Expand Up @@ -153,7 +158,7 @@ def emit_data(self, data, sequence_identifier=None, sequence_number=None,
# can be selected once at the beginning and dereferenced rather than repeating
# if statements.
filepath = os.path.join(self._dirpath, filename)
with open(filepath, 'w') as destfile:
with codecs.open(filepath, mode='w', errors='ignore') as destfile:
destfile.write(data)
destfile.flush()

Expand Down Expand Up @@ -198,7 +203,7 @@ def emit_data(self, data, sequence_identifier=None, sequence_number=None,
new_manifest_line = CFG_MANIFEST_LINE_PATTERN.format(
availability_time=timedelta_to_str_manifest(availability_time),
filename=filename)
with open(self._manifest_path, 'a') as f:
with codecs.open(self._manifest_path, mode='a', errors='ignore') as f:
f.write(new_manifest_line)


Expand Down Expand Up @@ -236,11 +241,11 @@ def __init__(self, manifest_path, custom_consumer, do_tail):
self._manifest_path = manifest_path
self._custom_consumer = custom_consumer
self._do_tail = do_tail
with open(manifest_path, 'r') as manifest:
with codecs.open(manifest_path, 'r') as manifest:
self._manifest_lines_iter = iter(manifest.readlines())

def resume_reading(self):
with open(self._manifest_path, 'r') as manifest_file:
with codecs.open(self._manifest_path, 'r') as manifest_file:
while True:
manifest_line = manifest_file.readline()
if not manifest_line:
Expand All @@ -256,7 +261,7 @@ def resume_reading(self):
availability_time_str, xml_file_name = manifest_line.rstrip().split(',')
xml_file_path = os.path.join(self._dirpath, xml_file_name)
xml_content = None
with open(xml_file_path, 'r') as xml_file:
with codecs.open(xml_file_path, 'r') as xml_file:
xml_content = xml_file.read()
data = [availability_time_str, xml_content]
self._custom_consumer.on_new_data(data)
4 changes: 2 additions & 2 deletions ebu_tt_live/config/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ def _ws_create_server_factory(self, listen, producer=None, consumer=None):

def _ws_create_client_factories(self, connect, producer=None, consumer=None, proxy=None):
factory_args = {}
if proxy:
factory_args.update({'host': proxy.host, 'port': proxy.port})
for dst in connect:
client_factory = self._websocket.BroadcastClientFactory(
url=dst.geturl(),
Expand All @@ -147,6 +145,8 @@ def _ws_create_client_factories(self, connect, producer=None, consumer=None, pro
**factory_args
)
client_factory.protocol = self._websocket.BroadcastClientProtocol
client_factory.proxy = proxy

client_factory.connect()

def ws_backend_producer(self, custom_producer, listen=None, connect=None, proxy=None):
Expand Down
13 changes: 7 additions & 6 deletions ebu_tt_live/config/carriage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from ebu_tt_live.carriage.direct import DirectCarriageImpl
from ebu_tt_live.carriage.websocket import WebsocketProducerCarriage, WebsocketConsumerCarriage
from ebu_tt_live.carriage import filesystem
from ebu_tt_live.utils import HTTPProxyConfig
from ebu_tt_live.strings import ERR_CONF_PROXY_CONF_VALUE, ERR_NO_SUCH_COMPONENT
from ebu_tt_live.errors import ConfigurationError
from ebu_tt_live.strings import CFG_FILENAME_PATTERN, CFG_MESSAGE_PATTERN
Expand Down Expand Up @@ -76,6 +75,10 @@ class FilesystemOutput(ConfigurableComponent):
default=False,
doc='Suppress output of a manifest file (default false)'
)
required_config.add_option(
'begin_count',
default=0,
doc='Value to begin counting at for patterns including {counter}; the first output value will be this plus 1.')

def __init__(self, config, local_config):
super(FilesystemOutput, self).__init__(config, local_config)
Expand All @@ -84,7 +87,8 @@ def __init__(self, config, local_config):
file_name_pattern=self.config.filename_pattern,
message_file_name_pattern=self.config.message_filename_pattern,
circular_buf_size=self.config.rotating_buf,
suppress_manifest=self.config.suppress_manifest)
suppress_manifest=self.config.suppress_manifest,
first_msg_counter=self.config.begin_count)



Expand Down Expand Up @@ -134,10 +138,7 @@ def parse_proxy_address(value):
match = proxy_regex.match(value)
if match:
# Ignoring the protocol part for now as it is only a http proxy
result = HTTPProxyConfig(
host=match.group('host'),
port=int(match.group('port'))
)
result = {u'host': match.group('host'), u'port': int(match.group('port'))}
elif value:
# In this case something was provided that isn't a falsy value but the parsing failed.
raise ConfigurationError(
Expand Down
25 changes: 25 additions & 0 deletions ebu_tt_live/config/clocks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .common import ConfigurableComponent, Namespace
from ebu_tt_live import clocks
from datetime import datetime, timedelta
import re
from ebu_tt_live.errors import ConfigurationError
from ebu_tt_live.strings import ERR_NO_SUCH_COMPONENT

Expand Down Expand Up @@ -42,3 +44,26 @@ def get_clock(clock_type):
type_name=clock_type
)
)

def _int_or_none(value):
try:
return int(value)
except TypeError:
return 0

_datetime_groups_regex = re.compile('([0-9][0-9][0-9][0-9])-([0-9][0-9])-([0-9][0-9])T([0-9][0-9]):([0-5][0-9]):([0-5][0-9]|60)(?:\.([0-9]+))?')

def get_date(date):
years, months, days, hours, minutes, seconds, microseconds = map(
lambda x: _int_or_none(x),
_datetime_groups_regex.match(date).groups()
)

return datetime(
year = years,
month = months,
day = days,
hour = hours,
minute = minutes,
second = seconds,
microsecond = microseconds)
36 changes: 32 additions & 4 deletions ebu_tt_live/config/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .common import ConfigurableComponent, Namespace, converters, RequiredConfig
from .clocks import get_clock
from .clocks import get_clock, get_date
from .carriage import get_producer_carriage, get_consumer_carriage
from ebu_tt_live import documents
from ebu_tt_live import bindings
Expand All @@ -10,6 +10,8 @@
from ebu_tt_live.errors import ConfigurationError
from ebu_tt_live.strings import ERR_CONF_NO_SUCH_NODE
from .adapters import ProducerNodeCarriageAdapter, ConsumerNodeCarriageAdapter
from datetime import datetime, timedelta
from math import floor


class NodeBase(ConfigurableComponent):
Expand Down Expand Up @@ -250,10 +252,26 @@ class EBUTTDEncoder(ProducerMixin, ConsumerMixin, NodeBase):

required_config = Namespace()
required_config.add_option('id', default='ebuttd-encoder')
required_config.add_option('media_time_zero', default='current')
required_config.add_option('default_namespace', default=False)
required_config.add_option(
'media_time_zero',
default='current',
doc='The clock equivalent time to use for media time zero, defaults to the current time.')
required_config.add_option(
'default_namespace',
default=False,
doc='Whether to use a default namespace, default false.')
required_config.clock = Namespace()
required_config.clock.add_option('type', default='local', from_string_converter=get_clock)
required_config.override_begin_count = Namespace()
required_config.override_begin_count.add_option(
'first_doc_datetime',
doc='The time when the document numbered 1 was available, format YYYY-mm-DDTHH:MM:SS',
default = datetime.utcnow(),
from_string_converter=get_date)
required_config.override_begin_count.add_option(
'doc_duration',
default=5.0,
doc='The duration of each document in seconds, default 5')

_clock = None

Expand All @@ -263,10 +281,20 @@ def _create_component(self, config):
mtz = self._clock.component.get_time()
else:
mtz = bindings.ebuttdt.LimitedClockTimingType(str(self.config.media_time_zero)).timedelta

begin_count = None

if self.config.override_begin_count:
# override the carriage mech's document count
fdt = self.config.override_begin_count.first_doc_datetime
tn = datetime.utcnow()
begin_count = int(floor((tn - fdt).total_seconds() / self.config.override_begin_count.doc_duration))

self.component = processing_node.EBUTTDEncoder(
node_id=self.config.id,
media_time_zero=mtz,
default_ns=self.config.default_namespace
default_ns=self.config.default_namespace,
begin_count=begin_count
)

def __init__(self, config, local_config):
Expand Down
2 changes: 1 addition & 1 deletion ebu_tt_live/node/deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def remove_duplication(self, document):

if document.binding.head.styling is not None:
styles = document.binding.head.styling.style
print styles

document.binding.head.styling.style = None

self.CollateUniqueVals(styles, old_id_dict, new_id_dict, hash_dict)
Expand Down
17 changes: 16 additions & 1 deletion ebu_tt_live/node/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from ebu_tt_live.clocks.media import MediaClock
from ebu_tt_live.documents.converters import EBUTT3EBUTTDConverter
from ebu_tt_live.documents import EBUTTDDocument, EBUTT3Document
#from ebu_tt_live.carriage.filesystem import FilesystemProducerImpl
#from ebu_tt_live.carriage import FilesystemProducerImpl


class EBUTTDEncoder(AbstractCombinedNode):
Expand All @@ -13,9 +15,14 @@ class EBUTTDEncoder(AbstractCombinedNode):
_default_ebuttd_doc = None
_expects = EBUTT3Document
_provides = EBUTTDDocument
# _begin_count is used to override the first output document count number. when
# provided as a constructor value it is stored, and set on the output carriage
# impl once before the first time emit_document is called. Then it is reset
# to None, which is used as the test to see if it needs to be used.
_begin_count = None

def __init__(self, node_id, media_time_zero, default_ns=False, producer_carriage=None,
consumer_carriage=None, **kwargs):
consumer_carriage=None, begin_count=None, **kwargs):
super(EBUTTDEncoder, self).__init__(
producer_carriage=producer_carriage,
consumer_carriage=consumer_carriage,
Expand All @@ -25,6 +32,7 @@ def __init__(self, node_id, media_time_zero, default_ns=False, producer_carriage
self._default_ns = default_ns
media_clock = MediaClock()
media_clock.adjust_time(timedelta(), media_time_zero)
self._begin_count = begin_count
self._ebuttd_converter = EBUTT3EBUTTDConverter(
media_clock=media_clock
)
Expand All @@ -41,6 +49,13 @@ def process_document(self, document, **kwargs):
converted_doc = EBUTTDDocument.create_from_raw_binding(
self._ebuttd_converter.convert_document(document.binding)
)

# If this is the first time, and there's a begin count override, apply it
if self._begin_count is not None:
# Will fail unless the concrete producer carriage impl is a FilesystemProducerImpl
self.producer_carriage.producer_carriage.set_message_counter(self._begin_count)
self._begin_count = None

# Specify the time_base since the FilesystemProducerImpl can't derive it otherwise.
# Hard coded to 'media' because that's all that's permitted in EBU-TT-D. Alternative
# would be to extract it from the EBUTTDDocument but since it's the only permitted
Expand Down
4 changes: 1 addition & 3 deletions ebu_tt_live/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ def __call__(cls, *args, **kwargs):
instance = super(AutoRegisteringABCMeta, cls).__call__(*args, **kwargs)
return instance

HTTPProxyConfig = collections.namedtuple('HTTPProxyConfig', ['host', 'port'])


# The following section is taken from https://github.com/django/django/blob/master/django/test/utils.py
# This is a relatively simple XML comparator implementation based on Python's minidom library.
Expand Down Expand Up @@ -467,4 +465,4 @@ def first_node(document):
want_root = first_node(parseString(want))
got_root = first_node(parseString(got))

return check_element(want_root, got_root)
return check_element(want_root, got_root)
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ sphinx-rtd-theme
pytest-bdd
pytest-cov
pytest-mock
pytest-capturelog
pytest-twisted
coverage
pytest-runner
Expand Down