From 4610fb5561f38d4697ad4e3dd3b73a9cfc9b53d4 Mon Sep 17 00:00:00 2001 From: Alex Golec Date: Wed, 10 Jun 2020 17:42:07 -0400 Subject: [PATCH] Fix streaming account activity (#29) --- docs/streaming.rst | 17 ++++++ tda/streaming.py | 64 ++++++++++++++++++++- tests/streaming_test.py | 122 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 200 insertions(+), 3 deletions(-) diff --git a/docs/streaming.rst b/docs/streaming.rst index 0d6127e..4adadf9 100644 --- a/docs/streaming.rst +++ b/docs/streaming.rst @@ -536,3 +536,20 @@ access this stream. .. autoclass:: tda.streaming::StreamClient.NewsHeadlineFields :members: :undoc-members: + + +++++++++++++++++ +Account Activity +++++++++++++++++ + +This stream allows you to monitor your account activity, including order +execution/cancellation/expiration/etc. ``tda-api`` provide utilities for setting +up and reading the stream, but leaves the task of parsing the `response XML +object `__ +to the user. + +.. automethod:: tda.streaming::StreamClient.account_activity_sub +.. automethod:: tda.streaming::StreamClient.add_account_activity_handler +.. autoclass:: tda.streaming::StreamClient.AccountActivityFields + :members: + :undoc-members: diff --git a/tda/streaming.py b/tda/streaming.py index 8252bac..537fa47 100644 --- a/tda/streaming.py +++ b/tda/streaming.py @@ -91,6 +91,7 @@ def __init__(self, client, *, account_id=None, enforce_enums=True): # Set by the login() function self._account = None + self._stream_key = None self._socket = None self._source = None @@ -152,7 +153,8 @@ async def _receive(self): return ret async def _init_from_principals(self, principals): - # Initialize accounts + # Initialize accounts and streamer keys. + # Assume a 1-to-1 mapping of streamer keys to accounts. accounts = principals['accounts'] num_accounts = len(accounts) assert num_accounts > 0, 'zero accounts found' @@ -165,8 +167,7 @@ async def _init_from_principals(self, principals): raise ValueError( 'multiple accounts found and StreamClient was ' + 'initialized with unspecified account_id') - - for account in accounts: + for idx, account in enumerate(accounts): if int(account['accountId']) == self._account_id: self._account = account @@ -178,6 +179,13 @@ async def _init_from_principals(self, principals): if self._account_id is None: self._account_id = self._account['accountId'] + # Record streamer subscription keys + stream_keys = principals['streamerSubscriptionKeys']['keys'] + if len(stream_keys) > 1: + self.logger.warn('Found {} stream keys, using the first one'.format( + len(stream_keys))) + self._stream_key = stream_keys[0]['key'] + # Initialize socket wss_url = 'wss://{}/ws'.format( principals['streamerInfo']['streamerSocketUrl']) @@ -401,6 +409,56 @@ async def quality_of_service(self, qos_level): await self._send({'requests': [request]}) await self._await_response(request_id, 'ADMIN', 'QOS') + ########################################################################## + # ACCT_ACTIVITY + + class AccountActivityFields(_BaseFieldEnum): + ''' + `Official documentation `__ + + Data fields for equity account activity. Primarily an implementation detail + and not used in client code. Provided here as documentation for key + values stored returned in the stream messages. + ''' + + #: Subscription key. Represented in the stream as the + #: ``key`` field. + SUBSCRIPTION_KEY = 0 + + #: Account # subscribed + ACCOUNT = 1 + + #: Refer to the `message type table in the official documentation + #: `__ + MESSAGE_TYPE = 2 + + #: The core data for the message. Either XML Message data describing + #: the update, ``NULL`` in some cases, or plain text in case of + #: ``ERROR``. + MESSAGE_DATA = 3 + + async def account_activity_sub(self): + ''' + `Official documentation `__ + + Subscribe to account activity for the account id associated with this + streaming client. See :class:`AccountActivityFields` for more info. + ''' + await self._service_op( + [self._stream_key], 'ACCT_ACTIVITY', 'SUBS', + self.AccountActivityFields) + + def add_account_activity_handler(self, handler): + ''' + Adds a handler to the account activity subscription. See + :ref:`registering_handlers` for details. + ''' + self._handlers['ACCT_ACTIVITY'].append(_Handler(handler, + self.AccountActivityFields)) + ########################################################################## # CHART_EQUITY diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 0b2e149..495393e 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -38,6 +38,9 @@ def parsable_as_int(s): account[key] = value + '-' + str(account['accountId']) return account + def stream_key(self, index): + return {'key': 'streamerSubscriptionKeys-keys-key' + str(index)} + def request_from_socket_mock(self, socket): return json.loads( socket.send.call_args_list[0].args[0])['requests'][0] @@ -104,6 +107,9 @@ async def test_login_single_account_success(self, ws_connect): principals = account_principals() principals['accounts'].clear() principals['accounts'].append(self.account(1)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -146,6 +152,11 @@ async def test_login_multiple_accounts_require_account_id(self, ws_connect): principals['accounts'].clear() principals['accounts'].append(self.account(1)) principals['accounts'].append(self.account(2)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(2)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -162,6 +173,11 @@ async def test_login_multiple_accounts_with_account_id(self, ws_connect): principals['accounts'].clear() principals['accounts'].append(self.account(1)) principals['accounts'].append(self.account(2)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(2)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -205,6 +221,11 @@ async def test_login_unrecognized_account_id(self, ws_connect): principals['accounts'].clear() principals['accounts'].append(self.account(1)) principals['accounts'].append(self.account(2)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(2)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -222,6 +243,9 @@ async def test_login_bad_response(self, ws_connect): principals = account_principals() principals['accounts'].clear() principals['accounts'].append(self.account(1)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -242,6 +266,9 @@ async def test_login_unexpected_request_id(self, ws_connect): principals = account_principals() principals['accounts'].clear() principals['accounts'].append(self.account(1)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -262,6 +289,9 @@ async def test_login_unexpected_service(self, ws_connect): principals = account_principals() principals['accounts'].clear() principals['accounts'].append(self.account(1)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -281,6 +311,9 @@ async def test_login_unexpected_command(self, ws_connect): principals = account_principals() principals['accounts'].clear() principals['accounts'].append(self.account(1)) + principals['streamerSubscriptionKeys']['keys'].clear() + principals['streamerSubscriptionKeys']['keys'].append( + self.stream_key(1)) self.http_client.get_user_principals.return_value = MockResponse( principals, True) @@ -333,6 +366,95 @@ async def test_qos_failure(self, ws_connect): await self.client.quality_of_service(StreamClient.QOSLevel.EXPRESS) socket.recv.assert_awaited_once() + ########################################################################## + # ACCT_ACTIVITY + + @no_duplicates + @patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock) + async def test_account_activity_subs_success(self, ws_connect): + socket = await self.login_and_get_socket(ws_connect) + + socket.recv.side_effect = [json.dumps(self.success_response( + 1, 'ACCT_ACTIVITY', 'SUBS'))] + + await self.client.account_activity_sub() + socket.recv.assert_awaited_once() + request = self.request_from_socket_mock(socket) + + self.assertEqual(request, { + 'account': '1001', + 'service': 'ACCT_ACTIVITY', + 'command': 'SUBS', + 'requestid': '1', + 'source': 'streamerInfo-appId', + 'parameters': { + 'keys': 'streamerSubscriptionKeys-keys-key', + 'fields': '0,1,2,3' + } + }) + + @no_duplicates + @patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock) + async def test_account_activity_subs_failure(self, ws_connect): + socket = await self.login_and_get_socket(ws_connect) + + response = self.success_response(1, 'ACCT_ACTIVITY', 'SUBS') + response['response'][0]['content']['code'] = 21 + socket.recv.side_effect = [json.dumps(response)] + + with self.assertRaises(tda.streaming.UnexpectedResponseCode): + await self.client.account_activity_sub() + + @no_duplicates + @patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock) + async def test_account_activity_handler(self, ws_connect): + socket = await self.login_and_get_socket(ws_connect) + + stream_item = { + 'data': [ + { + 'service': 'ACCT_ACTIVITY', + 'timestamp': 1591754497594, + 'command': 'SUBS', + 'content': [ + { + 'seq': 1, + 'key': 'streamerSubscriptionKeys-keys-key', + '1': '1001', + '2': 'OrderEntryRequest', + '3': '' + } + ] + } + ] + } + + socket.recv.side_effect = [ + json.dumps(self.success_response(1, 'ACCT_ACTIVITY', 'SUBS')), + json.dumps(stream_item)] + await self.client.account_activity_sub() + + handler = Mock() + self.client.add_account_activity_handler(handler) + await self.client.handle_message() + + expected_item = { + 'service': 'ACCT_ACTIVITY', + 'timestamp': 1591754497594, + 'command': 'SUBS', + 'content': [ + { + 'seq': 1, + 'key': 'streamerSubscriptionKeys-keys-key', + 'ACCOUNT': '1001', + 'MESSAGE_TYPE': 'OrderEntryRequest', + 'MESSAGE_DATA': '' + } + ] + } + + self.assert_handler_called_once_with(handler, expected_item) + ########################################################################## # CHART_EQUITY