Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
he2ss committed Nov 21, 2023
1 parent a60295b commit 5752d43
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions pkg/acquisition/modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
k.Config.Mode = configuration.TAIL_MODE
}

k.logger.Debugf("Successfully unmarshaled kafka configuration : %+v", k.Config)
k.logger.Debugf("successfully unmarshaled kafka configuration : %+v", k.Config)

return err
}

func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
k.logger = logger

k.logger.Debugf("Start configuring %s source", dataSourceName)
k.logger.Debugf("start configuring %s source", dataSourceName)

err := k.UnmarshalConfig(yamlConfig)
if err != nil {
Expand All @@ -108,7 +108,7 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
return fmt.Errorf("cannot create %s reader", dataSourceName)
}

k.logger.Debugf("Successfully configured %s source", dataSourceName)
k.logger.Debugf("successfully configured %s source", dataSourceName)

return nil
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
// Start processing from latest Offset
k.Reader.SetOffsetAt(context.Background(), time.Now())
for {
k.logger.Tracef("Reading message from topic '%s'", k.Config.Topic)
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)

Check warning on line 152 in pkg/acquisition/modules/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kafka/kafka.go#L152

Added line #L152 was not covered by tests
m, err := k.Reader.ReadMessage(context.Background())
if err != nil {
if err == io.EOF {
Expand All @@ -165,7 +165,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
Process: true,
Module: k.GetName(),
}
k.logger.Tracef("Line with message read from topic '%s': %+v", k.Config.Topic, l)
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)

Check warning on line 168 in pkg/acquisition/modules/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kafka/kafka.go#L168

Added line #L168 was not covered by tests
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
var evt types.Event

Expand All @@ -179,7 +179,7 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
}

func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
k.logger.Debugf("Starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
t.Go(func() error {
return k.ReadMessage(out)
})
Expand Down

0 comments on commit 5752d43

Please sign in to comment.