Skip to content

Commit

Permalink
SendReceipt feature (#352)
Browse files Browse the repository at this point in the history
This adds method SendWithReceipt that's used to delay confirmation of
message settlement and retrieval of the final delivery state.
Moved DeliveryState interface and implementing types to
exported_types.go as they're now public surface area.
Implemented fmt.Stringer on StateReceived type.
Slightly changed the value returned by String() on the other delivery
types to use the type's full name
  • Loading branch information
jhendrixMSFT authored Dec 4, 2024
1 parent 907d06c commit 2048490
Show file tree
Hide file tree
Showing 7 changed files with 570 additions and 205 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release History

## 1.4.0-beta.1 (Unreleased)

### Features Added

* Added `Sender` support for delayed confirmation of message settlement and retrieval of delivery state.
* `Sender.SendWithReceipt` sends a message and returns a `SendReceipt`.
* `SendReceipt.Wait` waits for confirmation of settlement and returns the message's delivery state.
* The `DeliveryState` interface encapsulates concrete delivery outcomes `StateAccepted`, `StateModified`, `StateRejected`, `StateReleased` and
non-terminal delivery state `StateReceived`.

## 1.3.0 (2024-12-03)

### Features Added
Expand Down
33 changes: 33 additions & 0 deletions delivery_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package amqp

import "github.com/Azure/go-amqp/internal/encoding"

// DeliveryState encapsulates the various concrete delivery states.
// Use a type switch to determine the concrete delivery state.
// - *StateAccepted
// - *StateModified
// - *StateReceived
// - *StateRejected
// - *StateReleased
type DeliveryState = encoding.DeliveryState

// StateAccepted indicates that an incoming message has been successfully processed,
// and that the receiver of the message is expecting the sender to transition the
// delivery to the accepted state at the source.
type StateAccepted = encoding.StateAccepted

// StateModifies indicates that a given transfer was not and will not be acted upon,
// and that the message SHOULD be modified in the specified ways at the node.
type StateModified = encoding.StateModified

// StateReceived indicates the furthest point in the payload of the message which the
// target will not need to have resent if the link is resumed.
type StateReceived = encoding.StateReceived

// StateRejected indicates that an incoming message is invalid and therefore unprocessable.
// The rejected outcome when applied to a message will cause the delivery-count to be
// incremented in the header of the rejected message.
type StateRejected = encoding.StateRejected

// StateReleased indicates that a given transfer was not and will not be acted upon.
type StateReleased = encoding.StateReleased
58 changes: 58 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,61 @@ func ExampleConn_Done() {
}
}
}

func ExampleSender_SendWithReceipt() {
ctx := context.TODO()

// create connection
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
})
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer conn.Close()

// open a session
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatal("Creating AMQP session:", err)
}

// create a sender
sender, err := session.NewSender(ctx, "/queue-name", nil)
if err != nil {
log.Fatal("Creating sender link:", err)
}

// send message
receipt, err := sender.SendWithReceipt(ctx, amqp.NewMessage([]byte("Hello!")), nil)
if err != nil {
log.Fatal("Sending message:", err)
}

// wait for confirmation of settlement
state, err := receipt.Wait(ctx)
if err != nil {
log.Fatal("Wait on receipt:", err)
}

// determine how the peer settled the message
switch stateType := state.(type) {
case *amqp.StateAccepted:
// message was accepted, no further action is required
case *amqp.StateModified:
// message must be modified and resent before it can be processed.
// the values in stateType provide further context.
case *amqp.StateReceived:
// see the fields in [StateReceived] for information on
// how to handle this delivery state.
case *amqp.StateRejected:
// the peer rejected the message
if stateType.Error != nil {
// the error will provide information about why the
// message was rejected. note that the peer isn't required
// to provide an error.
}
case *amqp.StateReleased:
// message was not and will not be acted upon
}
}
218 changes: 218 additions & 0 deletions internal/encoding/exported_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,221 @@ func (t DescribedType) String() string {
t.Value,
)
}

// DeliveryState encapsulates the various concrete delivery states.
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-delivery-state
// TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#type-declared
type DeliveryState interface {
deliveryState() // marker method
}

