Skip to content

Commit

Permalink
Force streaming subscribers to include SYMBOL as a requested field (#149
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alexgolec authored Jan 20, 2021
1 parent 39e9a39 commit 9b57c3b
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 0 deletions.
16 changes: 16 additions & 0 deletions tda/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,8 @@ async def level_one_equity_subs(self, symbols, *, fields=None):
the fields to return in streaming entries. If unset, all
fields will be requested.
'''
if fields and self.LevelOneEquityFields.SYMBOL not in fields:
fields.append(self.LevelOneEquityFields.SYMBOL)
await self._service_op(
symbols, 'QUOTE', 'SUBS', self.LevelOneEquityFields,
fields=fields)
Expand Down Expand Up @@ -978,6 +980,8 @@ async def level_one_option_subs(self, symbols, *, fields=None):
the fields to return in streaming entries. If unset, all
fields will be requested.
'''
if fields and self.LevelOneOptionFields.SYMBOL not in fields:
fields.append(self.LevelOneOptionFields.SYMBOL)
await self._service_op(
symbols, 'OPTION', 'SUBS', self.LevelOneOptionFields,
fields=fields)
Expand Down Expand Up @@ -1135,6 +1139,8 @@ async def level_one_futures_subs(self, symbols, *, fields=None):
the fields to return in streaming entries. If unset, all
fields will be requested.
'''
if fields and self.LevelOneFuturesFields.SYMBOL not in fields:
fields.append(self.LevelOneFuturesFields.SYMBOL)
await self._service_op(
symbols, 'LEVELONE_FUTURES', 'SUBS', self.LevelOneFuturesFields,
fields=fields)
Expand Down Expand Up @@ -1265,6 +1271,8 @@ async def level_one_forex_subs(self, symbols, *, fields=None):
the fields to return in streaming entries. If unset, all
fields will be requested.
'''
if fields and self.LevelOneForexFields.SYMBOL not in fields:
fields.append(self.LevelOneForexFields.SYMBOL)
await self._service_op(
symbols, 'LEVELONE_FOREX', 'SUBS', self.LevelOneForexFields,
fields=fields)
Expand Down Expand Up @@ -1414,6 +1422,8 @@ async def level_one_futures_options_subs(self, symbols, *, fields=None):
representing the fields to return in streaming entries.
If unset, all fields will be requested.
'''
if fields and self.LevelOneFuturesOptionsFields.SYMBOL not in fields:
fields.append(self.LevelOneFuturesOptionsFields.SYMBOL)
await self._service_op(
symbols, 'LEVELONE_FUTURES_OPTIONS', 'SUBS',
self.LevelOneFuturesOptionsFields, fields=fields)
Expand Down Expand Up @@ -1460,6 +1470,8 @@ async def timesale_equity_subs(self, symbols, *, fields=None):
:param symbols: Equity symbols to subscribe to
'''
if fields and self.TimesaleFields.SYMBOL not in fields:
fields.append(self.TimesaleFields.SYMBOL)
await self._service_op(
symbols, 'TIMESALE_EQUITY', 'SUBS',
self.TimesaleFields, fields=fields)
Expand All @@ -1481,6 +1493,8 @@ async def timesale_futures_subs(self, symbols, *, fields=None):
:param symbols: Futures symbols to subscribe to
'''
if fields and self.TimesaleFields.SYMBOL not in fields:
fields.append(self.TimesaleFields.SYMBOL)
await self._service_op(
symbols, 'TIMESALE_FUTURES', 'SUBS',
self.TimesaleFields, fields=fields)
Expand All @@ -1502,6 +1516,8 @@ async def timesale_options_subs(self, symbols, *, fields=None):
:param symbols: Options symbols to subscribe to
'''
if fields and self.TimesaleFields.SYMBOL not in fields:
fields.append(self.TimesaleFields.SYMBOL)
await self._service_op(
symbols, 'TIMESALE_OPTIONS', 'SUBS',
self.TimesaleFields, fields=fields)
Expand Down
231 changes: 231 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,35 @@ async def test_level_one_equity_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_equity_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'QUOTE', 'SUBS'))]

await self.client.level_one_equity_subs(['GOOG', 'MSFT'], fields=[
StreamClient.LevelOneEquityFields.BID_PRICE,
StreamClient.LevelOneEquityFields.ASK_PRICE,
StreamClient.LevelOneEquityFields.QUOTE_TIME,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'QUOTE',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'GOOG,MSFT',
'fields': '0,1,2,11'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_equity_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -1207,6 +1236,36 @@ async def test_level_one_option_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_option_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'OPTION', 'SUBS'))]

