Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5018 from golemfactory/mwu/b0.22/comp.task-fill-a…
Browse files Browse the repository at this point in the history
…ll-fields

Added legacy fields to task-api comp.task rpc command
  • Loading branch information
maaktweluit authored Jan 7, 2020
2 parents 6b6ce0e + d46e061 commit 637632b
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 4 deletions.
56 changes: 54 additions & 2 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
TYPE_CHECKING,
)

from ethereum.utils import denoms
from golem_messages import datastructures as msg_datastructures
from golem_task_api.envs import DOCKER_GPU_ENV_ID
from pydispatch import dispatcher
from twisted.internet.defer import (
inlineCallbacks,
Expand All @@ -33,6 +35,7 @@
import golem
from golem import model
from golem.appconfig import TASKARCHIVE_MAINTENANCE_INTERVAL, AppConfig
from golem.apps.default import APPS
from golem.clientconfigdescriptor import ConfigApprover, ClientConfigDescriptor
from golem.core import variables
from golem.core.common import (
Expand Down Expand Up @@ -76,8 +79,8 @@
from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager
from golem.rpc import utils as rpc_utils
from golem.rpc.mapping.rpceventnames import Task, Network, Environment, UI
from golem.task import taskpreset
from golem.task import taskstate
from golem.task import taskpreset, taskstate
from golem.task.helpers import calculate_subtask_payment
from golem.task.taskarchiver import TaskArchiver
from golem.task.taskserver import TaskServer
from golem.task.tasktester import TaskTester
Expand Down Expand Up @@ -897,14 +900,63 @@ def get_task(self, task_id: str) -> Optional[dict]:
if not task:
return None
subtask_ids = rtm.get_requested_task_subtask_ids(task_id)
# time_started
if task.start_time is None:
time_started = get_timestamp_utc()
if task.status == taskstate.TaskStatus.errorCreating:
time_started -= 1
else:
time_started = task.start_time.timestamp()
# proress and time_remaining
finished_subtasks = rtm.count_finished_subtasks(task.task_id)
progress = finished_subtasks / task.max_subtasks
time_remaining = None
if progress > 0.0 and not task.status.is_completed():
elapsed = task.elapsed_seconds
time_remaining = (elapsed / progress) - elapsed
# type
app_name = 'Unknown'
if task.app_id in APPS:
app = APPS[task.app_id]
app_name = app.name
# last_updated
if task.end_time is None:
last_updated = get_timestamp_utc()
else:
last_updated = task.end_time.timestamp()
# compute_on
compute_on = 'cpu'
if task.env_id == DOCKER_GPU_ENV_ID:
compute_on = 'gpu'
# estimated_cost and estimated_fee
subtask_price = calculate_subtask_payment(
task.max_price_per_hour,
task.subtask_timeout
)
estimated_cost = subtask_price * task.max_subtasks
estimated_fee = self.transaction_system.eth_for_batch_payment(
task.max_subtasks
)
task_dict = {
'id': task.task_id,
'time_remaining': time_remaining,
'subtasks_count': task.max_subtasks,
'status': task.status.value,
'progress': progress,
'time_started': time_started,
'last_updated': last_updated,
'name': task.name,
'bid': float(task.max_price_per_hour) / denoms.ether,
'compute_on': compute_on,
'concent_enabled': task.concent_enabled,
'subtask_timeout': str(timedelta(seconds=task.subtask_timeout)),
'timeout': str(timedelta(seconds=task.task_timeout)),
'type': app_name,
'options': {
'output_path': task.output_directory
},
'estimated_cost': estimated_cost,
'estimated_fee': estimated_fee,
}
else:
# OLD taskmanager
Expand Down
2 changes: 1 addition & 1 deletion golem/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True):


class Database:
SCHEMA_VERSION = 45
SCHEMA_VERSION = 46

def __init__(self, # noqa pylint: disable=too-many-arguments
db: peewee.Database,
Expand Down
15 changes: 15 additions & 0 deletions golem/database/schemas/046_rtm_task_end_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# pylint: disable=no-member
# pylint: disable=unused-argument
import peewee as pw

SCHEMA_VERSION = 46


def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
'requestedtask',
end_time=pw.UTCDateTimeField(null=True))


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields('requestedtask', 'end_time')
2 changes: 1 addition & 1 deletion golem/ethereum/paymentprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def add( # pylint: disable=too-many-arguments
value: int,
) -> model.TaskPayment:
log.info(
"Adding payment for %s to %s (%.18f GNTB)",
"Adding payment. subtask_id=%s, receiver=%s, value=(%.18f GNTB)",
subtask_id,
eth_addr,
value / denoms.ether,
Expand Down
1 change: 1 addition & 0 deletions golem/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ class RequestedTask(BaseModel):
task_timeout = IntegerField(null=False) # seconds
subtask_timeout = IntegerField(null=False) # seconds
start_time = UTCDateTimeField(null=True)
end_time = UTCDateTimeField(null=True)

max_price_per_hour = HexIntegerField(null=False)
max_subtasks = IntegerField(null=False)
Expand Down
14 changes: 14 additions & 0 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ async def _task_creation_ctx(
try:
task = RequestedTask.get(task_id=task_id)
task.status = TaskStatus.errorCreating
task.end_time = default_now()
task.save()
if not app_id:
app_id = task.app_id
Expand Down Expand Up @@ -334,6 +335,7 @@ def error_creating(self, task_id: TaskId):
raise RuntimeError(f"Task {task_id} has already been started")

task.status = TaskStatus.errorCreating
task.end_time = default_now()
task.save()
self._notice_task_updated(task, op=TaskOp.ABORTED)

Expand Down Expand Up @@ -529,6 +531,7 @@ async def _verify(
if not await self.has_pending_subtasks(task_id):
if not self._get_pending_subtasks(task_id):
task.status = TaskStatus.finished
task.end_time = default_now()
task.save()

self._move_task_results(
Expand All @@ -554,6 +557,7 @@ async def abort_task(self, task_id: TaskId) -> None:
f"Task not active, can not abort. task_id={task_id}")

task.status = TaskStatus.aborted
task.end_time = default_now()
task.save()

for subtask in self._get_pending_subtasks(task_id):
Expand Down Expand Up @@ -605,6 +609,15 @@ def get_requested_task_ids() -> List[TaskId]:
tasks = RequestedTask.select(RequestedTask.task_id).execute()
return [task.task_id for task in tasks]

@staticmethod
def count_finished_subtasks(task_id: TaskId) -> float:
return RequestedSubtask.select(
fn.Count(RequestedSubtask.subtask_id)
).where(
RequestedSubtask.task_id == task_id,
RequestedSubtask.status == SubtaskStatus.finished,
).scalar()

@staticmethod
def get_requested_task_subtask_ids(task_id: TaskId) -> List[SubtaskId]:
subtasks = RequestedSubtask.select(RequestedSubtask.subtask_id) \
Expand Down Expand Up @@ -850,6 +863,7 @@ def _time_out_task(self, task_id: TaskId) -> None:
logger.info("Task timed out. task_id=%r", task_id)

task.status = TaskStatus.timeout
task.end_time = default_now()
task.save()

for subtask in self._get_pending_subtasks(task_id):
Expand Down

0 comments on commit 637632b

Please sign in to comment.