From a4be125151ebe563121a465273b90be299b05e57 Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Fri, 26 Jul 2024 11:17:33 -0400 Subject: [PATCH 1/8] Pick lint --- atp/client.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/atp/client.go b/atp/client.go index a60fdad..3888660 100644 --- a/atp/client.go +++ b/atp/client.go @@ -3,7 +3,7 @@ package atp import ( "fmt" "github.com/fxamacker/cbor/v2" - log "go.arcalot.io/log/v2" + "go.arcalot.io/log/v2" "go.flow.arcalot.io/pluginsdk/schema" "io" "strings" @@ -43,6 +43,9 @@ type Client interface { } // NewClient creates a new ATP client (part of the engine code). +// Currently used only by tests in the Python- and Test-deployers. +// +//goland:noinspection GoUnusedExportedFunction func NewClient( channel ClientChannel, ) Client { @@ -137,8 +140,9 @@ func (c *client) ReadSchema() (*schema.SchemaSchema, error) { err := c.validateVersion(hello.Version) if err != nil { - c.logger.Errorf("Unsupported plugin version. %w", err) - return nil, fmt.Errorf("unsupported plugin version: %w", err) + err = fmt.Errorf("unsupported plugin version: %w", err) + c.logger.Errorf(err.Error()) + return nil, err } c.atpVersion = hello.Version @@ -326,8 +330,13 @@ func (c *client) executeWriteLoop( SignalID: signal.ID, Data: signal.InputData, }}); err != nil { - c.logger.Errorf("Client with steps '%s' failed to write signal (%s) with run id '&s' with error: %w", - c.getRunningStepIDs(), signal.ID, signal.RunID, err) + c.logger.Errorf( + "Client with steps '%s' failed to write signal (%s) with run id %q with error: %v", + c.getRunningStepIDs(), + signal.ID, + signal.RunID, + err, + ) return } c.logger.Debugf("Successfully sent signal with ID '%s' to step with run ID '%s'", signal.ID, signal.RunID) @@ -505,9 +514,9 @@ func (c *client) getResultV1( ) ExecutionResult { var doneMessage WorkDoneMessage if err := cborReader.Decode(&doneMessage); err != nil { - c.logger.Errorf("Failed to read or decode work done message: (%w) for step %s", err, stepData.ID) - return NewErrorExecutionResult( - fmt.Errorf("failed to read or decode work done message (%w) for step %s", err, stepData.ID)) + err = fmt.Errorf("failed to read or decode work done message (%w) for step %s", err, stepData.ID) + c.logger.Errorf(err.Error()) + return NewErrorExecutionResult(err) } return c.processWorkDone(stepData.RunID, doneMessage) } From 070b5a89f3928f70aa9cbe059fe34c52a60c3206 Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Fri, 26 Jul 2024 11:20:13 -0400 Subject: [PATCH 2/8] Make runningStepResultEntries send-only --- atp/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/atp/client.go b/atp/client.go index 3888660..640b701 100644 --- a/atp/client.go +++ b/atp/client.go @@ -75,7 +75,7 @@ func NewClientWithLogger( cbor.NewEncoder(channel), make(chan bool, 5), // Buffer to prevent deadlocks make([]schema.Input, 0), - make(map[string]chan schema.Input), + make(map[string]chan<- schema.Input), make(map[string]*executionEntry), make(map[string]chan<- schema.Input), sync.Mutex{}, @@ -107,7 +107,7 @@ type client struct { encoder *cbor.Encoder doneChannel chan bool runningSteps []schema.Input - runningSignalReceiveLoops map[string]chan schema.Input // Run ID to channel of signals to steps + runningSignalReceiveLoops map[string]chan<- schema.Input // Run ID to channel of signals to steps runningStepResultEntries map[string]*executionEntry // Run ID to results runningStepEmittedSignalChannels map[string]chan<- schema.Input // Run ID to channel of signals emitted from steps mutex sync.Mutex From 145fe90ab335df7006030a29547aa3a577cffc5c Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Fri, 26 Jul 2024 11:29:50 -0400 Subject: [PATCH 3/8] Remove unneeded existence check --- atp/client.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/atp/client.go b/atp/client.go index 640b701..0093d17 100644 --- a/atp/client.go +++ b/atp/client.go @@ -214,10 +214,7 @@ func (c *client) handleStepComplete(runID string, receivedSignals chan schema.In // Remove from the map to ensure that the client.Close() method doesn't double-close it c.mutex.Lock() // Validate that it exists, since Close() could have been called early. - _, exists := c.runningSignalReceiveLoops[runID] - if exists { - delete(c.runningSignalReceiveLoops, runID) - } + delete(c.runningSignalReceiveLoops, runID) c.mutex.Unlock() } } From e29d3b6b5ebd3c6ec633f7009a0d71e2009dbfdf Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Fri, 26 Jul 2024 11:43:40 -0400 Subject: [PATCH 4/8] Hoist optimization from handleStepComplete to caller and remove parameter --- atp/client.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/atp/client.go b/atp/client.go index 0093d17..865ec38 100644 --- a/atp/client.go +++ b/atp/client.go @@ -203,20 +203,20 @@ func (c *client) Execute( } c.logger.Debugf("Step '%s' started, waiting for response...", stepData.ID) - defer c.handleStepComplete(stepData.RunID, receivedSignals) + if receivedSignals != nil { + defer c.handleStepComplete(stepData.RunID) + } return c.getResult(stepData, cborReader) } // handleStepComplete is the deferred function that will handle closing of the received channel. -func (c *client) handleStepComplete(runID string, receivedSignals chan schema.Input) { - if receivedSignals != nil { - c.logger.Infof("Closing signal channel for finished step") - // Remove from the map to ensure that the client.Close() method doesn't double-close it - c.mutex.Lock() - // Validate that it exists, since Close() could have been called early. - delete(c.runningSignalReceiveLoops, runID) - c.mutex.Unlock() - } +func (c *client) handleStepComplete(runID string) { + c.logger.Infof("Closing signal channel for finished step") + // Remove from the map to ensure that the client.Close() method doesn't double-close it + c.mutex.Lock() + // Validate that it exists, since Close() could have been called early. + delete(c.runningSignalReceiveLoops, runID) + c.mutex.Unlock() } // Close Tells the client that it's done, and can stop listening for more requests. From c4cea4d886edb07fb102a599ffa5d0b8ab7017b4 Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Fri, 26 Jul 2024 19:51:57 -0400 Subject: [PATCH 5/8] Tweak comments --- atp/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/atp/client.go b/atp/client.go index 865ec38..a9949e4 100644 --- a/atp/client.go +++ b/atp/client.go @@ -209,12 +209,12 @@ func (c *client) Execute( return c.getResult(stepData, cborReader) } -// handleStepComplete is the deferred function that will handle closing of the received channel. +// handleStepComplete performs cleanup actions for a step when its execution is +// complete. Currently, this consists solely of removing its entry from the +// map of channels used to send it signals. func (c *client) handleStepComplete(runID string) { c.logger.Infof("Closing signal channel for finished step") - // Remove from the map to ensure that the client.Close() method doesn't double-close it c.mutex.Lock() - // Validate that it exists, since Close() could have been called early. delete(c.runningSignalReceiveLoops, runID) c.mutex.Unlock() } From da8683bcafdc967983f5da8cac7f2cab33150435 Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Mon, 29 Jul 2024 10:36:38 -0400 Subject: [PATCH 6/8] Move reference to handleStepComplete() to a better spot --- atp/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/atp/client.go b/atp/client.go index a9949e4..f4f4174 100644 --- a/atp/client.go +++ b/atp/client.go @@ -186,6 +186,7 @@ func (c *client) Execute( // Handle signals to the step if receivedSignals != nil { c.wg.Add(1) + defer c.handleStepComplete(stepData.RunID) go func() { defer c.wg.Done() c.executeWriteLoop(stepData.RunID, receivedSignals) @@ -203,9 +204,6 @@ func (c *client) Execute( } c.logger.Debugf("Step '%s' started, waiting for response...", stepData.ID) - if receivedSignals != nil { - defer c.handleStepComplete(stepData.RunID) - } return c.getResult(stepData, cborReader) } From e52a6548ff8f5e4736e17c42bd6991cff3c47abb Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Mon, 29 Jul 2024 10:38:41 -0400 Subject: [PATCH 7/8] Remove runningSignalReceiveLoops --- atp/client.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/atp/client.go b/atp/client.go index f4f4174..cfffb96 100644 --- a/atp/client.go +++ b/atp/client.go @@ -75,7 +75,6 @@ func NewClientWithLogger( cbor.NewEncoder(channel), make(chan bool, 5), // Buffer to prevent deadlocks make([]schema.Input, 0), - make(map[string]chan<- schema.Input), make(map[string]*executionEntry), make(map[string]chan<- schema.Input), sync.Mutex{}, @@ -107,7 +106,6 @@ type client struct { encoder *cbor.Encoder doneChannel chan bool runningSteps []schema.Input - runningSignalReceiveLoops map[string]chan<- schema.Input // Run ID to channel of signals to steps runningStepResultEntries map[string]*executionEntry // Run ID to results runningStepEmittedSignalChannels map[string]chan<- schema.Input // Run ID to channel of signals emitted from steps mutex sync.Mutex @@ -186,7 +184,6 @@ func (c *client) Execute( // Handle signals to the step if receivedSignals != nil { c.wg.Add(1) - defer c.handleStepComplete(stepData.RunID) go func() { defer c.wg.Done() c.executeWriteLoop(stepData.RunID, receivedSignals) @@ -207,16 +204,6 @@ func (c *client) Execute( return c.getResult(stepData, cborReader) } -// handleStepComplete performs cleanup actions for a step when its execution is -// complete. Currently, this consists solely of removing its entry from the -// map of channels used to send it signals. -func (c *client) handleStepComplete(runID string) { - c.logger.Infof("Closing signal channel for finished step") - c.mutex.Lock() - delete(c.runningSignalReceiveLoops, runID) - c.mutex.Unlock() -} - // Close Tells the client that it's done, and can stop listening for more requests. func (c *client) Close() error { c.mutex.Lock() @@ -298,14 +285,8 @@ func (c *client) executeWriteLoop( ) return } - // Add the channel to the client so that it can be kept track of - c.runningSignalReceiveLoops[runID] = receivedSignals c.mutex.Unlock() - defer func() { - c.mutex.Lock() - delete(c.runningSignalReceiveLoops, runID) - c.mutex.Unlock() - }() + // Looped select that gets signals for { signal, ok := <-receivedSignals From a66bd68ffcc9e485bcfcab5e26ce5f79a748271d Mon Sep 17 00:00:00 2001 From: Webb Scales Date: Mon, 29 Jul 2024 11:52:29 -0400 Subject: [PATCH 8/8] Restrict received-signals chanel type to receive-only --- atp/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/atp/client.go b/atp/client.go index cfffb96..e0c6fba 100644 --- a/atp/client.go +++ b/atp/client.go @@ -36,7 +36,7 @@ type Client interface { // ReadSchema reads the schema from the ATP server. ReadSchema() (*schema.SchemaSchema, error) // Execute executes a step with a given context and returns the resulting output. Assumes you called ReadSchema first. - Execute(input schema.Input, receivedSignals chan schema.Input, emittedSignals chan<- schema.Input) ExecutionResult + Execute(input schema.Input, receivedSignals <-chan schema.Input, emittedSignals chan<- schema.Input) ExecutionResult Close() error Encoder() *cbor.Encoder Decoder() *cbor.Decoder @@ -165,7 +165,7 @@ func (c *client) validateVersion(serverVersion int64) error { func (c *client) Execute( stepData schema.Input, - receivedSignals chan schema.Input, + receivedSignals <-chan schema.Input, emittedSignals chan<- schema.Input, ) ExecutionResult { c.logger.Debugf("Executing plugin step %s/%s...", stepData.RunID, stepData.ID) @@ -271,7 +271,7 @@ func (c *client) getRunningStepIDs() string { // Listen for received signals, and send them over ATP if available. func (c *client) executeWriteLoop( runID string, - receivedSignals chan schema.Input, + receivedSignals <-chan schema.Input, ) { c.mutex.Lock() if c.done {