Skip to content

Commit

Permalink
Code cleanup for command checkins (#3376)
Browse files Browse the repository at this point in the history
* Small cleanups

* more cleanups
  • Loading branch information
faec authored Sep 21, 2023
1 parent 7e86d24 commit 135ea2b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 70 deletions.
38 changes: 26 additions & 12 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,31 @@ type commandRuntime struct {
current component.Component
monitor MonitoringManager

ch chan ComponentState
// ch is the reporting channel for the current state. When a policy change
// or client checkin affects the component state, its new value is sent
// here and handled by (componentRuntimeState).runLoop.
ch chan ComponentState

// When the managed process closes, its termination status is sent on procCh
// by the watching goroutine, and handled by (*commandRuntime).Run.
procCh chan procState

// compCh forwards new component metadata from the runtime manager to
// the command runtime. It is written by calls to (*commandRuntime).Update
// and read by the run loop in (*commandRuntime).Run.
compCh chan component.Component

// The most recent mode received on actionCh. The mode will be either
// actionStart (indicating the process should be running, and should be
// created if it is not), or actionStop or actionTeardown (indicating that
// it should terminate).
actionState actionMode

// actionState is changed by sending its new value on actionCh, where it is
// handled by (*commandRuntime).Run.
actionCh chan actionMode
procCh chan procState
compCh chan component.Component

actionState actionMode
proc *process.Info
proc *process.Info

state ComponentState
lastCheckin time.Time
Expand Down Expand Up @@ -204,14 +222,10 @@ func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error {
} else {
// running and should be running
now := time.Now().UTC()
if c.lastCheckin.IsZero() {
// never checked-in
c.missedCheckins++
} else if now.Sub(c.lastCheckin) > checkinPeriod {
// missed check-in during required period
c.missedCheckins++
} else if now.Sub(c.lastCheckin) <= checkinPeriod {
if now.Sub(c.lastCheckin) <= checkinPeriod {
c.missedCheckins = 0
} else {
c.missedCheckins++
}
if c.missedCheckins == 0 {
c.compState(client.UnitStateHealthy)
Expand Down
16 changes: 4 additions & 12 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (m *Manager) State() []ComponentComponentState {
defer m.currentMx.RUnlock()
states := make([]ComponentComponentState, 0, len(m.current))
for _, crs := range m.current {
crs.latestMx.RLock()
var legacyPID string
if crs.runtime != nil {
if commandRuntime, ok := crs.runtime.(*commandRuntime); ok {
Expand All @@ -282,10 +281,9 @@ func (m *Manager) State() []ComponentComponentState {
}
states = append(states, ComponentComponentState{
Component: crs.getCurrent(),
State: crs.latestState.Copy(),
State: crs.getLatest(),
LegacyPID: legacyPID,
})
crs.latestMx.RUnlock()
}
return states
}
Expand Down Expand Up @@ -484,9 +482,7 @@ func (m *Manager) Subscribe(ctx context.Context, componentID string) *Subscripti
comp, ok := m.current[componentID]
m.currentMx.RUnlock()
if ok {
comp.latestMx.RLock()
latestState := comp.latestState.Copy()
comp.latestMx.RUnlock()
latestState := comp.getLatest()
go func() {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -532,9 +528,7 @@ func (m *Manager) SubscribeAll(ctx context.Context) *SubscriptionAll {
m.currentMx.RLock()
latest := make([]ComponentComponentState, 0, len(m.current))
for _, comp := range m.current {
comp.latestMx.RLock()
latest = append(latest, ComponentComponentState{Component: comp.getCurrent(), State: comp.latestState.Copy()})
comp.latestMx.RUnlock()
latest = append(latest, ComponentComponentState{Component: comp.getCurrent(), State: comp.getLatest()})
}
m.currentMx.RUnlock()
if len(latest) > 0 {
Expand Down Expand Up @@ -759,9 +753,7 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) {

timeoutCh := time.After(timeout)
for {
comp.latestMx.RLock()
latestState := comp.latestState
comp.latestMx.RUnlock()
latestState := comp.getLatest()
if latestState.State == client.UnitStateStopped {
return
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/component/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (s *componentRuntimeState) setCurrent(current component.Component) {
s.currCompMx.Unlock()
}

func (s *componentRuntimeState) getLatest() ComponentState {
s.latestMx.RLock()
defer s.latestMx.RUnlock()
return s.latestState.Copy()
}

func (s *componentRuntimeState) start() error {
return s.runtime.Start()
}
Expand Down
87 changes: 41 additions & 46 deletions pkg/component/runtime/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func newComponentState(comp *component.Component) (s ComponentState) {
s.expectedFeaturesIdx = 1
s.expectedComponentIdx = 1

s.syncComponent(comp)
// Merge initial component state.
s.syncExpected(comp)
s.syncUnits(comp)

return s
}

Expand Down Expand Up @@ -131,15 +134,6 @@ func (s *ComponentState) Copy() (c ComponentState) {
return c
}

func (s *ComponentState) syncComponent(comp *component.Component) bool {
changed := s.syncExpected(comp)
s.syncUnits(comp)
if changed {
return true
}
return s.unsettled()
}

func (s *ComponentState) syncExpected(comp *component.Component) bool {
changed := false
touched := make(map[ComponentUnitKey]bool)
Expand All @@ -151,34 +145,34 @@ func (s *ComponentState) syncExpected(comp *component.Component) bool {
}

touched[key] = true
existing, ok := s.expectedUnits[key]
expected, ok := s.expectedUnits[key]
if ok {
if existing.logLevel != unit.LogLevel {
existing.logLevel = unit.LogLevel
if expected.logLevel != unit.LogLevel {
expected.logLevel = unit.LogLevel
changed = true
}
if !gproto.Equal(existing.config, unit.Config) {
existing.config = unit.Config
existing.configStateIdx++
if !gproto.Equal(expected.config, unit.Config) {
expected.config = unit.Config
expected.configStateIdx++
changed = true
}
} else {
existing.state = client.UnitStateHealthy
existing.logLevel = unit.LogLevel
existing.config = unit.Config
existing.configStateIdx = 1
expected.state = client.UnitStateHealthy
expected.logLevel = unit.LogLevel
expected.config = unit.Config
expected.configStateIdx = 1
changed = true
}

if !errors.Is(existing.err, unit.Err) {
existing.err = unit.Err
if existing.err != nil {
existing.state = client.UnitStateFailed
if !errors.Is(expected.err, unit.Err) {
expected.err = unit.Err
if expected.err != nil {
expected.state = client.UnitStateFailed
}
changed = true
}

s.expectedUnits[key] = existing
s.expectedUnits[key] = expected
}

for key, unit := range s.expectedUnits {
Expand Down Expand Up @@ -320,27 +314,28 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool {
}

for key, unit := range s.Units {
_, ok := touched[key]
if !ok {
unit.unitState = client.UnitStateStarting
unit.unitMessage = ""
unit.unitPayload = nil
unit.configStateIdx = 0
if unit.err != nil {
errMsg := unit.err.Error()
if unit.State != client.UnitStateFailed || unit.Message != errMsg || diffPayload(unit.Payload, nil) {
changed = true
unit.State = client.UnitStateFailed
unit.Message = errMsg
unit.Payload = nil
}
} else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped {
if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) {
changed = true
unit.State = client.UnitStateFailed
unit.Message = missingMsg
unit.Payload = nil
}
// Look for units that weren't in the checkin.
if _, ok := touched[key]; ok {
continue
}
unit.unitState = client.UnitStateStarting
unit.unitMessage = ""
unit.unitPayload = nil
unit.configStateIdx = 0
if unit.err != nil {
errMsg := unit.err.Error()
if unit.State != client.UnitStateFailed || unit.Message != errMsg || diffPayload(unit.Payload, nil) {
changed = true
unit.State = client.UnitStateFailed
unit.Message = errMsg
unit.Payload = nil
}
} else if unit.State != client.UnitStateStarting && unit.State != client.UnitStateStopped {
if unit.State != client.UnitStateFailed || unit.Message != missingMsg || diffPayload(unit.Payload, nil) {
changed = true
unit.State = client.UnitStateFailed
unit.Message = missingMsg
unit.Payload = nil
}
}

Expand Down

0 comments on commit 135ea2b

Please sign in to comment.