Skip to content

Commit

Permalink
Fix inventory controller tests after adding additional ping (#49663)
Browse files Browse the repository at this point in the history
* Fix inventory controller tests after adding additional ping

* Fix TestTimeReconciliation
Remove unused code
  • Loading branch information
vapopov authored Dec 3, 2024
1 parent 23b99b5 commit 088cc60
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 84 deletions.
4 changes: 2 additions & 2 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const (
instanceHeartbeatOk testEvent = "instance-heartbeat-ok"
instanceHeartbeatErr testEvent = "instance-heartbeat-err"

timeReconciliationOk testEvent = "time-reconciliation-ok"
pongOk testEvent = "pong-ok"

instanceCompareFailed testEvent = "instance-compare-failed"

Expand Down Expand Up @@ -517,7 +517,6 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
handle.CloseWithError(err)
return
}
c.testEvent(timeReconciliationOk)

case now := <-dbKeepAliveDelay.Elapsed():
dbKeepAliveDelay.Advance(now)
Expand Down Expand Up @@ -631,6 +630,7 @@ func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInvent

pending.rspC <- pong
delete(handle.pings, msg.ID)
c.testEvent(pongOk)
}

func (c *Controller) handlePingRequest(handle *upstreamHandle, req pingRequest) error {
Expand Down
145 changes: 87 additions & 58 deletions lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,27 @@ func TestSSHServerBasics(t *testing.T) {

// set up fake in-memory control stream
upstream, downstream := client.InventoryControlStreamPipe(client.ICSPipePeerAddr(peerAddr))
t.Cleanup(func() {
controller.Close()
downstream.Close()
upstream.Close()
})

// launch goroutine to respond to ping requests
go func() {
for {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
return
case <-ctx.Done():
return
}
}
}()

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Expand Down Expand Up @@ -256,18 +277,6 @@ func TestSSHServerBasics(t *testing.T) {
deny(sshKeepAliveErr, handlerClose),
)

// launch goroutine to respond to a single ping
go func() {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
case <-ctx.Done():
}
}()

// limit time of ping call
pingCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
Expand Down Expand Up @@ -357,6 +366,27 @@ func TestAppServerBasics(t *testing.T) {

// set up fake in-memory control stream
upstream, downstream := client.InventoryControlStreamPipe()
t.Cleanup(func() {
controller.Close()
upstream.Close()
downstream.Close()
})

// launch goroutine to respond to ping requests
go func() {
for {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
return
case <-ctx.Done():
return
}
}
}()

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Expand Down Expand Up @@ -443,18 +473,6 @@ func TestAppServerBasics(t *testing.T) {
deny(appKeepAliveErr, handlerClose),
)

// launch goroutine to respond to a single ping
go func() {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
case <-ctx.Done():
}
}()

// limit time of ping call
pingCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
Expand Down Expand Up @@ -575,6 +593,27 @@ func TestDatabaseServerBasics(t *testing.T) {

// set up fake in-memory control stream
upstream, downstream := client.InventoryControlStreamPipe()
t.Cleanup(func() {
controller.Close()
upstream.Close()
downstream.Close()
})

// launch goroutine to respond to ping requests
go func() {
for {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
return
case <-ctx.Done():
return
}
}
}()

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Expand Down Expand Up @@ -662,18 +701,6 @@ func TestDatabaseServerBasics(t *testing.T) {
deny(dbKeepAliveErr, handlerClose),
)

// launch goroutine to respond to a single ping
go func() {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
case <-ctx.Done():
}
}()

// limit time of ping call
pingCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
Expand Down Expand Up @@ -1189,6 +1216,21 @@ func TestKubernetesServerBasics(t *testing.T) {

// set up fake in-memory control stream
upstream, downstream := client.InventoryControlStreamPipe()
// launch goroutine to respond to ping requests
go func() {
for {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
return
case <-ctx.Done():
return
}
}
}()

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Expand Down Expand Up @@ -1277,18 +1319,6 @@ func TestKubernetesServerBasics(t *testing.T) {
deny(kubeKeepAliveErr, handlerClose),
)

// launch goroutine to respond to a single ping
go func() {
select {
case msg := <-downstream.Recv():
downstream.Send(ctx, proto.UpstreamInventoryPong{
ID: msg.(proto.DownstreamInventoryPing).ID,
})
case <-downstream.Done():
case <-ctx.Done():
}
}()

// limit time of ping call
pingCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
Expand Down Expand Up @@ -1473,12 +1503,6 @@ func TestTimeReconciliation(t *testing.T) {
cancel()
})

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode},
})

// Launch goroutine to respond to clock request.
go func() {
for {
Expand All @@ -1488,7 +1512,6 @@ func TestTimeReconciliation(t *testing.T) {
ID: msg.(proto.DownstreamInventoryPing).ID,
SystemClock: clock.Now().Add(-time.Minute).UTC(),
})
return
case <-downstream.Done():
return
case <-ctx.Done():
Expand All @@ -1497,19 +1520,25 @@ func TestTimeReconciliation(t *testing.T) {
}
}()

controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode},
})

_, ok := controller.GetControlStream(serverID)
require.True(t, ok)

awaitEvents(t, events,
expect(timeReconciliationOk),
)
awaitEvents(t, events, expect(pongOk))
awaitEvents(t, events,
expect(instanceHeartbeatOk),
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)
auth.mu.Lock()
m := auth.lastInstance.GetLastMeasurement()
auth.mu.Unlock()

require.NotNil(t, m)
require.InDelta(t, time.Minute, m.ControllerSystemClock.Sub(m.SystemClock)-m.RequestDuration/2, float64(time.Second))
}

Expand Down
24 changes: 0 additions & 24 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,6 @@ type UpstreamHandle interface {

Ping(ctx context.Context, id uint64) (d time.Duration, err error)

// SystemClock makes ping request to fetch the system clock of the node.
SystemClock(ctx context.Context, id uint64) (time.Time, time.Duration, error)

// HasService is a helper for checking if a given service is associated with this
// stream.
HasService(types.SystemRole) bool
Expand Down Expand Up @@ -673,27 +670,6 @@ func (h *upstreamHandle) Ping(ctx context.Context, id uint64) (d time.Duration,
}
}

// SystemClock makes ping request to fetch the system clock of the downstream.
func (h *upstreamHandle) SystemClock(ctx context.Context, id uint64) (time.Time, time.Duration, error) {
rspC := make(chan pingResponse, 1)
select {
case h.pingC <- pingRequest{rspC: rspC, id: id}:
case <-h.Done():
return time.Time{}, 0, trace.Errorf("failed to send downstream ping (stream closed)")
case <-ctx.Done():
return time.Time{}, 0, trace.Errorf("failed to send downstream ping: %v", ctx.Err())
}

select {
case rsp := <-rspC:
return rsp.systemClock, rsp.reqDuration, rsp.err
case <-h.Done():
return time.Time{}, 0, trace.Errorf("failed to recv upstream pong (stream closed)")
case <-ctx.Done():
return time.Time{}, 0, trace.Errorf("failed to recv upstream ping: %v", ctx.Err())
}
}

func (h *upstreamHandle) Hello() proto.UpstreamInventoryHello {
return h.hello
}
Expand Down

0 comments on commit 088cc60

Please sign in to comment.