forked from gemnasium/logrus-graylog-hook
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gelf_reader.go
138 lines (119 loc) · 3.02 KB
/
gelf_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package graylog
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"
)
type Reader struct {
mu sync.Mutex
conn net.Conn
}
func NewReader(addr string) (*Reader, error) {
var err error
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("ListenUDP: %s", err)
}
r := new(Reader)
r.conn = conn
return r, nil
}
func (r *Reader) Addr() string {
return r.conn.LocalAddr().String()
}
// FIXME: this will discard data if p isn't big enough to hold the
// full message.
func (r *Reader) Read(p []byte) (int, error) {
msg, err := r.ReadMessage()
if err != nil {
return -1, err
}
var data string
if msg.Full == "" {
data = msg.Short
} else {
data = msg.Full
}
return strings.NewReader(data).Read(p)
}
func (r *Reader) ReadMessage() (*Message, error) {
cBuf := make([]byte, ChunkSize)
var (
err error
n, length int
buf bytes.Buffer
cid, ocid []byte
seq, total uint8
cHead []byte
cReader io.Reader
chunks [][]byte
)
for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
if n, err = r.conn.Read(cBuf); err != nil {
return nil, fmt.Errorf("Read: %s", err)
}
cHead, cBuf = cBuf[:2], cBuf[:n]
if bytes.Equal(cHead, magicChunked) {
//fmt.Printf("chunked %v\n", cBuf[:14])
cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
if ocid != nil && !bytes.Equal(cid, ocid) {
return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
} else if ocid == nil {
ocid = cid
chunks = make([][]byte, total)
}
n = len(cBuf) - chunkedHeaderLen
//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
length += n
} else { //not chunked
if total > 0 {
return nil, fmt.Errorf("out-of-band message (not chunked)")
}
break
}
}
//fmt.Printf("\nchunks: %v\n", chunks)
if length > 0 {
if cap(cBuf) < length {
cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
}
cBuf = cBuf[:0]
for i := range chunks {
//fmt.Printf("appending %d %v\n", i, chunks[i])
cBuf = append(cBuf, chunks[i]...)
}
cHead = cBuf[:2]
}
// the data we get from the wire is compressed
if bytes.Equal(cHead, magicGzip) {
cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
} else if cHead[0] == magicZlib[0] &&
(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
// zlib is slightly more complicated, but correct
cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
} else {
return nil, fmt.Errorf("unknown magic: %x %v", cHead, cHead)
}
if err != nil {
return nil, fmt.Errorf("NewReader: %s", err)
}
if _, err = io.Copy(&buf, cReader); err != nil {
return nil, fmt.Errorf("io.Copy: %s", err)
}
msg := new(Message)
if err := json.Unmarshal(buf.Bytes(), &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return msg, nil
}