-
Notifications
You must be signed in to change notification settings - Fork 0
/
btday.go
102 lines (87 loc) · 2.37 KB
/
btday.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package malgova
import (
"log"
"reflect"
"sync"
"time"
"github.com/sivamgr/kstreamdb"
)
// btDayRunner struct
type btDayRunner struct {
algos []reflect.Type
tickManager map[string]*btTickManager
algoRunner map[string]*btAlgoRunner
flagSymbolAlgoSetup map[string]bool
orders []orderEntry
}
func (bt *btDayRunner) instantiateAllAlgosForSymbol(symbol string) {
//spawn algos for symbol
for _, a := range bt.algos {
pAlgo := newAlgoInstance(a, symbol)
algoID := pAlgo.ID()
bt.algoRunner[algoID] = pAlgo
for _, w := range pAlgo.watch {
if _, ok := bt.tickManager[w]; !ok {
bt.tickManager[symbol] = new(btTickManager)
}
bt.tickManager[symbol].addObserver(algoID)
}
}
}
// worker for concurrent algo execution
func algoRunWorker(wg *sync.WaitGroup, algo *btAlgoRunner, bt *btDayRunner) {
defer wg.Done()
algo.run()
}
func (bt *btDayRunner) setup(algos []reflect.Type) {
bt.algos = algos
bt.tickManager = make(map[string]*btTickManager)
bt.algoRunner = make(map[string]*btAlgoRunner)
bt.flagSymbolAlgoSetup = make(map[string]bool)
// reset orders
bt.orders = make([]orderEntry, 0)
}
func (bt *btDayRunner) exit() {
for _, algo := range bt.algoRunner {
algo.exit()
// merge the trade ledger
bt.orders = append(bt.orders, algo.popOrders()...)
}
}
func (bt *btDayRunner) popOrders() []orderEntry {
orders := bt.orders
bt.orders = make([]orderEntry, 0)
return orders
}
//run day data against algos
func (bt *btDayRunner) run(dt time.Time, ticks []kstreamdb.TickData) {
for _, t := range ticks {
// instantiate algo runners if not instantiated already
if t.IsTradable {
if _, ok := bt.flagSymbolAlgoSetup[t.TradingSymbol]; !ok {
bt.flagSymbolAlgoSetup[t.TradingSymbol] = true
bt.instantiateAllAlgosForSymbol(t.TradingSymbol)
}
}
// pass data to algos subscribed to this symbol
if tickMgr, ok := bt.tickManager[t.TradingSymbol]; ok {
for _, algoid := range tickMgr.observerAlgoIDs {
pAlgo := bt.algoRunner[algoid]
// queue tick for handling
pAlgo.queue(t)
}
}
}
inQueueCount := 0
for _, algo := range bt.algoRunner {
inQueueCount += len(algo.queueTick)
}
log.Printf("[%s] %d ticks in Queue", dt.Format("2006/01/02"), inQueueCount)
var wg sync.WaitGroup
// run the runners
for _, algo := range bt.algoRunner {
wg.Add(1)
go algoRunWorker(&wg, algo, bt)
}
wg.Wait()
}