Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Hyperdrive (#810)
Browse files Browse the repository at this point in the history
Hyperdrive client support
  • Loading branch information
mfranciszkiewicz authored Mar 31, 2017
1 parent 4aee05d commit 16a1859
Show file tree
Hide file tree
Showing 38 changed files with 1,216 additions and 632 deletions.
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ before_install:
- brew install [email protected] openexr
- brew install ethereum/ethereum/ethereum ipfs

- rm -rf ~/.ipfs
- ipfs init
- ipfs daemon &

- git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1
- cd golem-hyperdrive && npm install --save && cd ..
- node golem-hyperdrive\src\main.js &

- sudo pip install --upgrade pip setuptools packaging pytest mock
- pip install https://github.com/golemfactory/golem/wiki/wheels/sip-4.19-cp27-cp27m-macosx_10_12_x86_64.whl
- pip install https://github.com/golemfactory/golem/wiki/wheels/PyQt5-5.7.1-cp27-cp27m-macosx_10_12_x86_64.whl
Expand Down
11 changes: 11 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ environment:
PYTHON_VERSION: "2.7.13"
PYTHON_ARCH: "32"

NODEJS_VERSION: "6"

VER_GETH: geth-windows-amd64-1.5.9-a07539fb
VER_IPFS: go-ipfs_v0.4.4_windows-386
VER_OPENSSL: openssl-1.0.2k-i386-win32
Expand Down Expand Up @@ -56,6 +58,14 @@ install:
7z x %VER_GETH%.zip -y -aoa -oC:\ > NUL
)

# nodejs
- ps: Install-Product node $env:NODEJS_VERSION

# golem-hyperdrive
- git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1
- cd golem-hyperdrive && npm install --save && cd ..
- ps: $HyperdriveProcess = Start-Process node golem-hyperdrive\src\main.js -PassThru

