Skip to content

Commit

Permalink
feat: websocket: support no response. (#355)
Browse files Browse the repository at this point in the history
Resolves #346

https://opentelemetry.io/docs/specs/opamp/#websocket-message-exchange
The specification does not require that we must respond to client messages in WebSocket, and we should support not responding to messages. This will reduce sending empty responses.
  • Loading branch information
tttoad authored Mar 5, 2025
1 parent 0173bf7 commit efd984c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 2 deletions.
8 changes: 8 additions & 0 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ func (s *server) handleWSConnection(reqCtx context.Context, wsConn *websocket.Co
}

response := connectionCallbacks.OnMessage(msgContext, agentConn, &request)
if response == nil { // No send message when 'response' is empty
continue
}

if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
}
Expand Down Expand Up @@ -369,6 +373,10 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter

response := connectionCallbacks.OnMessage(req.Context(), agentConn, &request)

if response == nil {
response = &protobufs.ServerToAgent{}
}

// Set the InstanceUid if it is not set by the callback.
if len(response.InstanceUid) == 0 {
response.InstanceUid = request.InstanceUid
Expand Down
83 changes: 83 additions & 0 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -1099,3 +1100,85 @@ func BenchmarkSendToClient(b *testing.B) {
conn.Close()
}
}

func TestServerNotResponse(t *testing.T) {
var (
rcvMsg atomic.Value
srvConn atomic.Value
)
callbacks := types.Callbacks{
OnConnecting: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{Accept: true, ConnectionCallbacks: types.ConnectionCallbacks{
OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
srvConn.Store(conn.Connection())
// Remember received message.
rcvMsg.Store(message)
return nil
},
}}
},
}

// Start a Server.
settings := &StartSettings{Settings: Settings{
Callbacks: callbacks,
}}
srv := startServer(t, settings)
defer srv.Stop(context.Background())

// Test HTTP Request
// Send a message to the Server.
sendMsg := protobufs.AgentToServer{
InstanceUid: testInstanceUid,
}
b, err := proto.Marshal(&sendMsg)
require.NoError(t, err)
resp, err := http.Post("http://"+settings.ListenEndpoint+settings.ListenPath, contentTypeProtobuf, bytes.NewReader(b))
require.NoError(t, err)

// Wait until Server receives the message.
eventually(t, func() bool { return rcvMsg.Load() != nil })

// Verify the received message is what was sent.
assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg))

// Read Server's response.
b, err = io.ReadAll(resp.Body)
require.NoError(t, err)

assert.EqualValues(t, http.StatusOK, resp.StatusCode)
assert.EqualValues(t, contentTypeProtobuf, resp.Header.Get(headerContentType))

// Decode the response.
var response protobufs.ServerToAgent
err = proto.Unmarshal(b, &response)
require.NoError(t, err)

// Verify the response.
assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid)

// Test WebSocket
// Connect using a WebSocket client.
conn, _, _ := dialClient(settings)
require.NotNil(t, conn)
defer conn.Close()

testInstanceUid2 := []byte{9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 1, 2, 3, 4, 5, 6}
// Send a message to the Server.
sendMsg = protobufs.AgentToServer{
InstanceUid: testInstanceUid2,
}
bytes, err := proto.Marshal(&sendMsg)
require.NoError(t, err)
err = conn.WriteMessage(websocket.BinaryMessage, bytes)
require.NoError(t, err)

// Wait until Server receives the message.
eventually(t, func() bool { return rcvMsg.Load() != nil })
assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg))
require.NoError(t, srvConn.Load().(net.Conn).Close())

// Read Server's response.
_, _, err = conn.ReadMessage()
require.True(t, websocket.IsCloseError(err, websocket.CloseAbnormalClosure))
}
6 changes: 4 additions & 2 deletions server/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ type ConnectionCallbacks struct {
OnConnected func(ctx context.Context, conn Connection)

// OnMessage is called when a message is received from the connection. Can happen
// only after OnConnected(). Must return a ServerToAgent message that will be sent
// as a response to the Agent.
// only after OnConnected().
// When the returned ServerToAgent message is nil, WebSocket will not send a
// message to the Agent, and the HTTP request will respond to an empty message.
// If the return is not nil it will be sent as a response to the Agent.
// For plain HTTP requests once OnMessage returns and the response is sent
// to the Agent the OnConnectionClose message will be called immediately.
OnMessage func(ctx context.Context, conn Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent
Expand Down

0 comments on commit efd984c

Please sign in to comment.