From d3e6b79e6f35b5185f85948327218a9d7815d05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 20 Mar 2024 19:17:14 +0000 Subject: [PATCH] first stab at a basic kafka e2e test --- .github/workflows/flow.yml | 5 ++ flow/e2e/kafka/kafka.go | 130 +++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 flow/e2e/kafka/kafka.go diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 38bc636f51..54919f0bde 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -86,6 +86,11 @@ jobs: env: PGPASSWORD: postgres + - name: start redpanda + uses: redpanda-data/github-action@v0.1.4 + with: + version: "latest" + - name: Install Temporal CLI uses: temporalio/setup-temporal@v0 diff --git a/flow/e2e/kafka/kafka.go b/flow/e2e/kafka/kafka.go new file mode 100644 index 0000000000..85bd93fe1e --- /dev/null +++ b/flow/e2e/kafka/kafka.go @@ -0,0 +1,130 @@ +package e2e_kafka + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +type KafkaSuite struct { + t *testing.T + conn *connpostgres.PostgresConnector + suffix string +} + +func (s KafkaSuite) T() *testing.T { + return s.t +} + +func (s KafkaSuite) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s KafkaSuite) Conn() *pgx.Conn { + return s.Connector().Conn() +} + +func (s KafkaSuite) Suffix() string { + return s.suffix +} + +func (s KafkaSuite) Peer() *protos.Peer { + return &protos.Peer{ + Name: e2e.AddSuffix(s, "kasimple"), + Type: protos.DBType_KAFKA, + Config: &protos.Peer_KafkaConfig{ + KafkaConfig: &protos.KafkaConfig{ + Servers: []string{"localhost:9092"}, + DisableTls: true, + }, + }, + } +} + +func (s KafkaSuite) DestinationTable(table string) string { + return table +} + +func (s KafkaSuite) Teardown() { + e2e.TearDownPostgres(s) +} + +func SetupSuite(t *testing.T) KafkaSuite { + suffix := "ka_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + + return KafkaSuite{ + t: t, + conn: conn, + suffix: suffix, + } +} + +func Test_Kafka(t *testing.T) { + e2eshared.RunSuite(t, SetupSuite) +} + +func (s KafkaSuite) TestSimple() { + srcTableName := e2e.AttachSchema(s, "kasimple") + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + val text + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: srcTableName, + TableNameMapping: map[string]string{srcTableName: "katest"}, + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig.Script = `function onRecord(r) return r.row.val end` + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (id, val) VALUES (1, 'testval') + `, srcTableName)) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize insert", func() bool { + kafka, err := kgo.NewClient( + kgo.SeedBrokers("localhost:9092"), + kgo.ConsumeTopics("katest"), + ) + if err != nil { + return false + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + fetches := kafka.PollFetches(ctx) + fetches.EachTopic(func(ft kgo.FetchTopic) { + require.Equal(s.T(), "katest", ft.Topic) + ft.EachRecord(func(r *kgo.Record) { + require.Equal(s.T(), "testval", string(r.Value)) + }) + }) + return true + }) + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +}