Skip to content

Commit

Permalink
Remove unused use case, and fixed close ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredoconnell committed Oct 20, 2023
1 parent fdb3e59 commit 9573909
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 22 deletions.
11 changes: 8 additions & 3 deletions atp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,13 @@ func (c *client) handleStepComplete(runID string, receivedSignals chan schema.In
c.logger.Infof("Closing signal channel for finished step")
// Remove from the map to ensure that the client.Close() method doesn't double-close it
c.mutex.Lock()
delete(c.runningSignalReceiveLoops, runID)
// Validate that it exists, since Close() could have been called early.
_, exists := c.runningSignalReceiveLoops[runID]
if exists {
delete(c.runningSignalReceiveLoops, runID)
close(receivedSignals)
}
c.mutex.Unlock()
close(receivedSignals)
}
}

Expand All @@ -214,13 +218,14 @@ func (c *client) Close() error {
return nil
}
c.done = true
c.mutex.Unlock()
// First, close channels that could send signals to the clients
// This ends the loop
for runID, signalChannel := range c.runningSignalReceiveLoops {
c.logger.Infof("Closing signal channel for run ID '%s'", runID)
delete(c.runningSignalReceiveLoops, runID)
close(signalChannel)
}
c.mutex.Unlock()
// Now tell the server we're done.
// Send the client done message
if c.atpVersion > 1 {
Expand Down
20 changes: 1 addition & 19 deletions atp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"go.flow.arcalot.io/pluginsdk/schema"
"io"
"os"
"os/signal"
"sync"
"syscall"
)

// RunATPServer runs an ArcaflowTransportProtocol server with a given schema.
Expand All @@ -36,7 +34,6 @@ func RunATPServer(

type atpServerSession struct {
ctx context.Context
cancel *context.CancelFunc
wg *sync.WaitGroup
stdinCloser io.ReadCloser
cborStdin *cbor.Decoder
Expand Down Expand Up @@ -65,29 +62,14 @@ func initializeATPServerSession(
stdout io.WriteCloser,
pluginSchema *schema.CallableSchema,
) *atpServerSession {
subCtx, cancel := context.WithCancel(ctx)
workDone := make(chan ServerError, 3)
// The ATP protocol uses CBOR.
cborStdin := cbor.NewDecoder(stdin)
cborStdout := cbor.NewEncoder(stdout)
runDoneChannel := make(chan bool, 3) // Buffer to prevent it from hanging if something unexpected happens.

// Cancel the sub context on sigint or sigterm.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case <-sigs:
// Got sigterm. So cancel context.
cancel()
case <-subCtx.Done():
// Done. No sigterm.
}
}()

return &atpServerSession{
ctx: subCtx,
cancel: &cancel,
ctx: ctx,
cborStdin: cborStdin,
stdinCloser: stdin,
cborStdout: cborStdout,
Expand Down

0 comments on commit 9573909

Please sign in to comment.