Skip to content

Commit

Permalink
ci: try to fix flaky test
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Sep 8, 2023
1 parent b901cb6 commit e459635
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions api/job_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -49,14 +50,19 @@ func TestJobEventsSubscribe(t *testing.T) {
t.Run(name, func(t *testing.T) {
clientID := "TestJobEventsSubscribe"

var jobID atomic.Pointer[string]

var wg sync.WaitGroup
ch, _ := events.AddSubscriber(context.Background(), events.FilterParams{ClientIDs: []string{clientID}})
wg.Add(1)
go func() {
defer wg.Done()

// wait for job created event
<-ch
ev := <-ch
payload := ev.Args[0].(events.JobEvent)
assert.Equal(t, events.ActionCreate, payload.Action)
jobID.Store(&payload.Job.ID)

// wait for event created by our status.Update below
<-ch
Expand All @@ -66,28 +72,29 @@ func TestJobEventsSubscribe(t *testing.T) {
events.ShutdownSubscribers()
}()

var jobID string
job, err := job.CreateJob(context.Background(), db,
&model.JobRequest{ClientID: clientID, Workflow: wf.Name})
_, err := job.CreateJob(context.Background(), db, &model.JobRequest{ClientID: clientID, Workflow: wf.Name})
require.NoError(t, err)
jobID = job.ID
require.NotEmpty(t, jobID)

wg.Add(1)
go func() {
defer wg.Done()
// wait for subscriber which is created by our GET request below and our test goroutine above
for events.SubscriberCount() != 2 {
time.Sleep(30 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
}
// update job
_, err = status.Update(context.Background(), db, jobID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
_, err = status.Update(context.Background(), db, *jobID.Load(), &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
}()

// wait for job id
for jobID.Load() == nil {
time.Sleep(20 * time.Millisecond)
}

result := apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/events").Query("ids", jobID).
Get("/api/wfx/v1/jobs/events").Query("ids", *jobID.Load()).
Expect(t).
Status(http.StatusOK).
Header("Content-Type", "text/event-stream").
Expand Down Expand Up @@ -116,7 +123,7 @@ func TestJobEventsSubscribe(t *testing.T) {
assert.Equal(t, events.ActionUpdateStatus, ev.Action)
assert.Equal(t, "INSTALLING", ev.Job.Status.State)
assert.Equal(t, wf.Name, ev.Job.Workflow.Name)
assert.Equal(t, job.ClientID, ev.Job.ClientID)
assert.Equal(t, clientID, ev.Job.ClientID)
assert.Equal(t, "id: 1", lines[1])

wg.Wait()
Expand Down

0 comments on commit e459635

Please sign in to comment.