Skip to content

Commit

Permalink
chore: Comment aesthetics
Browse files Browse the repository at this point in the history
  • Loading branch information
joeirimpan committed Jul 3, 2024
1 parent 0f15bfa commit e5c601e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (f *TestFilter) IsAllowed(msg []byte) bool {
* Copy this plugin code to a directory. `mkdir testfilter && cp sample.go testfilter`
* Build the plugin. `CGO_ENABLED=1 go build -a -ldflags="-s -w" -buildmode=plugin -o testfilter.filter sample.go`
* Change the config.toml to add the filter provider config.
* Run kaf-relay with the filter pluing. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`
* Run kaf-relay with the filter plugin. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`

## Metrics

Expand Down
4 changes: 2 additions & 2 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ source_topic2 = "target_topic2"
[source_pool]
# Kafka client config common to all upstream sources ([[sources]]).
initial_offset = "start"
# static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id.
# Static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id.
instance_id = "client_instance_id"
# consumer group id
# Consumer group id.
group_id = "consumer_group"

# Frequency at which source servers are polled for health/lag.
Expand Down
11 changes: 6 additions & 5 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ func (re *Relay) Start(globalCtx context.Context) error {
re.signalCh <- struct{}{}

wg.Add(1)
// Relay teardown.
go func() {
defer wg.Done()
// wait till main ctx is cancelled
// Wait till main ctx is cancelled.
<-globalCtx.Done()

// stop consumer group
// Stop consumer group.
re.source.Close()
}()

Expand All @@ -121,10 +122,10 @@ func (re *Relay) Start(globalCtx context.Context) error {
re.log.Error("error starting consumer worker", "err", err)
}

// close the producer inlet channle
// Close the producer inlet channel.
close(re.target.inletCh)

// close producer
// Close producer.
re.target.Close()

wg.Wait()
Expand Down Expand Up @@ -248,7 +249,7 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
return nil
}

// Repurpose &kgo.Record and forward it to producer to reduce allocs
// Repurpose &kgo.Record and forward it to producer to reduce allocs.
rec.Headers = nil
rec.Timestamp = time.Time{}
rec.Topic = t.TargetTopic
Expand Down

0 comments on commit e5c601e

Please sign in to comment.