From 17c4a0876447858a2f2ca775d505770bb26d9939 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 31 Aug 2019 15:03:57 +0100 Subject: [PATCH 1/4] Track branch in bankroll-marketdata --- requirements.txt | Bin 838 -> 972 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/requirements.txt b/requirements.txt index e5847ce28a25cdca9a7012ef3f3a067e0bdc2e66..12b29d66fdbd6773eaf10e987949890bb110bcc5 100644 GIT binary patch delta 147 zcmX@cc7}bzgz$8ROokE$ZH5dWS;A1jP|RS(pbvx)@eGDih9m|(hGd3(hFqX%5TRGJFZlnb;b4``?^Lkf^r0>sKd@pK@xo!GBF Lu}5O#oh&8*K3^cy delta 29 jcmX@ZevECxgvlz5YP_}#whRUgdJHB&Y_Qprv6%?~bc6<% From 01c8be57c0cf56cae84f840700028776d76ee19a Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 31 Aug 2019 15:13:33 +0100 Subject: [PATCH 2/4] Split IBDataProvider into its own module --- bankroll/brokers/ibkr/__init__.py | 3 +- bankroll/brokers/ibkr/account.py | 168 +------------------------- bankroll/brokers/ibkr/marketdata.py | 179 ++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 164 deletions(-) create mode 100644 bankroll/brokers/ibkr/marketdata.py diff --git a/bankroll/brokers/ibkr/__init__.py b/bankroll/brokers/ibkr/__init__.py index d127d0a..315fa96 100644 --- a/bankroll/brokers/ibkr/__init__.py +++ b/bankroll/brokers/ibkr/__init__.py @@ -1,3 +1,4 @@ -from .account import IBAccount, IBDataProvider, Settings, contract +from .account import IBAccount, Settings +from .marketdata import IBDataProvider, contract __all__ = ["IBAccount", "IBDataProvider", "Settings", "contract"] diff --git a/bankroll/brokers/ibkr/account.py b/bankroll/brokers/ibkr/account.py index d20187b..3c326e8 100644 --- a/bankroll/brokers/ibkr/account.py +++ b/bankroll/brokers/ibkr/account.py @@ -35,10 +35,8 @@ import backoff # type: ignore import ib_insync as IB # type: ignore import pandas as pd # type: ignore -from progress.spinner import Spinner # type: ignore - from bankroll.broker import configuration, parsetools -from bankroll.marketdata import MarketConnectedAccountData, MarketDataProvider +from bankroll.marketdata import MarketConnectedAccountData from bankroll.model import ( AccountBalance, Activity, @@ -58,6 +56,9 @@ Trade, TradeFlags, ) +from progress.spinner import Spinner # type: ignore + +from .marketdata import IBDataProvider @unique @@ -768,165 +769,6 @@ def _downloadBalance(ib: IB.IB, lenient: bool) -> AccountBalance: return AccountBalance(cash=cashByCurrency) -def _stockContract(stock: Stock) -> IB.Contract: - return IB.Stock( - symbol=stock.symbol, - exchange=f"SMART:{stock.exchange}" if stock.exchange else "SMART", - currency=stock.currency.name, - ) - - -def _bondContract(bond: Bond) -> IB.Contract: - return IB.Bond( - symbol=bond.symbol, - exchange=bond.exchange or "SMART", - currency=bond.currency.name, - ) - - -def _optionContract(option: Option, cls: Type[IB.Contract] = IB.Option) -> IB.Contract: - lastTradeDate = option.expiration.strftime("%Y%m%d") - defaultExchange = "" if issubclass(cls, IB.FuturesOption) else "SMART" - - return cls( - localSymbol=option.symbol, - exchange=option.exchange or defaultExchange, - currency=option.currency.name, - lastTradeDateOrContractMonth=lastTradeDate, - right=option.optionType.value, - strike=float(option.strike), - multiplier=str(option.multiplier), - ) - - -def _futuresContract(future: Future) -> IB.Contract: - lastTradeDate = future.expiration.strftime("%Y%m%d") - - return IB.Future( - symbol=future.symbol, - exchange=future.exchange or "", - currency=future.currency.name, - multiplier=str(future.multiplier), - lastTradeDateOrContractMonth=lastTradeDate, - ) - - -def _forexContract(forex: Forex) -> IB.Contract: - return IB.Forex( - pair=forex.symbol, - currency=forex.currency.name, - exchange=forex.exchange or "IDEALPRO", - ) - - -def contract(instrument: Instrument) -> IB.Contract: - if isinstance(instrument, Stock): - return _stockContract(instrument) - elif isinstance(instrument, Bond): - return _bondContract(instrument) - elif isinstance(instrument, FutureOption): - return _optionContract(instrument, cls=IB.FuturesOption) - elif isinstance(instrument, Option): - return _optionContract(instrument) - elif isinstance(instrument, Future): - return _futuresContract(instrument) - elif isinstance(instrument, Forex): - return _forexContract(instrument) - else: - raise ValueError(f"Unexpected type of instrument: {instrument!r}") - - -# https://interactivebrokers.github.io/tws-api/market_data_type.html -class _MarketDataType(IntEnum): - LIVE = 1 - FROZEN = 2 - DELAYED = 3 - DELAYED_FROZEN = 4 - - -class IBDataProvider(MarketDataProvider): - def __init__(self, client: IB.IB): - self._client = client - super().__init__() - - def qualifyContracts( - self, instruments: Iterable[Instrument] - ) -> Dict[Instrument, IB.Contract]: - # IB.Contract is not guaranteed to be hashable, so we orient the table this way, albeit less useful. - # TODO: Check uniqueness of instruments - contractsByInstrument: Dict[Instrument, IB.Contract] = { - i: contract(i) for i in instruments - } - - self._client.qualifyContracts(*contractsByInstrument.values()) - - return contractsByInstrument - - def fetchHistoricalData(self, instrument: Instrument) -> pd.DataFrame: - contractsByInstrument = self.qualifyContracts([instrument]) - data = self._client.reqHistoricalData( - contractsByInstrument[instrument], - endDateTime="", - durationStr="10 Y", - barSizeSetting="1 day", - whatToShow="TRADES", - useRTH=True, - formatDate=1, - ) - return IB.util.df(data) - - def fetchQuotes( - self, - instruments: Iterable[Instrument], - dataType: _MarketDataType = _MarketDataType.DELAYED_FROZEN, - ) -> Iterable[Tuple[Instrument, Quote]]: - self._client.reqMarketDataType(dataType.value) - - contractsByInstrument = self.qualifyContracts(instruments) - - # Note: this blocks until all tickers come back. When we want this to be async, we'll need to use reqMktData(). - # See https://github.com/jspahrsummers/bankroll/issues/13. - tickers = self._client.reqTickers(*contractsByInstrument.values()) - - for ticker in tickers: - instrument = next( - (i for (i, c) in contractsByInstrument.items() if c == ticker.contract) - ) - - bid: Optional[Cash] = None - ask: Optional[Cash] = None - last: Optional[Cash] = None - close: Optional[Cash] = None - - factor = 1 - - # Tickers are quoted in GBX despite all the other data being in GBP. - if instrument.currency == Currency.GBP: - factor = 100 - - if (ticker.bid and math.isfinite(ticker.bid)) and not ticker.bidSize == 0: - bid = Cash( - currency=instrument.currency, quantity=Decimal(ticker.bid) / factor - ) - if (ticker.ask and math.isfinite(ticker.ask)) and not ticker.askSize == 0: - ask = Cash( - currency=instrument.currency, quantity=Decimal(ticker.ask) / factor - ) - if ( - ticker.last and math.isfinite(ticker.last) - ) and not ticker.lastSize == 0: - last = Cash( - currency=instrument.currency, quantity=Decimal(ticker.last) / factor - ) - if ticker.close and math.isfinite(ticker.close): - close = Cash( - currency=instrument.currency, - quantity=Decimal(ticker.close) / factor, - ) - - yield (instrument, Quote(bid=bid, ask=ask, last=last, close=close)) - - class IBAccount(MarketConnectedAccountData): _cachedActivity: Optional[Sequence[Activity]] = None _client: Optional[IB.IB] = None @@ -1042,5 +884,5 @@ def balance(self) -> AccountBalance: return _downloadBalance(self.client, self._lenient) @property - def marketDataProvider(self) -> MarketDataProvider: + def marketDataProvider(self) -> IBDataProvider: return IBDataProvider(client=self.client) diff --git a/bankroll/brokers/ibkr/marketdata.py b/bankroll/brokers/ibkr/marketdata.py new file mode 100644 index 0000000..f133feb --- /dev/null +++ b/bankroll/brokers/ibkr/marketdata.py @@ -0,0 +1,179 @@ +import math +from decimal import Decimal +from enum import IntEnum +from typing import Dict, Iterable, Optional, Tuple, Type + +import ib_insync as IB # type: ignore +import pandas as pd # type: ignore +from bankroll.marketdata import MarketDataProvider +from bankroll.model import ( + Bond, + Cash, + Currency, + Forex, + Future, + FutureOption, + Instrument, + Option, + Quote, + Stock, +) + + +def _stockContract(stock: Stock) -> IB.Contract: + return IB.Stock( + symbol=stock.symbol, + exchange=f"SMART:{stock.exchange}" if stock.exchange else "SMART", + currency=stock.currency.name, + ) + + +def _bondContract(bond: Bond) -> IB.Contract: + return IB.Bond( + symbol=bond.symbol, + exchange=bond.exchange or "SMART", + currency=bond.currency.name, + ) + + +def _optionContract(option: Option, cls: Type[IB.Contract] = IB.Option) -> IB.Contract: + lastTradeDate = option.expiration.strftime("%Y%m%d") + defaultExchange = "" if issubclass(cls, IB.FuturesOption) else "SMART" + + return cls( + localSymbol=option.symbol, + exchange=option.exchange or defaultExchange, + currency=option.currency.name, + lastTradeDateOrContractMonth=lastTradeDate, + right=option.optionType.value, + strike=float(option.strike), + multiplier=str(option.multiplier), + ) + + +def _futuresContract(future: Future) -> IB.Contract: + lastTradeDate = future.expiration.strftime("%Y%m%d") + + return IB.Future( + symbol=future.symbol, + exchange=future.exchange or "", + currency=future.currency.name, + multiplier=str(future.multiplier), + lastTradeDateOrContractMonth=lastTradeDate, + ) + + +def _forexContract(forex: Forex) -> IB.Contract: + return IB.Forex( + pair=forex.symbol, + currency=forex.currency.name, + exchange=forex.exchange or "IDEALPRO", + ) + + +def contract(instrument: Instrument) -> IB.Contract: + if isinstance(instrument, Stock): + return _stockContract(instrument) + elif isinstance(instrument, Bond): + return _bondContract(instrument) + elif isinstance(instrument, FutureOption): + return _optionContract(instrument, cls=IB.FuturesOption) + elif isinstance(instrument, Option): + return _optionContract(instrument) + elif isinstance(instrument, Future): + return _futuresContract(instrument) + elif isinstance(instrument, Forex): + return _forexContract(instrument) + else: + raise ValueError(f"Unexpected type of instrument: {instrument!r}") + + +# https://interactivebrokers.github.io/tws-api/market_data_type.html +class _MarketDataType(IntEnum): + LIVE = 1 + FROZEN = 2 + DELAYED = 3 + DELAYED_FROZEN = 4 + + +class IBDataProvider(MarketDataProvider): + def __init__(self, client: IB.IB): + self._client = client + super().__init__() + + def qualifyContracts( + self, instruments: Iterable[Instrument] + ) -> Dict[Instrument, IB.Contract]: + # IB.Contract is not guaranteed to be hashable, so we orient the table this way, albeit less useful. + # TODO: Check uniqueness of instruments + contractsByInstrument: Dict[Instrument, IB.Contract] = { + i: contract(i) for i in instruments + } + + self._client.qualifyContracts(*contractsByInstrument.values()) + + return contractsByInstrument + + def fetchHistoricalData(self, instrument: Instrument) -> pd.DataFrame: + contractsByInstrument = self.qualifyContracts([instrument]) + data = self._client.reqHistoricalData( + contractsByInstrument[instrument], + endDateTime="", + durationStr="10 Y", + barSizeSetting="1 day", + whatToShow="TRADES", + useRTH=True, + formatDate=1, + ) + return IB.util.df(data) + + def fetchQuotes( + self, + instruments: Iterable[Instrument], + dataType: _MarketDataType = _MarketDataType.DELAYED_FROZEN, + ) -> Iterable[Tuple[Instrument, Quote]]: + self._client.reqMarketDataType(dataType.value) + + contractsByInstrument = self.qualifyContracts(instruments) + + # Note: this blocks until all tickers come back. When we want this to be async, we'll need to use reqMktData(). + # See https://github.com/jspahrsummers/bankroll/issues/13. + tickers = self._client.reqTickers(*contractsByInstrument.values()) + + for ticker in tickers: + instrument = next( + (i for (i, c) in contractsByInstrument.items() if c == ticker.contract) + ) + + bid: Optional[Cash] = None + ask: Optional[Cash] = None + last: Optional[Cash] = None + close: Optional[Cash] = None + + factor = 1 + + # Tickers are quoted in GBX despite all the other data being in GBP. + if instrument.currency == Currency.GBP: + factor = 100 + + if (ticker.bid and math.isfinite(ticker.bid)) and not ticker.bidSize == 0: + bid = Cash( + currency=instrument.currency, quantity=Decimal(ticker.bid) / factor + ) + if (ticker.ask and math.isfinite(ticker.ask)) and not ticker.askSize == 0: + ask = Cash( + currency=instrument.currency, quantity=Decimal(ticker.ask) / factor + ) + if ( + ticker.last and math.isfinite(ticker.last) + ) and not ticker.lastSize == 0: + last = Cash( + currency=instrument.currency, quantity=Decimal(ticker.last) / factor + ) + if ticker.close and math.isfinite(ticker.close): + close = Cash( + currency=instrument.currency, + quantity=Decimal(ticker.close) / factor, + ) + + yield (instrument, Quote(bid=bid, ask=ask, last=last, close=close)) From ebfb937db52fcf7323992a71c06edac57add3519 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 31 Aug 2019 15:36:20 +0100 Subject: [PATCH 3/4] WIP StreamingMarketDataProvider --- bankroll/brokers/ibkr/marketdata.py | 91 +++++++++++++++++------------ 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/bankroll/brokers/ibkr/marketdata.py b/bankroll/brokers/ibkr/marketdata.py index f133feb..b225fd0 100644 --- a/bankroll/brokers/ibkr/marketdata.py +++ b/bankroll/brokers/ibkr/marketdata.py @@ -5,7 +5,7 @@ import ib_insync as IB # type: ignore import pandas as pd # type: ignore -from bankroll.marketdata import MarketDataProvider +from bankroll.marketdata import StreamingMarketDataProvider from bankroll.model import ( Bond, Cash, @@ -19,6 +19,8 @@ Stock, ) +from rx.core.typing import Observable + def _stockContract(stock: Stock) -> IB.Contract: return IB.Stock( @@ -96,9 +98,17 @@ class _MarketDataType(IntEnum): DELAYED_FROZEN = 4 -class IBDataProvider(MarketDataProvider): - def __init__(self, client: IB.IB): +class IBDataProvider(StreamingMarketDataProvider): + def __init__( + self, + client: IB.IB, + dataType: Optional[_MarketDataType] = _MarketDataType.DELAYED_FROZEN, + ): self._client = client + + if dataType is not None: + self._client.reqMarketDataType(dataType.value) + super().__init__() def qualifyContracts( @@ -127,12 +137,45 @@ def fetchHistoricalData(self, instrument: Instrument) -> pd.DataFrame: ) return IB.util.df(data) + def _quoteFromTicker(self, ticker: IB.Ticker, instrument: Instrument) -> Quote: + bid: Optional[Cash] = None + ask: Optional[Cash] = None + last: Optional[Cash] = None + close: Optional[Cash] = None + + factor = 1 + + # Tickers are quoted in GBX despite all the other data being in GBP. + if instrument.currency == Currency.GBP: + factor = 100 + + if (ticker.bid and math.isfinite(ticker.bid)) and not ticker.bidSize == 0: + bid = Cash( + currency=instrument.currency, quantity=Decimal(ticker.bid) / factor + ) + if (ticker.ask and math.isfinite(ticker.ask)) and not ticker.askSize == 0: + ask = Cash( + currency=instrument.currency, quantity=Decimal(ticker.ask) / factor + ) + if (ticker.last and math.isfinite(ticker.last)) and not ticker.lastSize == 0: + last = Cash( + currency=instrument.currency, quantity=Decimal(ticker.last) / factor + ) + if ticker.close and math.isfinite(ticker.close): + close = Cash( + currency=instrument.currency, quantity=Decimal(ticker.close) / factor + ) + + return Quote(bid=bid, ask=ask, last=last, close=close) + def fetchQuotes( self, instruments: Iterable[Instrument], - dataType: _MarketDataType = _MarketDataType.DELAYED_FROZEN, + # TODO: Remove this (but it will break backwards compatibility). + dataType: Optional[_MarketDataType] = None, ) -> Iterable[Tuple[Instrument, Quote]]: - self._client.reqMarketDataType(dataType.value) + if dataType is not None: + self._client.reqMarketDataType(dataType.value) contractsByInstrument = self.qualifyContracts(instruments) @@ -145,35 +188,9 @@ def fetchQuotes( (i for (i, c) in contractsByInstrument.items() if c == ticker.contract) ) - bid: Optional[Cash] = None - ask: Optional[Cash] = None - last: Optional[Cash] = None - close: Optional[Cash] = None - - factor = 1 - - # Tickers are quoted in GBX despite all the other data being in GBP. - if instrument.currency == Currency.GBP: - factor = 100 - - if (ticker.bid and math.isfinite(ticker.bid)) and not ticker.bidSize == 0: - bid = Cash( - currency=instrument.currency, quantity=Decimal(ticker.bid) / factor - ) - if (ticker.ask and math.isfinite(ticker.ask)) and not ticker.askSize == 0: - ask = Cash( - currency=instrument.currency, quantity=Decimal(ticker.ask) / factor - ) - if ( - ticker.last and math.isfinite(ticker.last) - ) and not ticker.lastSize == 0: - last = Cash( - currency=instrument.currency, quantity=Decimal(ticker.last) / factor - ) - if ticker.close and math.isfinite(ticker.close): - close = Cash( - currency=instrument.currency, - quantity=Decimal(ticker.close) / factor, - ) - - yield (instrument, Quote(bid=bid, ask=ask, last=last, close=close)) + yield (instrument, self._quoteFromTicker(ticker, instrument)) + + def subscribeToQuotes( + self, instruments: Iterable[Instrument] + ) -> Observable[Tuple[Instrument, Quote]]: + pass From ec93ca06afd64ed7058680632b9fc2099bf0ef49 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Sat, 31 Aug 2019 15:36:33 +0100 Subject: [PATCH 4/4] WIP wrapping ib_insync in RxPY --- bankroll/brokers/ibkr/rx_client.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 bankroll/brokers/ibkr/rx_client.py diff --git a/bankroll/brokers/ibkr/rx_client.py b/bankroll/brokers/ibkr/rx_client.py new file mode 100644 index 0000000..a4a82cc --- /dev/null +++ b/bankroll/brokers/ibkr/rx_client.py @@ -0,0 +1,25 @@ +import ib_insync as IB # type: ignore +from rx.core.typing import Disposable, Observable, Observer, Scheduler +from typing import Any, Optional + +import rx +import rx.disposable as disposable + + +def reqMktData( + client: IB.IB, contract: IB.Contract, *args: Any, **kwargs: Any +) -> Observable[IB.Ticker]: + def _create( + observer: Observer[IB.Ticker], scheduler: Optional[Scheduler] + ) -> Disposable: + d = disposable.CompositeDisposable() + + ticker = client.ticker(contract) + if ticker: + ticker = client.reqMktData(contract, *args, **kwargs) + d.add(disposable.Disposable(lambda: client.cancelMktData(contract))) + + return disposable.Disposable() + + return rx.create(_create) +