forked from Haivision/srtgo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpollserver.go
109 lines (97 loc) · 2.43 KB
/
pollserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package srtgo
/*
#cgo LDFLAGS: -lsrt
#include <srt/srt.h>
*/
import "C"
import (
"sync"
"unsafe"
)
var (
phctx *pollServer
once sync.Once
)
func pollServerCtx() *pollServer {
once.Do(pollServerCtxInit)
return phctx
}
func pollServerCtxInit() {
eid := C.srt_epoll_create()
C.srt_epoll_set(eid, C.SRT_EPOLL_ENABLE_EMPTY)
phctx = &pollServer{
srtEpollDescr: eid,
pollDescs: make(map[C.SRTSOCKET]*pollDesc),
}
go phctx.run()
}
type pollServer struct {
srtEpollDescr C.int
pollDescLock sync.Mutex
pollDescs map[C.SRTSOCKET]*pollDesc
}
func (p *pollServer) pollOpen(pd *pollDesc) {
//use uint because otherwise with ET it would overflow :/ (srt should accept an uint instead, or fix it's SRT_EPOLL_ET definition)
events := C.uint(C.SRT_EPOLL_IN | C.SRT_EPOLL_OUT | C.SRT_EPOLL_ERR | C.SRT_EPOLL_ET)
//via unsafe.Pointer because we cannot cast *C.uint to *C.int directly
//block poller
p.pollDescLock.Lock()
ret := C.srt_epoll_add_usock(p.srtEpollDescr, pd.fd, (*C.int)(unsafe.Pointer(&events)))
if ret == -1 {
panic("ERROR ADDING FD TO EPOLL")
}
p.pollDescs[pd.fd] = pd
p.pollDescLock.Unlock()
}
func (p *pollServer) pollClose(pd *pollDesc) {
sockstate := C.srt_getsockstate(pd.fd)
//Broken/closed sockets get removed internally by SRT lib
if sockstate == C.SRTS_BROKEN || sockstate == C.SRTS_CLOSING || sockstate == C.SRTS_CLOSED || sockstate == C.SRTS_NONEXIST {
return
}
ret := C.srt_epoll_remove_usock(p.srtEpollDescr, pd.fd)
if ret == -1 {
panic("ERROR REMOVING FD FROM EPOLL")
}
p.pollDescLock.Lock()
delete(p.pollDescs, pd.fd)
p.pollDescLock.Unlock()
}
func init() {
}
func (p *pollServer) run() {
timeoutMs := C.int64_t(-1)
fds := [128]C.SRT_EPOLL_EVENT{}
fdlen := C.int(128)
for {
res := C.srt_epoll_uwait(p.srtEpollDescr, &fds[0], fdlen, timeoutMs)
if res == 0 {
continue //Shouldn't happen with -1
} else if res == -1 {
panic("srt_epoll_error")
} else if res > 0 {
max := int(res)
if fdlen < res {
max = int(fdlen)
}
p.pollDescLock.Lock()
for i := 0; i < max; i++ {
s := fds[i].fd
events := fds[i].events
pd := p.pollDescs[s]
if events&C.SRT_EPOLL_ERR != 0 {
pd.unblock(ModeRead, true, false)
pd.unblock(ModeWrite, true, false)
continue
}
if events&C.SRT_EPOLL_IN != 0 {
pd.unblock(ModeRead, false, true)
}
if events&C.SRT_EPOLL_OUT != 0 {
pd.unblock(ModeWrite, false, true)
}
}
p.pollDescLock.Unlock()
}
}
}