-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathunbuffered.go
139 lines (116 loc) · 3.55 KB
/
unbuffered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2020 Ian Gudger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memnet
import (
"context"
"errors"
"net"
"sync"
)
type unbufferedPacketConn struct {
net.Conn
}
var _ net.PacketConn = (*unbufferedPacketConn)(nil)
// ReadFrom implements net.PacketConn.ReadFrom.
func (c unbufferedPacketConn) ReadFrom(b []byte) (int, net.Addr, error) {
n, err := c.Read(b)
return n, c.RemoteAddr(), err
}
// WriteTo implements net.PacketConn.WriteTo.
//
// addr is ignored.
func (c unbufferedPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
return c.Write(b)
}
// NewUnbufferedPair creates a connected pair (socketpair) of unbuffered net.Conns.
//
// The returned values also implement net.PacketConn.
//
// The implementation is inherently message oriented due to the lack of buffering. For each WriteXxx call, there is one opportunity to read the written data. If the buffer provided to ReadXxx is smaller than the buffer provided to the corresponding WriteXxx call, the unreadable data will be silently discarded.
func NewUnbufferedPair() (net.Conn, net.Conn) {
c1, c2 := net.Pipe()
return &unbufferedPacketConn{c1}, &unbufferedPacketConn{c2}
}
var errClosed = errors.New("closed")
// unbufferedAddr is similar to net.pipeAddr.
type unbufferedAddr struct{}
func (unbufferedAddr) Network() string { return "unbuffered" }
func (unbufferedAddr) String() string { return "unbuffered" }
// Unbuffered is an unbuffered net.Listener implemented in memory in userspace.
//
// Lighter weight, but not compatible with all applications. Creates unbuffered net.Conns.
type Unbuffered struct {
acceptQueue chan net.Conn
cancelOnce sync.Once
cancel chan struct{}
}
var _ net.Listener = (*Unbuffered)(nil)
// NewUnbuffered creates a new unbuffered listener.
func NewUnbuffered() *Unbuffered {
return &Unbuffered{acceptQueue: make(chan net.Conn), cancel: make(chan struct{})}
}
// Dial creates a new unbuffered connection with no timeout.
func (u *Unbuffered) Dial() (net.Conn, error) {
return u.DialContext(context.Background())
}
// DialContext creates a new unbuffered connection with a context.
func (u *Unbuffered) DialContext(ctx context.Context) (net.Conn, error) {
// Check for closure/cancellation first.
select {
case <-u.cancel:
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()
default:
}
c1, c2 := NewUnbufferedPair()
select {
case <-u.cancel:
c1.Close()
c2.Close()
return nil, errClosed
case <-ctx.Done():
c1.Close()
c2.Close()
return nil, ctx.Err()
case u.acceptQueue <- c1:
return c2, nil
}
}
// Close implements net.Listener.Close.
func (u *Unbuffered) Close() error {
u.cancelOnce.Do(func() {
close(u.cancel)
})
return nil
}
// Accept implements net.Listener.Accept.
func (u *Unbuffered) Accept() (net.Conn, error) {
// Check for closure first.
select {
case <-u.cancel:
return nil, errClosed
default:
}
select {
case <-u.cancel:
return nil, errClosed
case a := <-u.acceptQueue:
return a, nil
}
}
// Addr implements net.Listener.Addr.
func (u *Unbuffered) Addr() net.Addr {
return unbufferedAddr{}
}