Skip to content

Commit d525579

Browse files
refactor: introduce reliable transport layer (#50)
This is the third commit in the series of incremental refactoring of the current minivpn tree. In this commit, we introduce the reliabletransport package, which is the layer between packetmuxer and control channel. Reliable transport adds a reliability layer on top of UDP (but it's also used in TCP mode). This first implementation of the reliability layer implements only a naive strategy to get the TLS handshake working on an optimal environment (i.e., no packet loss). We ACK any incoming packet, and we assume all packets arrive in order. After merging all the components in the new architecture, we will revisit the reliability layer to follow the OpenVPN design more closely. Reference issue: #47 --------- Co-authored-by: Simone Basso <[email protected]>
1 parent 4d0ca13 commit d525579

File tree

1 file changed

+187
-0
lines changed

1 file changed

+187
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Package reliabletransport implements the reliable transport.
2+
package reliabletransport
3+
4+
import (
5+
"bytes"
6+
7+
"github.com/ooni/minivpn/internal/model"
8+
"github.com/ooni/minivpn/internal/session"
9+
"github.com/ooni/minivpn/internal/workers"
10+
)
11+
12+
// Service is the reliable service. Make sure you initialize
13+
// the channels before invoking [Service.StartWorkers].
14+
type Service struct {
15+
// DataOrControlToMuxer is a shared channel that moves packets down to the muxer
16+
DataOrControlToMuxer *chan *model.Packet
17+
18+
// ControlToReliable moves packets down to us
19+
ControlToReliable chan *model.Packet
20+
21+
// MuxerToReliable moves packets up to us
22+
MuxerToReliable chan *model.Packet
23+
24+
// ReliableToControl moves packets up from us to the control layer above
25+
ReliableToControl *chan *model.Packet
26+
}
27+
28+
// StartWorkers starts the reliable-transport workers. See the [ARCHITECTURE]
29+
// file for more information about the reliable-transport workers.
30+
//
31+
// [ARCHITECTURE]: https://github.com/ooni/minivpn/blob/main/ARCHITECTURE.md
32+
func (s *Service) StartWorkers(
33+
logger model.Logger,
34+
workersManager *workers.Manager,
35+
sessionManager *session.Manager,
36+
) {
37+
ws := &workersState{
38+
logger: logger,
39+
dataOrControlToMuxer: *s.DataOrControlToMuxer,
40+
controlToReliable: s.ControlToReliable,
41+
muxerToReliable: s.MuxerToReliable,
42+
reliableToControl: *s.ReliableToControl,
43+
sessionManager: sessionManager,
44+
workersManager: workersManager,
45+
}
46+
workersManager.StartWorker(ws.moveUpWorker)
47+
workersManager.StartWorker(ws.moveDownWorker)
48+
}
49+
50+
// workersState contains the reliable workers state
51+
type workersState struct {
52+
// logger is the logger to use
53+
logger model.Logger
54+
55+
// dataOrControlToMuxer is the channel where we write packets going down the stack.
56+
dataOrControlToMuxer chan<- *model.Packet
57+
58+
// controlToReliable is the channel from which we read packets going down the stack.
59+
controlToReliable <-chan *model.Packet
60+
61+
// muxerToReliable is the channel from which we read packets going up the stack.
62+
muxerToReliable <-chan *model.Packet
63+
64+
// reliableToControl is the channel where we write packets going up the stack.
65+
reliableToControl chan<- *model.Packet
66+
67+
// sessionManager manages the OpenVPN session.
68+
sessionManager *session.Manager
69+
70+
// workersManager controls the workers lifecycle.
71+
workersManager *workers.Manager
72+
}
73+
74+
// moveUpWorker moves packets up the stack
75+
func (ws *workersState) moveUpWorker() {
76+
defer func() {
77+
ws.workersManager.OnWorkerDone()
78+
ws.workersManager.StartShutdown()
79+
ws.logger.Debug("reliable: moveUpWorker: done")
80+
}()
81+
82+
ws.logger.Debug("reliable: moveUpWorker: started")
83+
84+
// TODO: do we need to have notifications from the control channel
85+
// to reset state or can we do this implicitly?
86+
87+
for {
88+
// POSSIBLY BLOCK reading a packet to move up the stack
89+
// or POSSIBLY BLOCK waiting for notifications
90+
select {
91+
case packet := <-ws.muxerToReliable:
92+
ws.logger.Infof(
93+
"< %s localID=%x remoteID=%x [%d bytes]",
94+
packet.Opcode,
95+
packet.LocalSessionID,
96+
packet.RemoteSessionID,
97+
len(packet.Payload),
98+
)
99+
100+
// drop a packet that is not for our session
101+
if !bytes.Equal(packet.LocalSessionID[:], ws.sessionManager.RemoteSessionID()) {
102+
ws.logger.Warnf(
103+
"reliable: moveUpWorker: packet with invalid RemoteSessionID: expected %x; got %x",
104+
ws.sessionManager.LocalSessionID(),
105+
packet.RemoteSessionID,
106+
)
107+
continue
108+
}
109+
110+
// possibly ACK the incoming packet
111+
if err := ws.maybeACK(packet); err != nil {
112+
ws.logger.Warnf("reliable: moveUpWorker: cannot ACK packet: %s", err.Error())
113+
continue
114+
}
115+
116+
// TODO: here we should track ACKs
117+
118+
// POSSIBLY BLOCK delivering to the upper layer
119+
select {
120+
case ws.reliableToControl <- packet:
121+
case <-ws.workersManager.ShouldShutdown():
122+
return
123+
}
124+
125+
case <-ws.workersManager.ShouldShutdown():
126+
return
127+
}
128+
}
129+
}
130+
131+
// moveDownWorker moves packets down the stack
132+
func (ws *workersState) moveDownWorker() {
133+
defer func() {
134+
ws.workersManager.OnWorkerDone()
135+
ws.workersManager.StartShutdown()
136+
ws.logger.Debug("reliable: moveDownWorker: done")
137+
}()
138+
139+
ws.logger.Debug("reliable: moveDownWorker: started")
140+
141+
// TODO: we should have timer for retransmission
142+
for {
143+
// POSSIBLY BLOCK reading the next packet we should move down the stack
144+
select {
145+
case packet := <-ws.controlToReliable:
146+
// TODO: here we should treat control packets specially
147+
148+
ws.logger.Infof(
149+
"> %s localID=%x remoteID=%x [%d bytes]",
150+
packet.Opcode,
151+
packet.LocalSessionID,
152+
packet.RemoteSessionID,
153+
len(packet.Payload),
154+
)
155+
156+
// POSSIBLY BLOCK delivering this packet to the lower layer
157+
select {
158+
case ws.dataOrControlToMuxer <- packet:
159+
case <-ws.workersManager.ShouldShutdown():
160+
return
161+
}
162+
163+
case <-ws.workersManager.ShouldShutdown():
164+
return
165+
}
166+
}
167+
}
168+
169+
// maybeACK sends an ACK when needed.
170+
func (ws *workersState) maybeACK(packet *model.Packet) error {
171+
// currently we are ACKing every packet
172+
// TODO: implement better ACKing strategy
173+
174+
// this function will fail if we don't know the remote session ID
175+
ACK, err := ws.sessionManager.NewACKForPacket(packet)
176+
if err != nil {
177+
return err
178+
}
179+
180+
// move the packet down
181+
select {
182+
case ws.dataOrControlToMuxer <- ACK:
183+
return nil
184+
case <-ws.workersManager.ShouldShutdown():
185+
return workers.ErrShutdown
186+
}
187+
}

0 commit comments

Comments
 (0)