Skip to content

Commit

Permalink
e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 7, 2024
1 parent 4b0ccaf commit e47d03c
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions flow/e2e/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,59 @@ func (s KafkaSuite) TestDefault() {
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s KafkaSuite) TestInitialLoad() {
srcTableName := e2e.AttachSchema(s, "kainitial")

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
val text
);
`, srcTableName))
require.NoError(s.t, err)

flowName := e2e.AddSuffix(s, "kainitial")
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowName,
TableNameMapping: map[string]string{srcTableName: flowName},
Destination: s.Peer(),
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.DoInitialSnapshot = true

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (id, val) VALUES (1, 'testval')
`, srcTableName))
require.NoError(s.t, err)

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize insert", func() bool {
kafka, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics(flowName),
)
if err != nil {
return false
}
defer kafka.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
fetches := kafka.PollFetches(ctx)
fetches.EachTopic(func(ft kgo.FetchTopic) {
require.Equal(s.t, flowName, ft.Topic)
ft.EachRecord(func(r *kgo.Record) {
require.Contains(s.t, string(r.Value), "\"testval\"")
require.Equal(s.t, byte('{'), r.Value[0])
require.Equal(s.t, byte('}'), r.Value[len(r.Value)-1])
})
})
return true
})
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit e47d03c

Please sign in to comment.