Skip to content

Commit

Permalink
Fix streaming account activity (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgolec authored Jun 10, 2020
1 parent 850bcca commit 4610fb5
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 3 deletions.
17 changes: 17 additions & 0 deletions docs/streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,20 @@ access this stream.
.. autoclass:: tda.streaming::StreamClient.NewsHeadlineFields
:members:
:undoc-members:


++++++++++++++++
Account Activity
++++++++++++++++

This stream allows you to monitor your account activity, including order
execution/cancellation/expiration/etc. ``tda-api`` provide utilities for setting
up and reading the stream, but leaves the task of parsing the `response XML
object <https://developer.tdameritrade.com/content/streaming-data#_Toc504640581>`__
to the user.

.. automethod:: tda.streaming::StreamClient.account_activity_sub
.. automethod:: tda.streaming::StreamClient.add_account_activity_handler
.. autoclass:: tda.streaming::StreamClient.AccountActivityFields
:members:
:undoc-members:
64 changes: 61 additions & 3 deletions tda/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(self, client, *, account_id=None, enforce_enums=True):

# Set by the login() function
self._account = None
self._stream_key = None
self._socket = None
self._source = None

Expand Down Expand Up @@ -152,7 +153,8 @@ async def _receive(self):
return ret

async def _init_from_principals(self, principals):
# Initialize accounts
# Initialize accounts and streamer keys.
# Assume a 1-to-1 mapping of streamer keys to accounts.
accounts = principals['accounts']
num_accounts = len(accounts)
assert num_accounts > 0, 'zero accounts found'
Expand All @@ -165,8 +167,7 @@ async def _init_from_principals(self, principals):
raise ValueError(
'multiple accounts found and StreamClient was ' +
'initialized with unspecified account_id')

for account in accounts:
for idx, account in enumerate(accounts):
if int(account['accountId']) == self._account_id:
self._account = account

Expand All @@ -178,6 +179,13 @@ async def _init_from_principals(self, principals):
if self._account_id is None:
self._account_id = self._account['accountId']

# Record streamer subscription keys
stream_keys = principals['streamerSubscriptionKeys']['keys']
if len(stream_keys) > 1:
self.logger.warn('Found {} stream keys, using the first one'.format(
len(stream_keys)))
self._stream_key = stream_keys[0]['key']

# Initialize socket
wss_url = 'wss://{}/ws'.format(
principals['streamerInfo']['streamerSocketUrl'])
Expand Down Expand Up @@ -401,6 +409,56 @@ async def quality_of_service(self, qos_level):
await self._send({'requests': [request]})
await self._await_response(request_id, 'ADMIN', 'QOS')

##########################################################################
# ACCT_ACTIVITY

class AccountActivityFields(_BaseFieldEnum):
'''
`Official documentation <https://developer.tdameritrade.com/content/
streaming-data#_Toc504640580>`__
Data fields for equity account activity. Primarily an implementation detail
and not used in client code. Provided here as documentation for key
values stored returned in the stream messages.
'''

#: Subscription key. Represented in the stream as the
#: ``key`` field.
SUBSCRIPTION_KEY = 0

#: Account # subscribed
ACCOUNT = 1

#: Refer to the `message type table in the official documentation
#: <https://developer.tdameritrade.com/content/streaming-data
#: #_Toc504640581>`__
MESSAGE_TYPE = 2

#: The core data for the message. Either XML Message data describing
#: the update, ``NULL`` in some cases, or plain text in case of
#: ``ERROR``.
MESSAGE_DATA = 3

async def account_activity_sub(self):
'''
`Official documentation <https://developer.tdameritrade.com/content/
streaming-data#_Toc504640580>`__
Subscribe to account activity for the account id associated with this
streaming client. See :class:`AccountActivityFields` for more info.
'''
await self._service_op(
[self._stream_key], 'ACCT_ACTIVITY', 'SUBS',
self.AccountActivityFields)

def add_account_activity_handler(self, handler):
'''
Adds a handler to the account activity subscription. See
:ref:`registering_handlers` for details.
'''
self._handlers['ACCT_ACTIVITY'].append(_Handler(handler,
self.AccountActivityFields))

##########################################################################
# CHART_EQUITY

Expand Down
122 changes: 122 additions & 0 deletions tests/streaming_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def parsable_as_int(s):
account[key] = value + '-' + str(account['accountId'])
return account

def stream_key(self, index):
return {'key': 'streamerSubscriptionKeys-keys-key' + str(index)}

def request_from_socket_mock(self, socket):
return json.loads(
socket.send.call_args_list[0].args[0])['requests'][0]
Expand Down Expand Up @@ -104,6 +107,9 @@ async def test_login_single_account_success(self, ws_connect):
principals = account_principals()
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand Down Expand Up @@ -146,6 +152,11 @@ async def test_login_multiple_accounts_require_account_id(self, ws_connect):
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['accounts'].append(self.account(2))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(2))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand All @@ -162,6 +173,11 @@ async def test_login_multiple_accounts_with_account_id(self, ws_connect):
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['accounts'].append(self.account(2))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(2))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand Down Expand Up @@ -205,6 +221,11 @@ async def test_login_unrecognized_account_id(self, ws_connect):
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['accounts'].append(self.account(2))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(2))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand All @@ -222,6 +243,9 @@ async def test_login_bad_response(self, ws_connect):
principals = account_principals()
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand All @@ -242,6 +266,9 @@ async def test_login_unexpected_request_id(self, ws_connect):
principals = account_principals()
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand All @@ -262,6 +289,9 @@ async def test_login_unexpected_service(self, ws_connect):
principals = account_principals()
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand All @@ -281,6 +311,9 @@ async def test_login_unexpected_command(self, ws_connect):
principals = account_principals()
principals['accounts'].clear()
principals['accounts'].append(self.account(1))
principals['streamerSubscriptionKeys']['keys'].clear()
principals['streamerSubscriptionKeys']['keys'].append(
self.stream_key(1))

