Skip to content

Commit

Permalink
improve linux and windows
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Aug 5, 2023
1 parent 31dd563 commit 2341902
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 70 deletions.
21 changes: 0 additions & 21 deletions .travis.yml

This file was deleted.

16 changes: 10 additions & 6 deletions epoll_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (
"syscall"
)

var _ Poller = (*Epoll)(nil)

// Epoll is a epoll based poller.
type Epoll struct {
fd int
ts syscall.Timespec
fd int
ts syscall.Timespec

mu *sync.RWMutex
changes []syscall.Kevent_t
conns map[int]net.Conn
mu *sync.RWMutex
connbuf []net.Conn
events []syscall.Kevent_t
}
Expand Down Expand Up @@ -133,18 +137,18 @@ retry:
return nil, err
}

connections := make([]net.Conn, 0, n)
conns := make([]net.Conn, 0, n)
e.mu.RLock()
for i := 0; i < n; i++ {
conn := e.conns[int(events[i].Ident)]
if (events[i].Flags & syscall.EV_EOF) == syscall.EV_EOF {
conn.Close()
}
connections = append(connections, conn)
conns = append(conns, conn)
}
e.mu.RUnlock()

return connections, nil
return conns, nil
}

// WaitWithBuffer waits for events and returns the connections.
Expand Down
65 changes: 43 additions & 22 deletions epoll_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,56 @@ import (
"golang.org/x/sys/unix"
)

var _ Poller = (*Epoll)(nil)

// Epoll is a epoll based poller.
type Epoll struct {
fd int
connections map[int]net.Conn
lock *sync.RWMutex
connbuf []net.Conn
events []unix.EpollEvent
fd int

lock *sync.RWMutex
conns map[int]net.Conn
connbuf []net.Conn
events []unix.EpollEvent
}

// NewPoller creates a new epoll poller.
func NewPoller() (*Epoll, error) {
return NewPollerWithBuffer(128)
}

// NewPollerWithBuffer creates a new epoll poller with a buffer.
func NewPollerWithBuffer(count int) (*Epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
connbuf: make([]net.Conn, count, count),
events: make([]unix.EpollEvent, count, count),
return &Epoll{
fd: fd,
lock: &sync.RWMutex{},
conns: make(map[int]net.Conn),
connbuf: make([]net.Conn, count, count),
events: make([]unix.EpollEvent, count, count),
}, nil
}

func (e *Epoll) Close() error {
// Close closes the poller. If closeConns is true, it will close all the connections.
func (e *Epoll) Close(closeConns bool) error {
e.lock.Lock()
defer e.lock.Unlock()

e.connections = nil
if closeConns {
for _, conn := range e.conns {
conn.Close()
}
}

e.conns = nil
e.connbuf = e.connbuf[:0]

return unix.Close(e.fd)
}

// Add adds a connection to the poller.
func (e *Epoll) Add(conn net.Conn) error {
conn = newConnImpl(conn)
fd := socketFD(conn)
Expand All @@ -60,10 +76,11 @@ func (e *Epoll) Add(conn net.Conn) error {
if err != nil {
return err
}
e.connections[fd] = conn
e.conns[fd] = conn
return nil
}

// Remove removes a connection from the poller.
func (e *Epoll) Remove(conn net.Conn) error {
fd := socketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
Expand All @@ -72,10 +89,12 @@ func (e *Epoll) Remove(conn net.Conn) error {
}
e.lock.Lock()
defer e.lock.Unlock()
delete(e.connections, fd)
delete(e.conns, fd)

return nil
}

// Wait waits for at most count events and returns the connections.
func (e *Epoll) Wait(count int) ([]net.Conn, error) {
events := make([]unix.EpollEvent, count, count)

Expand All @@ -91,7 +110,7 @@ retry:
connections := make([]net.Conn, 0, n)
e.lock.RLock()
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Fd)]
conn := e.conns[int(events[i].Fd)]
if (events[i].Events & unix.POLLHUP) == unix.POLLHUP {
conn.Close()
}
Expand All @@ -103,6 +122,7 @@ retry:
return connections, nil
}

// WaitWithBuffer waits for at most count events and returns the connections, with a buffered connection slice.
func (e *Epoll) WaitWithBuffer() ([]net.Conn, error) {
retry:
n, err := unix.EpollWait(e.fd, e.events, -1)
Expand All @@ -113,22 +133,23 @@ retry:
return nil, err
}

connections := e.connbuf[:0]
conns := e.connbuf[:0]
e.lock.RLock()
for i := 0; i < n; i++ {
conn := e.connections[int(e.events[i].Fd)]
conn := e.conns[int(e.events[i].Fd)]
if (e.events[i].Events & unix.POLLHUP) == unix.POLLHUP {
conn.Close()
}
connections = append(connections, conn)
conns = append(conns, conn)
}
e.lock.RUnlock()

return connections, nil
return conns, nil
}

