Skip to content

Commit

Permalink
Remove runningSignalReceiveLoops
Browse files Browse the repository at this point in the history
  • Loading branch information
webbnh committed Jul 29, 2024
1 parent da8683b commit e52a654
Showing 1 changed file with 1 addition and 20 deletions.
21 changes: 1 addition & 20 deletions atp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func NewClientWithLogger(
cbor.NewEncoder(channel),
make(chan bool, 5), // Buffer to prevent deadlocks
make([]schema.Input, 0),
make(map[string]chan<- schema.Input),
make(map[string]*executionEntry),
make(map[string]chan<- schema.Input),
sync.Mutex{},
Expand Down Expand Up @@ -107,7 +106,6 @@ type client struct {
encoder *cbor.Encoder
doneChannel chan bool
runningSteps []schema.Input
runningSignalReceiveLoops map[string]chan<- schema.Input // Run ID to channel of signals to steps
runningStepResultEntries map[string]*executionEntry // Run ID to results
runningStepEmittedSignalChannels map[string]chan<- schema.Input // Run ID to channel of signals emitted from steps
mutex sync.Mutex
Expand Down Expand Up @@ -186,7 +184,6 @@ func (c *client) Execute(
// Handle signals to the step
if receivedSignals != nil {
c.wg.Add(1)
defer c.handleStepComplete(stepData.RunID)
go func() {
defer c.wg.Done()
c.executeWriteLoop(stepData.RunID, receivedSignals)
Expand All @@ -207,16 +204,6 @@ func (c *client) Execute(
return c.getResult(stepData, cborReader)
}

// handleStepComplete performs cleanup actions for a step when its execution is
// complete. Currently, this consists solely of removing its entry from the
// map of channels used to send it signals.
func (c *client) handleStepComplete(runID string) {
c.logger.Infof("Closing signal channel for finished step")
c.mutex.Lock()
delete(c.runningSignalReceiveLoops, runID)
c.mutex.Unlock()
}

// Close Tells the client that it's done, and can stop listening for more requests.
func (c *client) Close() error {
c.mutex.Lock()
Expand Down Expand Up @@ -298,14 +285,8 @@ func (c *client) executeWriteLoop(
)
return
}
// Add the channel to the client so that it can be kept track of
c.runningSignalReceiveLoops[runID] = receivedSignals
c.mutex.Unlock()
defer func() {
c.mutex.Lock()
delete(c.runningSignalReceiveLoops, runID)
c.mutex.Unlock()
}()

// Looped select that gets signals
for {
signal, ok := <-receivedSignals
Expand Down

0 comments on commit e52a654

Please sign in to comment.