Skip to content

Commit

Permalink
Test testcontainers kafka module
Browse files Browse the repository at this point in the history
  • Loading branch information
deadlycoconuts committed Aug 7, 2024
1 parent be121bf commit abba2eb
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 56 deletions.
3 changes: 2 additions & 1 deletion treatment-service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.32.0
github.com/testcontainers/testcontainers-go/modules/compose v0.32.0
github.com/testcontainers/testcontainers-go/modules/kafka v0.32.0
go.einride.tech/protobuf-bigquery v0.19.0
go.uber.org/automaxprocs v1.5.1
google.golang.org/api v0.139.0
Expand Down Expand Up @@ -170,7 +171,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down
28 changes: 26 additions & 2 deletions treatment-service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
Expand Down Expand Up @@ -270,6 +272,12 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4=
github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/dvsekhvalnov/jose2go v0.0.0-20170216131308-f21a8cedbbae/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203 h1:XBBHcIb256gUJtLmY22n99HaZTz+r2Z51xUPi01m3wg=
github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203/go.mod h1:E1jcSv8FaEny+OP/5k9UxZVw9YFWGj7eI4KR/iOBqCg=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
Expand Down Expand Up @@ -512,6 +520,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
Expand Down Expand Up @@ -541,6 +551,16 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand Down Expand Up @@ -732,8 +752,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -778,6 +798,8 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
Expand Down Expand Up @@ -868,6 +890,8 @@ github.com/testcontainers/testcontainers-go v0.32.0 h1:ug1aK08L3gCHdhknlTTwWjPHP
github.com/testcontainers/testcontainers-go v0.32.0/go.mod h1:CRHrzHLQhlXUsa5gXjTOfqIEJcrK5+xMDmBr/WMI88E=
github.com/testcontainers/testcontainers-go/modules/compose v0.32.0 h1:7Ei3qamJeXZc4jurAwzV2hGGihiabZ01MxIbll1Ee3U=
github.com/testcontainers/testcontainers-go/modules/compose v0.32.0/go.mod h1:A/it0TCrstoVkp0qmU5/LHiF1ZQyO7yqi8KKBD3WchY=
github.com/testcontainers/testcontainers-go/modules/kafka v0.32.0 h1:OeqTsdtPhwVQbq0pP35okVY2Qb6vb/5LaOBqewTWlGk=
github.com/testcontainers/testcontainers-go/modules/kafka v0.32.0/go.mod h1:GCPw9ky5rQpMk81DyNaETjbfZiAwA7j5LLeVPnIvgII=
github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c=
github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw=
github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375 h1:QB54BJwA6x8QU9nHY3xJSZR2kX9bgpZekRKGkLTmEXA=
Expand Down
130 changes: 77 additions & 53 deletions treatment-service/integration-test/fetch_treatment_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration_test
import (
"context"
"fmt"
probing "github.com/prometheus-community/pro-bing"

Check failure on line 6 in treatment-service/integration-test/fetch_treatment_it_test.go

View workflow job for this annotation

GitHub Actions / lint-go

File is not `goimports`-ed (goimports)
"log"
"math/rand"
"net"
Expand All @@ -20,10 +21,9 @@ import (
"github.com/caraml-dev/xp/treatment-service/server"
mgmtSvcServer "github.com/caraml-dev/xp/treatment-service/testhelper/mockmanagement/server"
mgmtSvc "github.com/caraml-dev/xp/treatment-service/testhelper/mockmanagement/service"
probing "github.com/prometheus-community/pro-bing"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/modules/kafka"
)

const (
Expand Down Expand Up @@ -56,7 +56,7 @@ type TreatmentServiceTestSuite struct {
terminationChannel chan bool
ctx context.Context
emulator testcontainers.Container
kafka compose.ComposeStack
kafka testcontainers.Container
}

func int32Ptr(value int32) *int32 {
Expand Down Expand Up @@ -361,62 +361,85 @@ func (suite *TreatmentServiceTestSuite) SetupSuite() {
suite.managementServiceClient = managementClient
suite.managementServiceServer = managementServer

// Docker compose file copied from official confluentinc repository.
// See: https://github.com/confluentinc/cp-all-in-one/blob/7.0.1-post/cp-all-in-one-kraft/docker-compose.yml
composeFilePaths := []string{"docker-compose/kafka/docker-compose.yaml"}
kafka, err := compose.NewDockerComposeWith(compose.WithStackFiles(composeFilePaths...), compose.StackIdentifier("kafka"))
kafkaContainer, err := kafka.Run(ctx, "confluentinc/cp-kafka:7.0.1")
if err != nil {
panic(err)
}
err = kafka.Up(ctx, compose.Wait(true))
brokers, err := kafkaContainer.Brokers(ctx)
if err != nil {
panic(err)
}
fmt.Println(kafka)
if kafka != nil {
kafkaContainer, err := kafka.ServiceContainer(ctx, "kafka")
if err != nil {
panic(err)
}
kafkaNetworks, err := kafkaContainer.Networks(ctx)
if err != nil {
panic(err)
}
fmt.Println(kafkaNetworks)
kafkaInfo, err := kafkaContainer.Inspect(ctx)
if err != nil {
panic(err)
}
fmt.Println(kafkaInfo.NetworkSettings.Networks)
fmt.Println(kafkaInfo.NetworkSettings.Networks["kafka_default"])
fmt.Println(kafkaInfo.NetworkSettings.Networks["kafka_default"].IPAddress)
fmt.Println(kafkaInfo.NetworkSettings.Ports)
fmt.Println(kafkaInfo.State)
kafkaHost, err := kafkaContainer.Host(ctx)
if err != nil {
panic(err)
}
fmt.Println(kafkaHost)
kafkaPort, err := kafkaContainer.MappedPort(ctx, "9092")
if err != nil {
panic(err)
}
fmt.Println(kafkaPort)
pinger, err := probing.NewPinger(fmt.Sprintf("localhost:%s", kafkaPort.Port()))
if err != nil {
panic(err)
}
pinger.Count = 3
err = pinger.Run() // Blocks until finished.
if err != nil {
panic(err)
}
stats := pinger.Statistics()
fmt.Println(stats) // get send/receive/duplicate/rtt stats

os.Setenv("ASSIGNEDTREATMENTLOGGER_KAFKACONFIG_BROKERS", fmt.Sprintf("localhost:%s", kafkaPort.Port()))
fmt.Println(brokers)
pinger, err := probing.NewPinger(brokers[0])
if err != nil {
panic(err)
}
pinger.Count = 3
err = pinger.Run() // Blocks until finished.
if err != nil {
panic(err)
}
suite.kafka = kafka
stats := pinger.Statistics()
fmt.Println(stats) // get send/receive/duplicate/rtt stats

// Docker compose file copied from official confluentinc repository.
// See: https://github.com/confluentinc/cp-all-in-one/blob/7.0.1-post/cp-all-in-one-kraft/docker-compose.yml
//composeFilePaths := []string{"docker-compose/kafka/docker-compose.yaml"}
//kafka, err := compose.NewDockerComposeWith(compose.WithStackFiles(composeFilePaths...), compose.StackIdentifier("kafka"))
//if err != nil {
// panic(err)
//}
//err = kafka.Up(ctx, compose.Wait(true))
//if err != nil {
// panic(err)
//}
//fmt.Println(kafka)
//if kafka != nil {
// kafkaContainer, err := kafka.ServiceContainer(ctx, "kafka")
// if err != nil {
// panic(err)
// }
// kafkaNetworks, err := kafkaContainer.Networks(ctx)
// if err != nil {
// panic(err)
// }
// fmt.Println(kafkaNetworks)
// kafkaInfo, err := kafkaContainer.Inspect(ctx)
// if err != nil {
// panic(err)
// }
// fmt.Println(kafkaInfo.NetworkSettings.Networks)
// fmt.Println(kafkaInfo.NetworkSettings.Networks["kafka_default"])
// fmt.Println(kafkaInfo.NetworkSettings.Networks["kafka_default"].IPAddress)
// fmt.Println(kafkaInfo.NetworkSettings.Ports)
// fmt.Println(kafkaInfo.State)
// kafkaHost, err := kafkaContainer.Host(ctx)
// if err != nil {
// panic(err)
// }
// fmt.Println(kafkaHost)
// kafkaPort, err := kafkaContainer.MappedPort(ctx, "9092")
// if err != nil {
// panic(err)
// }
// fmt.Println(kafkaPort)
// pinger, err := probing.NewPinger(fmt.Sprintf("localhost:%s", kafkaPort.Port()))
// if err != nil {
// panic(err)
// }
// pinger.Count = 3
// err = pinger.Run() // Blocks until finished.
// if err != nil {
// panic(err)
// }
// stats := pinger.Statistics()
// fmt.Println(stats) // get send/receive/duplicate/rtt stats
//
// os.Setenv("ASSIGNEDTREATMENTLOGGER_KAFKACONFIG_BROKERS", fmt.Sprintf("localhost:%s", kafkaPort.Port()))
//}

os.Setenv("ASSIGNEDTREATMENTLOGGER_KAFKACONFIG_BROKERS", brokers[0])
suite.kafka = kafkaContainer

c, treatmentServer := setupTreatmentService(suite.managementServiceServer.URL)
waitForServerToListen := func() bool {
Expand Down Expand Up @@ -445,7 +468,8 @@ func (suite *TreatmentServiceTestSuite) TearDownSuite() {
suite.treatmentServiceServer.Close()
suite.terminationChannel <- true

_ = suite.kafka.Down(context.Background())
//_ = suite.kafka.Down(context.Background())
_ = suite.kafka.Terminate(context.Background())
_ = suite.emulator.Terminate(context.Background())
}

Expand Down

0 comments on commit abba2eb

Please sign in to comment.