diff --git a/.logging.yaml b/.logging.yaml index 46773a2..16c6764 100644 --- a/.logging.yaml +++ b/.logging.yaml @@ -12,6 +12,7 @@ loggers: fxsdk: level: DEBUG handlers: [console] + propagate: no root: level: DEBUG handlers: [console] \ No newline at end of file diff --git a/examples/subscribe_event.py b/examples/subscribe_event.py new file mode 100644 index 0000000..179fb31 --- /dev/null +++ b/examples/subscribe_event.py @@ -0,0 +1,97 @@ +import asyncio +import logging.config +from typing import Dict + +import yaml + +from fxsdk.client.websockets import WebsocketRpcClient, ReconnectingRpcWebsocket +from fxsdk.msg import event + + +class BlockchainEvent(WebsocketRpcClient): + + @classmethod + async def create(cls, endpoint_url: str, loop, callback=None): + """Create a WebsocketManagerBase instance + + :param endpoint_url: node endpoint url + :param loop: asyncio loop + :param callback: async callback function to receive messages + :return: + """ + self = BlockchainEvent() + loop = loop if loop else asyncio.get_event_loop() + callback = callback if callback else self.receive + self._conn = ReconnectingRpcWebsocket(endpoint_url, loop=loop, recv=callback) + return self + + async def receive(self, data: Dict): + self._log.info(f"received data: {data}") + result = data.get('result', {}) + query = result.get('query', '') + value = result.get('data', {}).get('value', {}) + if query == event.EVENT_NEW_BLOCK_HEADER: + self.process_new_block_header(value) + elif query == event.EVENT_NEW_BLOCK_EVENTS: + self.process_new_block_events(value) + elif query == event.EVENT_NEW_BLOCK: + self.process_new_block(value) + elif query == event.EVENT_TX: + self.process_tx(value) + + async def subscribe_new_block_header(self): + await self.subscribe(event.EVENT_NEW_BLOCK_HEADER) + + def process_new_block_header(self, data: Dict): + height = data.get('header', {}).get('height', 0) + block_time = data.get('header', {}).get('time', '') + self._log.info(f"new block header: height={height}, block_time={block_time}") + + async def subscribe_new_block_events(self): + await self.subscribe(event.EVENT_NEW_BLOCK_EVENTS) + + def process_new_block_events(self, data: Dict): + height = data.get('height', 0) + events = data.get('events', []) + num_txs = data.get('num_txs', 0) + self._log.info(f"new block events: height={height}, events={len(events)}, num_txs={num_txs}") + + async def subscribe_new_block(self): + await self.subscribe(event.EVENT_NEW_BLOCK) + + def process_new_block(self, data: Dict): + block = data.get('block', {}) + height = block.get('header', {}).get('height', 0) + block_time = block.get('header', {}).get('time', '') + txs = block.get('data', {}).get('txs', []) + events = data.get('result_finalize_block', {}).get('events', []) + self._log.info(f"new block: height={height}, block_time={block_time}, txs={len(txs)}, events={len(events)}") + + async def subscribe_tx(self): + await self.subscribe(event.EVENT_TX) + + def process_tx(self, data: Dict): + height = data.get('TxResult', {}).get('height', 0) + base64_tx = data.get('TxResult', {}).get('tx', '') + events = data.get('TxResult', {}).get('result', {}).get('events') + self._log.info(f"new tx: height={height}, tx={base64_tx}, events={events}") + + +global_loop = None + + +async def main(): + global global_loop + rpc_url = "http://127.0.0.1:26657" + ws_client = await BlockchainEvent.create(endpoint_url=rpc_url, loop=global_loop) + await ws_client.subscribe_new_block() + + while True: + logging.info("sleeping to keep loop open") + await asyncio.sleep(5) + + +if __name__ == '__main__': + logging.config.dictConfig(yaml.safe_load(open('../.logging.yaml', 'r'))) + global_loop = asyncio.get_event_loop() + global_loop.run_until_complete(main()) diff --git a/examples/bank.py b/examples/transfer.py similarity index 100% rename from examples/bank.py rename to examples/transfer.py diff --git a/fxsdk/client/websockets.py b/fxsdk/client/websockets.py index ce20881..c0c359b 100644 --- a/fxsdk/client/websockets.py +++ b/fxsdk/client/websockets.py @@ -4,7 +4,6 @@ from typing import Callable, Awaitable, Optional, Dict from fxsdk.client.http import RpcRequest -from fxsdk.msg import event from fxsdk.websockets import ReconnectingWebsocket, WebsocketManagerBase from fxsdk.x.cosmos.tx.v1beta1.service_pb2 import BroadcastMode, BROADCAST_MODE_SYNC, BROADCAST_MODE_ASYNC, \ BROADCAST_MODE_BLOCK @@ -47,7 +46,7 @@ async def ping(self): class WebsocketRpcClient(WebsocketManagerBase): @classmethod - async def create(cls, endpoint_url: str, callback: Callable[[Dict], Awaitable[None]] = None, loop=None): + async def create(cls, endpoint_url: str, loop, callback: Callable[[Dict], Awaitable[None]] = None): """Create a WebsocketRpcClient instance :param endpoint_url: node endpoint url @@ -55,10 +54,10 @@ async def create(cls, endpoint_url: str, callback: Callable[[Dict], Awaitable[No :param callback: async callback function to receive messages :return: """ - self = WebsocketRpcClient(endpoint_url) + self = WebsocketRpcClient() loop = loop if loop else asyncio.get_event_loop() callback = callback if callback else self.receive - self._conn = ReconnectingRpcWebsocket(endpoint_url, recv=callback, loop=loop) + self._conn = ReconnectingRpcWebsocket(endpoint_url, loop=loop, recv=callback) return self async def close(self): @@ -213,72 +212,3 @@ async def tx_search(self, query: str, prove: Optional[bool] = None, data['limit'] = str(limit) await self._conn.send_rpc_message('tx_search', data) - - -class BlockchainEvent(WebsocketRpcClient): - - @classmethod - async def create(cls, endpoint_url: str, callback=None, loop=None): - """Create a WebsocketManagerBase instance - - :param endpoint_url: node endpoint url - :param loop: asyncio loop - :param callback: async callback function to receive messages - :return: - """ - self = WebsocketManagerBase(endpoint_url) - loop = loop if loop else asyncio.get_event_loop() - callback = callback if callback else self.receive - self._conn = ReconnectingWebsocket(endpoint_url, recv=callback, loop=loop) - return self - - def receive(self, data: Dict): - self._log.info(f"received data: {data}") - result = data.get('result', {}) - query = result.get('query', '') - value = result.get('data', {}).get('value', {}) - if query == event.EVENT_NEW_BLOCK_HEADER: - self.process_new_block_header(value) - elif query == event.EVENT_NEW_BLOCK_EVENTS: - self.process_new_block_events(value) - elif query == event.EVENT_NEW_BLOCK: - self.process_new_block(value) - elif query == event.EVENT_TX: - self.process_tx(value) - - async def subscribe_new_block_header(self): - await self.subscribe(event.EVENT_NEW_BLOCK_HEADER) - - def process_new_block_header(self, data: Dict): - height = data.get('header', {}).get('height', 0) - block_time = data.get('header', {}).get('time', '') - self._log.info(f"new block header: height={height}, block_time={block_time}") - - async def subscribe_new_block_events(self): - await self.subscribe(event.EVENT_NEW_BLOCK_EVENTS) - - def process_new_block_events(self, data: Dict): - height = data.get('height', 0) - events = data.get('events', []) - num_txs = data.get('num_txs', 0) - self._log.info(f"new block events: height={height}, events={len(events)}, num_txs={num_txs}") - - async def subscribe_new_block(self): - await self.subscribe(event.EVENT_NEW_BLOCK) - - def process_new_block(self, data: Dict): - block = data.get('block', {}) - height = block.get('header', {}).get('height', 0) - block_time = block.get('header', {}).get('time', '') - txs = block.get('data', {}).get('txs', []) - events = data.get('result_finalize_block', {}).get('events', []) - self._log.info(f"new block: height={height}, block_time={block_time}, txs={len(txs)}, events={len(events)}") - - async def subscribe_tx(self): - await self.subscribe(event.EVENT_TX) - - def process_tx(self, data: Dict): - height = data.get('TxResult', {}).get('height', 0) - base64_tx = data.get('TxResult', {}).get('tx', '') - events = data.get('TxResult', {}).get('result', {}).get('events') - self._log.info(f"new tx: height={height}, tx={base64_tx}, events={events}") diff --git a/fxsdk/websockets.py b/fxsdk/websockets.py index 3067325..1ea1ecb 100644 --- a/fxsdk/websockets.py +++ b/fxsdk/websockets.py @@ -15,8 +15,8 @@ class ReconnectingWebsocket: TIMEOUT: int = 10 PROTOCOL_VERSION: str = '1.0.0' - def __init__(self, endpoint_url: str, recv, retry_count: int = 5, loop=None): - self._loop = loop if loop else asyncio.get_event_loop() + def __init__(self, endpoint_url: str, loop, recv, retry_count: int = 5): + self._loop = loop self._log = logging.getLogger(__name__) self._recv = recv self._reconnect_attempts: int = 0 @@ -57,17 +57,14 @@ async def _run(self): else: try: evt_obj = json.loads(evt) - except ValueError: - pass - else: await self._recv(evt_obj) + except ValueError: + self._log.error("invalid json: {}".format(evt)) except websockets.ConnectionClosed as e: self._log.debug('conn closed:{}'.format(e)) - keep_waiting = False await self._reconnect() except Exception as e: self._log.debug('ws exception:{}'.format(e)) - keep_waiting = False await self._reconnect() except Exception as e: self._log.info(f"websocket error: {e}") @@ -122,13 +119,12 @@ async def cancel(self): class WebsocketManagerBase: - def __init__(self, endpoint_url: str): - self.endpoint_url = endpoint_url + def __init__(self): self._conn = None self._log = logging.getLogger(__name__) @classmethod - async def create(cls, endpoint_url: str, callback=None, loop=None): + async def create(cls, endpoint_url: str, loop, callback=None): """Create a WebsocketManagerBase instance :param endpoint_url: node endpoint url @@ -136,14 +132,14 @@ async def create(cls, endpoint_url: str, callback=None, loop=None): :param callback: async callback function to receive messages :return: """ - self = WebsocketManagerBase(endpoint_url) - loop = loop if loop else asyncio.get_event_loop() + self = WebsocketManagerBase() + loop = loop callback = callback if callback else self.receive self._conn = ReconnectingWebsocket(endpoint_url, recv=callback, loop=loop) return self - def receive(self, data: Dict): - self._log.info(f"received data: {data}") + async def receive(self, data: Dict): + self._log.debug(f"received data: {data}") async def close(self): await self._conn.cancel() diff --git a/tests/test_http_query.py b/tests/test_http_query.py index 51d80c5..d29226e 100644 --- a/tests/test_http_query.py +++ b/tests/test_http_query.py @@ -30,7 +30,8 @@ async def callback(msg: Dict): async def test_get_block_result(self): logging.config.dictConfig(yaml.safe_load(open('../.logging.yaml', 'r'))) - ws_client = await WebsocketRpcClient.create(endpoint_url=rpc_url, callback=self.callback) + loop = asyncio.get_event_loop() + ws_client = await WebsocketRpcClient.create(endpoint_url=rpc_url, loop=loop, callback=self.callback) await ws_client.get_status() await ws_client.subscribe(event.EVENT_NEW_BLOCK) await asyncio.sleep(5)