Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8293] Pass in Connection to Answerer in WebRTC Server #410

Merged
merged 19 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion rpc/dial_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type dialOptions struct {
// interceptors
unaryInterceptor grpc.UnaryClientInterceptor
streamInterceptor grpc.StreamClientInterceptor

// signalingConn can be used to force the webrtcSignalingAnswerer to use a preexisting connection instead of dialing and managing its own.
signalingConn ClientConn
}

// DialMulticastDNSOptions dictate any special settings to apply while dialing via mDNS.
Expand Down Expand Up @@ -265,7 +268,10 @@ func WithUnaryClientInterceptor(interceptor grpc.UnaryClientInterceptor) DialOpt
func WithStreamClientInterceptor(interceptor grpc.StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
if o.streamInterceptor != nil {
o.streamInterceptor = grpc_middleware.ChainStreamClient(o.streamInterceptor, interceptor)
o.streamInterceptor = grpc_middleware.ChainStreamClient(
o.streamInterceptor,
interceptor,
)
} else {
o.streamInterceptor = interceptor
}
Expand All @@ -280,3 +286,11 @@ func WithForceDirectGRPC() DialOption {
o.disableDirect = false
})
}

// WithSignalingConn provides a preexisting connection to use. This option forces the webrtcSignalingAnswerer to not dial or manage
// a connection.
func WithSignalingConn(signalingConn ClientConn) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.signalingConn = signalingConn
})
}
2 changes: 2 additions & 0 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
if !sOpts.unauthenticated {
answererDialOpts = append(answererDialOpts, WithEntityCredentials(server.internalUUID, server.internalCreds))
}
// this answerer uses an internal signaling server that runs locally as a separate process and so does not get a shared
// connection to App as a dial option
server.webrtcAnswerers = append(server.webrtcAnswerers, newWebRTCSignalingAnswerer(
address,
internalSignalingHosts,
Expand Down
30 changes: 23 additions & 7 deletions rpc/wrtc_signaling_answerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ type webrtcSignalingAnswerer struct {

// conn is used to share the direct gRPC connection used by the answerer workers. As direct gRPC connections
// reconnect on their own, custom reconnect logic is not needed. However, keepalives are necessary for the connection
// to realize it's been disconnected quickly and start reconnecting.
connMu sync.Mutex
conn ClientConn
// to realize it's been disconnected quickly and start reconnecting. conn can be set to a pre-existing gRPC connection that's used by
// other consumers via a dial option. In this scenario, sharedConn will be true, and the answerer will not attempt to establish a new
// connection to the signaling server. If this option is not set, the answerer will oversee the lifecycle of its own connection by
// continuously dialing in the background until a successful connection emerges and closing said connection when done. In the shared
// connection case, the answerer will not close the connection. connMu sync.Mutex
connMu sync.Mutex
conn ClientConn
sharedConn bool

logger utils.ZapCompatibleLogger
}
Expand All @@ -61,8 +66,12 @@ func newWebRTCSignalingAnswerer(
dialOptsCopy := make([]DialOption, len(dialOpts))
copy(dialOptsCopy, dialOpts)
dialOptsCopy = append(dialOptsCopy, WithWebRTCOptions(DialWebRTCOptions{Disable: true}))
options := &dialOptions{}
for _, opt := range dialOptsCopy {
opt.apply(options)
}
bgWorkers := utils.NewBackgroundStoppableWorkers()
return &webrtcSignalingAnswerer{
ans := &webrtcSignalingAnswerer{
address: address,
hosts: hosts,
server: server,
Expand All @@ -71,6 +80,11 @@ func newWebRTCSignalingAnswerer(
bgWorkers: bgWorkers,
logger: logger,
}
if options.signalingConn != nil {
ans.conn = options.signalingConn
ans.sharedConn = true
}
return ans
}

const (
Expand Down Expand Up @@ -260,9 +274,11 @@ func (ans *webrtcSignalingAnswerer) Stop() {
ans.connMu.Lock()
defer ans.connMu.Unlock()
if ans.conn != nil {
err := ans.conn.Close()
if isNetworkError(err) {
ans.logger.Errorw("error closing signaling connection", "error", err)
if !ans.sharedConn {
err := ans.conn.Close()
if isNetworkError(err) {
ans.logger.Errorw("error closing signaling connection", "error", err)
}
}
ans.conn = nil
}
Expand Down
Loading