diff --git a/piker/data/ffi.py b/piker/data/ffi.py new file mode 100644 index 000000000..86cc0b5ea --- /dev/null +++ b/piker/data/ffi.py @@ -0,0 +1,131 @@ +import ctypes +from ctypes import * +import os +from os import path + +import numpy as np +import pandas as pd +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + + +def __csv_to_df(raw_data): + csv = StringIO("ts,seq,is_trade,is_bid,price,size\n" + raw_data) + df = pd.read_csv(csv, dtype={ + 'ts': np.float, + 'seq': np.int16, + 'is_trade': np.bool, + 'is_bid': np.bool, + 'price': np.float, + 'size': np.float32} + ) + df.set_index("ts") + df = df[:-1] + df.ts *= 1000 + df.ts = df.ts.astype(int) + return df + +cwd = path.dirname(path.realpath(__file__)) +lib_path = path.normpath(path.join(cwd, '../../target/debug/libtdb_core.so')) +lib = CDLL(lib_path) + +class Update(Structure): + """ + ts: u64, + seq: u32, + is_trade: bool, + is_bid: bool, + price: f32, + size: f32, + """ + _fields_ = [ + ("ts", c_uint64), + ("seq", c_uint32), + ("is_trade", c_bool), + ("is_bid", c_bool), + ("price", c_float), + ("size", c_float), + ] + def __repr__(self): + return 'Update<{},{},{},{},{},{}>'.format( + self.ts, self.seq, self.is_trade, self.is_bid, + self.price, self.size) + def to_dict(self): + return { + "ts": self.ts, + "seq": self.seq, + "is_trade": self.is_trade, + "is_bid": self.is_bid, + "price": self.price, + "size": self.size + } + +class Slice(Structure): + _fields_ = [("ptr", POINTER(Update)), ("len", c_uint64)] + +def read_dtf_to_csv(fname): + ptr = lib.read_dtf_to_csv(fname.encode("utf-8")) + try: + return ctypes.cast(ptr, c_char_p).value.decode('utf-8') + finally: + lib.str_free(ptr) + +def read_dtf_to_csv_with_limit(fname, num): + ptr = lib.read_dtf_to_csv_with_limit(fname.encode("utf-8"), num) + try: + return ctypes.cast(ptr, c_char_p).value.decode('utf-8') + finally: + lib.str_free(ptr) + +def read_dtf_from_file(fname): + ups = lib.read_dtf_to_arr(fname.encode("utf-8")) + return [ups.ptr[i] for i in range(ups.len)] + +def parse_stream(stream): + ups = lib.parse_stream(stream, len(stream)) + return [ups.ptr[i] for i in range(ups.len)] + +## Type Definitions: + +lib.str_free.argtypes = (c_void_p, ) + +lib.read_dtf_to_csv.argtype = (c_char_p,) +lib.read_dtf_to_csv.restype = c_void_p + +lib.read_dtf_to_csv_with_limit.argtype = (c_char_p, c_uint32) +lib.read_dtf_to_csv_with_limit.restype = c_void_p + +lib.read_dtf_to_arr.argtype = (c_char_p,) +lib.read_dtf_to_arr.restype = Slice + +lib.read_dtf_to_arr_with_limit.argtype = (c_char_p, c_uint32) +lib.read_dtf_to_arr_with_limit.restype = Slice + +lib.parse_stream.argtype = (c_char_p, c_uint32) +lib.parse_stream.restype = Slice + +async def test_parse_stream(): + from tectonic import TectonicDB + db = TectonicDB() + await db.insert(0,0,True,True,0,0,"default") + await db.insert(1,1,False,False,1,1,"default") + print(await db.get(2)) + +def main(): + fname = "/home/g/Desktop/tick-data/10102017/bf_neobtc.dtf" + data = read_dtf_to_csv_with_limit(fname, 100000) + df = __csv_to_df(data) + print(df) + +if __name__ == '__main__': + # from time import time + # start = time() + # main() + # print(time() - start) + + import asyncio + loop = asyncio.get_event_loop() + loop.run_until_complete(test_parse_stream()) + loop.close() diff --git a/piker/data/tectonicdb.py b/piker/data/tectonicdb.py new file mode 100644 index 000000000..e94329527 --- /dev/null +++ b/piker/data/tectonicdb.py @@ -0,0 +1,146 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Union + +import trio +import struct + + +class TectonicDB: + + def __init__(self, host="localhost", port=9001): + self.host = host + self.port = port + + async def connect(self): + self.stream = await trio.open_tcp_stream( + self.host, self.port + ) + + async def cmd(self, cmd: Union[str, bytes]): + if isinstance(cmd, str): + msg = f'{cmd}\n'.encode() + + else: + msg = cmd + b'\n' + + print(msg) + + await self.stream.send_all(msg) + + return await self._recv_text() + + async def _recv_text(self): + header = await self.stream.receive_some(max_bytes=9) + current_len = len(header) + while current_len < 9: + header += await self.stream.receive_some(max_bytes=9 - current_len) + current_len = len(header) + + success, bytes_to_read = struct.unpack('>?Q', header) + if bytes_to_read == 0: + return success, "" + + body = await self.stream.receive_some(max_bytes=1) + body_len = len(body) + while body_len < bytes_to_read: + len_to_read = bytes_to_read - body_len + if len_to_read > 32: + len_to_read = 32 + body += await self.stream.receive_some(max_bytes=len_to_read) + body_len = len(body) + + return success, body.decode('utf-8') + + def destroy(self): + self.stream.aclose() + + async def info(self): + return await self.cmd("INFO") + + async def countall(self): + return await self.cmd("COUNT ALL") + + async def countall_in_mem(self): + return await self.cmd("COUNT ALL IN MEM") + + async def ping(self): + return await self.cmd("PING") + + async def help(self): + return await self.cmd("HELP") + + async def insert(self, ts, seq, is_trade, is_bid, price, size, dbname): + return await self.cmd("INSERT {}, {}, {} ,{}, {}, {}; INTO {}" + .format( ts, seq, + 't' if is_trade else 'f', + 't' if is_bid else 'f', price, size, + dbname)) + + async def add(self, ts, seq, is_trade, is_bid, price, size): + return await self.cmd("ADD {}, {}, {} ,{}, {}, {};" + .format( ts, seq, + 't' if is_trade else 'f', + 't' if is_bid else 'f', price, size)) + + async def getall(self): + success, ret = await self.cmd("GET ALL") + return success, list(map(lambda x:x.to_dict(), ret)) + + async def get(self, n): + success, ret = await self.cmd("GET {}".format(n)) + if success: + return success, list(map(lambda x:x.to_dict(), ret)) + else: + return False, None + + async def clear(self): + return await self.cmd("CLEAR") + + async def clearall(self): + return await self.cmd("CLEAR ALL") + + async def flush(self): + return await self.cmd("FLUSH") + + async def flushall(self): + return await self.cmd("FLUSH ALL") + + async def create(self, dbname): + return await self.cmd("CREATE {}".format(dbname)) + + async def use(self, dbname): + return await self.cmd("USE {}".format(dbname)) + + async def unsubscribe(self): + await self.cmd("UNSUBSCRIBE") + self.subscribed = False + + async def subscribe(self, dbname): + res = await self.cmd("SUBSCRIBE {}".format(dbname)) + if res[0]: + self.subscribed = True + return res + + async def poll(self): + return await self.cmd("") + + async def range(self, dbname, start, finish): + self.use(dbname) + data = await self.cmd("GET ALL FROM {} TO {} AS CSV".format(start, finish).encode()) + data = data[1] + return data