-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
90 lines (74 loc) · 1.83 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"encoding/binary"
"fmt"
"log"
"net"
qpack "github.com/cesbit/go-qpack"
)
func handlePkg(pkgCh chan *pkg) {
for {
data := <-pkgCh
packageDataBuf := data.GetData()
unpacked, _ := qpack.Unpack(packageDataBuf, 0)
unboxed, ok := unpacked.(map[interface{}]interface{})
if !ok {
log.Println("ERROR: var not a map")
continue
}
for key, element := range unboxed {
name, okName := key.(string)
if okName {
// converting cstring to string
nameBytes := []byte(name)
name = string(nameBytes[:len(nameBytes)-1])
if series, ok := seriesToWatch[name]; ok {
pointsList, okPointsList := element.([]interface{})
if okPointsList {
if series.IsRealtime {
singleUpdate := make(map[string]int)
singleUpdate[name] = len(pointsList)
sendSeriesUpdate(singleUpdate)
} else {
updateLock.Lock()
seriesCountUpdate[name] += len(pointsList)
updateLock.Unlock()
}
}
} else {
for _, groupConfig := range groupsToWatch {
if found := groupConfig.Regex.MatchString(name); found {
sendNewSeriesFoundForGroup(name, groupConfig.Name)
}
}
}
}
}
}
}
func readFromTcp() {
const headerSize = 8
var gds = func(data []byte) (int, error) {
dataSize := int(binary.LittleEndian.Uint32(data[0:4]))
return dataSize, nil
}
log.Printf("Starting listening on TCP port %s\n", tcpPort)
pkgCh := make(chan *pkg)
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", tcpPort))
if err != nil {
log.Fatal("Listen error: ", err)
}
go handlePkg(pkgCh)
for {
conn, err := listener.Accept()
if err != nil {
log.Println("Accept Error", err)
continue
}
buf := NewBuffer()
buf.SetConn(conn)
buf.SetPkgChan(pkgCh)
go buf.ReadToBuffer(headerSize, gds)
log.Println("Accepted ", conn.RemoteAddr())
}
}