Skip to content

Commit

Permalink
first stab at a basic kafka e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 20, 2024
1 parent 2ecfa27 commit d3e6b79
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ jobs:
env:
PGPASSWORD: postgres

- name: start redpanda
uses: redpanda-data/[email protected]
with:
version: "latest"

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

Expand Down
130 changes: 130 additions & 0 deletions flow/e2e/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package e2e_kafka

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type KafkaSuite struct {
t *testing.T
conn *connpostgres.PostgresConnector
suffix string
}

func (s KafkaSuite) T() *testing.T {
return s.t
}

func (s KafkaSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s KafkaSuite) Conn() *pgx.Conn {
return s.Connector().Conn()
}

func (s KafkaSuite) Suffix() string {
return s.suffix
}

func (s KafkaSuite) Peer() *protos.Peer {
return &protos.Peer{
Name: e2e.AddSuffix(s, "kasimple"),
Type: protos.DBType_KAFKA,
Config: &protos.Peer_KafkaConfig{
KafkaConfig: &protos.KafkaConfig{
Servers: []string{"localhost:9092"},
DisableTls: true,
},
},
}
}

func (s KafkaSuite) DestinationTable(table string) string {
return table
}

func (s KafkaSuite) Teardown() {
e2e.TearDownPostgres(s)
}

func SetupSuite(t *testing.T) KafkaSuite {

Check failure on line 65 in flow/e2e/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

test helper function should start from t.Helper() (thelper)
suffix := "ka_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(t, suffix)
require.NoError(t, err, "failed to setup postgres")

return KafkaSuite{
t: t,
conn: conn,
suffix: suffix,
}
}

func Test_Kafka(t *testing.T) {
e2eshared.RunSuite(t, SetupSuite)
}

func (s KafkaSuite) TestSimple() {
srcTableName := e2e.AttachSchema(s, "kasimple")

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
val text
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: srcTableName,
TableNameMapping: map[string]string{srcTableName: "katest"},
Destination: s.Peer(),
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.Script = `function onRecord(r) return r.row.val end`

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`

Check failure on line 104 in flow/e2e/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
INSERT INTO %s (id, val) VALUES (1, 'testval')
`, srcTableName))

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize insert", func() bool {
kafka, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("katest"),
)
if err != nil {
return false
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
fetches := kafka.PollFetches(ctx)
fetches.EachTopic(func(ft kgo.FetchTopic) {
require.Equal(s.T(), "katest", ft.Topic)
ft.EachRecord(func(r *kgo.Record) {
require.Equal(s.T(), "testval", string(r.Value))
})
})
return true
})
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit d3e6b79

Please sign in to comment.