func (e *Epoll) WaitChan(count int) <-chan []net.Conn {
ch := make(chan []net.Conn)
// WaitChan returns a channel that you can use to receive connections.
func (e *Epoll) WaitChan(count, chanBuffer int) <-chan []net.Conn {
ch := make(chan []net.Conn, chanBuffer)
go func() {
for {
conns, err := e.Wait(count)
Expand Down
11 changes: 8 additions & 3 deletions epoll_windows.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows && cgo
// +build windows,cgo

package epoller
Expand All @@ -6,12 +7,16 @@ import (
"github.com/smallnest/epoller/wepoll"
)

type epoll = wepoll.Epoll
var _ Poller = (*Epoll)(nil)

func NewPoller() (Poller, error) {
type Epoll = wepoll.Epoll

// NewPoller creates a new epoll poller.
func NewPoller() (*Epoll, error) {
return wepoll.NewPoller()
}

func NewPollerWithBuffer(count int) (Poller, error) {
// NewPollerWithBuffer creates a new epoll poller with a buffer.
func NewPollerWithBuffer(count int) (*Epoll, error) {
return wepoll.NewPollerWithBuffer(count)
}
52 changes: 34 additions & 18 deletions wepoll/epoll_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,58 @@ import (
)

type Epoll struct {
fd C.uintptr_t
connections map[int]net.Conn
lock *sync.RWMutex
connbuf []net.Conn
events []C.epoll_event
fd C.uintptr_t

lock *sync.RWMutex
conns map[int]net.Conn
connbuf []net.Conn
events []C.epoll_event
}

// NewPoller creates a new epoll poller.
func NewPoller() (*Epoll, error) {
fd := C.epoll_create1(0)

if fd == 0 {
return nil, errors.New("epoll_create1 error")
}
return &Epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
connbuf: make([]net.Conn, 128, 128),
events: make([]C.epoll_event, 128, 128),
fd: fd,
lock: &sync.RWMutex{},
conns: make(map[int]net.Conn),
connbuf: make([]net.Conn, 128, 128),
events: make([]C.epoll_event, 128, 128),
}, nil
}

// NewPollerWithBuffer creates a new epoll poller with a buffer.
func NewPollerWithBuffer(count int) (*Epoll, error) {
fd := C.epoll_create1(0)
if fd == 0 {
return nil, errors.New("epoll_create1 error")
}
return &Epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
connbuf: make([]net.Conn, count, count),
events: make([]C.epoll_event, count, count),
fd: fd,
lock: &sync.RWMutex{},
conns: make(map[int]net.Conn),
connbuf: make([]net.Conn, count, count),
events: make([]C.epoll_event, count, count),
}, nil
}

func (e *Epoll) Close() error {
func (e *Epoll) Close(closeConns bool) error {
e.lock.Lock()
defer e.lock.Unlock()

e.connections = nil
if closeConns {
for _, conn := range e.conns {
conn.Close()
}
}

e.conns = nil
e.connbuf = e.connbuf[:0]

i := C.epoll_close(e.fd)
if i == 0 {
return nil
Expand All @@ -64,6 +75,7 @@ func (e *Epoll) Close() error {
}
}

// Add adds a connection to the poller.
func (e *Epoll) Add(conn net.Conn) error {
// Extract file descriptor associated with the connection
fd := C.SOCKET(socketFDAsUint(conn))
Expand All @@ -79,6 +91,7 @@ func (e *Epoll) Add(conn net.Conn) error {
return nil
}

// Remove removes a connection from the poller.
func (e *Epoll) Remove(conn net.Conn) error {
fd := C.SOCKET(socketFDAsUint(conn))
var ev C.epoll_event
Expand All @@ -92,8 +105,9 @@ func (e *Epoll) Remove(conn net.Conn) error {
return nil
}

// Wait waits for events on the connections the poller is managing.
func (e *Epoll) Wait(count int) ([]net.Conn, error) {
events := make([]C.epoll_event, count, count)
events := make([]C.epoll_event, count)

n := C.epoll_wait(e.fd, &events[0], C.int(count), -1)
if n == -1 {
Expand All @@ -113,6 +127,7 @@ func (e *Epoll) Wait(count int) ([]net.Conn, error) {
return connections, nil
}

// WaitWithBuffer waits for events on the connections the poller is managing.
func (e *Epoll) WaitWithBuffer() ([]net.Conn, error) {
n := C.epoll_wait(e.fd, &e.events[0], 128, -1)
if n == -1 {
Expand All @@ -132,6 +147,7 @@ func (e *Epoll) WaitWithBuffer() ([]net.Conn, error) {
return connections, nil
}

// WaitChan waits for events on the connections the poller is managing.
func (e *Epoll) WaitChan(count int) <-chan []net.Conn {
ch := make(chan []net.Conn)
go func() {
Expand Down

0 comments on commit 2341902

Please sign in to comment.