/*
<type name="received" class="composite" source="list" provides="delivery-state">
<descriptor name="amqp:received:list" code="0x00000000:0x00000023"/>
<field name="section-number" type="uint" mandatory="true"/>
<field name="section-offset" type="ulong" mandatory="true"/>
</type>
*/

// StateReceived indicates the furthest point in the payload of the message which the
// target will not need to have resent if the link is resumed.
type StateReceived struct {
// When sent by the sender this indicates the first section of the message
// (with section-number 0 being the first section) for which data can be resent.
// Data from sections prior to the given section cannot be retransmitted for
// this delivery.
//
// When sent by the receiver this indicates the first section of the message
// for which all data might not yet have been received.
SectionNumber uint32

// When sent by the sender this indicates the first byte of the encoded section
// data of the section given by section-number for which data can be resent
// (with section-offset 0 being the first byte). Bytes from the same section
// prior to the given offset section cannot be retransmitted for this delivery.
//
// When sent by the receiver this indicates the first byte of the given section
// which has not yet been received. Note that if a receiver has received all of
// section number X (which contains N bytes of data), but none of section number
// X + 1, then it can indicate this by sending either Received(section-number=X,
// section-offset=N) or Received(section-number=X+1, section-offset=0). The state
// Received(section-number=0, section-offset=0) indicates that no message data
// at all has been transferred.
SectionOffset uint64
}

func (sr *StateReceived) deliveryState() {}

// Marshal encodes this type into a buffer. It is not intended for public use.
func (sr *StateReceived) Marshal(wr *buffer.Buffer) error {
return MarshalComposite(wr, TypeCodeStateReceived, []MarshalField{
{Value: &sr.SectionNumber, Omit: false},
{Value: &sr.SectionOffset, Omit: false},
})
}

// Unmarshal decodes a buffer into this type. It is not intended for public use.
func (sr *StateReceived) Unmarshal(r *buffer.Buffer) error {
return UnmarshalComposite(r, TypeCodeStateReceived, []UnmarshalField{
{Field: &sr.SectionNumber, HandleNull: func() error { return errors.New("StateReceiver.SectionNumber is required") }},
{Field: &sr.SectionOffset, HandleNull: func() error { return errors.New("StateReceiver.SectionOffset is required") }},
}...)
}

// String implements the [fmt.Stringer] interface.
// Note that the values are for diagnostic purposes and may change over time.
func (sr *StateReceived) String() string {
return fmt.Sprintf("StateReceived{SectionNumber : %d, SectionOffset: %d}", sr.SectionNumber, sr.SectionOffset)
}

/*
<type name="accepted" class="composite" source="list" provides="delivery-state, outcome">
<descriptor name="amqp:accepted:list" code="0x00000000:0x00000024"/>
</type>
*/

// StateAccepted indicates that an incoming message has been successfully processed,
// and that the receiver of the message is expecting the sender to transition the
// delivery to the accepted state at the source.
type StateAccepted struct{}

func (sr *StateAccepted) deliveryState() {}

// Marshal encodes this type into a buffer. It is not intended for public use.
func (sa *StateAccepted) Marshal(wr *buffer.Buffer) error {
return MarshalComposite(wr, TypeCodeStateAccepted, nil)
}

// Unmarshal decodes a buffer into this type. It is not intended for public use.
func (sa *StateAccepted) Unmarshal(r *buffer.Buffer) error {
return UnmarshalComposite(r, TypeCodeStateAccepted)
}

// String implements the [fmt.Stringer] interface.
// Note that the values are for diagnostic purposes and may change over time.
func (sa *StateAccepted) String() string {
return "StateAccepted{}"
}

/*
<type name="rejected" class="composite" source="list" provides="delivery-state, outcome">
<descriptor name="amqp:rejected:list" code="0x00000000:0x00000025"/>
<field name="error" type="error"/>
</type>
*/

// StateRejected indicates that an incoming message is invalid and therefore unprocessable.
// The rejected outcome when applied to a message will cause the delivery-count to be
// incremented in the header of the rejected message.
type StateRejected struct {
Error *Error
}

func (sr *StateRejected) deliveryState() {}

