Skip to content

Commit

Permalink
update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bachmanity1 committed Sep 3, 2024
1 parent e8f19b2 commit 856ab73
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"fmt"
"github.com/pkg/errors"
"math"
"time"

Expand Down Expand Up @@ -68,11 +69,41 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
return fmt.Errorf("failed to create partitions: %w", err)
}

// after topic configuration is complete number of partitions must be equal to the number of brokers multiplied
// by the partitions per broker
s.partitionCount = len(meta.Brokers) * s.config.TopicManagement.PartitionsPerBroker
return s.updatePartitionCount(ctx)
}

return nil
// The partition count must be updated after topic validation because the validation process may lead to the
// creation of new partitions. This can occur when new brokers are added to the cluster.
func (s *Service) updatePartitionCount(ctx context.Context) error {
for {
timer := time.NewTimer(1 * time.Second)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
meta, err := s.getTopicMetadata(ctx)
if err != nil {
return fmt.Errorf("could not get topic metadata while updating partition count: %w", err)
}

typedErr := kerr.TypedErrorForCode(meta.Topics[0].ErrorCode)
if typedErr == nil {
s.partitionCount = len(meta.Topics[0].Partitions)
s.logger.Debug("updatePartitionCount: successfully updated partition count", zap.Int("partition_count", s.partitionCount))
return nil
}
if !errors.Is(typedErr, kerr.UnknownTopicOrPartition) {
return fmt.Errorf("unexpected error while updating partition count: %w", typedErr)
}
s.logger.Warn("updatePartitionCount: received UNKNOWN_TOPIC_OR_PARTITION error, possibly due to timing issue. Retrying...")
// The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created
// in the validateManagementTopic function. It appears to be a timing issue where the topic metadata
// is not immediately available after creation. In practice, waiting for a short period and then retrying
// the operation resolves the issue.
}
}
}

func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreatePartitionsRequest) error {
Expand Down

0 comments on commit 856ab73

Please sign in to comment.