diff --git a/atp/client.go b/atp/client.go index b77ea98..0d0aa7d 100644 --- a/atp/client.go +++ b/atp/client.go @@ -201,9 +201,13 @@ func (c *client) handleStepComplete(runID string, receivedSignals chan schema.In c.logger.Infof("Closing signal channel for finished step") // Remove from the map to ensure that the client.Close() method doesn't double-close it c.mutex.Lock() - delete(c.runningSignalReceiveLoops, runID) + // Validate that it exists, since Close() could have been called early. + _, exists := c.runningSignalReceiveLoops[runID] + if exists { + delete(c.runningSignalReceiveLoops, runID) + close(receivedSignals) + } c.mutex.Unlock() - close(receivedSignals) } } @@ -214,13 +218,14 @@ func (c *client) Close() error { return nil } c.done = true - c.mutex.Unlock() // First, close channels that could send signals to the clients // This ends the loop for runID, signalChannel := range c.runningSignalReceiveLoops { c.logger.Infof("Closing signal channel for run ID '%s'", runID) + delete(c.runningSignalReceiveLoops, runID) close(signalChannel) } + c.mutex.Unlock() // Now tell the server we're done. // Send the client done message if c.atpVersion > 1 { diff --git a/atp/server.go b/atp/server.go index 8dd2396..2ac6c3d 100644 --- a/atp/server.go +++ b/atp/server.go @@ -7,9 +7,7 @@ import ( "go.flow.arcalot.io/pluginsdk/schema" "io" "os" - "os/signal" "sync" - "syscall" ) // RunATPServer runs an ArcaflowTransportProtocol server with a given schema. @@ -36,7 +34,6 @@ func RunATPServer( type atpServerSession struct { ctx context.Context - cancel *context.CancelFunc wg *sync.WaitGroup stdinCloser io.ReadCloser cborStdin *cbor.Decoder @@ -65,29 +62,14 @@ func initializeATPServerSession( stdout io.WriteCloser, pluginSchema *schema.CallableSchema, ) *atpServerSession { - subCtx, cancel := context.WithCancel(ctx) workDone := make(chan ServerError, 3) // The ATP protocol uses CBOR. cborStdin := cbor.NewDecoder(stdin) cborStdout := cbor.NewEncoder(stdout) runDoneChannel := make(chan bool, 3) // Buffer to prevent it from hanging if something unexpected happens. - // Cancel the sub context on sigint or sigterm. - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - select { - case <-sigs: - // Got sigterm. So cancel context. - cancel() - case <-subCtx.Done(): - // Done. No sigterm. - } - }() - return &atpServerSession{ - ctx: subCtx, - cancel: &cancel, + ctx: ctx, cborStdin: cborStdin, stdinCloser: stdin, cborStdout: cborStdout,