# ipfs
- if not exist %DIR_IPFS% (
appveyor DownloadFile https://dist.ipfs.io/go-ipfs/v0.4.4/%VER_IPFS%.zip &&
Expand Down Expand Up @@ -103,3 +113,4 @@ test_script:

on_finish:
- ps: Stop-Process -Id $IPFSProcess.Id
- ps: Stop-Process -Id $HyperdriveProcess.Id
5 changes: 5 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
machine:
node:
version: 6.9.5
services:
- docker
post:
Expand All @@ -15,11 +17,14 @@ dependencies:
- pip install https://github.com/golemfactory/golem/wiki/wheels/sip-4.15-py2-none-any.whl
- pip install https://github.com/golemfactory/golem/wiki/wheels/PyQt5-5.2.1-py2-none-any.whl
- if [ ! -e /usr/local/bin/ipfs ]; then wget https://dist.ipfs.io/go-ipfs/v0.4.5/go-ipfs_v0.4.5_linux-amd64.tar.gz; tar xvfz go-ipfs_v0.4.5_linux-amd64.tar.gz; sudo mv go-ipfs/ipfs /usr/local/bin/ipfs; /usr/local/bin/ipfs init; fi
- git clone https://github.com/mfranciszkiewicz/golem-hyperdrive --depth 1; cd golem-hyperdrive; npm install --save; cd -
- /usr/local/bin/ipfs config --json Bootstrap "[]"
- /usr/local/bin/ipfs config --json SupernodeRouting.Servers "[]"
- /usr/local/bin/ipfs config --json Addresses.Swarm '["/ip6/::/tcp/4001", "/ip6/::/udp/4002/utp", "/ip4/0.0.0.0/udp/4002/utp"]'
- /usr/local/bin/ipfs daemon:
background: true
- node golem-hyperdrive/src/main.js:
background: true

test:
pre:
Expand Down
18 changes: 14 additions & 4 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from golem.monitor.model.nodemetadatamodel import NodeMetadataModel
from golem.monitor.monitor import SystemMonitor
from golem.monitorconfig import MONITOR_CONFIG
from golem.network.hyperdrive.daemon_manager import HyperdriveDaemonManager
from golem.network.p2p.node import Node
from golem.network.p2p.p2pservice import P2PService
from golem.network.p2p.peersession import PeerSessionInfo
Expand All @@ -37,7 +38,7 @@
from golem.resource.base.resourceserver import BaseResourceServer
from golem.resource.client import AsyncRequest, async_run
from golem.resource.dirmanager import DirManager, DirectoryType
from golem.resource.swift.resourcemanager import OpenStackSwiftResourceManager
from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager
from golem.rpc.mapping.aliases import Task, Network, Environment, UI
from golem.rpc.session import Publisher
from golem.task.taskbase import resource_types
Expand Down Expand Up @@ -129,6 +130,7 @@ def __init__(self, datadir=None, transaction_system=False, connect_to_known_host
self.use_docker_machine_manager = use_docker_machine_manager
self.connect_to_known_hosts = connect_to_known_hosts
self.environments_manager = EnvironmentsManager()
self.daemon_manager = None

self.rpc_publisher = None

Expand Down Expand Up @@ -170,9 +172,13 @@ def start_network(self):
self.node.collect_network_info(self.config_desc.seed_host,
use_ipv6=self.config_desc.use_ipv6)
log.debug("Is super node? %s", self.node.is_super_node())

# self.ipfs_manager = IPFSDaemonManager(connect_to_bootstrap_nodes=self.connect_to_known_hosts)
# self.ipfs_manager.store_client_info()

self.daemon_manager = HyperdriveDaemonManager(self.datadir)
self.daemon_manager.start()

self.p2pservice = P2PService(self.node, self.config_desc, self.keys_auth,
connect_to_known_hosts=self.connect_to_known_hosts)
self.task_server = TaskServer(self.node, self.config_desc, self.keys_auth, self,
Expand All @@ -181,7 +187,7 @@ def start_network(self):

dir_manager = self.task_server.task_computer.dir_manager

self.resource_server = BaseResourceServer(OpenStackSwiftResourceManager(dir_manager),
self.resource_server = BaseResourceServer(HyperdriveResourceManager(dir_manager),
dir_manager, self.keys_auth, self)

log.info("Starting p2p server ...")
Expand Down Expand Up @@ -226,8 +232,12 @@ def quit(self):
self.do_work_task.stop()
if self.task_server:
self.task_server.quit()
if self.transaction_system:
self.transaction_system.stop()
if self.diag_service:
self.diag_service.unregister_all()
if self.daemon_manager:
self.daemon_manager.stop()
dispatcher.send(signal='golem.monitor', event='shutdown')
if self.db:
self.db.close()
Expand Down Expand Up @@ -542,8 +552,8 @@ def query_task_state(self, task_id):
if state:
return DictSerializer.dump(state)

def pull_resources(self, task_id, list_files, client_options=None):
self.resource_server.add_files_to_get(list_files, task_id, client_options=client_options)
def pull_resources(self, task_id, resources, client_options=None):
self.resource_server.download_resources(resources, task_id, client_options=client_options)

def add_resource_peer(self, node_name, addr, port, key_id, node_info):
self.resource_server.add_resource_peer(node_name, addr, port, key_id, node_info)
Expand Down
81 changes: 63 additions & 18 deletions golem/core/processmonitor.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,96 @@
import atexit
import subprocess
import time
from multiprocessing import Process
from threading import Thread
from threading import Thread, Lock

import psutil


class ProcessMonitor(Thread):

def __init__(self, *child_processes):
def __init__(self, *child_processes, **params):
super(ProcessMonitor, self).__init__(target=self._start)
self.shutdown_callbacks = [self.kill_processes, self.stop]
self.child_processes = child_processes

self._child_processes = []
self._callbacks = params.pop('callbacks', [])
self._lock = Lock()

self.daemon = True
self.working = False

atexit.register(self.exit)
self.add_child_processes(*child_processes)

def _start(self):
self.working = True

while self.working:
for process in self.child_processes:
for i in xrange(len(self._child_processes) - 1, -1, -1):
process = self._child_processes[i]

if not self.is_process_alive(process):
print("Subprocess {} exited with code {}. Terminating"
.format(process.pid, self.exit_code(process)))
self.exit()
time.sleep(1)
print "Subprocess {} exited with code {}".format(process.pid,
self.exit_code(process))
if self.working:
self.run_callbacks(process)
self._child_processes.pop(i)

def stop(self):
time.sleep(0.5)

def stop(self, *_):
self.working = False

def exit(self):
for callback in self.shutdown_callbacks:
callback()
def exit(self, *_):
self.stop()
self.kill_processes()

def add_child_processes(self, *processes):
assert all([self.is_supported(p) for p in processes])
self._child_processes.extend(processes)

def add_callbacks(self, *callbacks):
self._callbacks.extend(callbacks)

def add_shutdown_callback(self, callback):
self.shutdown_callbacks.append(callback)
def remove_callbacks(self, *callbacks):
for handler in callbacks:
idx = self._callbacks.index(handler)
if idx != -1:
self._callbacks.pop(idx)

def kill_processes(self):
for process in self.child_processes:
def run_callbacks(self, process=None):
for callback in self._callbacks:
if self.working:
callback(process)

def kill_processes(self, *_):
for process in self._child_processes:
self.kill_process(process)

@classmethod
def kill_process(cls, process):
if cls.is_process_alive(process):
try:
process.terminate()

if isinstance(process, (psutil.Popen, subprocess.Popen)):
process.communicate()
elif isinstance(process, Process):
process.join()

except Exception as exc:
print("Error terminating process {}: {}".format(process, exc))
else:
print "Subprocess {} terminated".format(cls._pid(process))

@staticmethod
def _pid(process):
if process:
return process.pid

@staticmethod
def is_supported(process):
return isinstance(process, (psutil.Popen, subprocess.Popen, Process))

@staticmethod
def exit_code(process):
Expand All @@ -61,5 +106,5 @@ def is_process_alive(process):
process.poll()
return process.returncode is None
elif isinstance(process, Process):
return process.exitcode is None
return process.is_alive()
return False
Empty file.
76 changes: 76 additions & 0 deletions golem/network/hyperdrive/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json

import requests

from golem.resource.client import IClient, ClientOptions


class HyperdriveClient(IClient):

CLIENT_ID = 'hyperg'
VERSION = 1.0

def __init__(self, port=3292, host='127.0.0.1', timeout=None):
super(HyperdriveClient, self).__init__()

# destination address
self.host = host
self.port = port
# connection / read timeout
self.timeout = timeout

# default POST request headers
self._url = 'http://{}:{}/api'.format(self.host, self.port)
self._headers = {'content-type': 'application/json'}

@classmethod
def build_options(cls, node_id, **kwargs):
return ClientOptions(cls.CLIENT_ID, cls.VERSION)

def diagnostics(self, *args, **kwargs):
raise NotImplementedError()

def id(self, client_options=None, *args, **kwargs):
response = self._request(command='id')
return response['id']

def add(self, files, client_options=None, **kwargs):
response = self._request(
command='upload',
id=kwargs.get('id'),
files=files
)
return response['hash']

def get_file(self, multihash, client_options=None, **kwargs):
dst_path = kwargs.pop('filepath')
response = self._request(
command='download',
hash=multihash,
dest=dst_path
)
return [(dst_path, multihash, response['files'])]

def pin_add(self, file_path, multihash):
response = self._request(
command='upload',
files=[file_path],
hash=multihash
)
return response['hash']

def pin_rm(self, multihash):
response = self._request(
command='cancel',
hash=multihash
)
return response['hash']

def _request(self, **data):
response = requests.post(url=self._url,
headers=self._headers,
data=json.dumps(data),
timeout=self.timeout)

response.raise_for_status()
return json.loads(response.content)
63 changes: 63 additions & 0 deletions golem/network/hyperdrive/daemon_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import atexit
import logging
import os
import subprocess
import time

from requests import ConnectionError

from golem.core.processmonitor import ProcessMonitor
from golem.network.hyperdrive.client import HyperdriveClient

logger = logging.getLogger(__name__)


class HyperdriveDaemonManager(object):

_executable = 'hyperg'

def __init__(self, datadir, **hyperdrive_config):
super(HyperdriveDaemonManager, self).__init__()

self._config = hyperdrive_config

# monitor and restart if process dies
self._monitor = ProcessMonitor()
self._monitor.add_callbacks(self._start)

# hyperdrive data directory
self._dir = os.path.join(datadir, self._executable)
if not os.path.exists(self._dir):
os.makedirs(self._dir)

atexit.register(self.stop)

def start(self):
self._monitor.start()
self._start()

def stop(self):
self._monitor.exit()

def _command(self):
return [self._executable, '--db', self._dir]

def _daemon_running(self):
try:
return HyperdriveClient(**self._config).id()
except ConnectionError:
return False

def _start(self, *_):
# do not supervise already running processes
if self._daemon_running():
return

process = subprocess.Popen(self._command())
while not self._daemon_running():
time.sleep(0.1)

if process.poll() is None:
self._monitor.add_child_processes(process)
else:
raise RuntimeError("Cannot start {}".format(self._executable))
Loading

0 comments on commit 16a1859

Please sign in to comment.