diff --git a/golem/client.py b/golem/client.py index f2424a7112..ca53f4e55e 100644 --- a/golem/client.py +++ b/golem/client.py @@ -21,7 +21,9 @@ TYPE_CHECKING, ) +from ethereum.utils import denoms from golem_messages import datastructures as msg_datastructures +from golem_task_api.envs import DOCKER_GPU_ENV_ID from pydispatch import dispatcher from twisted.internet.defer import ( inlineCallbacks, @@ -33,6 +35,7 @@ import golem from golem import model from golem.appconfig import TASKARCHIVE_MAINTENANCE_INTERVAL, AppConfig +from golem.apps.default import APPS from golem.clientconfigdescriptor import ConfigApprover, ClientConfigDescriptor from golem.core import variables from golem.core.common import ( @@ -76,8 +79,8 @@ from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager from golem.rpc import utils as rpc_utils from golem.rpc.mapping.rpceventnames import Task, Network, Environment, UI -from golem.task import taskpreset -from golem.task import taskstate +from golem.task import taskpreset, taskstate +from golem.task.helpers import calculate_subtask_payment from golem.task.taskarchiver import TaskArchiver from golem.task.taskserver import TaskServer from golem.task.tasktester import TaskTester @@ -897,14 +900,63 @@ def get_task(self, task_id: str) -> Optional[dict]: if not task: return None subtask_ids = rtm.get_requested_task_subtask_ids(task_id) + # time_started if task.start_time is None: time_started = get_timestamp_utc() + if task.status == taskstate.TaskStatus.errorCreating: + time_started -= 1 else: time_started = task.start_time.timestamp() + # proress and time_remaining + finished_subtasks = rtm.count_finished_subtasks(task.task_id) + progress = finished_subtasks / task.max_subtasks + time_remaining = None + if progress > 0.0 and not task.status.is_completed(): + elapsed = task.elapsed_seconds + time_remaining = (elapsed / progress) - elapsed + # type + app_name = 'Unknown' + if task.app_id in APPS: + app = APPS[task.app_id] + app_name = app.name + # last_updated + if task.end_time is None: + last_updated = get_timestamp_utc() + else: + last_updated = task.end_time.timestamp() + # compute_on + compute_on = 'cpu' + if task.env_id == DOCKER_GPU_ENV_ID: + compute_on = 'gpu' + # estimated_cost and estimated_fee + subtask_price = calculate_subtask_payment( + task.max_price_per_hour, + task.subtask_timeout + ) + estimated_cost = subtask_price * task.max_subtasks + estimated_fee = self.transaction_system.eth_for_batch_payment( + task.max_subtasks + ) task_dict = { 'id': task.task_id, + 'time_remaining': time_remaining, + 'subtasks_count': task.max_subtasks, 'status': task.status.value, + 'progress': progress, 'time_started': time_started, + 'last_updated': last_updated, + 'name': task.name, + 'bid': float(task.max_price_per_hour) / denoms.ether, + 'compute_on': compute_on, + 'concent_enabled': task.concent_enabled, + 'subtask_timeout': str(timedelta(seconds=task.subtask_timeout)), + 'timeout': str(timedelta(seconds=task.task_timeout)), + 'type': app_name, + 'options': { + 'output_path': task.output_directory + }, + 'estimated_cost': estimated_cost, + 'estimated_fee': estimated_fee, } else: # OLD taskmanager diff --git a/golem/database/database.py b/golem/database/database.py index ebea38da39..04c5c3794e 100644 --- a/golem/database/database.py +++ b/golem/database/database.py @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True): class Database: - SCHEMA_VERSION = 45 + SCHEMA_VERSION = 46 def __init__(self, # noqa pylint: disable=too-many-arguments db: peewee.Database, diff --git a/golem/database/schemas/046_rtm_task_end_time.py b/golem/database/schemas/046_rtm_task_end_time.py new file mode 100644 index 0000000000..9d48342c42 --- /dev/null +++ b/golem/database/schemas/046_rtm_task_end_time.py @@ -0,0 +1,15 @@ +# pylint: disable=no-member +# pylint: disable=unused-argument +import peewee as pw + +SCHEMA_VERSION = 46 + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.add_fields( + 'requestedtask', + end_time=pw.UTCDateTimeField(null=True)) + + +def rollback(migrator, database, fake=False, **kwargs): + migrator.remove_fields('requestedtask', 'end_time') diff --git a/golem/ethereum/paymentprocessor.py b/golem/ethereum/paymentprocessor.py index 173d60cd1a..8acc31aaec 100644 --- a/golem/ethereum/paymentprocessor.py +++ b/golem/ethereum/paymentprocessor.py @@ -157,7 +157,7 @@ def add( # pylint: disable=too-many-arguments value: int, ) -> model.TaskPayment: log.info( - "Adding payment for %s to %s (%.18f GNTB)", + "Adding payment. subtask_id=%s, receiver=%s, value=(%.18f GNTB)", subtask_id, eth_addr, value / denoms.ether, diff --git a/golem/model.py b/golem/model.py index 2baba0735f..237191e47c 100644 --- a/golem/model.py +++ b/golem/model.py @@ -735,6 +735,7 @@ class RequestedTask(BaseModel): task_timeout = IntegerField(null=False) # seconds subtask_timeout = IntegerField(null=False) # seconds start_time = UTCDateTimeField(null=True) + end_time = UTCDateTimeField(null=True) max_price_per_hour = HexIntegerField(null=False) max_subtasks = IntegerField(null=False) diff --git a/golem/task/requestedtaskmanager.py b/golem/task/requestedtaskmanager.py index 1c381889fb..caa4e4eabd 100644 --- a/golem/task/requestedtaskmanager.py +++ b/golem/task/requestedtaskmanager.py @@ -295,6 +295,7 @@ async def _task_creation_ctx( try: task = RequestedTask.get(task_id=task_id) task.status = TaskStatus.errorCreating + task.end_time = default_now() task.save() if not app_id: app_id = task.app_id @@ -334,6 +335,7 @@ def error_creating(self, task_id: TaskId): raise RuntimeError(f"Task {task_id} has already been started") task.status = TaskStatus.errorCreating + task.end_time = default_now() task.save() self._notice_task_updated(task, op=TaskOp.ABORTED) @@ -529,6 +531,7 @@ async def _verify( if not await self.has_pending_subtasks(task_id): if not self._get_pending_subtasks(task_id): task.status = TaskStatus.finished + task.end_time = default_now() task.save() self._move_task_results( @@ -554,6 +557,7 @@ async def abort_task(self, task_id: TaskId) -> None: f"Task not active, can not abort. task_id={task_id}") task.status = TaskStatus.aborted + task.end_time = default_now() task.save() for subtask in self._get_pending_subtasks(task_id): @@ -605,6 +609,15 @@ def get_requested_task_ids() -> List[TaskId]: tasks = RequestedTask.select(RequestedTask.task_id).execute() return [task.task_id for task in tasks] + @staticmethod + def count_finished_subtasks(task_id: TaskId) -> float: + return RequestedSubtask.select( + fn.Count(RequestedSubtask.subtask_id) + ).where( + RequestedSubtask.task_id == task_id, + RequestedSubtask.status == SubtaskStatus.finished, + ).scalar() + @staticmethod def get_requested_task_subtask_ids(task_id: TaskId) -> List[SubtaskId]: subtasks = RequestedSubtask.select(RequestedSubtask.subtask_id) \ @@ -850,6 +863,7 @@ def _time_out_task(self, task_id: TaskId) -> None: logger.info("Task timed out. task_id=%r", task_id) task.status = TaskStatus.timeout + task.end_time = default_now() task.save() for subtask in self._get_pending_subtasks(task_id):