forked from BitcoinExchangeFH/BitcoinExchangeFH
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bitcoinexchangefh.py
193 lines (176 loc) · 8.21 KB
/
bitcoinexchangefh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/bin/python
import argparse
import sys
from befh.exchanges.gateway import ExchangeGateway
from befh.exchanges.bitmex import ExchGwBitmex
from befh.exchanges.btcc import ExchGwBtccSpot, ExchGwBtccFuture
from befh.exchanges.bitfinex import ExchGwBitfinex
from befh.exchanges.okcoin import ExchGwOkCoin
from befh.exchanges.kraken import ExchGwKraken
from befh.exchanges.gdax import ExchGwGdax
from befh.exchanges.bitstamp import ExchGwBitstamp
from befh.exchanges.huobi import ExchGwHuoBi
from befh.exchanges.coincheck import ExchGwCoincheck
from befh.exchanges.gatecoin import ExchGwGatecoin
from befh.exchanges.quoine import ExchGwQuoine
from befh.exchanges.poloniex import ExchGwPoloniex
from befh.exchanges.bittrex import ExchGwBittrex
from befh.exchanges.yunbi import ExchGwYunbi
from befh.exchanges.liqui import ExchGwLiqui
from befh.exchanges.binance import ExchGwBinance
from befh.exchanges.cryptopia import ExchGwCryptopia
from befh.exchanges.okex import ExchGwOkex
from befh.exchanges.okex_spot import ExchGwOkexSpot
from befh.exchanges.wex import ExchGwWex
from befh.exchanges.bitflyer import ExchGwBitflyer
from befh.exchanges.coinone import ExchGwCoinOne
from befh.exchanges.fcoin import ExchGwFcoin
from befh.exchanges.binance_ws import ExchGwBinanceWs
from befh.exchanges.bitforex import ExchGwBitforex
from befh.clients.kdbplus import KdbPlusClient
from befh.clients.mysql import MysqlClient
from befh.clients.sqlite import SqliteClient
from befh.clients.csv import FileClient
from befh.clients.zmq import ZmqClient
from befh.clients.nanomsg_client import NanomsgClient
from befh.subscription_manager import SubscriptionManager
from befh.util import Logger
def main():
parser = argparse.ArgumentParser(description='Bitcoin exchange market data feed handler.')
parser.add_argument('-instmts', action='store', help='Instrument subscription file.', default='subscriptions.ini')
parser.add_argument('-exchtime', action='store_true', help='Use exchange timestamp.')
parser.add_argument('-kdb', action='store_true', help='Use Kdb+ as database.')
parser.add_argument('-csv', action='store_true', help='Use csv file as database.')
parser.add_argument('-sqlite', action='store_true', help='Use SQLite database.')
parser.add_argument('-mysql', action='store_true', help='Use MySQL.')
parser.add_argument('-zmq', action='store_true', help='Use zmq publisher.')
parser.add_argument('-nanomsg', action='store_true', help='Use nanomsg publisher.')
parser.add_argument('-mysqldest', action='store', dest='mysqldest',
help='MySQL destination. Formatted as <name:pwd@host:port>',
default='')
parser.add_argument('-mysqlschema', action='store', dest='mysqlschema',
help='MySQL schema.',
default='')
parser.add_argument('-kdbdest', action='store', dest='kdbdest',
help='Kdb+ destination. Formatted as <host:port>',
default='')
parser.add_argument('-zmqdest', action='store', dest='zmqdest',
help='Zmq destination. For example \"tcp://127.0.0.1:3306\"',
default='')
parser.add_argument('-nanomsgdest', action='store', dest='nanomsgdest',
help='Nanomq destination. For example \"tcp://127.0.0.1:3306\"',
default='')
parser.add_argument('-sqlitepath', action='store', dest='sqlitepath',
help='SQLite database path',
default='')
parser.add_argument('-csvpath', action='store', dest='csvpath',
help='Csv file path',
default='')
parser.add_argument('-output', action='store', dest='output',
help='Verbose output file path')
args = parser.parse_args()
Logger.init_log(args.output)
db_clients = []
is_database_defined = False
if args.sqlite:
db_client = SqliteClient()
db_client.connect(path=args.sqlitepath)
db_clients.append(db_client)
is_database_defined = True
if args.mysql:
db_client = MysqlClient()
mysqldest = args.mysqldest
logon_credential = mysqldest.split('@')[0]
connection = mysqldest.split('@')[1]
db_client.connect(host=connection.split(':')[0],
port=int(connection.split(':')[1]),
user=logon_credential.split(':')[0],
pwd=logon_credential.split(':')[1],
schema=args.mysqlschema)
db_clients.append(db_client)
is_database_defined = True
if args.csv:
if args.csvpath != '':
db_client = FileClient(dir=args.csvpath)
else:
db_client = FileClient()
db_clients.append(db_client)
is_database_defined = True
if args.kdb:
db_client = KdbPlusClient()
db_client.connect(host=args.kdbdest.split(':')[0], port=int(args.kdbdest.split(':')[1]))
db_clients.append(db_client)
is_database_defined = True
if args.zmq:
db_client = ZmqClient()
db_client.connect(addr=args.zmqdest)
db_clients.append(db_client)
is_database_defined = True
if args.nanomsg:
db_client = NanomsgClient()
db_client.connect(addr=args.nanomsgdest)
db_clients.append(db_client)
is_database_defined = True
if not is_database_defined:
print('Error: Please define which database is used.')
parser.print_help()
sys.exit(1)
# Subscription instruments
if args.instmts is None or len(args.instmts) == 0:
print('Error: Please define the instrument subscription list. You can refer to subscriptions.ini.')
parser.print_help()
sys.exit(1)
# Use exchange timestamp rather than local timestamp
if args.exchtime:
ExchangeGateway.is_local_timestamp = False
# Initialize subscriptions
subscription_instmts = SubscriptionManager(args.instmts).get_subscriptions()
if len(subscription_instmts) == 0:
print('Error: No instrument is found in the subscription file. ' +
'Please check the file path and the content of the subscription file.')
parser.print_help()
sys.exit(1)
# Initialize snapshot destination
ExchangeGateway.init_snapshot_table(db_clients)
Logger.info('[main]', 'Subscription file = %s' % args.instmts)
log_str = 'Exchange/Instrument/InstrumentCode:\n'
for instmt in subscription_instmts:
log_str += '%s/%s/%s\n' % (instmt.exchange_name, instmt.instmt_name, instmt.instmt_code)
Logger.info('[main]', log_str)
exch_gws = []
exch_gws.append(ExchGwBtccSpot(db_clients))
exch_gws.append(ExchGwBtccFuture(db_clients))
exch_gws.append(ExchGwBitmex(db_clients))
exch_gws.append(ExchGwBitfinex(db_clients))
exch_gws.append(ExchGwOkCoin(db_clients))
exch_gws.append(ExchGwKraken(db_clients))
exch_gws.append(ExchGwGdax(db_clients))
exch_gws.append(ExchGwBitstamp(db_clients))
exch_gws.append(ExchGwBitflyer(db_clients))
exch_gws.append(ExchGwHuoBi(db_clients))
exch_gws.append(ExchGwCoincheck(db_clients))
exch_gws.append(ExchGwCoinOne(db_clients))
exch_gws.append(ExchGwGatecoin(db_clients))
exch_gws.append(ExchGwQuoine(db_clients))
exch_gws.append(ExchGwPoloniex(db_clients))
exch_gws.append(ExchGwBittrex(db_clients))
exch_gws.append(ExchGwYunbi(db_clients))
exch_gws.append(ExchGwLiqui(db_clients))
exch_gws.append(ExchGwBinance(db_clients))
exch_gws.append(ExchGwBinanceWs(db_clients))
exch_gws.append(ExchGwCryptopia(db_clients))
exch_gws.append(ExchGwOkex(db_clients))
exch_gws.append(ExchGwOkexSpot(db_clients))
exch_gws.append(ExchGwWex(db_clients))
exch_gws.append(ExchGwFcoin(db_clients))
exch_gws.append(ExchGwBitforex(db_clients))
threads = []
for exch in exch_gws:
for instmt in subscription_instmts:
if instmt.get_exchange_name() == exch.get_exchange_name():
Logger.info("[main]", "Starting instrument %s-%s..." % \
(instmt.get_exchange_name(), instmt.get_instmt_name()))
threads += exch.start(instmt)
exch.notify_all_added()
if __name__ == '__main__':
main()