self.http_client.get_user_principals.return_value = MockResponse(
principals, True)
Expand Down Expand Up @@ -333,6 +366,95 @@ async def test_qos_failure(self, ws_connect):
await self.client.quality_of_service(StreamClient.QOSLevel.EXPRESS)
socket.recv.assert_awaited_once()

##########################################################################
# ACCT_ACTIVITY

@no_duplicates
@patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock)
async def test_account_activity_subs_success(self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'ACCT_ACTIVITY', 'SUBS'))]

await self.client.account_activity_sub()
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'ACCT_ACTIVITY',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'streamerSubscriptionKeys-keys-key',
'fields': '0,1,2,3'
}
})

@no_duplicates
@patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock)
async def test_account_activity_subs_failure(self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

response = self.success_response(1, 'ACCT_ACTIVITY', 'SUBS')
response['response'][0]['content']['code'] = 21
socket.recv.side_effect = [json.dumps(response)]

with self.assertRaises(tda.streaming.UnexpectedResponseCode):
await self.client.account_activity_sub()

@no_duplicates
@patch('tda.streaming.websockets.client.connect', new_callable=AsyncMock)
async def test_account_activity_handler(self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

stream_item = {
'data': [
{
'service': 'ACCT_ACTIVITY',
'timestamp': 1591754497594,
'command': 'SUBS',
'content': [
{
'seq': 1,
'key': 'streamerSubscriptionKeys-keys-key',
'1': '1001',
'2': 'OrderEntryRequest',
'3': ''
}
]
}
]
}

socket.recv.side_effect = [
json.dumps(self.success_response(1, 'ACCT_ACTIVITY', 'SUBS')),
json.dumps(stream_item)]
await self.client.account_activity_sub()

handler = Mock()
self.client.add_account_activity_handler(handler)
await self.client.handle_message()

expected_item = {
'service': 'ACCT_ACTIVITY',
'timestamp': 1591754497594,
'command': 'SUBS',
'content': [
{
'seq': 1,
'key': 'streamerSubscriptionKeys-keys-key',
'ACCOUNT': '1001',
'MESSAGE_TYPE': 'OrderEntryRequest',
'MESSAGE_DATA': ''
}
]
}

self.assert_handler_called_once_with(handler, expected_item)

##########################################################################
# CHART_EQUITY

Expand Down

0 comments on commit 4610fb5

Please sign in to comment.