-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtcp_net.go
256 lines (215 loc) · 6.76 KB
/
tcp_net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package treego
import (
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
)
var (
tcp_conn_index = 0
)
type tcp_net struct {
// reference to TreeApi
api *TreeApi
// TCP connections, which are something like a channel
// for entire API connection for TreeScale API
connections map[int]*net.TCPConn
// locking connections for concurrent access
connection_locker *sync.Mutex
// keeping index for writing using round rubin algorithm
connection_index int
}
func newTcpNet(api *TreeApi) *tcp_net {
return &tcp_net{
api: api,
connections: make(map[int]*net.TCPConn),
connection_locker: &sync.Mutex{},
connection_index: 0,
}
}
// Connecting to available endpoint in TreeApi
func (network *tcp_net) connect(channels int) {
// Resolving address of endpoint
addr, err := net.ResolveTCPAddr("tcp", network.api.endpoint)
if err != nil {
go network.api.trigger_local(EVENT_ON_ERROR,
[]byte(fmt.Sprintf("Unable to resolve given endpoint %s from TCP -> %s", network.api.endpoint, err.Error())))
return
}
// getting handshake from TreeApi
handshake_data := network.api.firstHandshake()
for i := 0; i < channels; i++ {
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
go network.api.trigger_local(EVENT_ON_ERROR,
[]byte(fmt.Sprintf("Unable to connect to given endpoint %s from TCP -> %s", network.api.endpoint, err.Error())))
} else {
network.connection_locker.Lock()
network.connections[tcp_conn_index] = conn
go network.handle_connection(tcp_conn_index, conn, handshake_data)
go network.api.trigger_local(EVENT_ON_CHANNEL_CONNECTION, []byte{})
tcp_conn_index++
network.connection_locker.Unlock()
}
}
}
// Writing entire data to one of the existing connection channels
func (network *tcp_net) write(buffer []byte, conn *net.TCPConn) error {
// if we don't have specific connection to write
// just choosing from available channels using round rubin algorithm
if conn == nil {
network.connection_locker.Lock()
if len(network.connections) == 0 {
return errors.New("There is no available channels to write to!")
}
// getting one of the connections
if network.connection_index >= len(network.connections) {
network.connection_index = 0
}
conn = network.connections[network.connection_index]
network.connection_index++
network.connection_locker.Unlock()
}
write_offset := 0
data_len := len(buffer)
for {
n, err := conn.Write(buffer[write_offset:])
if err != nil {
// if we have EOF, then we need to return from write functionality
// connection would be closed from reader
if err == io.EOF {
return errors.New("Connection channel closed!")
}
break
}
if n+write_offset < data_len {
write_offset += n
continue
}
// if we got here, then all data have been written
// so just breaking loop to return
break
}
return nil
}
// handling connection for readable data
func (network *tcp_net) handle_connection(index int, conn *net.TCPConn, handshake []byte) {
err := network.write(handshake, conn)
if err != nil {
// if we got error during handshake write
// just closing connection
network.close_connection(index)
return
}
// keeping 4 bytes for reading first 4 bytes
endian_data := make([]byte, 4)
// after writing handshake, we need to read server handshake also
// first reading API version as a BigEndian 4 bytes
api_version, close_conn := read_endian(conn, endian_data)
if close_conn {
network.close_connection(index)
return
}
// checking if we got valid API version
if api_version > MAX_API_VERSION || api_version == 0 {
go network.api.trigger_local(EVENT_ON_ERROR, []byte(fmt.Sprintf("Invalid API version %d from TCP channel", api_version)))
network.close_connection(index)
return
}
// reading token and value from endpoint
token_value_buf, close_conn := read_message(conn, endian_data)
if close_conn {
network.close_connection(index)
return
}
tv_len := len(token_value_buf)
token := string(token_value_buf[:tv_len-8])
value := binary.BigEndian.Uint64(token_value_buf[tv_len-8:])
// if we got new API value and token, then probably this is a new connection
// so setting up new endpoint values and triggering event
network.api.endpoint_info_locker.Lock()
if network.api.endpoint_value != value || network.api.endpoint_token != token {
network.api.endpoint_value = value
network.api.endpoint_token = token
go network.api.trigger_local(EVENT_ON_CONNECTION, []byte(fmt.Sprintf("%s|%d", token, value)))
}
network.api.endpoint_info_locker.Unlock()
var data []byte
for {
data, close_conn = read_message(conn, endian_data)
if close_conn {
network.close_connection(index)
return
}
go network.api.handle_data(data)
}
}
func (network *tcp_net) close_connection(index int) {
network.connection_locker.Lock()
if conn, ok := network.connections[index]; ok {
conn.Close()
delete(network.connections, index)
go network.api.trigger_local(EVENT_ON_CHANNEL_DISCONNECT,
[]byte(fmt.Sprintf("TCP Channel closed. Left %d channels", len(network.connections))))
// if we don't have channels anymore
// triggering event about connection close
if len(network.connections) == 0 {
go network.api.trigger_local(EVENT_ON_DISCONNECT, []byte(fmt.Sprintf("%s|%d", network.api.endpoint_token, network.api.endpoint_value)))
}
}
network.connection_locker.Unlock()
}
func read_endian(conn *net.TCPConn, endian_data []byte) (uint32, bool) {
// sometimes, we will need indexing 4 bytes because it is possible
// that connection is not sending full 4 bytes at once
endian_index := 0
for {
n, err := conn.Read(endian_data[endian_index:])
if err != nil {
// if we have EOF, then we need to return from write functionality
// connection would be closed from reader
if err == io.EOF {
return 0, true
}
// if we got temporary network error, just sleeping for a few milliseconds
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
time.Sleep(time.Millisecond * 10)
}
}
if n+endian_index < 4 {
endian_index += n
}
break
}
return binary.BigEndian.Uint32(endian_data), false
}
func read_message(conn *net.TCPConn, endian_data []byte) ([]byte, bool) {
length, close_conn := read_endian(conn, endian_data)
if close_conn || length > MAX_READ_DATA_LEN {
return nil, true
}
buffer := make([]byte, length)
offset := 0
for {
n, err := conn.Read(buffer[offset:])
if err != nil {
// if we have EOF, then we need to return from write functionality
// connection would be closed from reader
if err == io.EOF {
return nil, true
}
// if we got temporary network error, just sleeping for a few milliseconds
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
time.Sleep(time.Millisecond * 10)
}
}
if n+offset < int(length) {
offset += n
}
break
}
return buffer, false
}