forked from vcabbage/amqp
-
Notifications
You must be signed in to change notification settings - Fork 59
/
Copy pathsession.go
838 lines (720 loc) · 26.9 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
package amqp
import (
"context"
"errors"
"fmt"
"math"
"sync"
"github.com/Azure/go-amqp/internal/bitmap"
"github.com/Azure/go-amqp/internal/debug"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/queue"
)
// Default session options
const (
defaultWindow = 5000
)
// SessionOptions contains the optional settings for configuring an AMQP session.
type SessionOptions struct {
// MaxLinks sets the maximum number of links (Senders/Receivers)
// allowed on the session.
//
// Minimum: 1.
// Default: 4294967295.
MaxLinks uint32
}
// Session is an AMQP session.
//
// A session multiplexes Receivers.
type Session struct {
channel uint16 // session's local channel
remoteChannel uint16 // session's remote channel, owned by conn.connReader
conn *Conn // underlying conn
tx chan frameBodyEnvelope // non-transfer frames to be sent; session must track disposition
txTransfer chan transferEnvelope // transfer frames to be sent; session must track disposition
// frames destined for this session are added to this queue by conn.connReader
rxQ *queue.Holder[frames.FrameBody]
// properties returned by the peer
peerProperties map[string]any
// flow control
incomingWindow uint32
outgoingWindow uint32
needFlowCount uint32
handleMax uint32
// link management
linksMu sync.RWMutex // used to synchronize link handle allocation
linksByKey map[linkKey]*link // mapping of name+role link
outputHandles *bitmap.Bitmap // allocated output handles
abandonedLinksMu sync.Mutex
abandonedLinks []*link
// used for gracefully closing session
close chan struct{} // closed by calling Close(). it signals that the end performative should be sent
closeOnce sync.Once
// part of internal public surface area
done chan struct{} // closed when the session has terminated (mux exited); DO NOT wait on this from within Session.mux() as it will never trigger!
endSent chan struct{} // closed when the end performative has been sent; once this is closed, links MUST NOT send any frames!
doneErr error // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed!
closeErr error // contains the error state returned from Close(); ONLY Close() reads/writes this!
}
func newSession(c *Conn, channel uint16, opts *SessionOptions) *Session {
s := &Session{
conn: c,
channel: channel,
tx: make(chan frameBodyEnvelope),
txTransfer: make(chan transferEnvelope),
incomingWindow: defaultWindow,
outgoingWindow: defaultWindow,
handleMax: math.MaxUint32 - 1,
linksMu: sync.RWMutex{},
linksByKey: make(map[linkKey]*link),
close: make(chan struct{}),
done: make(chan struct{}),
endSent: make(chan struct{}),
}
if opts != nil {
if opts.MaxLinks != 0 {
// MaxLinks is the number of total links.
// handleMax is the max handle ID which starts
// at zero. so we decrement by one
s.handleMax = opts.MaxLinks - 1
}
}
// create output handle map after options have been applied
s.outputHandles = bitmap.New(s.handleMax)
s.rxQ = queue.NewHolder(queue.New[frames.FrameBody](int(s.incomingWindow)))
return s
}
// waitForFrame waits for an incoming frame to be queued.
// it returns the next frame from the queue, or an error.
// the error is either from the context or conn.doneErr.
// not meant for consumption outside of session.go.
func (s *Session) waitForFrame(ctx context.Context) (frames.FrameBody, error) {
var q *queue.Queue[frames.FrameBody]
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.conn.done:
return nil, s.conn.doneErr
case q = <-s.rxQ.Wait():
// populated queue
}
fr := q.Dequeue()
s.rxQ.Release(q)
return *fr, nil
}
func (s *Session) begin(ctx context.Context) error {
// send Begin to server
begin := &frames.PerformBegin{
NextOutgoingID: 0,
IncomingWindow: s.incomingWindow,
OutgoingWindow: s.outgoingWindow,
HandleMax: s.handleMax,
}
if err := s.txFrameAndWait(ctx, begin); err != nil {
return err
}
// wait for response
fr, err := s.waitForFrame(ctx)
if err != nil {
// if we exit before receiving the ack, our caller will clean up the channel.
// however, it does mean that the peer will now have assigned an outgoing
// channel ID that's not in use.
return err
}
begin, ok := fr.(*frames.PerformBegin)
if !ok {
// this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not
// yet seen the remote channel number, the default clause in conn.connReader will protect us from that.
// if we have seen the remote channel number then it's likely the session.mux for that channel will
// either swallow the frame or blow up in some other way, both causing this call to hang.
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
debug.Log(1, "RX (Session %p): unexpected begin response frame %T", s, fr)
s.conn.deleteSession(s)
if err := s.conn.Close(); err != nil {
return err
}
return &ConnError{inner: fmt.Errorf("unexpected begin response: %#v", fr)}
}
if len(begin.Properties) > 0 {
s.peerProperties = map[string]any{}
for k, v := range begin.Properties {
s.peerProperties[string(k)] = v
}
}
// start Session multiplexor
go s.mux(begin)
return nil
}
// Close closes the session.
// - ctx controls waiting for the peer to acknowledge the session is closed
//
// If the context's deadline expires or is cancelled before the operation
// completes, an error is returned. However, the operation will continue to
// execute in the background. Subsequent calls will return a *SessionError
// that contains the context's error message.
func (s *Session) Close(ctx context.Context) error {
var ctxErr error
s.closeOnce.Do(func() {
close(s.close)
// once the mux has received the ack'ing end performative, the mux will
// exit which deletes the session and closes s.done.
select {
case <-s.done:
s.closeErr = s.doneErr
case <-ctx.Done():
// notify the caller that the close timed out/was cancelled.
// the mux will remain running and once the ack is received it will terminate.
ctxErr = ctx.Err()
// record that the close timed out/was cancelled.
// subsequent calls to Close() will return this
debug.Log(1, "TX (Session %p) channel %d: %v", s, s.channel, ctxErr)
s.closeErr = &SessionError{inner: ctxErr}
}
})
if ctxErr != nil {
return ctxErr
}
var sessionErr *SessionError
if errors.As(s.closeErr, &sessionErr) && sessionErr.RemoteErr == nil && sessionErr.inner == nil {
// an empty SessionError means the session was cleanly closed by the caller
return nil
}
return s.closeErr
}
// txFrame sends a frame to the connWriter.
// - ctx is used to provide the write deadline
// - fr is the frame to write to net.Conn
func (s *Session) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
debug.Log(2, "TX (Session %p) mux frame to Conn (%p): %s", s, s.conn, fr)
s.conn.sendFrame(frameEnvelope{
FrameCtx: frameCtx,
Frame: frames.Frame{
Type: frames.TypeAMQP,
Channel: s.channel,
Body: fr,
},
})
}
// txFrameAndWait sends a frame to the connWriter and waits for the write to complete
// - ctx is used to provide the write deadline
// - fr is the frame to write to net.Conn
func (s *Session) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
s.txFrame(&frameCtx, fr)
select {
case <-frameCtx.Done:
return frameCtx.Err
case <-s.conn.done:
return s.conn.doneErr
case <-s.done:
return s.doneErr
}
}
// NewReceiver opens a new receiver link on the session.
// - ctx controls waiting for the peer to create a sending terminus
// - source is the name of the peer's sending terminus
// - opts contains optional values, pass nil to accept the defaults
//
// If the context's deadline expires or is cancelled before the operation
// completes, an error is returned. If the Receiver was successfully
// created, it will be cleaned up in future calls to NewReceiver.
func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error) {
return newReceiverForSession(ctx, s, source, opts, receiverTestHooks{})
}
// split out so tests can add hooks
func newReceiverForSession(ctx context.Context, s *Session, source string, opts *ReceiverOptions, hooks receiverTestHooks) (*Receiver, error) {
r, err := newReceiver(source, s, opts)
if err != nil {
return nil, err
}
if err = r.attach(ctx); err != nil {
return nil, err
}
go r.mux(hooks)
return r, nil
}
// NewSender opens a new sender link on the session.
// - ctx controls waiting for the peer to create a receiver terminus
// - target is the name of the peer's receiver terminus
// - opts contains optional values, pass nil to accept the defaults
//
// If the context's deadline expires or is cancelled before the operation
// completes, an error is returned. If the Sender was successfully
// created, it will be cleaned up in future calls to NewSender.
func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error) {
return newSenderForSession(ctx, s, target, opts, senderTestHooks{})
}
// Properties returns the peer's session properties.
// Returns nil if the peer didn't send any properties.
func (s *Session) Properties() map[string]any {
return s.peerProperties
}
// split out so tests can add hooks
func newSenderForSession(ctx context.Context, s *Session, target string, opts *SenderOptions, hooks senderTestHooks) (*Sender, error) {
l, err := newSender(target, s, opts)
if err != nil {
return nil, err
}
if err = l.attach(ctx); err != nil {
return nil, err
}
go l.mux(hooks)
return l, nil
}
func (s *Session) mux(remoteBegin *frames.PerformBegin) {
defer func() {
if s.doneErr == nil {
s.doneErr = &SessionError{}
} else if connErr := (&ConnError{}); !errors.As(s.doneErr, &connErr) {
// only wrap non-ConnError error types
var amqpErr *Error
if errors.As(s.doneErr, &amqpErr) {
s.doneErr = &SessionError{RemoteErr: amqpErr}
} else {
s.doneErr = &SessionError{inner: s.doneErr}
}
}
// Signal goroutines waiting on the session.
close(s.done)
}()
var (
// maps input (remote) handles to links
linkFromInputHandle = make(map[uint32]*link)
// maps local delivery IDs (sending transfers) to input (remote) handles
inputHandleFromDeliveryID = make(map[uint32]uint32)
// maps remote delivery IDs (receiving transfers) to input (remote) handles
inputHandleFromRemoteDeliveryID = make(map[uint32]uint32)
// maps delivery IDs to output (our) handles. used for multi-frame transfers
deliveryIDFromOutputHandle = make(map[uint32]uint32)
// maps delivery IDs to the settlement state channel
settlementFromDeliveryID = make(map[uint32]chan encoding.DeliveryState)
// tracks the next delivery ID for outgoing transfers
nextDeliveryID uint32
// flow control values
nextOutgoingID uint32
nextIncomingID = remoteBegin.NextOutgoingID
remoteIncomingWindow = remoteBegin.IncomingWindow
remoteOutgoingWindow = remoteBegin.OutgoingWindow
closeInProgress bool // indicates the end performative has been sent
)
closeWithError := func(e1 *Error, e2 error) {
if closeInProgress {
debug.Log(3, "TX (Session %p): close already pending, discarding %v", s, e1)
return
}
closeInProgress = true
s.doneErr = e2
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1})
close(s.endSent)
}
for {
txTransfer := s.txTransfer
// disable txTransfer if flow control windows have been exceeded
if remoteIncomingWindow == 0 || s.outgoingWindow == 0 {
debug.Log(1, "TX (Session %p): disabling txTransfer - window exceeded. remoteIncomingWindow: %d outgoingWindow: %d",
s, remoteIncomingWindow, s.outgoingWindow)
txTransfer = nil
}
tx := s.tx
closed := s.close
if closeInProgress {
// swap out channel so it no longer triggers
closed = nil
// once the end performative is sent, we're not allowed to send any frames
tx = nil
txTransfer = nil
}
// notes on client-side closing session
// when session is closed, we must keep the mux running until the ack'ing end performative
// has been received. during this window, the session is allowed to receive frames but cannot
// send them.
// client-side close happens either by user calling Session.Close() or due to mux initiated
// close due to a violation of some invariant (see sending &Error{} to s.close). in the case
// that both code paths have been triggered, we must be careful to preserve the error that
// triggered the mux initiated close so it can be surfaced to the caller.
select {
// conn has completed, exit
case <-s.conn.done:
s.doneErr = s.conn.doneErr
return
case <-closed:
if closeInProgress {
// a client-side close due to protocol error is in progress
continue
}
// session is being closed by the client
closeInProgress = true
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{})
close(s.endSent)
// incoming frame
case q := <-s.rxQ.Wait():
fr := *q.Dequeue()
s.rxQ.Release(q)
debug.Log(2, "RX (Session %p): %s", s, fr)
switch body := fr.(type) {
// Disposition frames can reference transfers from more than one
// link. Send this frame to all of them.
case *frames.PerformDisposition:
start := body.First
end := start
if body.Last != nil {
end = *body.Last
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
// find the input (remote) handle for this delivery ID.
// default to the map for local delivery IDs.
handles := inputHandleFromDeliveryID
if body.Role == encoding.RoleSender {
// the disposition frame is meant for a receiver
// so look in the map for remote delivery IDs.
handles = inputHandleFromRemoteDeliveryID
}
inputHandle, ok := handles[deliveryID]
if !ok {
debug.Log(2, "RX (Session %p): role %s: didn't find deliveryID %d in inputHandlesByDeliveryID map", s, body.Role, deliveryID)
continue
}
delete(handles, deliveryID)
if body.Settled && body.Role == encoding.RoleReceiver {
// check if settlement confirmation was requested, if so
// confirm by closing channel (RSM == ModeFirst)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
case done <- body.State:
default:
}
close(done)
}
}
// now find the *link for this input (remote) handle
link, ok := linkFromInputHandle[inputHandle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received disposition frame referencing a handle that's not in use",
}, fmt.Errorf("received disposition frame with unknown link input handle %d", inputHandle))
continue
}
s.muxFrameToLink(link, fr)
}
continue
case *frames.PerformFlow:
if body.NextIncomingID == nil {
// This is a protocol error:
// "[...] MUST be set if the peer has received
// the begin frame for the session"
closeWithError(&Error{
Condition: ErrCondNotAllowed,
Description: "next-incoming-id not set after session established",
}, errors.New("protocol error: received flow without next-incoming-id after session established"))
continue
}
// "When the endpoint receives a flow frame from its peer,
// it MUST update the next-incoming-id directly from the
// next-outgoing-id of the frame, and it MUST update the
// remote-outgoing-window directly from the outgoing-window
// of the frame."
nextIncomingID = body.NextOutgoingID
remoteOutgoingWindow = body.OutgoingWindow
// "The remote-incoming-window is computed as follows:
//
// next-incoming-id(flow) + incoming-window(flow) - next-outgoing-id(endpoint)
//
// If the next-incoming-id field of the flow frame is not set, then remote-incoming-window is computed as follows:
//
// initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)"
remoteIncomingWindow = body.IncomingWindow - nextOutgoingID
remoteIncomingWindow += *body.NextIncomingID
debug.Log(3, "RX (Session %p): flow - remoteOutgoingWindow: %d remoteIncomingWindow: %d nextOutgoingID: %d", s, remoteOutgoingWindow, remoteIncomingWindow, nextOutgoingID)
// Send to link if handle is set
if body.Handle != nil {
link, ok := linkFromInputHandle[*body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received flow frame referencing a handle that's not in use",
}, fmt.Errorf("received flow frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
continue
}
if body.Echo && !closeInProgress {
niID := nextIncomingID
resp := &frames.PerformFlow{
NextIncomingID: &niID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(&frameContext{Ctx: context.Background()}, resp)
}
case *frames.PerformAttach:
// On Attach response link should be looked up by name, then added
// to the links map with the remote's handle contained in this
// attach frame.
//
// Note body.Role is the remote peer's role, we reverse for the local key.
s.linksMu.RLock()
link, linkOk := s.linksByKey[linkKey{name: body.Name, role: !body.Role}]
s.linksMu.RUnlock()
if !linkOk {
closeWithError(&Error{
Condition: ErrCondNotAllowed,
Description: "received mismatched attach frame",
}, fmt.Errorf("protocol error: received mismatched attach frame %+v", body))
continue
}
// track the input (remote) handle number for this link.
// note that it might be a different value than our output handle.
link.inputHandle = body.Handle
linkFromInputHandle[link.inputHandle] = link
s.muxFrameToLink(link, fr)
debug.Log(1, "RX (Session %p): link %s attached, input handle %d, output handle %d", s, link.key.name, link.inputHandle, link.outputHandle)
case *frames.PerformTransfer:
s.needFlowCount++
// "Upon receiving a transfer, the receiving endpoint will
// increment the next-incoming-id to match the implicit
// transfer-id of the incoming transfer plus one, as well
// as decrementing the remote-outgoing-window, and MAY
// (depending on policy) decrement its incoming-window."
nextIncomingID++
// don't loop to intmax
if remoteOutgoingWindow > 0 {
remoteOutgoingWindow--
}
link, ok := linkFromInputHandle[body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received transfer frame referencing a handle that's not in use",
}, fmt.Errorf("received transfer frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ReceiverSettleModeSecond {
debug.Log(1, "RX (Session %p): adding handle %d to inputHandleFromRemoteDeliveryID. remote delivery ID: %d", s, body.Handle, *body.DeliveryID)
inputHandleFromRemoteDeliveryID[*body.DeliveryID] = body.Handle
}
// Update peer's outgoing window if half has been consumed.
if s.needFlowCount >= s.incomingWindow/2 && !closeInProgress {
debug.Log(3, "RX (Session %p): channel %d: flow - s.needFlowCount(%d) >= s.incomingWindow(%d)/2\n", s, s.channel, s.needFlowCount, s.incomingWindow)
s.needFlowCount = 0
nID := nextIncomingID
flow := &frames.PerformFlow{
NextIncomingID: &nID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(&frameContext{Ctx: context.Background()}, flow)
}
case *frames.PerformDetach:
link, ok := linkFromInputHandle[body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received detach frame referencing a handle that's not in use",
}, fmt.Errorf("received detach frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
// we received a detach frame and sent it to the link.
// this was either the response to a client-side initiated
// detach or our peer detached us. either way, now that
// the link has processed the frame it's detached so we
// are safe to clean up its state.
delete(linkFromInputHandle, link.inputHandle)
delete(deliveryIDFromOutputHandle, link.outputHandle)
s.deallocateHandle(link)
case *frames.PerformEnd:
// there are two possibilities:
// - this is the ack to a client-side Close()
// - the peer is ending the session so we must ack
if closeInProgress {
return
}
// peer detached us with an error, save it and send the ack
if body.Error != nil {
s.doneErr = body.Error
}
fr := frames.PerformEnd{}
s.txFrame(&frameContext{Ctx: context.Background()}, &fr)
// per spec, when end is received, we're no longer allowed to receive frames
return
default:
debug.Log(1, "RX (Session %p): unexpected frame: %s\n", s, body)
closeWithError(&Error{
Condition: ErrCondInternalError,
Description: "session received unexpected frame",
}, fmt.Errorf("internal error: unexpected frame %T", body))
}
case env := <-txTransfer:
fr := &env.Frame
// record current delivery ID
var deliveryID uint32
if fr.DeliveryID == &needsDeliveryID {
deliveryID = nextDeliveryID
fr.DeliveryID = &deliveryID
nextDeliveryID++
deliveryIDFromOutputHandle[fr.Handle] = deliveryID
if !fr.Settled {
inputHandleFromDeliveryID[deliveryID] = env.InputHandle
}
} else {
// if fr.DeliveryID is nil it must have been added
// to deliveryIDByHandle already (multi-frame transfer)
deliveryID = deliveryIDFromOutputHandle[fr.Handle]
}
// log after the delivery ID has been assigned
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
// frame has been sender-settled, remove from map.
// this should only come into play for multi-frame transfers.
if fr.Settled {
delete(inputHandleFromDeliveryID, deliveryID)
}
s.txFrame(env.FrameCtx, fr)
select {
case <-env.FrameCtx.Done:
if env.FrameCtx.Err != nil {
// transfer wasn't sent, don't update state
continue
}
// transfer was written to the network
case <-s.conn.done:
// the write failed, Conn is going down
continue
}
// if not settled, add done chan to map
if !fr.Settled && fr.Done != nil {
settlementFromDeliveryID[deliveryID] = fr.Done
} else if fr.Done != nil {
// sender-settled, close done now that the transfer has been sent
close(fr.Done)
}
// "Upon sending a transfer, the sending endpoint will increment
// its next-outgoing-id, decrement its remote-incoming-window,
// and MAY (depending on policy) decrement its outgoing-window."
nextOutgoingID++
// don't decrement if we're at 0 or we could loop to int max
if remoteIncomingWindow != 0 {
remoteIncomingWindow--
}
case env := <-tx:
fr := env.FrameBody
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
switch fr := env.FrameBody.(type) {
case *frames.PerformDisposition:
if fr.Settled && fr.Role == encoding.RoleSender {
// sender with a peer that's in mode second; sending confirmation of disposition.
// disposition frames can reference a range of delivery IDs, although it's highly
// likely in this case there will only be one.
start := fr.First
end := start
if fr.Last != nil {
end = *fr.Last
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
// send delivery state to the channel and close it to signal
// that the delivery has completed (RSM == ModeSecond)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
case done <- fr.State:
default:
}
close(done)
}
}
}
s.txFrame(env.FrameCtx, fr)
case *frames.PerformFlow:
niID := nextIncomingID
fr.NextIncomingID = &niID
fr.IncomingWindow = s.incomingWindow
fr.NextOutgoingID = nextOutgoingID
fr.OutgoingWindow = s.outgoingWindow
s.txFrame(env.FrameCtx, fr)
case *frames.PerformTransfer:
panic("transfer frames must use txTransfer")
default:
s.txFrame(env.FrameCtx, fr)
}
}
}
}
func (s *Session) allocateHandle(ctx context.Context, l *link) error {
s.linksMu.Lock()
defer s.linksMu.Unlock()
// Check if link name already exists, if so then an error should be returned
existing := s.linksByKey[l.key]
if existing != nil {
return fmt.Errorf("link with name '%v' already exists", l.key.name)
}
next, ok := s.outputHandles.Next()
if !ok {
if err := s.Close(ctx); err != nil {
return err
}
// handle numbers are zero-based, report the actual count
return &SessionError{inner: fmt.Errorf("reached session handle max (%d)", s.handleMax+1)}
}
l.outputHandle = next // allocate handle to the link
s.linksByKey[l.key] = l // add to mapping
return nil
}
func (s *Session) deallocateHandle(l *link) {
s.linksMu.Lock()
defer s.linksMu.Unlock()
delete(s.linksByKey, l.key)
s.outputHandles.Remove(l.outputHandle)
}
func (s *Session) abandonLink(l *link) {
s.abandonedLinksMu.Lock()
defer s.abandonedLinksMu.Unlock()
s.abandonedLinks = append(s.abandonedLinks, l)
}
func (s *Session) freeAbandonedLinks(ctx context.Context) error {
s.abandonedLinksMu.Lock()
defer s.abandonedLinksMu.Unlock()
debug.Log(3, "TX (Session %p): cleaning up %d abandoned links", s, len(s.abandonedLinks))
for _, l := range s.abandonedLinks {
dr := &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
if err := s.txFrameAndWait(ctx, dr); err != nil {
return err
}
}
s.abandonedLinks = nil
return nil
}
func (s *Session) muxFrameToLink(l *link, fr frames.FrameBody) {
q := l.rxQ.Acquire()
q.Enqueue(fr)
l.rxQ.Release(q)
debug.Log(2, "RX (Session %p): mux frame to link (%p): %s, %s", s, l, l.key.name, fr)
}
// transferEnvelope is used by senders to send transfer frames
type transferEnvelope struct {
FrameCtx *frameContext
// the link's remote handle
InputHandle uint32
Frame frames.PerformTransfer
}
// frameBodyEnvelope is used by senders and receivers to send frames.
type frameBodyEnvelope struct {
FrameCtx *frameContext
FrameBody frames.FrameBody
}
// the address of this var is a sentinel value indicating
// that a transfer frame is in need of a delivery ID
var needsDeliveryID uint32