// Marshal encodes this type into a buffer. It is not intended for public use.
func (sr *StateRejected) Marshal(wr *buffer.Buffer) error {
return MarshalComposite(wr, TypeCodeStateRejected, []MarshalField{
{Value: sr.Error, Omit: sr.Error == nil},
})
}

// Unmarshal decodes a buffer into this type. It is not intended for public use.
func (sr *StateRejected) Unmarshal(r *buffer.Buffer) error {
return UnmarshalComposite(r, TypeCodeStateRejected,
UnmarshalField{Field: &sr.Error},
)
}

// String implements the [fmt.Stringer] interface.
// Note that the values are for diagnostic purposes and may change over time.
func (sr *StateRejected) String() string {
return fmt.Sprintf("StateRejected{Error: %v}", sr.Error)
}

/*
<type name="released" class="composite" source="list" provides="delivery-state, outcome">
<descriptor name="amqp:released:list" code="0x00000000:0x00000026"/>
</type>
*/

// StateReleased indicates that a given transfer was not and will not be acted upon.
type StateReleased struct{}

func (sr *StateReleased) deliveryState() {}

// Marshal encodes this type into a buffer. It is not intended for public use.
func (sr *StateReleased) Marshal(wr *buffer.Buffer) error {
return MarshalComposite(wr, TypeCodeStateReleased, nil)
}

// Unmarshal decodes a buffer into this type. It is not intended for public use.
func (sr *StateReleased) Unmarshal(r *buffer.Buffer) error {
return UnmarshalComposite(r, TypeCodeStateReleased)
}

// String implements the [fmt.Stringer] interface.
// Note that the values are for diagnostic purposes and may change over time.
func (sr *StateReleased) String() string {
return "StateReleased{}"
}

/*
<type name="modified" class="composite" source="list" provides="delivery-state, outcome">
<descriptor name="amqp:modified:list" code="0x00000000:0x00000027"/>
<field name="delivery-failed" type="boolean"/>
<field name="undeliverable-here" type="boolean"/>
<field name="message-annotations" type="fields"/>
</type>
*/

// StateModifies indicates that a given transfer was not and will not be acted upon,
// and that the message SHOULD be modified in the specified ways at the node.
type StateModified struct {
// count the transfer as an unsuccessful delivery attempt
//
// If the delivery-failed flag is set, any messages modified
// MUST have their delivery-count incremented.
DeliveryFailed bool

// prevent redelivery
//
// If the undeliverable-here is set, then any messages released MUST NOT
// be redelivered to the modifying link endpoint.
UndeliverableHere bool

// message attributes
// Map containing attributes to combine with the existing message-annotations
// held in the message's header section. Where the existing message-annotations
// of the message contain an entry with the same key as an entry in this field,
// the value in this field associated with that key replaces the one in the
// existing headers; where the existing message-annotations has no such value,
// the value in this map is added.
MessageAnnotations Annotations
}

func (sr *StateModified) deliveryState() {}

// Marshal encodes this type into a buffer. It is not intended for public use.
func (sm *StateModified) Marshal(wr *buffer.Buffer) error {
return MarshalComposite(wr, TypeCodeStateModified, []MarshalField{
{Value: &sm.DeliveryFailed, Omit: !sm.DeliveryFailed},
{Value: &sm.UndeliverableHere, Omit: !sm.UndeliverableHere},
{Value: sm.MessageAnnotations, Omit: sm.MessageAnnotations == nil},
})
}

// Unmarshal decodes a buffer into this type. It is not intended for public use.
func (sm *StateModified) Unmarshal(r *buffer.Buffer) error {
return UnmarshalComposite(r, TypeCodeStateModified, []UnmarshalField{
{Field: &sm.DeliveryFailed},
{Field: &sm.UndeliverableHere},
{Field: &sm.MessageAnnotations},
}...)
}

// String implements the [fmt.Stringer] interface.
// Note that the values are for diagnostic purposes and may change over time.
func (sm *StateModified) String() string {
return fmt.Sprintf("StateModified{DeliveryFailed: %t, UndeliverableHere: %t, MessageAnnotations: %v}", sm.DeliveryFailed, sm.UndeliverableHere, sm.MessageAnnotations)
}
Loading

0 comments on commit 2048490

Please sign in to comment.