Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1077 from golemfactory/simple_task_repr
Browse files Browse the repository at this point in the history
RPC update: unicode keys and values for simple task / subtask representation
  • Loading branch information
badb authored May 23, 2017
2 parents 58d6842 + 530c12b commit 7d55762
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
65 changes: 36 additions & 29 deletions golem/task/taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from twisted.internet.defer import inlineCallbacks

from golem.core.common import HandleKeyError, get_timestamp_utc, \
timeout_to_deadline
timeout_to_deadline, to_unicode
from golem.core.hostaddress import get_external_address
from golem.manager.nodestatesnapshot import LocalTaskStateSnapshot
from golem.network.transport.tcpnetwork import SocketAddress
Expand Down Expand Up @@ -512,7 +512,8 @@ def get_subtasks(self, task_id):
"""
if task_id not in self.tasks_states:
return None
return [sub.subtask_id for sub in self.tasks_states[task_id].subtask_states.values()]
return [sub.subtask_id for sub in
self.tasks_states[task_id].subtask_states.values()]

def change_config(self, root_path, use_distributed_resource_management):
self.dir_manager = DirManager(root_path)
Expand All @@ -533,11 +534,13 @@ def get_dict_task(self, task_id):
return self._simple_task_repr(self.tasks_states, self.tasks[task_id])

def get_dict_tasks(self):
return [self._simple_task_repr(self.tasks_states, t) for task_id, t in self.tasks.iteritems()]
return [self._simple_task_repr(self.tasks_states, t)
for task_id, t in self.tasks.iteritems()]

def get_dict_subtasks(self, task_id):
task_state = self.tasks_states[task_id]
return [self._simple_subtask_repr(subtask) for subtask_id, subtask in task_state.subtask_states.iteritems()]
return [self._simple_subtask_repr(subtask) for subtask_id, subtask
in task_state.subtask_states.iteritems()]

def get_dict_subtask(self, subtask_id):
task_id = self.subtask2task_mapping[subtask_id]
Expand All @@ -546,35 +549,39 @@ def get_dict_subtask(self, subtask_id):
return self._simple_subtask_repr(subtask)

@staticmethod
def _simple_task_repr(_states, _task):
if _task:
state = _states.get(_task.header.task_id,)
return dict(
id=_task.header.task_id,
time_remaining=state.remaining_time,
subtasks=_task.get_total_tasks(),
status=state.status,
progress=_task.get_progress()
)
def _simple_task_repr(states, task):
if task:
state = states.get(task.header.task_id)
return {
u'id': to_unicode(task.header.task_id),
u'name': to_unicode(task.task_definition.task_name),
u'type': to_unicode(task.task_definition.task_type),
u'duration': max(task.task_definition.full_task_timeout -
state.remaining_time, 0),
u'time_remaining': state.remaining_time,
u'subtasks': task.get_total_tasks(),
u'status': to_unicode(state.status),
u'progress': task.get_progress()
}

@staticmethod
def _simple_subtask_repr(subtask):
if subtask:
return dict(
subtask_id=subtask.subtask_id,
node_name=subtask.computer.node_name,
node_id=subtask.computer.node_id,
node_performance=subtask.computer.performance,
node_ip_address=subtask.computer.ip_address,
node_port=subtask.computer.port,
status=subtask.subtask_status,
progress=subtask.subtask_progress,
time_started=subtask.time_started,
time_remaining=subtask.subtask_rem_time,
results=subtask.results,
stderr=subtask.stderr,
stdout=subtask.stdout
)
return {
u'subtask_id': to_unicode(subtask.subtask_id),
u'node_name': to_unicode(subtask.computer.node_name),
u'node_id': to_unicode(subtask.computer.node_id),
u'node_performance': subtask.computer.performance,
u'node_ip_address': to_unicode(subtask.computer.ip_address),
u'node_port': subtask.computer.port,
u'status': to_unicode(subtask.subtask_status),
u'progress': subtask.subtask_progress,
u'time_started': subtask.time_started,
u'time_remaining': subtask.subtask_rem_time,
u'results': [to_unicode(r) for r in subtask.results],
u'stderr': to_unicode(subtask.stderr),
u'stdout': to_unicode(subtask.stdout)
}

@handle_subtask_key_error
def set_computation_time(self, subtask_id, computation_time):
Expand Down
4 changes: 4 additions & 0 deletions tests/golem/task/test_taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ def __build_tasks(cls, n):
for i in xrange(0, n):

task = Mock()
task.task_definition.full_task_timeout = 100
task.header.task_id = str(uuid.uuid4())
task.get_total_tasks.return_value = i + 2
task.get_progress.return_value = i * 10
Expand Down Expand Up @@ -636,6 +637,9 @@ def __build_subtasks(n):
subtask.computer = ComputerState()
subtask.computer.node_name = 'node_{}'.format(i)
subtask.computer.node_id = 'deadbeef0{}'.format(i)
subtask.results = []
subtask.stderr = 'error_{}'.format(i)
subtask.stdout = 'output_{}'.format(i)
subtask_id = subtask.subtask_id

subtasks[subtask.subtask_id] = subtask
Expand Down

0 comments on commit 7d55762

Please sign in to comment.