diff --git a/.travis.yml b/.travis.yml index bc6d7cd7..7d88a61b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,11 @@ sudo: false language: python +python: + - "3.4" + - "3.5" env: - - PYTHON=2.7 PANDAS=0.18.0 - PYTHON=3.4 PANDAS=0.18.0 - PYTHON=3.5 PANDAS=0.18.0 @@ -14,6 +16,10 @@ env: # - env: PYTHON=2.7 PANDAS=0.11.0 install: + - wget http://interactivebrokers.github.io/downloads/twsapi_macunix.973.02.zip + - unzip twsapi_macunix.973.02.zip + - pip install IBJts/source/pythonclient/ + - pip install -qq flake8 # You may want to periodically update this, although the conda update # conda line below will keep everything up-to-date. We do this diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py new file mode 100644 index 00000000..295d5177 --- /dev/null +++ b/examples/buy_and_hold_historic_ib.py @@ -0,0 +1,103 @@ +import datetime + +from qstrader import settings +from qstrader.strategy.base import AbstractStrategy +from qstrader.event import SignalEvent, EventType +from qstrader.compat import queue +from qstrader.trading_session import TradingSession +from qstrader.service.ib import IBService +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from ibapi.contract import Contract + + +class BuyAndHoldStrategy(AbstractStrategy): + """ + A testing strategy that simply purchases (longs) any asset that + matches what was passed in on initialization and + then holds until the completion of a backtest. + """ + def __init__( + self, tickers, events_queue, + base_quantity=100 + ): + self.tickers = tickers + self.invested = dict.fromkeys(tickers) + self.events_queue = events_queue + self.base_quantity = base_quantity + + def calculate_signals(self, event): + if ( + event.type in [EventType.BAR, EventType.TICK] and + event.ticker in self.tickers + ): + if not self.invested[event.ticker]: + signal = SignalEvent( + event.ticker, "BOT", + suggested_quantity=self.base_quantity + ) + self.events_queue.put(signal) + self.invested[event.ticker] = True + + +def run(config, testing, tickers, filename): + # Backtest information + title = ['Buy and Hold Historic IB Example'] + initial_equity = 10000.0 + events_queue = queue.Queue() + + # Set up IBService + ib_service = IBService() + ib_service.connect("127.0.0.1", 4001, 0) # TODO from config + ib_service.start() + + # Set up IB Contract objects for the PriceHandler + # MORE INFO: https://www.interactivebrokers.com/en/?f=%2Fen%2Fgeneral%2Fcontact%2FtipsContractsDatabaseSearch.php%3Fib_entity%3Dllc + symbols = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + contracts = [] + for symbol in symbols: + contract = Contract() + contract.exchange = "SMART" + contract.symbol = symbol + contract.secType = "STK" + contract.currency = "AUD" + contracts.append(contract) + + # Set up the IB PriceHandler. Want 5 day's of minute bars, up to yesterday. + # Look at IB Documentation for possible values. + end_date = datetime.datetime.now() - datetime.timedelta(days=1) + price_handler = IBBarPriceHandler( + ib_service, events_queue, contracts, config, + "historic", end_date, hist_duration="5 D", barsize="1 min" + ) + + # Use the Buy and Hold Strategy + strategy = BuyAndHoldStrategy(tickers, events_queue) + + # Start/End TODO redundant -- only required for default (Yahoo) price handler. + start_date = datetime.datetime(2000, 1, 1) + end_date = datetime.datetime(2014, 1, 1) + + # Set up the backtest + backtest = TradingSession( + config, strategy, tickers, + initial_equity, start_date, end_date, + events_queue, price_handler=price_handler, title=title + ) + results = backtest.start_trading(testing=testing) + + # Disconnect from services + ib_service.stop_event.set() + ib_service.join() + + return results + + +if __name__ == "__main__": + # Configuration data + testing = False + config = settings.from_file( + settings.DEFAULT_CONFIG_FILENAME, testing + ) + tickers = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + filename = None + run(config, testing, tickers, filename) diff --git a/examples/buy_and_hold_live_ib.py b/examples/buy_and_hold_live_ib.py new file mode 100644 index 00000000..0a13a1ce --- /dev/null +++ b/examples/buy_and_hold_live_ib.py @@ -0,0 +1,136 @@ +import datetime + +from qstrader.price_parser import PriceParser +from qstrader import settings +from qstrader.strategy.base import Strategies, AbstractStrategy +from qstrader.event import SignalEvent, EventType +from qstrader.compat import queue +from qstrader.trading_session import TradingSession +from qstrader.service.ib import IBService +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from ibapi.contract import Contract + + +class DisplayStrategy(AbstractStrategy): + """ + A strategy which display ticks / bars + params: + n = 10000 + n_window = 5 + """ + def __init__(self, n=100, n_window=5): + self.n = n + self.n_window = n_window + self.i = 0 + + def calculate_signals(self, event): + if event.type in [EventType.TICK, EventType.BAR]: + # Format the event for human display + if event.type == EventType.BAR: + event.open_price = PriceParser.display(event.open_price) + event.high_price = PriceParser.display(event.high_price) + event.low_price = PriceParser.display(event.low_price) + event.close_price = PriceParser.display(event.close_price) + event.adj_close_price = PriceParser.display(event.adj_close_price) + else: # event.type == EventType.TICK + event.bid = PriceParser.display(event.bid) + event.ask = PriceParser.display(event.ask) + + if self.i % self.n in range(self.n_window): + print("%d %s" % (self.i, event)) + self.i += 1 + + +class BuyAndHoldStrategy(AbstractStrategy): + """ + A testing strategy that simply purchases (longs) any asset that + matches what was passed in on initialization and + then holds until the completion of a backtest. + """ + def __init__( + self, tickers, events_queue, + base_quantity=100 + ): + self.tickers = tickers + self.invested = dict.fromkeys(tickers) + self.events_queue = events_queue + self.base_quantity = base_quantity + + def calculate_signals(self, event): + if ( + event.type in [EventType.BAR, EventType.TICK] and + event.ticker in self.tickers + ): + if not self.invested[event.ticker]: + signal = SignalEvent( + event.ticker, "BOT", + suggested_quantity=self.base_quantity + ) + self.events_queue.put(signal) + self.invested[event.ticker] = True + + +def run(config, testing, tickers, filename): + # Backtest information + title = ['Buy and Hold Live IB Example -- 5 Sec Bars'] + initial_equity = 10000.0 + events_queue = queue.Queue() + + # Set up IBService + ib_service = IBService() + ib_service.connect("127.0.0.1", 4001, 0) # TODO from config + ib_service.start() + + # Set up IB Contract objects for the PriceHandler + # MORE INFO: https://www.interactivebrokers.com/en/?f=%2Fen%2Fgeneral%2Fcontact%2FtipsContractsDatabaseSearch.php%3Fib_entity%3Dllc + symbols = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + contracts = [] + for symbol in symbols: + contract = Contract() + contract.exchange = "SMART" + contract.symbol = symbol + contract.secType = "STK" + contract.currency = "AUD" + contracts.append(contract) + + # Set up the IB PriceHandler. Want 5 day's of minute bars, up to yesterday. + # Look at IB Documentation for possible values. + end_date = datetime.datetime.now() - datetime.timedelta(days=1) + price_handler = IBBarPriceHandler( + ib_service, events_queue, contracts, config, + "live" + ) + + # Use the Buy and Hold Strategy + strategy = Strategies(BuyAndHoldStrategy(tickers, events_queue), DisplayStrategy(n=20)) + + # Start/End TODO redundant -- only required for default (Yahoo) price handler. + start_date = datetime.datetime(2000, 1, 1) + end_date = datetime.datetime(2014, 1, 1) + + # Set up the backtest + session = TradingSession( + config, strategy, tickers, + initial_equity, start_date, end_date, + events_queue, session_type="live", + end_session_time=datetime.datetime.now() + datetime.timedelta(minutes=10), + price_handler=price_handler, title=title + ) + results = session.start_trading(testing=testing) + + # Disconnect from services + ib_service.stop_event.set() + ib_service.join() + + return results + + +if __name__ == "__main__": + # Configuration data + testing = False + config = settings.from_file( + settings.DEFAULT_CONFIG_FILENAME, testing + ) + tickers = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + filename = None + run(config, testing, tickers, filename) diff --git a/qstrader/compliance/example.py b/qstrader/compliance/example.py index e66738fb..f4af9a32 100644 --- a/qstrader/compliance/example.py +++ b/qstrader/compliance/example.py @@ -40,7 +40,7 @@ def __init__(self, config): "commission" ] fname = os.path.expanduser(os.path.join(self.config.OUTPUT_DIR, self.csv_filename)) - with open(fname, 'a') as csvfile: + with open(fname, 'a+') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() @@ -49,7 +49,7 @@ def record_trade(self, fill): Append all details about the FillEvent to the CSV trade log. """ fname = os.path.expanduser(os.path.join(self.config.OUTPUT_DIR, self.csv_filename)) - with open(fname, 'a') as csvfile: + with open(fname, 'a+') as csvfile: writer = csv.writer(csvfile) writer.writerow([ fill.timestamp, fill.ticker, diff --git a/qstrader/ib_portfolio.py b/qstrader/ib_portfolio.py new file mode 100644 index 00000000..60912a01 --- /dev/null +++ b/qstrader/ib_portfolio.py @@ -0,0 +1,46 @@ +import time +from .position import Position +from .portfolio import Portfolio + +class IBPortfolio(Portfolio): + def __init__(self, ib_service, price_handler, cash): + """ + On creation, the IB Portfolio will request all current portfolio details + from the IB API. + + WORK PROCESS: + * Subclass the existing Portfolio implementation, override init() only + * Bootstrap the portfolio by requesting current positions from IB + * Decide where we handle OrderStatus() events (guessing ExecutionHandler) + """ + Portfolio.__init__(self, price_handler, cash) + self.ib_service = ib_service + + # Bootstrap the portfolio by loading data from IB. + self.ib_service.reqAccountUpdates(True, "") + time.sleep(5) # Ugly, but no way to implement future/promise with IB's response? + while not self.ib_service.portfolioUpdatesQueue.empty(): + # Create the position + portfolioUpdate = self.ib_service.portfolioUpdatesQueue.get(False) + contract = portfolioUpdate[0] + contract.exchange = contract.primaryExchange + position = Position( + "BOT" if portfolioUpdate[1] > 0 else "SLD", + contract.symbol, portfolioUpdate[1], 0, 0, 0, 0 + ) + # Override some of the position variables + if portfolioUpdate[1] > 0: + position.buys = portfolioUpdate[1] ## TODO Confirm correct + else: + position.sells = portfolioUpdate[1] ## TODO Confirm correct + position.quantity = portfolioUpdate[1] + position.init_price = portfolioUpdate[4] + position.realised_pnl = portfolioUpdate[6] + position.unrealized_pnl = portfolioUpdate[5] + position.market_value = portfolioUpdate[3] + + # Add the position to the QSTrader portfolio + self.positions[contract.symbol] = position + + # Subscribe the PriceHandler to this position so we get updates on value. + self.price_handler._subscribe_contract(contract) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py new file mode 100644 index 00000000..42c4d3db --- /dev/null +++ b/qstrader/price_handler/ib_bar.py @@ -0,0 +1,149 @@ +import datetime +import pandas as pd +from qstrader.compat import queue +from .base import AbstractBarPriceHandler +from ..event import BarEvent +from ..price_parser import PriceParser + + +class IBBarPriceHandler(AbstractBarPriceHandler): + """ + Designed to feed either live or historic market data bars + from an Interactive Brokers connection. + + Uses the IBService to make requests and collect data once responses have returned. + + `param_contracts` must be a list of IB Contract objects. + + TODO: + * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode + * Work with live market data + * Raise exceptions if the user enters data that + IB won't like (i.e. barsize/duration string formats) + * Decide/discuss approaches to handle IB's simultaneous data feed limit. + * Decide/discuss on support of live market data (ticks, opposed to bars) + """ + def __init__( + self, ib_service, events_queue, param_contracts, settings, mode="historic", + hist_end_date=datetime.datetime.now() - datetime.timedelta(days=3), + hist_duration="5 D", hist_barsize="1 min" + ): + self.ib_service = ib_service + self.barsize_lookup = { + "1 sec": 1, + "5 secs": 5, + "15 secs": 15, + "30 secs": 30, + "1 min": 60, + "2 mins": 120, + "3 mins": 180, + "5 mins": 300, + "15 mins": 900, + "30 mins": 1800, + "1 hour": 3600, + "8 hours": 28800, + "1 day": 86400 + } + self.tickers = {} # Required to be populated for some parent methods. + self.bar_stream = queue.Queue() + self.events_queue = events_queue + self.mode = mode + self.continue_backtest = True + self.hist_end_date = hist_end_date + self.hist_duration = hist_duration + self.qst_barsize = self.barsize_lookup[hist_barsize] + self.ib_barsize = hist_barsize + # IB Only supports `5` as an int, for live barsize. + if self.mode == "live": + self.ib_barsize = 5 + + # The position of a contract in this dict is used as its IB ID. + self.contracts = {} # TODO gross + self.contract_lookup = {} + for contract in param_contracts: # TODO gross param_contracts -- combine above? + self._subscribe_contract(contract) + if self.mode == "historic": + self._wait_for_hist_population() + self._merge_sort_contract_data() + elif self.mode == "live": # Assign a reference to the live bars populated by IB. + self.bar_stream = self.ib_service.realtimeBarQueue + + def _subscribe_contract(self, contract): + """ + Request contract data from IB + """ + # Add ticker symbol, as required by some parent methods + self.tickers[contract.symbol] = {} + if self.mode == "live": + ib_contract_id = len(self.contracts) + end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") + self.ib_service.reqRealTimeBars( + ib_contract_id, contract, self.ib_barsize, "TRADES", True, None + ) + + if self.mode == "historic": + ib_contract_id = len(self.contracts) + end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") + self.ib_service.reqHistoricalData( + ib_contract_id, contract, end_time, self.hist_duration, self.ib_barsize, + "TRADES", True, 2, None) + + # TODO gross + self.contract_lookup[len(self.contracts)] = contract.symbol + self.contracts[contract] = {} + + def _wait_for_hist_population(self): + """ + Blocks until the historical dataset has been populated. + """ + while len(self.ib_service.waitingHistoricalData) != 0: + pass + + def _merge_sort_contract_data(self): + """ + Collects all the equities data from thte IBService, and populates the + member Queue `self.bar_stream`. This queue is used for the `stream_next()` + function. + + Note this is not necessary for live data. + """ + historicalData = [] + while not self.ib_service.historicalDataQueue.empty(): + historicalData.append(self.ib_service.historicalDataQueue.get()) + historicalData = sorted(historicalData, key=lambda x: x[1]) + for bar_tuple in historicalData: + self.bar_stream.put(bar_tuple) + + def _create_event(self, mkt_event): + """ + mkt_event is a tuple created according to the format: + http:////www.interactivebrokers.com/en/software/api/apiguide/java/historicaldata.htm + """ + symbol = self.contract_lookup[mkt_event[0]] + time = pd.Timestamp(int(mkt_event[1]) * 10**9) + barsize = self.qst_barsize + open_price = PriceParser.parse(mkt_event[2]) + high_price = PriceParser.parse(mkt_event[3]) + low_price = PriceParser.parse(mkt_event[4]) + close_price = PriceParser.parse(mkt_event[5]) + adj_close_price = PriceParser.parse(mkt_event[5]) # TODO redundant? + volume = mkt_event[6] + return BarEvent( + symbol, time, barsize, open_price, high_price, + low_price, close_price, volume, adj_close_price + ) + + def stream_next(self): + """ + Create the next BarEvent and place it onto the event queue. + + TODO make more clear if differences between live/historic? + """ + try: + # Create, store and return the bar event. + mkt_event = self.bar_stream.get(False) + bev = self._create_event(mkt_event) + self._store_event(bev) + self.events_queue.put(bev) + except queue.Empty: + self.continue_backtest = False diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py new file mode 100644 index 00000000..d5010ab8 --- /dev/null +++ b/qstrader/service/ib.py @@ -0,0 +1,135 @@ +import queue +import threading +from ibapi import comm +from ibapi.wrapper import EWrapper +from ibapi.client import EClient +from ibapi.common import NO_VALID_ID, MAX_MSG_LEN, TickerId, TagValueList +from ibapi.contract import Contract +from ibapi.errors import BAD_LENGTH + + +class IBService(EWrapper, EClient, threading.Thread): + """ + The IBService is the primary conduit of data from QStrader to Interactive Brokers. + This service provides functions to request data, and allows for + callbacks to be triggered, which populates "data queues" with the response. + + All methods of the EClient are available (i.e. API Requests), as are + the callbacks for EWrapper (i.e. API responses). It also provides a set of Queues + which are populated with the responses from EWrapper. Other components in the + system should use these queues collect the API response data. + + Any module or component that wishes to interact with IB should do so by using + methods offered in this class. This ensures that the logic required to talk with IB + is contained within this class exclusively, with the added benefit that we + can easily create mock instances of the IBService for testing. + + Several calls must be made to the IBService in order for it to run correctly. + These should be called from the user's main `trading-session` script. + + An IBService object must be instantiated, immediately followed by the .connect() + call, immediately followed by the .start() call, which spawns a thread for the run loop. + When the trading session is complete, the service should be stopped gracefully by + calling ibservice.stop_event.set() to break the infinite loop, and ibservice.join() + to wait for the thread to close. + """ + def __init__(self): + EWrapper.__init__(self) + EClient.__init__(self, wrapper=self) + threading.Thread.__init__(self, name='IBService') + self.stop_event = threading.Event() + # Set up data queues. + self.portfolioUpdatesQueue = queue.Queue() + self.realtimeBarQueue = queue.Queue() + self.historicalDataQueue = queue.Queue() + self.waitingHistoricalData = [] + + def error(self, reqId: TickerId, errorCode: int, errorString: str): + super().error(reqId, errorCode, errorString) + print("Error. Id: ", reqId, " Code: ", errorCode, " Msg: ", errorString) + + + """ + Print portfolio position updates. + """ + def updatePortfolio(self, contract: Contract, position: float, marketPrice: float, + marketValue: float, averageCost: float, unrealizedPNL: float, + realizedPNL: float, accountName: str): + super().updatePortfolio(contract, position, marketPrice, marketValue, + averageCost, unrealizedPNL, realizedPNL, accountName) + self.portfolioUpdatesQueue.put((contract, position, marketPrice, + marketValue, averageCost, unrealizedPNL, + realizedPNL, accountName)) + print("UpdatePortfolio.", contract.symbol, "", contract.secType, "@", + contract.exchange, "Position:", position, "MarketPrice:", marketPrice, + "MarketValue:", marketValue, "AverageCost:", averageCost, + "UnrealizedPNL:", unrealizedPNL, "RealizedPNL:", realizedPNL, + "AccountName:", accountName) + + """ + Print account value + """ + def updateAccountValue(self, key: str, val: str, currency: str, accountName: str): + super().updateAccountValue(key, val, currency, accountName) + print("UpdateAccountValue. Key:" ,key, "Value:", val, + "Currency:", currency, "AccountName:", accountName) + + + """ + Append `reqId` to waitingHistoricalData, then call the super method. + """ + def reqHistoricalData(self, reqId: TickerId, contract: Contract, endDateTime: str, + durationStr: str, barSizeSetting: str, whatToShow: str, + useRTH: int, formatDate: int, chartOptions: TagValueList): + self.waitingHistoricalData.append(reqId) + super().reqHistoricalData(reqId, contract, endDateTime, + durationStr, barSizeSetting, whatToShow, + useRTH, formatDate, chartOptions) + + """ + Populate the RealTimeBars queue. + Note that `time` is the start of the bar + """ + def realtimeBar(self, reqId: TickerId, time: int, open: float, high: float, + low: float, close: float, volume: float, + wap: float, count: int): + self.realtimeBarQueue.put((reqId, time, open, high, low, close, + volume, wap, count)) + + """ + Populate the HistoricalData queue. + """ + def historicalData(self, reqId: TickerId, date: str, open: float, high: float, + low: float, close: float, volume: int, barCount: int, + WAP: float, hasGaps: int): + self.historicalDataQueue.put((reqId, date, open, high, low, close, + volume, barCount, WAP, hasGaps)) + + """ + Remove `reqId` from waitingHistoricalData + TODO: Will it work with multiple historical requests for same symbol? + """ + def historicalDataEnd(self, reqId: int, start: str, end: str): + self.waitingHistoricalData.remove(reqId) + + """ + Overridden from the Threading class. Infinite loop which handles + message passing from IB to QSTrader. This loop is run in new thread when + started. + """ + def run(self): + while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set(): + try: + text = self.msg_queue.get(block=True, timeout=0.2) + if len(text) > MAX_MSG_LEN: + self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), + "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) + self.disconnect() + break + except queue.Empty: + pass # TODO something more appropriate + else: + fields = comm.read_fields(text) + self.decoder.interpret(fields) + + self.disconnect() diff --git a/qstrader/statistics/performance.py b/qstrader/statistics/performance.py index 771da596..da081bb0 100644 --- a/qstrader/statistics/performance.py +++ b/qstrader/statistics/performance.py @@ -63,7 +63,7 @@ def create_sortino_ratio(returns, periods=252): returns - A pandas Series representing period percentage returns. periods - Daily (252), Hourly (252*6.5), Minutely(252*6.5*60) etc. """ - return np.sqrt(periods) * (np.mean(returns)) / np.std(returns[returns < 0]) + return np.sqrt(periods) * (np.mean(returns)) / np.std(returns.ix[returns < 0]) def create_drawdowns(returns): diff --git a/qstrader/trading_session.py b/qstrader/trading_session.py index 59ba5903..567d536f 100644 --- a/qstrader/trading_session.py +++ b/qstrader/trading_session.py @@ -16,6 +16,9 @@ class TradingSession(object): """ Enscapsulates the settings and components for carrying out either a backtest or live trading session. + + TODO logic leak from here/pricehandler with live & end_session_time, + code smell. I.e. we set end_time differently when live vs historic. """ def __init__( self, config, strategy, tickers, @@ -51,6 +54,7 @@ def __init__( self.session_type = session_type self._config_session() self.cur_time = None + self.end_session_time = end_session_time if self.session_type == "live": if self.end_session_time is None: diff --git a/tests/test_ib_portfolio.py b/tests/test_ib_portfolio.py new file mode 100644 index 00000000..e9b89243 --- /dev/null +++ b/tests/test_ib_portfolio.py @@ -0,0 +1,44 @@ +import unittest +from qstrader.ib_portfolio import IBPortfolio +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from qstrader.service.ib import IBService +from qstrader.compat import queue +import time + +class TestIBPortfolio(unittest.TestCase): + def setUp(self): + """ + Set up IBService. + Set up IBPortfolio. + """ + self.ib_service = IBService() + self.ib_service.connect("127.0.0.1", 4001, 0) + self.ib_service.start() + self.events_queue = queue.Queue() + self.price_handler = IBBarPriceHandler( + self.ib_service, self.events_queue, [], None, mode="live" + ) + self.portfolio = IBPortfolio( + self.ib_service, self.price_handler, 0 + ) + + def test_stream_portfolio_updates(self): + """ + Test that portfolio updates are requested + Test that portfolio updates show in real time. + + Print portfolio value every second for 1 minute + """ + for i in range(0, 20): # Run for about 4 minutes + time.sleep(10) + print("time: %s, value: %s" % (i, self.portfolio.equity)) + + self.assertEqual(1, 2) + + + def tearDown(self): + self.ib_service.stop_event.set() + self.ib_service.join() + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py new file mode 100644 index 00000000..ac671c46 --- /dev/null +++ b/tests/test_ib_price_handler.py @@ -0,0 +1,200 @@ +import unittest +import numpy as np +import pandas as pd +from qstrader.price_parser import PriceParser +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from qstrader.compat import queue +from qstrader import settings +from ibapi.contract import Contract + +""" +TODO + * Code repetition in this file + * Swaps between camelCase and snake_case, + largely because IB uses camelCase. Be more consistent? + * Test that the price handler called IBService methods with correct params. +""" + + +# Starting at 2017-01-01 13:00:00, 1 minute bars. +timestamp = 1483275600 +closes = np.arange(80.00, 91.00, 1) + + +class IBServiceMock(object): + def __init__(self): + self.historicalDataQueue = queue.Queue() + self.realtimeBarQueue = queue.Queue() + self.waitingHistoricalData = [] + self.countHistoricalRequestsMade = 0 + self.countMarketDataRequestsMade = 0 + # Populate some historic data for the mock service. + # CBA mock data + for i in range(0, 10): + self.historicalDataQueue.put((0, timestamp + (i * 60), + closes[i], closes[i] + 1, + closes[i] - 1, closes[i + 1], + 1000000, 100, closes[i], False)) + # BHP mock data + for i in range(0, 10): + self.historicalDataQueue.put((1, timestamp + (i * 60), + closes[i] / 2, closes[i] + 1 / 2, + (closes[i] - 1) / 2, closes[i + 1] / 2, + 1000000, 100, closes[i] / 2, False)) + + # Populate mock realtimeBars + for i in range(0, 10): + # CBA + self.realtimeBarQueue.put((0, timestamp + (i * 60), + closes[i], closes[i] + 1, + closes[i] - 1, closes[i + 1], + 1000000, 100, closes[i], False)) + # BHP + self.realtimeBarQueue.put((1, timestamp + (i * 60), + closes[i] / 2, closes[i] + 1 / 2, + (closes[i] - 1) / 2, closes[i + 1] / 2, + 1000000, 100, closes[i] / 2, False)) + + def reqHistoricalData(self, *arg): + self.countHistoricalRequestsMade += 1 + + def reqRealTimeBars(self, *arg): + self.countMarketDataRequestsMade += 1 + + +class TestPriceHandlerLiveCase(unittest.TestCase): + def setUp(self): + """ + Set up the PriceHandler object with a small + set of market data for a mocked 'live' trading session. + + TODO: + * Test multiple timeframes + * Test successfully cancels market data feeds + * Test handling of maxing out IB's market data streaming connections + * Test that live can understand 'start' of a bar, but historic + might use 'end' of a bar (RE timestamps)?? + """ + self.ib_service = IBServiceMock() + self.config = settings.TEST + events_queue = queue.Queue() + + # Set up an IB Contract for CBA and BHP + cba = Contract() + cba.exchange = "SMART" + cba.symbol = "CBA" + cba.secType = "STK" + cba.currency = "AUD" + + bhp = Contract() + bhp.exchange = "SMART" + bhp.symbol = "BHP" + bhp.secType = "STK" + bhp.currency = "AUD" + + # Create the price handler. + self.price_handler = IBBarPriceHandler( + self.ib_service, events_queue, [cba, bhp], self.config, mode="live" + ) + + def test_stream_all_live_events(self): + """ + Will test that: + * live data is requested and collected from IBService + * live data is streamed out correctly + """ + for i in range(0, 10): + # Test CBA + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[i + 1] # Close is next open + ) + + # Test BHP + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[i + 1] / 2 # Close is next open + ) + + def test_made_market_data_requests(self): + self.assertEqual(self.ib_service.countMarketDataRequestsMade, 2) + + +class TestPriceHandlerHistoricCase(unittest.TestCase): + def setUp(self): + """ + Set up the PriceHandler object with a small + set of initial tickers for a backtest in historic mode. + + TODO: + * Test multiple timeframes + * Test mocked live market data methods + """ + self.ib_service = IBServiceMock() + self.config = settings.TEST + events_queue = queue.Queue() + + # Set up an IB Contract for CBA and BHP + cba = Contract() + cba.exchange = "SMART" + cba.symbol = "CBA" + cba.secType = "STK" + cba.currency = "AUD" + + bhp = Contract() + bhp.exchange = "SMART" + bhp.symbol = "BHP" + bhp.secType = "STK" + bhp.currency = "AUD" + + # Create the price handler. + self.price_handler = IBBarPriceHandler( + self.ib_service, events_queue, [cba, bhp], self.config + ) + + def test_stream_all_historic_events(self): + """ + Will test that: + * historic data is collected from IBService + * historic data is merge sorted correctly + * historic data is streamed out correctly + """ + for i in range(0, 10): + # Test CBA + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[i + 1] # Close is next open + ) + + # Test BHP + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[i + 1] / 2 # Close is next open + ) + + def test_made_historical_requests(self): + self.assertEqual(self.ib_service.countHistoricalRequestsMade, 2) + + +if __name__ == "__main__": + unittest.main()