From 16a1859449f6fd689318248639d9a33527f3e217 Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Fri, 31 Mar 2017 15:33:52 +0200 Subject: [PATCH] Hyperdrive (#810) Hyperdrive client support --- .travis.yml | 8 + appveyor.yml | 11 + circle.yml | 5 + golem/client.py | 18 +- golem/core/processmonitor.py | 81 ++- golem/network/hyperdrive/__init__.py | 0 golem/network/hyperdrive/client.py | 76 +++ golem/network/hyperdrive/daemon_manager.py | 63 +++ golem/network/ipfs/daemon_manager.py | 4 +- golem/network/p2p/peersession.py | 2 +- golem/resource/base/resourceserver.py | 152 +++--- golem/resource/base/resourcesmanager.py | 498 ++++++++++-------- golem/resource/base/resourcetest.py | 17 +- golem/resource/client.py | 10 +- golem/resource/http/resourcesmanager.py | 4 +- golem/resource/hyperdrive/__init__.py | 0 golem/resource/hyperdrive/resourcesmanager.py | 87 +++ golem/resource/ipfs/resourcesmanager.py | 11 +- golem/resource/resourcesession.py | 2 +- golem/task/result/resultmanager.py | 51 +- golem/task/result/resultpackage.py | 4 +- golem/task/taskcomputer.py | 8 +- golem/task/taskmanager.py | 8 +- golem/task/taskserver.py | 4 +- golem/task/tasksession.py | 18 +- .../ethereum/ethereumtransactionsystem.py | 14 +- gui/startapp.py | 4 +- tests/golem/core/test_processmonitor.py | 94 +++- .../hyperdrive/test_hyperdrive_client.py | 73 +++ .../test_hyperdrive_daemon_manager.py | 73 +++ .../base/test_base_resourcemanager.py | 221 ++++---- .../resource/base/test_base_resourceserver.py | 103 ++-- .../test_hyperdrive_resourcemanager.py | 20 + .../ipfs/test_ipfs_resourcemanager.py | 26 +- tests/golem/resource/test_resourcesession.py | 2 +- tests/golem/task/result/test_resultmanager.py | 6 +- .../test_ethereumtransactionsystem.py | 45 +- tests/gui/test_startapp.py | 25 +- 38 files changed, 1216 insertions(+), 632 deletions(-) create mode 100644 golem/network/hyperdrive/__init__.py create mode 100644 golem/network/hyperdrive/client.py create mode 100644 golem/network/hyperdrive/daemon_manager.py create mode 100644 golem/resource/hyperdrive/__init__.py create mode 100644 golem/resource/hyperdrive/resourcesmanager.py create mode 100644 tests/golem/network/hyperdrive/test_hyperdrive_client.py create mode 100644 tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py create mode 100644 tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py diff --git a/.travis.yml b/.travis.yml index 7e21a4fe3d..ef9e2708f5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,14 @@ before_install: - brew install qt@5.7 openexr - brew install ethereum/ethereum/ethereum ipfs + - rm -rf ~/.ipfs + - ipfs init + - ipfs daemon & + + - git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1 + - cd golem-hyperdrive && npm install --save && cd .. + - node golem-hyperdrive\src\main.js & + - sudo pip install --upgrade pip setuptools packaging pytest mock - pip install https://github.com/golemfactory/golem/wiki/wheels/sip-4.19-cp27-cp27m-macosx_10_12_x86_64.whl - pip install https://github.com/golemfactory/golem/wiki/wheels/PyQt5-5.7.1-cp27-cp27m-macosx_10_12_x86_64.whl diff --git a/appveyor.yml b/appveyor.yml index 233c171680..1b4ce6d76e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -12,6 +12,8 @@ environment: PYTHON_VERSION: "2.7.13" PYTHON_ARCH: "32" + NODEJS_VERSION: "6" + VER_GETH: geth-windows-amd64-1.5.9-a07539fb VER_IPFS: go-ipfs_v0.4.4_windows-386 VER_OPENSSL: openssl-1.0.2k-i386-win32 @@ -56,6 +58,14 @@ install: 7z x %VER_GETH%.zip -y -aoa -oC:\ > NUL ) + # nodejs + - ps: Install-Product node $env:NODEJS_VERSION + + # golem-hyperdrive + - git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1 + - cd golem-hyperdrive && npm install --save && cd .. + - ps: $HyperdriveProcess = Start-Process node golem-hyperdrive\src\main.js -PassThru + # ipfs - if not exist %DIR_IPFS% ( appveyor DownloadFile https://dist.ipfs.io/go-ipfs/v0.4.4/%VER_IPFS%.zip && @@ -103,3 +113,4 @@ test_script: on_finish: - ps: Stop-Process -Id $IPFSProcess.Id + - ps: Stop-Process -Id $HyperdriveProcess.Id diff --git a/circle.yml b/circle.yml index cf64566973..b91c82a1ba 100644 --- a/circle.yml +++ b/circle.yml @@ -1,4 +1,6 @@ machine: + node: + version: 6.9.5 services: - docker post: @@ -15,11 +17,14 @@ dependencies: - pip install https://github.com/golemfactory/golem/wiki/wheels/sip-4.15-py2-none-any.whl - pip install https://github.com/golemfactory/golem/wiki/wheels/PyQt5-5.2.1-py2-none-any.whl - if [ ! -e /usr/local/bin/ipfs ]; then wget https://dist.ipfs.io/go-ipfs/v0.4.5/go-ipfs_v0.4.5_linux-amd64.tar.gz; tar xvfz go-ipfs_v0.4.5_linux-amd64.tar.gz; sudo mv go-ipfs/ipfs /usr/local/bin/ipfs; /usr/local/bin/ipfs init; fi + - git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1; cd golem-hyperdrive; npm install --save; cd - - /usr/local/bin/ipfs config --json Bootstrap "[]" - /usr/local/bin/ipfs config --json SupernodeRouting.Servers "[]" - /usr/local/bin/ipfs config --json Addresses.Swarm '["/ip6/::/tcp/4001", "/ip6/::/udp/4002/utp", "/ip4/0.0.0.0/udp/4002/utp"]' - /usr/local/bin/ipfs daemon: background: true + - node golem-hyperdrive/src/main.js: + background: true test: pre: diff --git a/golem/client.py b/golem/client.py index 165c2f3fff..83a7770eb8 100644 --- a/golem/client.py +++ b/golem/client.py @@ -27,6 +27,7 @@ from golem.monitor.model.nodemetadatamodel import NodeMetadataModel from golem.monitor.monitor import SystemMonitor from golem.monitorconfig import MONITOR_CONFIG +from golem.network.hyperdrive.daemon_manager import HyperdriveDaemonManager from golem.network.p2p.node import Node from golem.network.p2p.p2pservice import P2PService from golem.network.p2p.peersession import PeerSessionInfo @@ -37,7 +38,7 @@ from golem.resource.base.resourceserver import BaseResourceServer from golem.resource.client import AsyncRequest, async_run from golem.resource.dirmanager import DirManager, DirectoryType -from golem.resource.swift.resourcemanager import OpenStackSwiftResourceManager +from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager from golem.rpc.mapping.aliases import Task, Network, Environment, UI from golem.rpc.session import Publisher from golem.task.taskbase import resource_types @@ -129,6 +130,7 @@ def __init__(self, datadir=None, transaction_system=False, connect_to_known_host self.use_docker_machine_manager = use_docker_machine_manager self.connect_to_known_hosts = connect_to_known_hosts self.environments_manager = EnvironmentsManager() + self.daemon_manager = None self.rpc_publisher = None @@ -170,9 +172,13 @@ def start_network(self): self.node.collect_network_info(self.config_desc.seed_host, use_ipv6=self.config_desc.use_ipv6) log.debug("Is super node? %s", self.node.is_super_node()) + # self.ipfs_manager = IPFSDaemonManager(connect_to_bootstrap_nodes=self.connect_to_known_hosts) # self.ipfs_manager.store_client_info() + self.daemon_manager = HyperdriveDaemonManager(self.datadir) + self.daemon_manager.start() + self.p2pservice = P2PService(self.node, self.config_desc, self.keys_auth, connect_to_known_hosts=self.connect_to_known_hosts) self.task_server = TaskServer(self.node, self.config_desc, self.keys_auth, self, @@ -181,7 +187,7 @@ def start_network(self): dir_manager = self.task_server.task_computer.dir_manager - self.resource_server = BaseResourceServer(OpenStackSwiftResourceManager(dir_manager), + self.resource_server = BaseResourceServer(HyperdriveResourceManager(dir_manager), dir_manager, self.keys_auth, self) log.info("Starting p2p server ...") @@ -226,8 +232,12 @@ def quit(self): self.do_work_task.stop() if self.task_server: self.task_server.quit() + if self.transaction_system: + self.transaction_system.stop() if self.diag_service: self.diag_service.unregister_all() + if self.daemon_manager: + self.daemon_manager.stop() dispatcher.send(signal='golem.monitor', event='shutdown') if self.db: self.db.close() @@ -542,8 +552,8 @@ def query_task_state(self, task_id): if state: return DictSerializer.dump(state) - def pull_resources(self, task_id, list_files, client_options=None): - self.resource_server.add_files_to_get(list_files, task_id, client_options=client_options) + def pull_resources(self, task_id, resources, client_options=None): + self.resource_server.download_resources(resources, task_id, client_options=client_options) def add_resource_peer(self, node_name, addr, port, key_id, node_info): self.resource_server.add_resource_peer(node_name, addr, port, key_id, node_info) diff --git a/golem/core/processmonitor.py b/golem/core/processmonitor.py index d5edbc11b1..9e131a8a26 100644 --- a/golem/core/processmonitor.py +++ b/golem/core/processmonitor.py @@ -1,42 +1,70 @@ +import atexit import subprocess import time from multiprocessing import Process -from threading import Thread +from threading import Thread, Lock import psutil class ProcessMonitor(Thread): - def __init__(self, *child_processes): + def __init__(self, *child_processes, **params): super(ProcessMonitor, self).__init__(target=self._start) - self.shutdown_callbacks = [self.kill_processes, self.stop] - self.child_processes = child_processes + + self._child_processes = [] + self._callbacks = params.pop('callbacks', []) + self._lock = Lock() + + self.daemon = True self.working = False + atexit.register(self.exit) + self.add_child_processes(*child_processes) + def _start(self): self.working = True while self.working: - for process in self.child_processes: + for i in xrange(len(self._child_processes) - 1, -1, -1): + process = self._child_processes[i] + if not self.is_process_alive(process): - print("Subprocess {} exited with code {}. Terminating" - .format(process.pid, self.exit_code(process))) - self.exit() - time.sleep(1) + print "Subprocess {} exited with code {}".format(process.pid, + self.exit_code(process)) + if self.working: + self.run_callbacks(process) + self._child_processes.pop(i) - def stop(self): + time.sleep(0.5) + + def stop(self, *_): self.working = False - def exit(self): - for callback in self.shutdown_callbacks: - callback() + def exit(self, *_): + self.stop() + self.kill_processes() + + def add_child_processes(self, *processes): + assert all([self.is_supported(p) for p in processes]) + self._child_processes.extend(processes) + + def add_callbacks(self, *callbacks): + self._callbacks.extend(callbacks) - def add_shutdown_callback(self, callback): - self.shutdown_callbacks.append(callback) + def remove_callbacks(self, *callbacks): + for handler in callbacks: + idx = self._callbacks.index(handler) + if idx != -1: + self._callbacks.pop(idx) - def kill_processes(self): - for process in self.child_processes: + def run_callbacks(self, process=None): + for callback in self._callbacks: + if self.working: + callback(process) + + def kill_processes(self, *_): + for process in self._child_processes: self.kill_process(process) @classmethod @@ -44,8 +72,25 @@ def kill_process(cls, process): if cls.is_process_alive(process): try: process.terminate() + + if isinstance(process, (psutil.Popen, subprocess.Popen)): + process.communicate() + elif isinstance(process, Process): + process.join() + except Exception as exc: print("Error terminating process {}: {}".format(process, exc)) + else: + print "Subprocess {} terminated".format(cls._pid(process)) + + @staticmethod + def _pid(process): + if process: + return process.pid + + @staticmethod + def is_supported(process): + return isinstance(process, (psutil.Popen, subprocess.Popen, Process)) @staticmethod def exit_code(process): @@ -61,5 +106,5 @@ def is_process_alive(process): process.poll() return process.returncode is None elif isinstance(process, Process): - return process.exitcode is None + return process.is_alive() return False diff --git a/golem/network/hyperdrive/__init__.py b/golem/network/hyperdrive/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/golem/network/hyperdrive/client.py b/golem/network/hyperdrive/client.py new file mode 100644 index 0000000000..641beb826e --- /dev/null +++ b/golem/network/hyperdrive/client.py @@ -0,0 +1,76 @@ +import json + +import requests + +from golem.resource.client import IClient, ClientOptions + + +class HyperdriveClient(IClient): + + CLIENT_ID = 'hyperg' + VERSION = 1.0 + + def __init__(self, port=3292, host='127.0.0.1', timeout=None): + super(HyperdriveClient, self).__init__() + + # destination address + self.host = host + self.port = port + # connection / read timeout + self.timeout = timeout + + # default POST request headers + self._url = 'http://{}:{}/api'.format(self.host, self.port) + self._headers = {'content-type': 'application/json'} + + @classmethod + def build_options(cls, node_id, **kwargs): + return ClientOptions(cls.CLIENT_ID, cls.VERSION) + + def diagnostics(self, *args, **kwargs): + raise NotImplementedError() + + def id(self, client_options=None, *args, **kwargs): + response = self._request(command='id') + return response['id'] + + def add(self, files, client_options=None, **kwargs): + response = self._request( + command='upload', + id=kwargs.get('id'), + files=files + ) + return response['hash'] + + def get_file(self, multihash, client_options=None, **kwargs): + dst_path = kwargs.pop('filepath') + response = self._request( + command='download', + hash=multihash, + dest=dst_path + ) + return [(dst_path, multihash, response['files'])] + + def pin_add(self, file_path, multihash): + response = self._request( + command='upload', + files=[file_path], + hash=multihash + ) + return response['hash'] + + def pin_rm(self, multihash): + response = self._request( + command='cancel', + hash=multihash + ) + return response['hash'] + + def _request(self, **data): + response = requests.post(url=self._url, + headers=self._headers, + data=json.dumps(data), + timeout=self.timeout) + + response.raise_for_status() + return json.loads(response.content) diff --git a/golem/network/hyperdrive/daemon_manager.py b/golem/network/hyperdrive/daemon_manager.py new file mode 100644 index 0000000000..f49e81b2d4 --- /dev/null +++ b/golem/network/hyperdrive/daemon_manager.py @@ -0,0 +1,63 @@ +import atexit +import logging +import os +import subprocess +import time + +from requests import ConnectionError + +from golem.core.processmonitor import ProcessMonitor +from golem.network.hyperdrive.client import HyperdriveClient + +logger = logging.getLogger(__name__) + + +class HyperdriveDaemonManager(object): + + _executable = 'hyperg' + + def __init__(self, datadir, **hyperdrive_config): + super(HyperdriveDaemonManager, self).__init__() + + self._config = hyperdrive_config + + # monitor and restart if process dies + self._monitor = ProcessMonitor() + self._monitor.add_callbacks(self._start) + + # hyperdrive data directory + self._dir = os.path.join(datadir, self._executable) + if not os.path.exists(self._dir): + os.makedirs(self._dir) + + atexit.register(self.stop) + + def start(self): + self._monitor.start() + self._start() + + def stop(self): + self._monitor.exit() + + def _command(self): + return [self._executable, '--db', self._dir] + + def _daemon_running(self): + try: + return HyperdriveClient(**self._config).id() + except ConnectionError: + return False + + def _start(self, *_): + # do not supervise already running processes + if self._daemon_running(): + return + + process = subprocess.Popen(self._command()) + while not self._daemon_running(): + time.sleep(0.1) + + if process.poll() is None: + self._monitor.add_child_processes(process) + else: + raise RuntimeError("Cannot start {}".format(self._executable)) diff --git a/golem/network/ipfs/daemon_manager.py b/golem/network/ipfs/daemon_manager.py index 202ea346bf..18ae6c3f53 100644 --- a/golem/network/ipfs/daemon_manager.py +++ b/golem/network/ipfs/daemon_manager.py @@ -1,5 +1,4 @@ import logging -import time import ipaddress @@ -29,7 +28,6 @@ def __init__(self, config=None, connect_to_bootstrap_nodes=True): self.addresses = [] self.meta_addresses = [] self.bootstrap_nodes = set() - self.last_backoff_clear_ts = time.time() if connect_to_bootstrap_nodes: for node in self.config.bootstrap_nodes: @@ -161,7 +159,7 @@ def list_bootstrap_nodes(self, client=None): return [] def _node_action(self, url, method, command, success, error, obj_id=None, async=True): - def closure(): + def closure(*_): self._handle_retries(method, command, url, obj_id=obj_id, raise_exc=True) diff --git a/golem/network/p2p/peersession.py b/golem/network/p2p/peersession.py index 7eed143a3f..331c9410c5 100644 --- a/golem/network/p2p/peersession.py +++ b/golem/network/p2p/peersession.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) -P2P_PROTOCOL_ID = 11 +P2P_PROTOCOL_ID = 12 class PeerSessionInfo(object): diff --git a/golem/resource/base/resourceserver.py b/golem/resource/base/resourceserver.py index fa6d1f3aec..93cc346f89 100644 --- a/golem/resource/base/resourceserver.py +++ b/golem/resource/base/resourceserver.py @@ -1,5 +1,6 @@ import copy import logging +from collections import namedtuple from threading import Lock from enum import Enum @@ -16,11 +17,19 @@ class TransferStatus(Enum): failed = 4 -class BaseResourceServer(object): +class PendingResource(object): + + def __init__(self, resource, task_id, client_options, status): + self.resource = resource + self.task_id = task_id + self.client_options = client_options + self.status = status + - lock = Lock() +class BaseResourceServer(object): def __init__(self, resource_manager, dir_manager, keys_auth, client): + self._lock = Lock() self.client = client self.keys_auth = keys_auth @@ -29,10 +38,7 @@ def __init__(self, resource_manager, dir_manager, keys_auth, client): self.resource_manager = resource_manager self.resource_dir = self.dir_manager.res - - self.resources_to_get = [] - self.waiting_resources = {} - self.waiting_tasks_to_compute = {} + self.pending_resources = {} def change_resource_dir(self, config_desc): if self.dir_manager.root_path == config_desc.root_path: @@ -52,7 +58,7 @@ def get_peers(self): self.client.get_resource_peers() def sync_network(self): - self.get_resources() + self._download_resources() def add_task(self, files, task_id, client_options=None): result = self.resource_manager.add_task(files, task_id, @@ -67,100 +73,64 @@ def _add_task_error(error): def remove_task(self, task_id, client_options=None): self.resource_manager.remove_task(task_id, client_options=client_options) - def add_files_to_get(self, files, task_id, client_options=None): - num = 0 - collected = False - - with self.lock: - for filename, multihash in files: - exists = self.resource_manager.storage.get_path_and_hash(filename, task_id, - multihash=multihash) - if not exists: - self._add_resource_to_get(filename, multihash, task_id, client_options) - num += 1 + def download_resources(self, resources, task_id, client_options=None): + with self._lock: + for resource in resources: + self._add_pending_resource(resource, task_id, client_options) - if num > 0: - self.waiting_tasks_to_compute[task_id] = num - else: - collected = True + collected = not self.pending_resources.get(task_id) if collected: self.client.task_resource_collected(task_id, unpack_delta=False) - def _add_resource_to_get(self, filename, multihash, task_id, client_options): - resource = [filename, multihash, task_id, client_options, TransferStatus.idle] - - if filename not in self.waiting_resources: - self.waiting_resources[filename] = [] - - if task_id not in self.waiting_resources[filename]: - self.waiting_resources[filename].append(task_id) - - self.resources_to_get.append(resource) - - def get_resources(self, async=True): - resources = copy.copy(self.resources_to_get) - - for resource in resources: - if resource[-1] in [TransferStatus.idle, TransferStatus.failed]: - resource[-1] = TransferStatus.transferring - self.pull_resource(resource, async=async) - - def pull_resource(self, resource, async=True): - - filename = resource[0] - multihash = resource[1] - task_id = resource[2] - client_options = resource[3] - - logger.debug("Resource server: pull resource: {} ({})" - .format(filename, multihash)) - - self.resource_manager.pull_resource(filename, - multihash, - task_id, - self.resource_downloaded, - self.resource_download_error, - async=async, - client_options=client_options) - - def resource_downloaded(self, filename, multihash, task_id, *args): - if not filename or not multihash: - self.resource_download_error(Exception("Invalid resource: {} ({})" - .format(filename, multihash)), - filename, multihash, task_id) - return - - collected = self.remove_resource_to_get(filename, task_id) - if collected: - self.client.task_resource_collected(collected, - unpack_delta=False) - - def resource_download_error(self, exc, filename, multihash, task_id, *args): - for entry in self.resources_to_get: - if task_id == entry[2]: - self.remove_resource_to_get(filename, task_id) - self.client.task_resource_failure(task_id, exc) - - def remove_resource_to_get(self, filename, task_id): - collected = None - - with self.lock: - for waiting_task_id in self.waiting_resources.get(filename, []): - self.waiting_tasks_to_compute[waiting_task_id] -= 1 + def _add_pending_resource(self, resource, task_id, client_options): + if task_id not in self.pending_resources: + self.pending_resources[task_id] = [] - if self.waiting_tasks_to_compute[waiting_task_id] <= 0: - collected = waiting_task_id - del self.waiting_tasks_to_compute[waiting_task_id] + self.pending_resources[task_id].append(PendingResource( + resource, task_id, client_options, TransferStatus.idle + )) - self.waiting_resources.pop(filename, None) + def _remove_pending_resource(self, resource, task_id): + with self._lock: + pending_resources = self.pending_resources.get(task_id, []) - for i, entry in enumerate(self.resources_to_get): - if task_id == entry[2] and filename == entry[0]: - del self.resources_to_get[i] + for i, pending_resource in enumerate(pending_resources): + if pending_resource.resource == resource: + pending_resources.pop(i) break - return collected + if not pending_resources: + self.pending_resources.pop(task_id, None) + return task_id + + def _download_resources(self, async=True): + pending = dict(self.pending_resources) + + for task_id, entries in pending.iteritems(): + for entry in list(entries): + if entry.status in [TransferStatus.idle, TransferStatus.failed]: + entry.status = TransferStatus.transferring + self.resource_manager.pull_resource(entry.resource, entry.task_id, + client_options=entry.client_options, + success=self._download_success, + error=self._download_error, + async=async) + + def _download_success(self, resource, task_id): + if resource: + + collected = self._remove_pending_resource(resource, task_id) + if collected: + self.client.task_resource_collected(collected, + unpack_delta=False) + else: + logger.error("Empty resource downloaded for task {}" + .format(task_id)) + + def _download_error(self, error, resource, task_id): + self._remove_pending_resource(resource, task_id) + self.client.task_resource_failure(task_id, error) def get_key_id(self): return self.keys_auth.get_key_id() diff --git a/golem/resource/base/resourcesmanager.py b/golem/resource/base/resourcesmanager.py index 62558a2d3d..d4bd10a760 100644 --- a/golem/resource/base/resourcesmanager.py +++ b/golem/resource/base/resourcesmanager.py @@ -46,78 +46,154 @@ def dir_files(directory): return result +class Resource(object): + + def __init__(self, resource_hash, task_id=None, path=None): + self.hash = resource_hash + self.task_id = task_id + self.path = path + + def __eq__(self, other): + return other and \ + self.task_id == other.task_id and \ + self.hash == other.hash and \ + self.path == other.path + + def __str__(self): + return '({}, task: {})'.format( + self.hash, self.task_id) + + def __unicode__(self): + return unicode(self.__str__()) + + def __repr__(self): + return str(self) + + @property + def exists(self): + return self.path and os.path.exists(self.path) + + def contains_file(self, name): + raise NotImplementedError() + + +class FileResource(Resource): + + def __init__(self, file_name, resource_hash, task_id=None, path=None): + super(FileResource, self).__init__(resource_hash, task_id=task_id, path=path) + self.file_name = norm_path(file_name) + + def __eq__(self, other): + return super(FileResource, self).__eq__(other) and \ + self.file_name == other.file_name + + def __str__(self): + return '{} ({}, task: {})'.format( + self.file_name, self.hash, self.task_id) + + def contains_file(self, name): + return os.path.basename(self.file_name) == name + + +class ResourceBundle(Resource): + + def __init__(self, files, bundle_hash, task_id=None, path=None): + super(ResourceBundle, self).__init__(bundle_hash, task_id=task_id, path=path) + self._files = None + self._files_split = None + self.files = files + + def __eq__(self, other): + return super(ResourceBundle, self).__eq__(other) and \ + self.files == other.files + + def __str__(self): + return '{} (bundle: {}, task: {})'.format( + self.files, self.hash, self.task_id) + + @property + def files(self): + return self._files + + @files.setter + def files(self, value): + if value: + self._files_split = [split_path(v) for v in value] + else: + self._files_split = [] + self._files = value + + @property + def files_split(self): + return self._files_split[:] + + def contains_file(self, name): + if self._files: + return any([os.path.basename(f) == name + for f in self._files]) + return False + + class ResourceCache(object): def __init__(self): self._lock = Lock() - # hash to file/dir path - self._hash_to_path = dict() - # file/dir path to hash - self._path_to_hash = dict() - # category to relative file path - self._cat_to_res = dict() - # category to common path for resources - self._cat_to_prefix = dict() - - def set_path(self, resource_hash, path): - self._hash_to_path[resource_hash] = path - self._path_to_hash[path] = resource_hash - - def get_path(self, resource_hash, default=None): - return self._hash_to_path.get(resource_hash, default) - - def get_hash(self, path, default=None): - return self._path_to_hash.get(path, default) - - def remove_path(self, resource_hash): - path = self._hash_to_path.pop(resource_hash, None) - self._path_to_hash.pop(path, None) - return path - - def add_resource(self, category, resource): + # hash to resource + self._hash_to_res = dict() + # path to resource + self._path_to_res = dict() + # task to resources + self._task_to_res = dict() + # task to resource common prefix + self._task_to_prefix = dict() + + def add_resource(self, resource): + task_id = resource.task_id + with self._lock: - resource_list = self._cat_to_res.get(category) + resource_list = self._task_to_res.get(task_id) if not resource_list: - self._cat_to_res[category] = resource_list = list() + self._task_to_res[task_id] = resource_list = list() resource_list.append(resource) - def has_resource(self, category, resource): - return resource in self.get_resources(category) - - def has_file(self, entry, category): - return entry in self._cat_to_res.get(category, []) + self._hash_to_res[resource.hash] = resource + self._path_to_res[resource.path] = resource - def set_resources(self, category, resources): - with self._lock: - self._cat_to_res[category] = resources + def get_by_hash(self, resource_hash, default=None): + return self._hash_to_res.get(resource_hash, default) - def get_resources(self, category, default=None): - return self._cat_to_res.get(category, default or []) + def get_by_path(self, resource_path, default=None): + return self._path_to_res.get(resource_path, default) - def remove_resources(self, category): - with self._lock: - return self._cat_to_res.pop(category, []) + def has_resource(self, resource): + if resource.task_id and resource.task_id not in self._task_to_res: + return False + if resource.hash and resource.hash not in self._hash_to_res: + return False + return resource.path in self._path_to_res - def set_prefix(self, category, prefix): - self._cat_to_prefix[category] = prefix + def get_resources(self, task_id, default=None): + return self._task_to_res.get(task_id, default or []) - def get_prefix(self, category, default=''): - return self._cat_to_prefix.get(category, default) + def set_prefix(self, task_id, prefix): + self._task_to_prefix[task_id] = norm_path(prefix) - def remove_prefix(self, category): - return self._cat_to_prefix.pop(category, None) + def get_prefix(self, task_id, default=''): + return self._task_to_prefix.get(task_id, default) - def remove(self, category): - resources = self.remove_resources(category) + def remove(self, task_id): + resources = self._task_to_res.pop(task_id, []) for r in resources: - self.remove_path(r[1]) - self.remove_prefix(category) + self._hash_to_res.pop(r.hash, None) + self._path_to_res.pop(r.path, None) + self._task_to_prefix.pop(task_id, None) + return resources def clear(self): - self._hash_to_path = dict() - self._path_to_hash = dict() - self._cat_to_res = dict() - self._cat_to_prefix = dict() + self._hash_to_res = dict() + self._path_to_res = dict() + self._task_to_res = dict() + self._task_to_prefix = dict() class ResourceStorage(object): @@ -130,63 +206,36 @@ def __init__(self, dir_manager, resource_dir_method): def list_dir(self, dir_name): return self.dir_manager.list_dir_names(dir_name) - def get_dir(self, category): - return norm_path(self.resource_dir_method(category)) + def get_dir(self, task_id): + return norm_path(self.resource_dir_method(task_id)) - def get_path(self, relative_file_path, category): - resource_dir = self.get_dir(category) + def get_path(self, relative_file_path, task_id): + resource_dir = self.get_dir(task_id) return os.path.join(resource_dir, norm_path(relative_file_path)) - def get_path_and_hash(self, path, category, - multihash=None, absolute_path=False): - - if absolute_path: - res_path = norm_path(path) - else: - res_path = self.get_path(path, category) - - res_path = to_unicode(res_path) - - if multihash: - if self.cache.get_path(multihash) == res_path: - return res_path, multihash - else: - multihash = self.cache.get_hash(res_path) - if multihash: - return res_path, multihash - def get_root(self): return self.dir_manager.get_node_dir() def get_resources(self, task_id): return self.cache.get_resources(task_id) - @staticmethod - def split_resources(resources): - return [[split_path(r[0])] + r[1:] for r in resources] - - @staticmethod - def join_resources(resources): - results = [] - - for r in resources: - if not r: - continue - elif isinstance(r[0], basestring): - results.append(r) - elif r[0]: - results.append([os.path.join(*r[0])] + r[1:]) + def has_resource(self, resource): + return self.cache.has_resource(resource) and resource.exists - return results + def relative_path(self, path, task_id): - def relative_path(self, path, category): path = norm_path(path) - common_prefix = self.cache.get_prefix(category) - return_path = path.replace(common_prefix, '', 1) + common_prefix = self.cache.get_prefix(task_id) + + if path.startswith(common_prefix): + return_path = path.replace(common_prefix, '', 1) + else: + return_path = path if common_prefix: while return_path and return_path.startswith(os.path.sep): return_path = return_path[1:] + return return_path def copy_dir(self, src_dir): @@ -198,33 +247,26 @@ def copy_dir(self, src_dir): copy_file_tree(src_dir, root_dir) return True - def copy_file(self, src_path, dst_relative_path, category): + def copy(self, src_path, dst_relative_path, task_id): - dst_path = self.get_path(dst_relative_path, category) - src_path = norm_path(src_path) dst_relative_path = norm_path(dst_relative_path) + dst_path = self.get_path(dst_relative_path, task_id) + src_path = norm_path(src_path) make_path_dirs(dst_path) - if os.path.exists(dst_path): + + if os.path.isfile(dst_path): os.remove(dst_path) + elif os.path.isdir(dst_path): + shutil.rmtree(dst_path) - try: + if os.path.isfile(src_path): shutil.copyfile(src_path, dst_path) - except OSError as e: - logger.error("Resource storage: Error copying {} (as {}): {}" - .format(src_path, dst_relative_path, e)) + elif os.path.isdir(src_path): + copy_file_tree(src_path, dst_path) else: - return True - - def copy_cached(self, dst_relative_path, res_hash, category): - - cached_path = self.cache.get_path(res_hash) - if cached_path and os.path.exists(cached_path): - - if self.copy_file(cached_path, dst_relative_path, category): - logger.debug("Resource storage: Resource copied {} ({})" - .format(dst_relative_path, res_hash)) - return True + raise ValueError("Error reading source path: '{}'" + .format(src_path)) def clear_cache(self): self.cache.clear() @@ -240,7 +282,8 @@ def __init__(self, dir_manager, resource_dir_method=None): self.download_queue = deque() self.current_downloads = 0 - self.storage = ResourceStorage(dir_manager, resource_dir_method or dir_manager.get_task_resource_dir) + self.storage = ResourceStorage(dir_manager, resource_dir_method + or dir_manager.get_task_resource_dir) self.index_resources(self.storage.get_root()) if not hasattr(self, 'commands'): @@ -259,6 +302,42 @@ def pin_resource(self, multihash, client=None, client_options=None): def unpin_resource(self, multihash, client=None, client_options=None): pass + def get_resources(self, task_id): + return self.storage.get_resources(task_id) + + def to_wire(self, resources): + + if len(resources) == 1: + resource = next(iter(resources)) + return [[os.path.basename(resource.path), resource.hash]] + else: + relative = self.storage.relative_path + return [[split_path(relative(r.path, r.task_id)), r.hash] + for r in resources] + + def from_wire(self, resources): + results = [] + + for r in resources: + if not r: + continue + elif isinstance(r[0], basestring): + results.append(r) + elif r[0]: + results.append([os.path.join(*r[0])] + r[1:]) + + return results + + def remove_task(self, task_id, + client=None, client_options=None): + + resources = self.storage.cache.remove(task_id) + if resources: + for resource in resources: + self.unpin_resource(resource.hash, + client=client, + client_options=client_options) + def add_task(self, files, task_id, client=None, client_options=None): @@ -270,7 +349,7 @@ def _add_task(self, files, task_id, client=None, client_options=None): if self.storage.cache.get_prefix(task_id): - logger.warn("Resource manager: Not re-adding task {}" + logger.warn("Resource manager: Task {} already exists" .format(task_id)) return @@ -281,104 +360,86 @@ def _add_task(self, files, task_id, else: prefix = common_dir(files) - prefix = norm_path(prefix) self.storage.cache.set_prefix(task_id, prefix) - self.add_resources(files, task_id, - absolute_path=True, - client=client, - client_options=client_options) + self.add_files(files, task_id, + absolute_path=True, + client=client, + client_options=client_options) - def remove_task(self, task_id, **kwargs): - self.storage.cache.remove(task_id) + def add_files(self, files, task_id, + absolute_path=False, client=None, + client_options=None): - def add_resources(self, resources, task_id, - absolute_path=False, client=None, client_options=None): - client = client or self.new_client() + if files: + client = client or self.new_client() - if resources: - for resource in resources: - self.add_resource(resource, task_id, - absolute_path=absolute_path, - client=client, - client_options=client_options) + for path in files: + self.add_file(path, task_id, + absolute_path=absolute_path, + client=client, + client_options=client_options) - def add_resource(self, resource, task_id, - absolute_path=False, client=None, client_options=None): + def add_file(self, path, task_id, + absolute_path=False, client=None, + client_options=None): client = client or self.new_client() - resource = norm_path(resource) + path = norm_path(path) if absolute_path: - resource_path = resource + file_path = path else: - resource_path = self.storage.get_path(resource, task_id) + file_path = self.storage.get_path(path, task_id) - if not os.path.exists(resource_path): + if not os.path.exists(file_path): raise RuntimeError("File '{}' does not exist" - .format(resource_path)) - - disk_resource = self.storage.get_path_and_hash(resource, task_id) + .format(file_path)) - if disk_resource: - if self.storage.cache.has_file(disk_resource, task_id): - logger.debug("Resource manager: resource '{}' already exists in task {}" - .format(resource, task_id)) - else: - file_path, multihash = disk_resource - self._cache_resource(resource, file_path, multihash, task_id) + local = self.storage.cache.get_by_path(path) + if local and local.exists and local.task_id == task_id: + logger.debug("Resource manager: file '{}' already exists in task {}" + .format(path, task_id)) else: - is_dir = os.path.isdir(resource_path) response = self._handle_retries(client.add, self.commands.add, - resource_path, - recursive=is_dir, + file_path, client_options=client_options) - self._cache_response(resource, response, task_id) + self._cache_response(path, response, task_id) - def copy_resources(self, from_dir): + def copy_files(self, from_dir): self.storage.copy_dir(from_dir) AbstractResourceManager.__init__(self, self.storage.dir_manager, self.storage.resource_dir_method) - def pull_resource(self, filename, multihash, task_id, + def pull_resource(self, entry, task_id, success, error, client=None, client_options=None, async=True, pin=True): - filename = norm_path(filename) + resource = self._wrap_resource(entry, task_id) - if self.storage.get_path_and_hash(filename, task_id, multihash=multihash): - success(filename, multihash, task_id) + if self.storage.has_resource(resource): + success(entry, task_id) return - def success_wrapper(result, *args, **kwargs): + def success_wrapper(response, **_): self.__dec_downloads() - - result_filename = result['Name'] - result_multihash = result['Hash'] - result_path = self.storage.get_path(result_filename, task_id) - - self._clear_retry(self.commands.get, result_multihash) + self._clear_retry(self.commands.get, resource.hash) if pin: - self._cache_resource(result_filename, - result_path, - result_multihash, - task_id) - self.pin_resource(result_multihash) + self._cache_resource(resource) + self.pin_resource(resource.hash) logger.debug("Resource manager: {} ({}) downloaded" - .format(result_filename, result_multihash)) + .format(resource.path, resource.hash)) - success(filename, result_multihash, task_id) + success(entry, task_id) self.__process_queue() - def error_wrapper(exc, *args, **kwargs): + def error_wrapper(exception, **_): self.__dec_downloads() - if self._can_retry(exc, self.commands.get, multihash): - self.pull_resource(filename, - multihash, - task_id, + if self._can_retry(exception, self.commands.get, resource.hash): + self.pull_resource(entry, task_id, client_options=client_options, success=success, error=error, @@ -386,36 +447,36 @@ def error_wrapper(exc, *args, **kwargs): pin=pin) else: logger.error("Resource manager: error downloading {} ({}): {}" - .format(filename, multihash, exc)) + .format(resource.path, resource.hash, exception)) - error(exc, filename, multihash, task_id) + error(exception, entry, task_id) self.__process_queue() - cached_path = self.storage.cache.get_path(multihash) - make_path_dirs(self.storage.get_path(filename, task_id)) + make_path_dirs(self.storage.get_path(resource.path, task_id)) + local = self.storage.cache.get_by_hash(resource.hash) - if cached_path and os.path.exists(cached_path): + if local: self.__inc_downloads() try: - self.storage.copy_cached(filename, multihash, task_id) + self.storage.copy(local.path, resource.path, task_id) except Exception as exc: error_wrapper(exc) else: - success_wrapper(dict(Name=filename, Hash=multihash)) + success_wrapper(entry) else: if self.__can_download(): self.__inc_downloads() - self.__pull(filename, multihash, task_id, + self.__pull(resource, task_id, success=success_wrapper, error=error_wrapper, client=client, client_options=client_options, async=async) else: - self.__push_to_queue(filename, multihash, task_id, + self.__push_to_queue(entry, task_id, success, error, client, client_options, async, pin) @@ -423,62 +484,55 @@ def command_failed(self, exc, cmd, obj_id, **kwargs): logger.error("Resource manager: Error executing command '{}': {}" .format(cmd.name, exc)) - def _cache_response(self, resource, response, task_id): + def wrap_file(self, resource): + return resource + + def _wrap_resource(self, resource, task_id=None): + resource_path, resource_hash = resource + path = self.storage.get_path(resource_path, task_id) + return FileResource(resource_path, resource_hash, + task_id=task_id, path=path) + + def _cache_response(self, resource_name, response, task_id): if isinstance(response, list): for entry in response: - self._cache_response(resource, entry, task_id) + self._cache_response(resource_name, entry, task_id) + elif response and 'Hash' in response and 'Name' in response: - if os.path.basename(response.get('Name')) != os.path.basename(resource): + if os.path.basename(response.get('Name')) != os.path.basename(resource_name): raise Exception("Resource manager: Invalid response {}".format(response)) - res_path = to_unicode(resource) + res_path = to_unicode(resource_name) res_hash = to_unicode(response.get('Hash')) + resource = self._wrap_resource((res_path, res_hash), task_id) + self._cache_resource(resource) - self._cache_resource(resource, res_path, res_hash, task_id) - - def _cache_resource(self, resource, res_path, res_hash, task_id): + def _cache_resource(self, resource): """ Caches information on a new resource. - :param resource: original resource name - :param res_path: resource path - :param res_hash: resource hash - :param task_id: current task identifier - :return: file's path relative to task resource path + :param resource: resource object """ - if not os.path.exists(res_path): - if os.path.isabs(res_path): - raise Exception("Resource manager: File not found {} ({})" - .format(res_path, res_hash)) - return - - norm_file_path = norm_path(res_path) - norm_resource = norm_path(resource) - - if not norm_file_path.endswith(norm_resource): - raise Exception("Resource manager: Invalid resource path {} ({})" - .format(res_path, res_hash)) - - name = self.storage.relative_path(res_path, task_id) - self.storage.cache.add_resource(task_id, [name, res_hash]) - - if res_hash: - self.storage.cache.set_path(res_hash, res_path) + if os.path.exists(resource.path): + self.storage.cache.add_resource(resource) + logger.debug("Resource manager: Resource cached: {}".format(resource)) else: - logger.warn("Resource manager: No hash provided for {}".format(res_path)) - - logger.debug("Resource manager: Resource registered {} ({})".format(res_path, res_hash)) - return name + if os.path.isabs(resource.path): + raise Exception("Resource manager: File not found {} ({})" + .format(resource.path, resource.hash)) + logger.warn("Resource does not exist: {}" + .format(resource.path)) - def __pull(self, filename, multihash, task_id, + def __pull(self, resource, task_id, success, error, client=None, client_options=None, async=True): client = client or self.new_client() directory = self.storage.get_dir(task_id) + file_name = self.storage.relative_path(resource.path, task_id) kwargs = dict( - multihash=multihash, - filename=filename, + multihash=resource.hash, + filename=file_name, filepath=directory, client_options=client_options ) diff --git a/golem/resource/base/resourcetest.py b/golem/resource/base/resourcetest.py index 40e7c994ab..85b1e05ee6 100644 --- a/golem/resource/base/resourcetest.py +++ b/golem/resource/base/resourcetest.py @@ -79,7 +79,7 @@ def setUp(self): task_server_1.client = self.client_1 task_server_2.client = self.client_2 task_server_1.keys_auth = self.client_1.keys_auth - task_server_1.keys_auth = self.client_2.keys_auth + task_server_2.keys_auth = self.client_2.keys_auth task_server_1.sync_network = task_server_2.sync_network = Mock() task_server_1.start_accepting = task_server_2.start_accepting = Mock() task_server_1.task_computer = task_server_2.task_computer = Mock() @@ -125,9 +125,16 @@ def test(self): assert msg self.task_session_2._react_to_resource_list(msg) - self.resource_server_2.get_resources(async=False) + self.resource_server_2._download_resources(async=False) for r in self.resources_relative: - sha_256_1 = file_sha_256(os.path.join(self.resource_dir_1, r)) - sha_256_2 = file_sha_256(os.path.join(self.resource_dir_2, r)) - assert sha_256_1 == sha_256_2 + location_1 = os.path.join(self.resource_dir_1, r) + location_2 = os.path.join(self.resource_dir_2, r) + + assert os.path.exists(location_1) + assert os.path.exists(location_2) + + sha_256_1 = file_sha_256(location_1) + sha_256_2 = file_sha_256(location_2) + assert sha_256_1 == sha_256_2, '{} != {}'.format( + sha_256_1.encode('hex'), sha_256_2.encode('hex')) diff --git a/golem/resource/client.py b/golem/resource/client.py index 88cd9bf313..e4e56c9b48 100644 --- a/golem/resource/client.py +++ b/golem/resource/client.py @@ -191,7 +191,6 @@ def _handle_retries(self, method, cmd, *args, **kwargs): while not result: try: result = method(*args, **kwargs) - break except Exception as exc: self.command_failed(exc, cmd, obj_id) @@ -199,11 +198,10 @@ def _handle_retries(self, method, cmd, *args, **kwargs): self._clear_retry(cmd, obj_id) if raise_exc: raise exc - result = None break - - self._clear_retry(cmd, obj_id) - return result + else: + self._clear_retry(cmd, obj_id) + return result @staticmethod def _async_call(method, success, error, *args, **kwargs): @@ -253,7 +251,7 @@ class TestClient(IClient): _id = "test" def add(self, resource_path, **_): - resource_hash = str(uuid.uuid4()) + resource_hash = 'hash_' + str(uuid.uuid4()) self._resources[resource_hash] = resource_path return dict( diff --git a/golem/resource/http/resourcesmanager.py b/golem/resource/http/resourcesmanager.py index f11fc7ffba..32c3d00786 100644 --- a/golem/resource/http/resourcesmanager.py +++ b/golem/resource/http/resourcesmanager.py @@ -42,7 +42,7 @@ def build_options(cls, node_id, **kwargs): return ClientOptions(cls.CLIENT_ID, cls.VERSION, options) - def add(self, files, recursive=False, **kwargs): + def add(self, files, **kwargs): results = [] if files: @@ -57,7 +57,7 @@ def add(self, files, recursive=False, **kwargs): }) else: for f in files: - results.extend(self.add(f, recursive=False, **kwargs)) + results.extend(self.add(f, **kwargs)) return results diff --git a/golem/resource/hyperdrive/__init__.py b/golem/resource/hyperdrive/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/golem/resource/hyperdrive/resourcesmanager.py b/golem/resource/hyperdrive/resourcesmanager.py new file mode 100644 index 0000000000..cebb0bcab7 --- /dev/null +++ b/golem/resource/hyperdrive/resourcesmanager.py @@ -0,0 +1,87 @@ +import logging +import os +import uuid + +from golem.network.hyperdrive.client import HyperdriveClient +from golem.resource.base.resourcesmanager import AbstractResourceManager, ResourceBundle +from golem.resource.client import ClientHandler, ClientConfig, ClientCommands + +logger = logging.getLogger(__name__) + + +class HyperdriveResourceManager(ClientHandler, AbstractResourceManager): + + def __init__(self, dir_manager, config=None, resource_dir_method=None): + ClientHandler.__init__(self, ClientCommands, config or ClientConfig()) + AbstractResourceManager.__init__(self, dir_manager, resource_dir_method) + + def new_client(self): + return HyperdriveClient(**self.config.client) + + def build_client_options(self, node_id, **kwargs): + return HyperdriveClient.build_options(node_id, **kwargs) + + def to_wire(self, resources): + return [[r.hash, r.files_split] + for r in resources] + + def from_wire(self, resources): + return [[r[0], [os.path.join(*x) for x in r[1]]] + for r in resources] + + def add_files(self, files, task_id, + absolute_path=False, client=None, client_options=None): + + if not files: + logger.warn("Resource manager: trying to add an empty file collection for task {}" + .format(task_id)) + return + + files = {path: self.storage.relative_path(path, task_id) + for path in files} + + return self._add_files(files, task_id, + client=client, + client_options=client_options) + + def add_file(self, path, task_id, + absolute_path=False, client=None, client_options=None): + + files = {path: os.path.basename(path)} + + return self._add_files(files, task_id, + client=client, + client_options=client_options) + + def _add_files(self, files, task_id, + client=None, client_options=None): + + for f in files.iterkeys(): + if not os.path.exists(f): + logger.error("Resource manager: file '{}' does not exist" + .format(f)) + return + + client = client or self.new_client() + response = self._handle_retries(client.add, + self.commands.add, + files, + id=task_id, + client_options=client_options, + obj_id=str(uuid.uuid4())) + + self._cache_response(files.values(), response, task_id) + + def wrap_file(self, resource): + resource_path, resource_hash = resource + return resource_hash, [resource_path] + + def _wrap_resource(self, resource, task_id=None): + resource_hash, files = resource + path = self.storage.get_path('', task_id) + return ResourceBundle(files, resource_hash, + task_id=task_id, path=path) + + def _cache_response(self, resources, resource_hash, task_id): + res = self._wrap_resource((resource_hash, resources), task_id) + self._cache_resource(res) diff --git a/golem/resource/ipfs/resourcesmanager.py b/golem/resource/ipfs/resourcesmanager.py index ba43737ac2..ef8acddd01 100644 --- a/golem/resource/ipfs/resourcesmanager.py +++ b/golem/resource/ipfs/resourcesmanager.py @@ -21,8 +21,15 @@ def index_resources(self, dir_name, client=None, client_options=None): task_ids = self.storage.list_dir(dir_name) for task_id in task_ids: - task_root_dir = self.storage.dir_manager.get_task_resource_dir(task_id) - self._add_task(dir_files(task_root_dir), task_id) + # FIXME: review directory structure + if 'benchmark' in task_id: + continue + try: + task_root_dir = self.storage.dir_manager.get_task_resource_dir(task_id) + self._add_task(dir_files(task_root_dir), task_id) + except Exception as e: + logger.warn("Couldn't load task resources ({}): {}" + .format(task_id, e)) def pin_resource(self, multihash, client=None, client_options=None): if not client: diff --git a/golem/resource/resourcesession.py b/golem/resource/resourcesession.py index 5df8ef2bb0..27c89df98f 100644 --- a/golem/resource/resourcesession.py +++ b/golem/resource/resourcesession.py @@ -131,7 +131,7 @@ def full_data_received(self, **kwargs): self.resource_server.add_resource_to_send(self.file_name, self.copies) self.copies = 0 else: - self.resource_server.resource_downloaded(self.file_name, self.address, self.port) + self.resource_server._download_success(self.file_name, self.address, self.port) self.dropped() self.file_name = None diff --git a/golem/task/result/resultmanager.py b/golem/task/result/resultmanager.py index 4f076832d6..277fcfc179 100644 --- a/golem/task/result/resultmanager.py +++ b/golem/task/result/resultmanager.py @@ -1,7 +1,6 @@ import abc import logging import os -import uuid from golem.core.fileencrypt import FileEncryptor from golem.resource.client import async_run @@ -29,8 +28,8 @@ def extract(self, path, output_dir=None, **kwargs): class EncryptedResultPackageManager(TaskResultPackageManager): - min_secret_len = 12 - max_secret_len = 24 + min_secret_len = 16 + max_secret_len = 32 package_class = EncryptingTaskResultPackager def __init__(self, resource_manager): @@ -43,24 +42,25 @@ def gen_secret(self): def pull_package(self, multihash, task_id, subtask_id, key_or_secret, success, error, async=True, client_options=None, output_dir=None): - filename = str(uuid.uuid4()) - path = self.resource_manager.storage.get_path(filename, task_id) - output_dir = os.path.join(output_dir or os.path.dirname(path), subtask_id) + file_name = task_id + "." + subtask_id + file_path = self.resource_manager.storage.get_path(file_name, task_id) + output_dir = os.path.join(output_dir or os.path.dirname(file_path), subtask_id) + + if os.path.exists(file_path): + os.remove(file_path) def package_downloaded(*args, **kwargs): - request = AsyncRequest(self.extract, - path, + request = AsyncRequest(self.extract, file_path, output_dir=output_dir, key_or_secret=key_or_secret) async_run(request, package_extracted, error) def package_extracted(extracted_pkg, *args, **kwargs): success(extracted_pkg, multihash, task_id, subtask_id) - os.remove(path) + os.remove(file_path) - self.resource_manager.pull_resource(filename, - multihash, - task_id, + resource = self.resource_manager.wrap_file((file_name, multihash)) + self.resource_manager.pull_resource(resource, task_id, client_options=client_options, success=package_downloaded, error=error, @@ -72,26 +72,25 @@ def create(self, node, task_result, client_options=None, key_or_secret=None): raise ValueError("Empty key / secret") task_id = task_result.task_id - out_name = task_id + "." + task_result.subtask_id - out_path = self.resource_manager.storage.get_path(out_name, task_id) + file_name = task_id + "." + task_result.subtask_id + file_path = self.resource_manager.storage.get_path(file_name, task_id) - if os.path.exists(out_path): - os.remove(out_path) + if os.path.exists(file_path): + os.remove(file_path) packager = self.package_class(key_or_secret) - package = packager.create(out_path, - node=node, - task_result=task_result) + path = packager.create(file_path, + node=node, + task_result=task_result) - self.resource_manager.add_resource(package, task_id, client_options=client_options) - files = self.resource_manager.storage.get_resources(task_id) + self.resource_manager.add_file(path, task_id, + client_options=client_options) - for file_obj in files: - name = file_obj if isinstance(file_obj, basestring) else file_obj[0] - if os.path.basename(name) == out_name: - return file_obj + for resource in self.resource_manager.get_resources(task_id): + if resource.contains_file(file_name): + return file_path, resource.hash - if os.path.exists(package): + if os.path.exists(path): raise EnvironmentError("Error creating package: 'add' command failed") raise Exception("Error creating package: file not found") diff --git a/golem/task/result/resultpackage.py b/golem/task/result/resultpackage.py index b24e6eb189..cf5ce20a64 100644 --- a/golem/task/result/resultpackage.py +++ b/golem/task/result/resultpackage.py @@ -146,8 +146,8 @@ def create(self, output_path, node=None, task_result=None, **kwargs): disk_files, cbor_files = self.__collect_files(task_result, - disk_files=disk_files, - cbor_files=cbor_files) + disk_files=disk_files, + cbor_files=cbor_files) descriptor = TaskResultDescriptor(node, task_result) cbor_files.append((self.descriptor_file_name, descriptor)) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index eec8f90d61..dde94a929b 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -359,8 +359,8 @@ def wait(self, wait=True, ttl=None): else: self.waiting_ttl = ttl - def reset(self, counting_task=False): - self.counting_task = counting_task + def reset(self, computing_task=False): + self.counting_task = computing_task self.use_waiting_ttl = False self.task_requested = False self.waiting_for_task = None @@ -391,12 +391,12 @@ def __request_resource(self, task_id, resource_header, return_address, return_po def __compute_task(self, subtask_id, docker_images, src_code, extra_data, short_desc, task_timeout): - self.reset(counting_task=True) - task_id = self.assigned_subtasks[subtask_id].task_id working_dir = self.assigned_subtasks[subtask_id].working_directory unique_str = str(uuid.uuid4()) + self.reset(computing_task=task_id) + with self.dir_lock: resource_dir = self.resource_manager.get_resource_dir(task_id) temp_dir = os.path.join(self.resource_manager.get_temporary_dir(task_id), unique_str) diff --git a/golem/task/taskmanager.py b/golem/task/taskmanager.py index ab474a8532..1e2ba6af99 100644 --- a/golem/task/taskmanager.py +++ b/golem/task/taskmanager.py @@ -13,7 +13,7 @@ from golem.network.transport.tcpnetwork import SocketAddress from golem.resource.client import AsyncRequest, async_run from golem.resource.dirmanager import DirManager -from golem.resource.swift.resourcemanager import OpenStackSwiftResourceManager +from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager from golem.task.result.resultmanager import EncryptedResultPackageManager from golem.task.taskbase import ComputeTaskDef, TaskEventListener from golem.task.taskkeeper import CompTaskKeeper, compute_subtask_value @@ -63,8 +63,10 @@ def __init__(self, node_name, node, keys_auth, listen_address="", listen_port=0, self.root_path = root_path self.dir_manager = DirManager(self.get_task_manager_root()) - resource_manager = OpenStackSwiftResourceManager(self.dir_manager, - resource_dir_method=self.dir_manager.get_task_temporary_dir) + # resource_manager = OpenStackSwiftResourceManager(self.dir_manager, + # resource_dir_method=self.dir_manager.get_task_temporary_dir) + resource_manager = HyperdriveResourceManager(self.dir_manager, + resource_dir_method=self.dir_manager.get_task_temporary_dir) self.task_result_manager = EncryptedResultPackageManager(resource_manager) self.activeStatus = [TaskStatus.computing, TaskStatus.starting, TaskStatus.waiting] diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index 5cd0e65cef..d42d7185d0 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -125,8 +125,8 @@ def request_resource(self, subtask_id, resource_header, address, port, key_id, t logger.error("Cannot map subtask_id {} to session".format(subtask_id)) return subtask_id - def pull_resources(self, task_id, list_files, client_options=None): - self.client.pull_resources(task_id, list_files, client_options=client_options) + def pull_resources(self, task_id, resources, client_options=None): + self.client.pull_resources(task_id, resources, client_options=client_options) def send_results(self, subtask_id, task_id, result, computing_time, owner_address, owner_port, owner_key_id, owner, node_name): diff --git a/golem/task/tasksession.py b/golem/task/tasksession.py index 513329adf6..b45daee0eb 100644 --- a/golem/task/tasksession.py +++ b/golem/task/tasksession.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) -TASK_PROTOCOL_ID = 12 +TASK_PROTOCOL_ID = 13 def drop_after_attr_error(*args, **kwargs): @@ -496,7 +496,7 @@ def _react_to_delta_parts(self, msg): def _react_to_resource_list(self, msg): resource_manager = self.task_server.client.resource_server.resource_manager - resources = resource_manager.storage.join_resources(msg.resources) + resources = resource_manager.from_wire(msg.resources) client_options = msg.options self.task_computer.wait_for_resources(self.task_id, resources) @@ -660,8 +660,8 @@ def __send_resource_parts_list(self, msg): def __send_resource_list(self, msg): resource_manager = self.task_server.client.resource_server.resource_manager client_options = resource_manager.build_client_options(self.task_server.get_key_id()) - res = resource_manager.storage.get_resources(msg.task_id) - res = resource_manager.storage.split_resources(res) + res = resource_manager.get_resources(msg.task_id) + res = resource_manager.to_wire(res) self.send(MessageResourceList(res, options=client_options)) def __send_resource_format(self, use_distributed_resource): @@ -687,11 +687,13 @@ def __send_result_hash(self, res): subtask_id = res.subtask_id secret = task_result_manager.gen_secret() - def success(output): - logger.debug("Task session: sending task result hash: {}".format(output)) + def success(result): + result_path, result_hash = result + logger.debug("Task session: sending task result hash: {} ({})" + .format(result_path, result_hash)) - file_name, multihash = output - self.send(MessageTaskResultHash(subtask_id, multihash, secret, options=client_options)) + self.send(MessageTaskResultHash(subtask_id, result_hash, + secret, options=client_options)) def error(exc): logger.error("Couldn't create a task result package for subtask {}: {}" diff --git a/golem/transactions/ethereum/ethereumtransactionsystem.py b/golem/transactions/ethereum/ethereumtransactionsystem.py index 832fb7931a..f5c8d34884 100644 --- a/golem/transactions/ethereum/ethereumtransactionsystem.py +++ b/golem/transactions/ethereum/ethereumtransactionsystem.py @@ -27,14 +27,20 @@ def __init__(self, datadir, node_priv_key): self.__node_address = keys.privtoaddr(node_priv_key) log.info("Node Ethereum address: " + self.get_payment_address()) - datadir = path.join(datadir, "ethereum") - eth_node = Client() - self.__proc = PaymentProcessor(eth_node, node_priv_key, faucet=True) + self.__eth_node = Client() + self.__proc = PaymentProcessor(self.__eth_node, node_priv_key, faucet=True) self.__proc.start() - self.__monitor = PaymentMonitor(eth_node, self.__node_address) + self.__monitor = PaymentMonitor(self.__eth_node, self.__node_address) self.__monitor.start() # TODO: We can keep address in PaymentMonitor only + def stop(self): + if self.__proc.running: + self.__proc.stop() + if self.__monitor.running: + self.__monitor.stop() + self.__eth_node.node.stop() + def add_payment_info(self, *args, **kwargs): payment = super(EthereumTransactionSystem, self).add_payment_info(*args, **kwargs) self.__proc.add(payment) diff --git a/gui/startapp.py b/gui/startapp.py index 036217107a..0a811328c8 100644 --- a/gui/startapp.py +++ b/gui/startapp.py @@ -16,7 +16,7 @@ apps_manager.load_apps() -def stop_reactor(): +def stop_reactor(*_): from twisted.internet import reactor if reactor.running: reactor.stop() @@ -102,7 +102,7 @@ def session_ready(*_): gui_process = start_gui(router.address) process_monitor = ProcessMonitor(gui_process) - process_monitor.add_shutdown_callback(stop_reactor) + process_monitor.add_callbacks(stop_reactor) process_monitor.start() router.start(reactor, router_ready, start_error) diff --git a/tests/golem/core/test_processmonitor.py b/tests/golem/core/test_processmonitor.py index 22e09f7bef..c7f081231b 100644 --- a/tests/golem/core/test_processmonitor.py +++ b/tests/golem/core/test_processmonitor.py @@ -1,7 +1,10 @@ +import os import subprocess import time from multiprocessing import Process + +import subprocess import psutil from mock import Mock, patch @@ -45,6 +48,10 @@ def wait_for_processes(timeout=10, *processes): time.sleep(0.5) +def sleep_1sec(): + time.sleep(1) + + def run_exit(): return @@ -52,7 +59,6 @@ def run_exit(): class TestProcessMonitor(LogTestCase): def test_monitor(self): - mp = MockProcess() p1 = Process(target=run_exit) p2 = Process(target=mp.run) @@ -61,6 +67,7 @@ def test_monitor(self): p2.start() pm = ProcessMonitor(p1, p2) + pm.add_callbacks(pm.kill_processes, pm.exit) pm.start() wait_for_processes(10, p1, p2) @@ -78,6 +85,7 @@ def test_monitor_2(self): p2.start() pm = ProcessMonitor(p1, p2) + pm.add_callbacks(pm.kill_processes, pm.exit) pm.start() wait_for_processes(10, p1, p2) @@ -103,17 +111,86 @@ def test_exit(self): def callback(): logger.warning("Shutting down...") - pm.add_shutdown_callback(callback=callback) + pm.add_callbacks(callback) pm.start() - - with self.assertLogs(logger, level="WARNING"): - pm.exit() + pm.exit() wait_for_processes(10, p1, p2) self.assertFalse(pm.is_process_alive(p1)) self.assertFalse(pm.is_process_alive(p2)) + def test_add_remove_callbacks(self): + pm = ProcessMonitor() + + pm.add_callbacks(pm.exit) + pm.remove_callbacks(pm.exit) + + assert not pm._callbacks + + def test_add_child_process(self): + mp1, mp2 = MockProcess(), MockProcess(timeout=1) + + p1 = Process(target=mp1.run) + p2 = Process(target=mp2.run) + + pm = ProcessMonitor(p1) + pm.add_child_processes(p2) + + assert len(pm._child_processes) == 2 + + def test_lifecycle_popen(self): + + process = subprocess.Popen(['python', '-c', 'import time; time.sleep(1)']) + assert ProcessMonitor.is_process_alive(process) + assert ProcessMonitor._pid(process) + assert ProcessMonitor.is_supported(process) + + process.communicate() + assert not ProcessMonitor.is_process_alive(process) + assert ProcessMonitor.exit_code(process) is not None + + def test_lifecycle_multiprocessing(self): + + process = Process(target=sleep_1sec) + assert not ProcessMonitor.is_process_alive(process) + assert ProcessMonitor.is_supported(process) + + process.start() + assert ProcessMonitor.is_process_alive(process) + process.join() + + assert not ProcessMonitor.is_process_alive(process) + assert ProcessMonitor.exit_code(process) is not None + + def test_lifecycle_none(self): + + process = None + + assert not ProcessMonitor.is_process_alive(process) + assert not ProcessMonitor.is_supported(process) + assert not ProcessMonitor._pid(process) + assert ProcessMonitor.exit_code(process) is None + + def test_kill_process_popen(self): + + process = subprocess.Popen(['python', '-c', 'import time; time.sleep(1)']) + assert ProcessMonitor.is_process_alive(process) + ProcessMonitor.kill_process(process) + assert not ProcessMonitor.is_process_alive(process) + + def test_kill_process_multiprocessing(self): + + process = Process(target=sleep_1sec) + process.start() + + assert ProcessMonitor.is_process_alive(process) + ProcessMonitor.kill_process(process) + assert not ProcessMonitor.is_process_alive(process) + + process = Process(target=sleep_1sec) + ProcessMonitor.kill_process(process) + def test_exit_code(self): process_psutil = psutil.Popen.__new__(psutil.Popen, None) @@ -123,7 +200,7 @@ def test_exit_code(self): process_psutil.poll = Mock() process_subprocess.poll = Mock() process_multiprocessing._popen = Mock() - process_multiprocessing._parent_pid = Mock() + process_multiprocessing._parent_pid = os.getpid() process_multiprocessing._name = "test" process_multiprocessing._daemonic = False @@ -132,9 +209,8 @@ def test_exit_code(self): assert ProcessMonitor.is_process_alive(process_psutil) assert ProcessMonitor.is_process_alive(process_subprocess) - with patch('multiprocessing.Process.exitcode') as exitcode: - exitcode.__get__ = Mock(return_value=None) - assert ProcessMonitor.is_process_alive(process_multiprocessing) + with patch('multiprocessing.Process.is_alive', side_effect=lambda: False): + assert not ProcessMonitor.is_process_alive(process_multiprocessing) assert ProcessMonitor.exit_code(None) is None assert ProcessMonitor.exit_code(process_psutil) is None diff --git a/tests/golem/network/hyperdrive/test_hyperdrive_client.py b/tests/golem/network/hyperdrive/test_hyperdrive_client.py new file mode 100644 index 0000000000..53a270b468 --- /dev/null +++ b/tests/golem/network/hyperdrive/test_hyperdrive_client.py @@ -0,0 +1,73 @@ +import unittest +import uuid + +import mock + +from golem.network.hyperdrive.client import HyperdriveClient + + +class TestHyperdriveClient(unittest.TestCase): + + def setUp(self): + self.response = { + 'files': [ + 'file1', 'file2' + ], + 'hash': str(uuid.uuid4()) + } + + def test_build_options(self): + node_id = str(uuid.uuid4()) + options = HyperdriveClient.build_options(node_id) + assert options.client_id == HyperdriveClient.CLIENT_ID + assert options.version == HyperdriveClient.VERSION + assert not options.options + + def test_diagnostics(self): + client = HyperdriveClient() + + with self.assertRaises(NotImplementedError): + client.diagnostics() + + def test_add(self): + client = HyperdriveClient() + + with mock.patch.object(HyperdriveClient, '_request', + return_value=self.response): + assert client.add(self.response['files']) == self.response['hash'] + + def test_get_file(self): + client = HyperdriveClient() + multihash = str(uuid.uuid4()) + filepath = str(uuid.uuid4()) + + with mock.patch.object(HyperdriveClient, '_request', + return_value=self.response): + + with self.assertRaises(KeyError): + client.get_file(multihash) + + assert client.get_file(multihash, + client_options=None, + filepath=filepath) == \ + [(filepath, multihash, self.response['files'])] + + def test_pin_add(self): + client = HyperdriveClient() + multihash = str(uuid.uuid4()) + filepath = str(uuid.uuid4()) + + with mock.patch.object(HyperdriveClient, '_request', + return_value=self.response): + + assert client.pin_add(filepath, multihash) == self.response['hash'] + + def test_pin_rm(self): + client = HyperdriveClient() + multihash = str(uuid.uuid4()) + with mock.patch.object(HyperdriveClient, '_request', + return_value=self.response): + + assert client.pin_rm(multihash) == self.response['hash'] + + diff --git a/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py b/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py new file mode 100644 index 0000000000..f316e8e441 --- /dev/null +++ b/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py @@ -0,0 +1,73 @@ +import os + +from mock import patch, Mock +from requests import ConnectionError + +from golem.network.hyperdrive.daemon_manager import HyperdriveDaemonManager +from golem.testutils import TempDirFixture + + +class TestHyperdriveDaemonManager(TempDirFixture): + + @patch('golem.core.processmonitor.ProcessMonitor.start') + @patch('golem.core.processmonitor.ProcessMonitor.add_callbacks') + @patch('golem.core.processmonitor.ProcessMonitor.add_child_processes') + @patch('atexit.register') + def test_start(self, register, *_): + + dest_dir = os.path.join(self.path, HyperdriveDaemonManager._executable) + if not os.path.exists(dest_dir): + os.makedirs(dest_dir) + + with patch('os.makedirs') as makedirs: + daemon_manager = HyperdriveDaemonManager(self.path) + assert not makedirs.called + + daemon_manager._monitor.add_callbacks.assert_called_with(daemon_manager._start) + register.assert_called_with(daemon_manager.stop) + + process = Mock() + + def _running(val=True): + daemon_manager._daemon_running = lambda *_: val + + _running(False) + process.poll.return_value = True + daemon_manager._monitor.add_child_processes.called = False + + with patch('time.sleep', side_effect=lambda *_: _running(True)), \ + patch('subprocess.Popen', return_value=process): + + with self.assertRaises(RuntimeError): + daemon_manager.start() + assert daemon_manager._monitor.start.called + assert not daemon_manager._monitor.add_child_processes.called + + _running(False) + process.poll.return_value = None + daemon_manager._monitor.add_child_processes.called = False + + with patch('time.sleep', side_effect=lambda *_: _running(True)), \ + patch('subprocess.Popen', return_value=process): + + daemon_manager.start() + assert daemon_manager._monitor.start.called + daemon_manager._monitor.add_child_processes.assert_called_with(process) + + def test_daemon_running(self): + + with patch('os.makedirs') as makedirs: + daemon_manager = HyperdriveDaemonManager(self.path) + assert makedirs.called + + def raise_exc(): + raise ConnectionError() + + with patch('golem.network.hyperdrive.client.HyperdriveClient.id', + side_effect=raise_exc): + assert not daemon_manager._daemon_running() + + with patch('golem.network.hyperdrive.client.HyperdriveClient.id', + side_effect=lambda *_: '0xdeadbeef'): + assert daemon_manager._daemon_running() + diff --git a/tests/golem/resource/base/test_base_resourcemanager.py b/tests/golem/resource/base/test_base_resourcemanager.py index 90c18a75ef..940645f5e5 100644 --- a/tests/golem/resource/base/test_base_resourcemanager.py +++ b/tests/golem/resource/base/test_base_resourcemanager.py @@ -4,7 +4,7 @@ from mock import patch -from golem.resource.base.resourcesmanager import ResourceCache, ResourceStorage, TestResourceManager +from golem.resource.base.resourcesmanager import ResourceCache, ResourceStorage, TestResourceManager, FileResource from golem.resource.dirmanager import DirManager from golem.tools.testdirfixture import TestDirFixture @@ -49,82 +49,71 @@ class TestResourceCache(unittest.TestCase): def setUp(self): self.cache = ResourceCache() - self.resource_path = u'\0!/abstract/resource/path\0!' + self.resource_path = unicode(os.path.join('abstract', 'prefix', 'path')) self.resource_name = u'\0!abstract_name\0!' self.resource_hash = str(uuid.uuid4()) - self.prefix = u'\0!/abstract/prefix\0!' - self.category = u'\0!abstract\0!' - - def test_path(self): - self.cache.set_path(self.resource_hash, self.resource_path) - - assert self.cache.get_path(self.resource_hash) == self.resource_path - assert self.cache.get_hash(self.resource_path) == self.resource_hash - assert self.cache.get_path(str(uuid.uuid4())) is None - assert self.cache.get_hash(str(uuid.uuid4())) is None - assert self.cache.get_path(str(uuid.uuid4()), 'default_value') == 'default_value' - - assert self.cache.remove_path(self.resource_hash) == self.resource_path - assert self.cache.remove_path(self.resource_hash) is None - assert self.cache.get_path(self.resource_hash) is None + self.prefix = unicode(os.path.join('abstract', 'prefix')) + self.task_id = u'\0!abstract\0!' def test_prefix(self): - self.cache.set_prefix(self.category, self.prefix) + self.cache.set_prefix(self.task_id, self.prefix) + resource = FileResource(self.resource_name, self.resource_hash, + task_id=self.task_id, path=self.resource_path) - assert self.cache.get_prefix(self.category) == self.prefix + assert self.cache.get_prefix(resource.task_id) == self.prefix assert self.cache.get_prefix(str(uuid.uuid4())) == '' assert self.cache.get_prefix(str(uuid.uuid4()), 'default_value') == 'default_value' - assert self.cache.remove_prefix(self.category) == self.prefix - assert self.cache.remove_prefix(self.category) is None - assert self.cache.get_prefix(self.category) == '' + self.cache.add_resource(resource) + self.cache.remove(resource.task_id) + assert self.cache.get_prefix(resource.task_id) == '' def test_resources(self): - new_category = 'new_category' - resource = ['name', str(uuid.uuid4())] - new_resource = ['new_name', str(uuid.uuid4())] - - self.cache.add_resource(self.category, resource) - self.cache.add_resource(new_category, resource) - - assert self.cache.get_resources(self.category) == [resource] - assert self.cache.get_resources(new_category) == [resource] + resource = FileResource(self.resource_name, self.resource_hash, + task_id=self.task_id, path=self.resource_path) + new_task_id = str(uuid.uuid4()) + new_resource = FileResource('new_name', str(uuid.uuid4()), new_task_id) + tmp_task_id = str(uuid.uuid4()) + tmp_resource = FileResource('tmp_name', str(uuid.uuid4()), tmp_task_id) + + self.cache.add_resource(resource) + self.cache.add_resource(new_resource) + + assert self.cache.get_resources(self.task_id) == [resource] + assert self.cache.get_resources(new_task_id) == [new_resource] assert self.cache.get_resources('unknown') == [] - assert self.cache.has_resource(self.category, resource) - assert self.cache.has_resource(new_category, resource) - assert not self.cache.has_resource('unknown', resource) - - self.cache.set_resources(self.category, [new_resource]) + assert self.cache.has_resource(resource) + assert self.cache.has_resource(new_resource) + assert not self.cache.has_resource(tmp_resource) - assert self.cache.get_resources(self.category) == [new_resource] - assert self.cache.get_resources(new_category) == [resource] + self.cache.add_resource(tmp_resource) - assert self.cache.remove_resources(self.category) - assert self.cache.remove_resources('unknown') == [] - assert self.cache.get_resources(new_category) == [resource] + assert self.cache.get_resources(self.task_id) == [resource] + assert self.cache.get_resources(tmp_task_id) == [tmp_resource] - def test_file_exists(self): - resource = ['name', str(uuid.uuid4())] - - self.cache.add_resource(self.category, resource) - assert self.cache.has_file(resource, self.category) - assert not self.cache.has_file(['invalid_name', str(uuid.uuid4())], self.category) - assert not self.cache.has_file(['invalid_name', resource[1]], self.category) + assert self.cache.remove(self.task_id) + assert self.cache.remove('unknown') == [] + assert self.cache.get_resources(new_task_id) == [new_resource] def test_remove(self): self._add_all() - self.cache.remove(self.category) + self.cache.remove(self.task_id) assert self._all_default_empty() new_hash = str(uuid.uuid4()) new_path = '/other/path' + new_task = str(uuid.uuid4()) + new_resource = FileResource(new_path, new_hash, + task_id=new_task, path=new_path) self._add_all() - self.cache.set_path(new_hash, new_path) - self.cache.remove(self.category) + self.cache.add_resource(new_resource) + self.cache.remove(self.task_id) assert self._all_default_empty() - assert self.cache.get_path(new_hash) == new_path + assert self.cache.has_resource(new_resource) + assert self.cache.get_by_path(new_path) == new_resource + assert self.cache.get_by_hash(new_hash) == new_resource def test_clear(self): self._add_all() @@ -132,15 +121,16 @@ def test_clear(self): assert self._all_default_empty() def _add_all(self): - self.cache.set_path(self.resource_hash, self.resource_path) - self.cache.set_prefix(self.category, self.prefix) - self.cache.add_resource(self.category, [self.resource_name, self.resource_hash]) + resource = FileResource(self.resource_path, self.resource_hash, + task_id=self.task_id, path=self.resource_path) + self.cache.add_resource(resource) + self.cache.set_prefix(self.task_id, self.prefix) def _all_default_empty(self): - return self.cache.get_path(self.resource_hash) is None and \ - self.cache.get_hash(self.resource_path) is None and \ - self.cache.get_prefix(self.category) == '' and \ - self.cache.get_resources(self.category) == [] + return self.cache.get_by_path(self.resource_hash) is None and \ + self.cache.get_by_hash(self.resource_path) is None and \ + self.cache.get_prefix(self.task_id) == '' and \ + self.cache.get_resources(self.task_id) == [] class TestResourceStorage(_Common.ResourceSetUp): @@ -189,39 +179,7 @@ def test_relative_path(self): src_path = os.path.join(task_dir, 'dir', 'file') assert self.storage.relative_path(src_path, self.task_id) == os.path.join('dir', 'file') - def test_get_path_and_hash(self): - - resource = self.test_dir_file - resource_hash = str(uuid.uuid4()) - self.storage.cache.set_path(resource_hash, resource) - - assert self.storage.get_path_and_hash(resource, self.task_id) - assert not self.storage.get_path_and_hash(resource + '_2', self.task_id) - assert self.storage.get_path_and_hash(resource, self.task_id, multihash=resource_hash) - assert not self.storage.get_path_and_hash(resource, self.task_id, multihash=resource_hash + '_2') - - def test_split_join_resources(self): - - resources = [[r, str(uuid.uuid4())] for r in self.joined_resources] - resources_split = self.storage.split_resources(resources) - resources_joined = self.storage.join_resources(resources_split) - - assert len(resources) == len(self.target_resources) - assert all([r[0] in self.split_resources for r in resources_split]) - assert all([r[0] in self.joined_resources for r in resources_joined]) - - resources = [ - ['resource', '1'], - [None, '2'], - None, - [['split', 'path'], '4'] - ] - assert self.storage.join_resources(resources) == [ - ['resource', '1'], - [os.path.join('split', 'path'), '4'] - ] - - def test_copy_file(self): + def test_copy(self): task_dir = self.storage.get_dir(self.task_id) self.storage.cache.set_prefix(self.task_id, task_dir) @@ -233,29 +191,9 @@ def test_copy_file(self): dst_path = self.storage.get_path(relative_path, new_category) assert file_path != dst_path - self.storage.copy_file(file_path, relative_path, new_category) + self.storage.copy(file_path, relative_path, new_category) assert os.path.exists(dst_path) - def test_copy_cached(self): - resource_hash = str(uuid.uuid4()) - self.storage.cache.set_path(resource_hash, self.test_dir_file) - - assert self.storage.copy_cached( - os.path.join('other', 'path'), - resource_hash, - self.task_id - ) - assert not self.storage.copy_cached( - os.path.join('other', 'path'), - resource_hash + '_2', - self.task_id - ) - assert self.storage.copy_cached( - os.path.join('other', 'path'), - resource_hash, - self.task_id + '_2' - ) - class TestAbstractResourceManager(_Common.ResourceSetUp): @@ -263,50 +201,51 @@ def setUp(self): _Common.ResourceSetUp.setUp(self) self.resource_manager = TestResourceManager(self.dir_manager) - def test_copy_resources(self): + def test_copy_files(self): old_resource_dir = self.resource_manager.storage.get_root() prev_content = os.listdir(old_resource_dir) self.dir_manager.node_name = "another" + self.node_name - self.resource_manager.copy_resources(old_resource_dir) + self.resource_manager.copy_files(old_resource_dir) assert os.listdir(self.resource_manager.storage.get_root()) == prev_content - def test_add_resource(self): + def test_add_file(self): self.resource_manager.storage.clear_cache() - self.resource_manager.add_resource(self.test_dir_file, self.task_id) + self.resource_manager.add_file(self.test_dir_file, self.task_id) resources = self.resource_manager.storage.get_resources(self.task_id) assert len(resources) == 1 with self.assertRaises(RuntimeError): - self.resource_manager.add_resource('/.!&^%', self.task_id) + self.resource_manager.add_files(['/.!&^%'], self.task_id) + resources = self.resource_manager.storage.get_resources(self.task_id) assert len(resources) == 1 - def test_add_resources(self): + def test_add_files(self): self.resource_manager.storage.clear_cache() - self.resource_manager.add_resources(self.target_resources, self.task_id, - absolute_path=True) + self.resource_manager.add_files(self.target_resources, self.task_id, + absolute_path=True) storage = self.resource_manager.storage resources = storage.get_resources(self.task_id) assert resources - assert all([r[0] in self.target_resources for r in resources]) + assert all([r.file_name in self.target_resources for r in resources]) for resource in resources: - assert storage.get_path_and_hash(resource[0], self.task_id) is not None - assert storage.get_path_and_hash(resource[0], self.task_id, multihash=resource[1]) is not None - assert storage.get_path_and_hash(str(uuid.uuid4()), self.task_id) is None + assert storage.cache.get_by_path(resource.file_name) is not None + assert storage.cache.get_by_path(str(uuid.uuid4())) is None storage.clear_cache() - self.resource_manager.add_resources([self.test_dir_file], self.task_id) + self.resource_manager.add_files([self.test_dir_file], self.task_id) assert len(storage.get_resources(self.task_id)) == 1 with self.assertRaises(RuntimeError): - self.resource_manager.add_resources(['/.!&^%'], self.task_id) + self.resource_manager.add_files(['/.!&^%'], self.task_id) + assert len(storage.get_resources(self.task_id)) == 1 def test_add_task(self): @@ -329,11 +268,6 @@ def test_add_task(self): self.resource_manager._add_task(resource_paths, new_task) assert len(storage.get_resources(new_task)) == len(resources) - task_files = storage.cache.get_resources(self.task_id) - assert storage.cache.has_resource(self.task_id, task_files[0]) - assert not storage.cache.has_resource(self.task_id, (u'File path', u'Multihash')) - assert not storage.cache.has_resource(str(uuid.uuid4()), task_files[0]) - def test_remove_task(self): self.resource_manager.storage.clear_cache() @@ -350,3 +284,26 @@ def test_command_failed(self): self.resource_manager.commands.id, str(uuid.uuid4())) assert logger.error.called + + def test_to_from_wire(self): + + entries = [FileResource(r, str(uuid.uuid4()), task_id="task", path=r) + for r in self.joined_resources] + + resources_split = self.resource_manager.to_wire(entries) + resources_joined = self.resource_manager.from_wire(resources_split) + + assert len(entries) == len(self.target_resources) + assert all([r[0] in self.split_resources for r in resources_split]) + assert all([r[0] in self.joined_resources for r in resources_joined]) + + entries = [ + ['resource', '1'], + [None, '2'], + None, + [['split', 'path'], '4'] + ] + assert self.resource_manager.from_wire(entries) == [ + ['resource', '1'], + [os.path.join('split', 'path'), '4'] + ] diff --git a/tests/golem/resource/base/test_base_resourceserver.py b/tests/golem/resource/base/test_base_resourceserver.py index 4366335290..c135f2095b 100644 --- a/tests/golem/resource/base/test_base_resourceserver.py +++ b/tests/golem/resource/base/test_base_resourceserver.py @@ -3,8 +3,7 @@ import time import uuid -from mock import patch - +from golem.core.fileshelper import common_dir from golem.core.keysauth import EllipticalKeysAuth from golem.resource.base.resourceserver import BaseResourceServer from golem.resource.base.resourcesmanager import TestResourceManager @@ -80,19 +79,6 @@ def testStartAccepting(self): self.dir_manager, keys_auth, client) rs.start_accepting() - @patch('golem.network.ipfs.client.IPFSClient', autospec=True) - def testGetResources(self): - rs = self.testAddTask() - rm = rs.resource_manager - - rm.add_files_to_get([ - (u'filename', u'multihash'), - (u'filename_2', u'multihash_2'), - (u'filename_2', u'') - ], 'xyz') - - rs.sync_network() - def testGetDistributedResourceRoot(self): keys_auth = EllipticalKeysAuth(self.path) client = MockClient() @@ -167,8 +153,8 @@ def testChangeResourceDir(self): rs = BaseResourceServer(TestResourceManager(self.dir_manager), self.dir_manager, keys_auth, client) - rm.add_resources(self._resources(), self.task_id, - absolute_path=True) + rm.add_files(self._resources(), self.task_id, + absolute_path=True) resources = rm.storage.get_resources(self.task_id) @@ -196,8 +182,8 @@ def testRemoveTask(self): self.dir_manager, keys_auth, client) rm = rs.resource_manager - rm.add_resources(self._resources(), self.task_id, - absolute_path=True) + rm.add_files(self._resources(), self.task_id, + absolute_path=True) assert rm.storage.get_resources(self.task_id) @@ -209,46 +195,47 @@ def testRemoveTask(self): def testGetResources(self): keys_auth = EllipticalKeysAuth(self.path) client = MockClient() + rs = BaseResourceServer(TestResourceManager(self.dir_manager), self.dir_manager, keys_auth, client) - rs.resource_manager.storage.clear_cache() - rs.resource_manager.add_resources(self.target_resources, self.task_id) - resources = rs.resource_manager.storage.get_resources(self.task_id) - resources_len = len(resources) - filenames = [] - for entry in resources: - filenames.append(entry[0]) - common_path = os.path.commonprefix(filenames) + rm = rs.resource_manager + rm.storage.clear_cache() + rm.add_files(self.target_resources, self.task_id) - relative_resources = [] - for resource in resources: - relative_resources.append((resource[0].replace(common_path, '', 1), - resource[1])) + common_path = common_dir(self.target_resources) + resources = rm.storage.get_resources(self.task_id) + assert len(resources) == len(self.target_resources) - rs.add_files_to_get(resources, self.task_id) - assert len(rs.waiting_resources) == 0 + assert len(rs.pending_resources) == 0 + rs.download_resources(resources, self.task_id) + assert len(rs.pending_resources[self.task_id]) == len(resources) rs_aux = BaseResourceServer(TestResourceManager(self.dir_manager), self.dir_manager_aux, keys_auth, client) - rs_aux.add_files_to_get(relative_resources, self.task_id) - assert len(rs_aux.waiting_resources) == resources_len + relative_resources = [] + for resource in resources: + relative_resources.append((resource.path.replace(common_path, '', 1), + resource.hash)) - rs_aux.get_resources(async=False) - rm_aux = rs_aux.resource_manager + task_id_2 = str(uuid.uuid4()) + + assert len(rs_aux.pending_resources) == 0 + rs_aux.download_resources(relative_resources, task_id_2) + assert len(rs_aux.pending_resources[task_id_2]) == len(resources) + + rs_aux._download_resources(async=False) for entry in relative_resources: - new_path = rm_aux.get_resource_path(entry[0], self.task_id) - assert os.path.exists(new_path) + assert os.path.exists(entry[0]) assert client.downloaded def testVerifySig(self): keys_auth = EllipticalKeysAuth(self.path) - client = MockClient() rs = BaseResourceServer(TestResourceManager(self.dir_manager), - self.dir_manager, keys_auth, client) + self.dir_manager, keys_auth, MockClient()) test_str = "A test string to sign" sig = rs.sign(test_str) @@ -256,36 +243,30 @@ def testVerifySig(self): def testAddFilesToGet(self): keys_auth = EllipticalKeysAuth(self.path) - client = MockClient() rs = BaseResourceServer(TestResourceManager(self.dir_manager), - self.dir_manager, keys_auth, client) + self.dir_manager, keys_auth, MockClient()) test_files = [ ['file1.txt', '1'], [os.path.join('tmp', 'file2.bin'), '2'] ] - assert not rs.resources_to_get - - rs.add_files_to_get(test_files, self.task_id) - - assert len(rs.resources_to_get) == len(test_files) + assert not rs.pending_resources + rs.download_resources(test_files, self.task_id) + assert len(rs.pending_resources[self.task_id]) == len(test_files) return rs, test_files - def testResourceDownloaded(self): + def testDownloadSuccess(self): rs, file_names = self.testAddFilesToGet() + resources = list(rs.pending_resources[self.task_id]) + for entry in resources: + rs._download_success(entry.resource, self.task_id) + assert not rs.pending_resources - for i, entry in enumerate(file_names): - rs.resource_downloaded(entry[0], str(i), self.task_id) - - assert not rs.resources_to_get - - def testResourceDownloadError(self): + def testDownloadError(self): rs, file_names = self.testAddFilesToGet() - - for entry in file_names: - rs.resource_download_error(Exception("Error " + entry[0]), - entry[0], entry[1], self.task_id) - - assert len(rs.resources_to_get) == 0 + resources = list(rs.pending_resources[self.task_id]) + for entry in resources: + rs._download_error(Exception(), entry.resource, self.task_id) + assert not rs.pending_resources diff --git a/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py b/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py new file mode 100644 index 0000000000..fcc66ee904 --- /dev/null +++ b/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py @@ -0,0 +1,20 @@ +from unittest import skipIf + +from requests import ConnectionError + +from golem.network.hyperdrive.client import HyperdriveClient +from golem.resource.base.resourcetest import AddGetResources +from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager + + +def running(): + try: + return HyperdriveClient().id() + except ConnectionError: + return False + + +@skipIf(not running(), "Hyperdrive daemon isn't running") +class TestHyperdriveResources(AddGetResources): + __test__ = True + _resource_manager_class = HyperdriveResourceManager diff --git a/tests/golem/resource/ipfs/test_ipfs_resourcemanager.py b/tests/golem/resource/ipfs/test_ipfs_resourcemanager.py index 34cdd9e05c..1306cf2796 100644 --- a/tests/golem/resource/ipfs/test_ipfs_resourcemanager.py +++ b/tests/golem/resource/ipfs/test_ipfs_resourcemanager.py @@ -52,30 +52,34 @@ def test_pin(self): rm = IPFSResourceManager(self.dir_manager) rm.storage.clear_cache() - rm.add_resources(self.target_resources, self.task_id) + rm.add_files(self.target_resources, self.task_id) resources = rm.storage.get_resources(self.task_id) + assert resources - result = rm.pin_resource(resources[0][1]) - self.assertTrue(result) + result = rm.pin_resource(resources[0].hash) + assert result def test_unpin(self): rm = IPFSResourceManager(self.dir_manager) rm.storage.clear_cache() - rm.add_resources(self.target_resources, self.task_id) + rm.add_files(self.target_resources, self.task_id) resources = rm.storage.get_resources(self.task_id) + assert resources - rm.pin_resource(resources[0][1]) - rm.unpin_resource(resources[0][1]) + rm.pin_resource(resources[0].hash) + rm.unpin_resource(resources[0].hash) def test_pull(self): rm = IPFSResourceManager(self.dir_manager) rm.storage.clear_cache() - rm.add_resources(self.target_resources, self.task_id) + rm.add_files(self.target_resources, self.task_id) rls = rm.storage.get_resources(self.task_id) + assert rls + rl = rls[0] - multihash = rl[1] + multihash = rl.hash # working, downloaded status = [True, False] @@ -95,8 +99,7 @@ def wait(): time.sleep(0.25) self.assertTrue(status[1]) - rm.pull_resource('other_resource', - multihash, + rm.pull_resource(('other_resource', multihash), self.task_id, success, error, async=async) @@ -105,8 +108,7 @@ def wait(): status[0] = True status[1] = False - rm.pull_resource('other_resource', - multihash, + rm.pull_resource(('other_resource', multihash), self.task_id, success, error, async=async) diff --git a/tests/golem/resource/test_resourcesession.py b/tests/golem/resource/test_resourcesession.py index 6b346a56f8..0d09a21b81 100644 --- a/tests/golem/resource/test_resourcesession.py +++ b/tests/golem/resource/test_resourcesession.py @@ -111,7 +111,7 @@ def test_full_data_received(self): self.instance.full_data_received() - self.instance.resource_server.resource_downloaded.assert_called_once_with( + self.instance.resource_server._download_success.assert_called_once_with( file_name, self.instance.address, self.instance.port) diff --git a/tests/golem/task/result/test_resultmanager.py b/tests/golem/task/result/test_resultmanager.py index c5bbb239d4..a354adfa99 100644 --- a/tests/golem/task/result/test_resultmanager.py +++ b/tests/golem/task/result/test_resultmanager.py @@ -61,7 +61,7 @@ def create(result_manager, node_name, task_id): with open(out_dir_file, 'w') as f: f.write("Dir file contents") - rm.add_resources(files, task_id) + rm.add_files(files, task_id) secret = result_manager.gen_secret() mock_node = MockNode(node_name) @@ -120,6 +120,7 @@ def testPullPackage(self): path, multihash = data assert os.path.exists(path) + assert multihash def success(*args, **kwargs): pass @@ -132,8 +133,7 @@ def error(exc, *args, **kwargs): resource_dir_method=dir_manager.get_task_temporary_dir) new_manager = EncryptedResultPackageManager(resource_manager) - new_manager.pull_package(multihash, - self.task_id, self.task_id, + new_manager.pull_package(multihash, self.task_id, self.task_id, secret, success=success, error=error, diff --git a/tests/golem/transactions/ethereum/test_ethereumtransactionsystem.py b/tests/golem/transactions/ethereum/test_ethereumtransactionsystem.py index 53225a3af8..17071eefff 100644 --- a/tests/golem/transactions/ethereum/test_ethereumtransactionsystem.py +++ b/tests/golem/transactions/ethereum/test_ethereumtransactionsystem.py @@ -1,9 +1,7 @@ - from ethereum import keys +from mock import patch -from golem.network.p2p.node import Node from golem.tools.testwithdatabase import TestWithDatabase -from golem.transactions.ethereum.ethereumpaymentskeeper import EthAccountInfo from golem.transactions.ethereum.ethereumtransactionsystem import EthereumTransactionSystem PRIV_KEY = '\7' * 32 @@ -28,3 +26,44 @@ def test_wrong_address_in_pay_for_task(self): def test_get_balance(self): e = EthereumTransactionSystem(self.tempdir, PRIV_KEY) assert e.get_balance() == (None, None, None) + + def test_stop(self): + + pkg = 'golem.ethereum.' + + def init(self, *args, **kwargs): + self.rpcport = 65001 + self._NodeProcess__ps = None + self.testnet = True + + with patch(pkg + 'paymentprocessor.PaymentProcessor.start'), \ + patch(pkg + 'paymentprocessor.PaymentProcessor.stop'), \ + patch(pkg + 'paymentmonitor.PaymentMonitor.start'), \ + patch(pkg + 'paymentmonitor.PaymentMonitor.stop'), \ + patch(pkg + 'node.NodeProcess.start'), \ + patch(pkg + 'node.NodeProcess.stop'), \ + patch(pkg + 'node.NodeProcess.__init__', init), \ + patch(pkg + 'node.NodeProcess.system_geth', False, create=True), \ + patch('web3.providers.rpc.HTTPProvider.__init__', init): + + e = EthereumTransactionSystem(self.tempdir, PRIV_KEY) + + assert e._EthereumTransactionSystem__proc.start.called + assert e._EthereumTransactionSystem__monitor.start.called + assert e._EthereumTransactionSystem__eth_node.node.start.called + + e.stop() + + assert not e._EthereumTransactionSystem__proc.stop.called + assert not e._EthereumTransactionSystem__monitor.stop.called + assert e._EthereumTransactionSystem__eth_node.node.stop.called + + e._EthereumTransactionSystem__eth_node.node.stop.called = False + e._EthereumTransactionSystem__proc._loopingCall.running = True + e._EthereumTransactionSystem__monitor._loopingCall.running = True + + e.stop() + + assert e._EthereumTransactionSystem__proc.stop.called + assert e._EthereumTransactionSystem__monitor.stop.called + assert e._EthereumTransactionSystem__eth_node.node.stop.called diff --git a/tests/gui/test_startapp.py b/tests/gui/test_startapp.py index 7f946fbff6..36392bf052 100644 --- a/tests/gui/test_startapp.py +++ b/tests/gui/test_startapp.py @@ -1,21 +1,20 @@ -import os import sys import time from Queue import Queue from threading import Thread +from mock import Mock, patch, ANY +from twisted.internet.defer import Deferred + from golem.client import Client from golem.clientconfigdescriptor import ClientConfigDescriptor -from golem.core.common import get_golem_path from golem.core.simpleserializer import DictSerializer from golem.environments.environment import Environment from golem.rpc.mapping import aliases from golem.rpc.session import WebSocketAddress from golem.tools.ci import ci_patch from golem.tools.testwithreactor import TestDirFixtureWithReactor -from gui.startapp import load_environments, start_client -from mock import Mock, patch, ANY -from twisted.internet.defer import Deferred +from gui.startapp import load_environments, start_client, stop_reactor, start_app def router_start(fail_on_start): @@ -209,6 +208,22 @@ def test_start_gui_failure(self, *_): self._start_gui(session_fails=True, expected_result=u"Session error") + @patch('twisted.internet.reactor') + def test_stop_reactor(self, reactor): + reactor.running = False + stop_reactor() + assert not reactor.stop.called + + reactor.running = True + stop_reactor() + assert reactor.stop.called + + @patch('gui.startapp.start_client') + def test_start_app(self, _start_client): + + start_app(datadir=self.tempdir) + _start_client.assert_called_with(True, self.tempdir, False) + def test_start_gui_subprocess(self): from gui.startapp import start_gui as _start_gui