-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathblockonomics_co.go
114 lines (102 loc) · 2.65 KB
/
blockonomics_co.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package hammer
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const blockonomicsURL = "https://www.blockonomics.co/api/balance"
const blockonomicsBatchLimit = 25 // can query upto this many addresses at time
type blockonomicsBalances struct {
Addr string `json:"addr"`
Confirmed float64 `json:"confirmed"`
Unconfirmed float64 `json:"unconfirmed"`
}
type blockonomicsResponse struct {
Response []blockonomicsBalances `json:"response"`
}
// Blockonomics worker
type Blockonomics struct {
W
}
// NewBlockonomics returns an initialized Blockonomics worker
func NewBlockonomics() *Blockonomics {
return &Blockonomics{
W{
name: "blockonomics",
input: make(chan Request),
},
}
}
// Start the blockonomics worker
func (bl *Blockonomics) Start() {
requests := make([]Request, 0, blockonomicsBatchLimit)
for {
// we wait upto 5 seconds to gather as many addresses (upto query limit)
ticker := time.NewTicker(5 * time.Second)
select {
case request := <-bl.input:
requests = append(requests, request)
if len(requests) == blockonomicsBatchLimit {
bl.process(requests)
requests = []Request{}
}
case <-ticker.C:
if len(requests) > 0 {
bl.process(requests)
requests = []Request{}
}
}
}
}
func (bl *Blockonomics) do(addresses []string) (blockonomicsResponse, error) {
addrs := strings.Join(addresses, " ")
req, err := json.Marshal(map[string]string{
"addr": addrs,
})
if err != nil {
return blockonomicsResponse{}, err
}
resp, err := http.Post(blockonomicsURL,
"application/json",
bytes.NewBuffer(req))
if err != nil {
return blockonomicsResponse{}, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return blockonomicsResponse{},
fmt.Errorf("error response from blockonomics, got status code: %q", resp.StatusCode)
}
var result blockonomicsResponse
err = json.NewDecoder(resp.Body).Decode(&result)
return result, err
}
func (bl *Blockonomics) process(requests []Request) {
addresses := make([]string, 0, len(requests))
addrToChan := make(map[string]chan Result)
for _, req := range requests {
addresses = append(addresses, req.Address)
addrToChan[req.Address] = req.Output
}
resp, err := bl.do(addresses)
if err != nil {
fmt.Println(bl.Name()+":", err)
go submitRequests(requests, bl.input) // return input channel for processing
return
}
for _, p := range resp.Response {
h := Result{
Source: bl.Name(),
Address: p.Addr,
BalanceConfirmed: p.Confirmed,
BalanceUnconfirmed: p.Unconfirmed,
BalanceTotal: p.Confirmed + p.Unconfirmed,
}
go func(p blockonomicsBalances) {
addrToChan[p.Addr] <- h
}(p)
}
}