diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a95f080 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM python:latest + +WORKDIR /py-orbit-db-http-client +RUN curl -L https://github.com/orbitdb/py-orbit-db-http-client/tarball/develop | tar -xz --strip-components 1 \ + && pip install . \ No newline at end of file diff --git a/orbitdbapi/client.py b/orbitdbapi/client.py index 51d9f24..d6142af 100644 --- a/orbitdbapi/client.py +++ b/orbitdbapi/client.py @@ -1,10 +1,13 @@ import json import logging -import requests -from .db import DB -from hypertemp.contrib import HTTP20Adapter +from pprint import pformat from urllib.parse import quote as urlquote +import httpx + +from .db import DB + + class OrbitDbAPI (): def __init__ (self, **kwargs): self.logger = logging.getLogger(__name__) @@ -12,8 +15,7 @@ def __init__ (self, **kwargs): self.__base_url = self.__config.get('base_url') self.__use_db_cache = self.__config.get('use_db_cache', True) self.__timeout = self.__config.get('timeout', 30) - self.__session = requests.Session() - self.__session.mount(self.__base_url, HTTP20Adapter(timeout=self.__timeout)) + self.__session = httpx.Client() self.logger.debug('Base url: ' + self.__base_url) @property @@ -29,6 +31,7 @@ def use_db_cache(self): return self.__use_db_cache def _do_request(self, *args, **kwargs): + self.logger.log(15, json.dumps([args, kwargs])) kwargs['timeout'] = kwargs.get('timeout', self.__timeout) try: return self.__session.request(*args, **kwargs) @@ -40,28 +43,33 @@ def _call_raw(self, method, endpoint, **kwargs): url = '/'.join([self.__base_url, endpoint]) return self._do_request(method, url, **kwargs) - def _call(self, method, endpoint, body=None): - res = self._call_raw(method, endpoint, json=body) + def _call(self, method, endpoint, **kwargs): + res = self._call_raw(method, endpoint, **kwargs) try: result = res.json() except: self.logger.warning('Json decode error', exc_info=True) - self.logger.debug(res.text) + self.logger.log(15, res.text) raise try: res.raise_for_status() except: self.logger.exception('Server Error') - self.logger.debug(result) + self.logger.error(pformat(result)) raise return result def list_dbs(self): - return self._call('get', 'dbs') + return self._call('GET', 'dbs') - def db(self, dbname, **kwargs): - return DB(self, self.open_db(dbname, **kwargs), **self.__config) + def db(self, dbname, local_options=None, **kwargs): + if local_options is None: local_options = {} + return DB(self, self.open_db(dbname, **kwargs), **{**self.__config, **local_options}) def open_db(self, dbname, **kwargs): endpoint = '/'.join(['db', urlquote(dbname, safe='')]) - return self._call('post', endpoint, kwargs) + return self._call('POST', endpoint, **kwargs) + + def searches(self): + endpoint = '/'.join(['peers', 'searches']) + return self._call('GET', endpoint) diff --git a/orbitdbapi/db.py b/orbitdbapi/db.py index f664817..7bf2dc1 100644 --- a/orbitdbapi/db.py +++ b/orbitdbapi/db.py @@ -1,10 +1,11 @@ import json import logging -from copy import deepcopy -from sseclient import SSEClient from collections.abc import Hashable, Iterable +from copy import deepcopy from urllib.parse import quote as urlquote +from sseclient import SSEClient + class DB (): def __init__(self, client, params, **kwargs): @@ -36,6 +37,10 @@ def cache_remove(self, item): if item in self.__cache: del self.__cache[item] + @property + def cached(self): + return self.__use_cache + @property def index_by(self): return self.__index_by @@ -52,36 +57,45 @@ def params(self): def dbname(self): return self.__dbname + @property + def id(self): + return self.__id + @property def dbtype(self): return self.__type + @property + def capabilities(self): + return deepcopy(self.__params.get('capabilities', [])) + @property def queryable(self): - return 'query' in self.__params.get('capabilities', {}) + return 'query' in self.__params.get('capabilities', []) + @property def putable(self): - return 'put' in self.__params.get('capabilities', {}) + return 'put' in self.__params.get('capabilities', []) @property def removeable(self): - return 'remove' in self.__params.get('capabilities', {}) + return 'remove' in self.__params.get('capabilities', []) @property def iterable(self): - return 'iterator' in self.__params.get('capabilities', {}) + return 'iterator' in self.__params.get('capabilities', []) @property def addable(self): - return 'add' in self.__params.get('capabilities', {}) + return 'add' in self.__params.get('capabilities', []) @property def valuable(self): - return 'value' in self.__params.get('capabilities', {}) + return 'value' in self.__params.get('capabilities', []) @property def incrementable(self): - return 'inc' in self.__params.get('capabilities', {}) + return 'inc' in self.__params.get('capabilities', []) @property def indexed(self): @@ -97,7 +111,7 @@ def write_access(self): def info(self): endpoint = '/'.join(['db', self.__id_safe]) - return self.__client._call('get', endpoint) + return self.__client._call('GET', endpoint) def get(self, item, cache=None, unpack=False): if cache is None: cache = self.__use_cache @@ -106,7 +120,7 @@ def get(self, item, cache=None, unpack=False): result = self.__cache[item] else: endpoint = '/'.join(['db', self.__id_safe, item]) - result = self.__client._call('get', endpoint) + result = self.__client._call('GET', endpoint) if cache: self.__cache[item] = result if isinstance(result, Hashable): return deepcopy(result) if isinstance(result, Iterable): return deepcopy(result) @@ -117,13 +131,13 @@ def get(self, item, cache=None, unpack=False): def get_raw(self, item): endpoint = '/'.join(['db', self.__id_safe, 'raw', str(item)]) - return (self.__client._call('get', endpoint)) + return (self.__client._call('GET', endpoint)) def put(self, item, cache=None): if self.__enforce_caps and not self.putable: - raise CapabilityError('Db {} does not have put capability'.format(self.__dbname)) - if self.indexed and (not hasattr(item, self.__index_by)) and self.__enforce_indexby: - raise MissingIndexError("The provided document doesn't contain field '{}'".format(self.__index_by)) + raise CapabilityError(f'Db {self.__dbname} does not have put capability') + if self.indexed and (not self.__index_by in item) and self.__enforce_indexby: + raise MissingIndexError(f"The provided document {item} doesn't contain field '{self.__index_by}'") if cache is None: cache = self.__use_cache if cache: @@ -134,59 +148,77 @@ def put(self, item, cache=None): if index_val: self.__cache[index_val] = item endpoint = '/'.join(['db', self.__id_safe, 'put']) - entry_hash = self.__client._call('post', endpoint, item).get('hash') + entry_hash = self.__client._call('POST', endpoint, json=item).get('hash') if cache and entry_hash: self.__cache[entry_hash] = item return entry_hash def add(self, item, cache=None): if self.__enforce_caps and not self.addable: - raise CapabilityError('Db {} does not have add capability'.format(self.__dbname)) + raise CapabilityError(f'Db {self.__dbname} does not have add capability') if cache is None: cache = self.__use_cache endpoint = '/'.join(['db', self.__id_safe, 'add']) - entry_hash = self.__client._call('post', endpoint, item).get('hash') + entry_hash = self.__client._call('POST', endpoint, json=item).get('hash') if cache and entry_hash: self.__cache[entry_hash] = item return entry_hash + def inc(self, val): + val = int(val) + endpoint = '/'.join(['db', self.__id_safe, 'inc']) + return self.__client._call('POST', endpoint, json={'val':val}) + + def value(self): + endpoint = '/'.join(['db', self.__id_safe, 'value']) + return self.__client._call('GET', endpoint) + def iterator_raw(self, **kwargs): if self.__enforce_caps and not self.iterable: - raise CapabilityError('Db {} does not have iterator capability'.format(self.__dbname)) + raise CapabilityError(f'Db {self.__dbname} does not have iterator capability') endpoint = '/'.join(['db', self.__id_safe, 'rawiterator']) - return self.__client._call('get', endpoint, kwargs) + return self.__client._call('GET', endpoint, json=kwargs) def iterator(self, **kwargs): if self.__enforce_caps and not self.iterable: - raise CapabilityError('Db {} does not have iterator capability'.format(self.__dbname)) + raise CapabilityError(f'Db {self.__dbname} does not have iterator capability') endpoint = '/'.join(['db', self.__id_safe, 'iterator']) - return self.__client._call('get', endpoint, kwargs) + return self.__client._call('GET', endpoint, json=kwargs) def index(self): endpoint = '/'.join(['db', self.__id_safe, 'index']) - result = self.__client._call('get', endpoint) + result = self.__client._call('GET', endpoint) return result def all(self): endpoint = '/'.join(['db', self.__id_safe, 'all']) - result = self.__client._call('get', endpoint) + result = self.__client._call('GET', endpoint) if isinstance(result, Hashable): self.__cache = result return result def remove(self, item): if self.__enforce_caps and not self.removeable: - raise CapabilityError('Db {} does not have remove capability'.format(self.__dbname)) + raise CapabilityError(f'Db {self.__dbname} does not have remove capability') item = str(item) endpoint = '/'.join(['db', self.__id_safe, item]) - return self.__client._call('delete', endpoint) + return self.__client._call('DELETE', endpoint) def unload(self): endpoint = '/'.join(['db', self.__id_safe]) - return self.__client._call('delete', endpoint) + return self.__client._call('DELETE', endpoint) def events(self, eventname): endpoint = '/'.join(['db', self.__id_safe, 'events', urlquote(eventname, safe='')]) - #return SSEClient('{}/{}'.format(self.__client.base_url, endpoint), session=self.__client.session) - req = self.__client._call_raw('get', endpoint, stream=True) - return SSEClient(req).events() + res = self.__client._call_raw('GET', endpoint, stream=True) + res.raise_for_status() + return SSEClient(res.stream()).events() + + def findPeers(self, **kwargs): + endpoint = '/'.join(['peers','searches','db', self.__id_safe]) + return self.__client._call('POST', endpoint, json=kwargs) + + def getPeers(self): + endpoint = '/'.join(['db', self.__id_safe, 'peers']) + return self.__client._call('GET', endpoint) + class CapabilityError(Exception): pass diff --git a/orbitdbapi/version.py b/orbitdbapi/version.py index 5c22e4a..9cf1927 100644 --- a/orbitdbapi/version.py +++ b/orbitdbapi/version.py @@ -1,2 +1,2 @@ -version = '0.3.0-dev0' +version = '0.4.0-dev0' version_info = tuple([int(d) for d in version.split("-")[0].split(".")]) \ No newline at end of file diff --git a/setup.py b/setup.py index d98cbe5..7ab9e9f 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,7 @@ url='https://github.com/phillmac/py-orbit-db-http-client', packages=find_packages(), install_requires=[ - 'requests >= 2.11', - 'hypertemp == 0.8.0', + 'httpx >= 0.7.5', 'sseclient==0.0.24' ], classifiers=[ diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..034366e --- /dev/null +++ b/tests/test.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +import json +import logging +import os +import random +import string +import sys +import unittest +from time import sleep + +from orbitdbapi.client import OrbitDbAPI + +base_url=os.environ.get('ORBIT_DB_HTTP_API_URL') + +def randString(k=5, lowercase=False, both=False): + if both: + return ''.join(random.choices(string.ascii_letters + string.digits, k=k)) + if lowercase: + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=k)) + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=k)) + +class CapabilitiesTestCase(unittest.TestCase): + def setUp(self): + client = OrbitDbAPI(base_url=base_url) + self.kevalue_test = client.db('keyvalue_test', json={'create':True, 'type': 'keyvalue'}) + self.feed_test = client.db('feed_test', json={'create':True, 'type': 'feed'}) + self.event_test = client.db('event_test', json={'create':True, 'type': 'eventlog'}) + self.docstore_test = client.db('docstore_test', json={'create':True, 'type': 'docstore'}) + self.counter_test = client.db('counter_test', json={'create':True, 'type': 'counter'}) + + def runTest(self): + self.assertEqual(set(['get', 'put', 'remove']), set(self.kevalue_test.capabilities)) + self.assertEqual(set(['add', 'get', 'iterator', 'remove']), set(self.feed_test.capabilities)) + self.assertEqual(set(['add', 'get', 'iterator']), set(self.event_test.capabilities)) + self.assertEqual(set(['get', 'put', 'query', 'remove']), set(self.docstore_test.capabilities)) + self.assertEqual(set(['inc', 'value']), set(self.counter_test.capabilities)) + + def tearDown(self): + self.kevalue_test.unload() + self.feed_test.unload() + self.event_test.unload() + self.docstore_test.unload() + self.counter_test.unload() + +class CounterIncrementTestCase(unittest.TestCase): + def setUp(self): + client = OrbitDbAPI(base_url=base_url) + self.counter_test = client.db('counter_test', json={'create':True, 'type': 'counter'}) + + def runTest(self): + localVal = self.counter_test.value() + self.assertEqual(localVal, self.counter_test.value()) + for _c in range(1,100): + incVal = random.randrange(1,100) + localVal += incVal + self.counter_test.inc(incVal) + self.assertEqual(localVal, self.counter_test.value()) + + def tearDown(self): + self.counter_test.unload() + + +class KVStoreGetPutTestCase(unittest.TestCase): + def setUp(self): + client = OrbitDbAPI(base_url=base_url, use_db_cache=False) + self.kevalue_test = client.db('keyvalue_test', json={'create':True, 'type': 'keyvalue'}) + + def runTest(self): + self.assertFalse(self.kevalue_test.cached) + localKV = {} + for _c in range(1,100): + k = randString() + v = randString(k=100, both=True) + localKV[k] = v + self.kevalue_test.put({'key':k, 'value':v}) + self.assertEqual(localKV.get(k), self.kevalue_test.get(k)) + self.assertDictContainsSubset(localKV, self.kevalue_test.all()) + + def tearDown(self): + self.kevalue_test.unload() + +class DocStoreGetPutTestCase(unittest.TestCase): + def setUp(self): + client = OrbitDbAPI(base_url=base_url, use_db_cache=False) + self.docstore_test = client.db('docstore_test', json={'create':True, 'type': 'docstore'}) + + def runTest(self): + self.assertFalse(self.docstore_test.cached) + localDocs = [] + for _c in range(1,100): + k = randString() + v = randString(k=100, both=True) + item = {'_id':k, 'value':v} + localDocs.append(item) + self.docstore_test.put(item) + self.assertDictContainsSubset(item, self.docstore_test.get(k)[0]) + self.assertTrue(all(item in self.docstore_test.all() for item in localDocs)) + + def tearDown(self): + self.docstore_test.unload() + + +class SearchesTestCase(unittest.TestCase): + def setUp(self): + self.client = OrbitDbAPI(base_url=base_url) + self.kevalue_test = self.client.db('keyvalue_test', json={'create':True, 'type': 'keyvalue'}) + + + def runTest(self): + self.kevalue_test.findPeers() + searches = self.client.searches() + self.assertGreater(len(searches), 0) + self.assertGreater(len([s for s in searches if s.get('searchID') == self.kevalue_test.id]), 0) + + def tearDown(self): + self.kevalue_test.unload() + +class SearchPeersTestCase(unittest.TestCase): + def setUp(self): + self.client = OrbitDbAPI(base_url=base_url) + self.kevalue_test = self.client.db('zdpuAuSAkDDRm9KTciShAcph2epSZsNmfPeLQmxw6b5mdLmq5/keyvalue_test') + + def runTest(self): + self.kevalue_test.findPeers(useCustomFindProvs=True) + dbPeers = [] + count = 0 + while len(dbPeers) < 1: + sleep(5) + dbPeers = self.kevalue_test.getPeers() + if count > 60: break + self.assertGreater(len(dbPeers), 0) + + +# def tearDown(self): +# self.kevalue_test.unload() + + + +if __name__ == '__main__': + loglvl = int(os.environ.get('LOG_LEVEL',15)) + print(f'Log level: {loglvl}') + logfmt = '%(asctime)s - %(levelname)s - %(message)s' + logging.basicConfig(format=logfmt, stream=sys.stdout, level=loglvl) + unittest.main()