await self.client.level_one_option_subs(
['GOOG_052920C620', 'MSFT_052920C145'], fields=[
StreamClient.LevelOneOptionFields.BID_PRICE,
StreamClient.LevelOneOptionFields.ASK_PRICE,
StreamClient.LevelOneOptionFields.VOLATILITY,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'OPTION',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'GOOG_052920C620,MSFT_052920C145',
'fields': '0,2,3,10'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_option_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -1474,6 +1533,35 @@ async def test_level_one_futures_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_futures_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'LEVELONE_FUTURES', 'SUBS'))]

await self.client.level_one_futures_subs(['/ES', '/CL'], fields=[
StreamClient.LevelOneFuturesFields.BID_PRICE,
StreamClient.LevelOneFuturesFields.ASK_PRICE,
StreamClient.LevelOneFuturesFields.FUTURE_PRICE_FORMAT,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'LEVELONE_FUTURES',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': '/ES,/CL',
'fields': '0,1,2,28'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_futures_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -1738,6 +1826,35 @@ async def test_level_one_forex_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_forex_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'LEVELONE_FOREX', 'SUBS'))]

await self.client.level_one_forex_subs(['EUR/USD', 'EUR/GBP'], fields=[
StreamClient.LevelOneForexFields.HIGH_PRICE,
StreamClient.LevelOneForexFields.LOW_PRICE,
StreamClient.LevelOneForexFields.MARKET_MAKER,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'LEVELONE_FOREX',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'EUR/USD,EUR/GBP',
'fields': '0,10,11,26'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_forex_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -1974,6 +2091,36 @@ async def test_level_one_futures_options_subs_success_some_fields(
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_futures_options_subs_success_some_fields_no_symol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'LEVELONE_FUTURES_OPTIONS', 'SUBS'))]

await self.client.level_one_futures_options_subs(
['NQU20_C6500', 'NQU20_P6500'], fields=[
StreamClient.LevelOneFuturesOptionsFields.BID_SIZE,
StreamClient.LevelOneFuturesOptionsFields.ASK_SIZE,
StreamClient.LevelOneFuturesOptionsFields.FUTURE_PRICE_FORMAT,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'LEVELONE_FUTURES_OPTIONS',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'NQU20_C6500,NQU20_P6500',
'fields': '0,4,5,28'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_level_one_futures_options_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -2244,6 +2391,34 @@ async def test_timesale_equity_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_equity_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'TIMESALE_EQUITY', 'SUBS'))]

await self.client.timesale_equity_subs(['GOOG', 'MSFT'], fields=[
StreamClient.TimesaleFields.TRADE_TIME,
StreamClient.TimesaleFields.LAST_SIZE,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'TIMESALE_EQUITY',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'GOOG,MSFT',
'fields': '0,1,3'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_equity_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -2374,6 +2549,34 @@ async def test_timesale_futures_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_futures_subs_success_some_fields_no_symbol(
self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'TIMESALE_FUTURES', 'SUBS'))]

await self.client.timesale_futures_subs(['/ES', '/CL'], fields=[
StreamClient.TimesaleFields.TRADE_TIME,
StreamClient.TimesaleFields.LAST_SIZE,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'TIMESALE_FUTURES',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': '/ES,/CL',
'fields': '0,1,3'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_futures_subs_failure(self, ws_connect):
Expand Down Expand Up @@ -2504,6 +2707,34 @@ async def test_timesale_options_subs_success_some_fields(self, ws_connect):
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_options_subs_success_some_fields_no_symbol(self, ws_connect):
socket = await self.login_and_get_socket(ws_connect)

socket.recv.side_effect = [json.dumps(self.success_response(
1, 'TIMESALE_OPTIONS', 'SUBS'))]

await self.client.timesale_options_subs(
['GOOG_052920C620', 'MSFT_052920C145'], fields=[
StreamClient.TimesaleFields.TRADE_TIME,
StreamClient.TimesaleFields.LAST_SIZE,
])
socket.recv.assert_awaited_once()
request = self.request_from_socket_mock(socket)

self.assertEqual(request, {
'account': '1001',
'service': 'TIMESALE_OPTIONS',
'command': 'SUBS',
'requestid': '1',
'source': 'streamerInfo-appId',
'parameters': {
'keys': 'GOOG_052920C620,MSFT_052920C145',
'fields': '0,1,3'
}
})

@no_duplicates
@asynctest.patch('tda.streaming.websockets.client.connect', new_callable=asynctest.CoroutineMock)
async def test_timesale_options_subs_failure(self, ws_connect):
Expand Down

0 comments on commit 9b57c3b

Please sign in to comment.