diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..d85fb79 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,31 @@ +name: Test + +on: + push: + branches: [ master, gocardless ] + pull_request: + branches: [ master, gocardless ] + +concurrency: + # running pipeline per workflow per PR + group: ${{ github.head_ref || github.run_id }}-${{ github.workflow }} + # Running a new pipeline will cancel any running pipelines that belong to the above group + cancel-in-progress: true + +jobs: + test: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + ruby-version: [ '3.1' ] + + steps: + - uses: actions/checkout@v2 + - name: Set up Ruby + uses: ruby/setup-ruby@v1 + with: + ruby-version: ${{ matrix.ruby-version }} + bundler-cache: true + - name: Run tests + run: bundle exec rake test diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index ce3bcf0..3985b3d 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -1,35 +1,52 @@ ---- # This configuration was generated by # `rubocop --auto-gen-config` -# on 2020-05-12 17:34:19 +0100 using RuboCop version 0.83.0. +# on 2023-02-23 11:08:06 UTC using RuboCop version 0.93.1. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 3 +# Offense count: 1 +# Configuration parameters: Include. +# Include: **/*.gemspec +Gemspec/RequiredRubyVersion: + Exclude: + - 'fluent-plugin-gcloud-pubsub-custom.gemspec' + +# Offense count: 4 # Configuration parameters: IgnoredMethods. Metrics/AbcSize: - Max: 50 + Max: 43 +# Offense count: 7 +# Configuration parameters: CountComments, CountAsOne, ExcludedMethods. +# ExcludedMethods: refine Metrics/BlockLength: - Enabled: false + Max: 245 +# Offense count: 4 +# Configuration parameters: CountComments, CountAsOne. Metrics/ClassLength: - Enabled: false + Max: 290 + +# Offense count: 1 +# Configuration parameters: IgnoredMethods. +Metrics/CyclomaticComplexity: + Max: 8 +# Offense count: 10 +# Configuration parameters: CountComments, CountAsOne, ExcludedMethods. Metrics/MethodLength: - Max: 30 + Max: 28 # Offense count: 1 -# Configuration parameters: IgnoredPatterns. -# SupportedStyles: snake_case, camelCase -Naming/MethodName: - EnforcedStyle: snake_case +# Configuration parameters: IgnoredMethods. +Metrics/PerceivedComplexity: + Max: 9 # Offense count: 3 # Cop supports --auto-correct. -# Configuration parameters: AutoCorrect, EnforcedStyle. +# Configuration parameters: EnforcedStyle. # SupportedStyles: nested, compact Style/ClassAndModuleChildren: Exclude: @@ -37,7 +54,7 @@ Style/ClassAndModuleChildren: - 'lib/fluent/plugin/out_gcloud_pubsub.rb' - 'test/test_helper.rb' -# Offense count: 6 +# Offense count: 7 Style/Documentation: Exclude: - 'spec/**/*' @@ -53,46 +70,31 @@ Style/GlobalVars: - 'test/test_helper.rb' # Offense count: 1 -# Configuration parameters: MinBodyLength. -Style/GuardClause: - Exclude: - - 'lib/fluent/plugin/gcloud_pubsub/client.rb' - -# Offense count: 2 -# Cop supports --auto-correct. -Style/IfUnlessModifier: - Exclude: - - 'lib/fluent/plugin/gcloud_pubsub/client.rb' - -# Offense count: 1 -Style/MethodMissingSuper: +Style/MissingRespondToMissing: Exclude: - 'test/test_helper.rb' # Offense count: 1 -Style/MissingRespondToMissing: +# Cop supports --auto-correct. +# Configuration parameters: PreferredDelimiters. +Style/PercentLiteralDelimiters: Exclude: - - 'test/test_helper.rb' + - 'test/plugin/test_in_gcloud_pubsub.rb' -# Offense count: 260 +# Offense count: 3 # Cop supports --auto-correct. -# Configuration parameters: EnforcedStyle, ConsistentQuotesInMultiline. -# SupportedStyles: single_quotes, double_quotes -Style/StringLiterals: +# Configuration parameters: EnforcedStyle, MinSize. +# SupportedStyles: percent, brackets +Style/SymbolArray: Exclude: - - 'Gemfile' - - 'Rakefile' - - 'fluent-plugin-gcloud-pubsub-custom.gemspec' - 'lib/fluent/plugin/gcloud_pubsub/client.rb' - 'lib/fluent/plugin/in_gcloud_pubsub.rb' - - 'lib/fluent/plugin/out_gcloud_pubsub.rb' - - 'test/plugin/test_in_gcloud_pubsub.rb' - - 'test/plugin/test_out_gcloud_pubsub.rb' - - 'test/test_helper.rb' -# Offense count: 36 +# Offense count: 4 # Cop supports --auto-correct. -# Configuration parameters: AutoCorrect, AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns. -# URISchemes: http, https -Layout/LineLength: - Max: 120 +# Configuration parameters: EnforcedStyleForMultiline. +# SupportedStylesForMultiline: comma, consistent_comma, no_comma +Style/TrailingCommaInArguments: + Exclude: + - 'lib/fluent/plugin/gcloud_pubsub/client.rb' + - 'lib/fluent/plugin/in_gcloud_pubsub.rb' diff --git a/fluent-plugin-gcloud-pubsub-custom.gemspec b/fluent-plugin-gcloud-pubsub-custom.gemspec index 2e322b2..3ac516d 100644 --- a/fluent-plugin-gcloud-pubsub-custom.gemspec +++ b/fluent-plugin-gcloud-pubsub-custom.gemspec @@ -20,13 +20,13 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "google-cloud-pubsub", "~> 0.30.0" # Use the same version constraint as fluent-plugin-prometheus currently specifies - gem.add_runtime_dependency "prometheus-client", "< 0.10" + gem.add_runtime_dependency "prometheus-client", ">= 2.1.0" gem.add_development_dependency "bundler" gem.add_development_dependency "pry" gem.add_development_dependency "pry-byebug" gem.add_development_dependency "rake" - gem.add_development_dependency "rubocop", "~>0.83" + gem.add_development_dependency "rubocop" gem.add_development_dependency "test-unit" gem.add_development_dependency "test-unit-rr" end diff --git a/lib/fluent/plugin/gcloud_pubsub/client.rb b/lib/fluent/plugin/gcloud_pubsub/client.rb index 9138e55..e70a747 100644 --- a/lib/fluent/plugin/gcloud_pubsub/client.rb +++ b/lib/fluent/plugin/gcloud_pubsub/client.rb @@ -7,6 +7,7 @@ module Fluent module GcloudPubSub class Error < StandardError end + class RetryableError < Error end @@ -42,12 +43,12 @@ def initialize(project, key, autocreate_topic, metric_prefix) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compressed_size_per_original_size_ratio") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compressed_size_per_original_size_ratio", - "Compression ratio achieved on a batch of messages", - {}, + docstring: "Compression ratio achieved on a batch of messages", # We expect compression for even a single message to be typically # above 2x (0.5/50%), so bias the buckets towards the higher end # of the range. - [0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1], + buckets: [0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1], + labels: [:topic, :algorithm] ) end @@ -55,9 +56,9 @@ def initialize(project, key, autocreate_topic, metric_prefix) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_duration_seconds") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compression_duration_seconds", - "Time taken to compress a batch of messages", - {}, - [0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1], + docstring: "Time taken to compress a batch of messages", + buckets: [0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1], + labels: [:topic, :algorithm] ) end # rubocop:enable Layout/LineLength @@ -109,15 +110,15 @@ def compress_messages_with_zlib(messages, topic_name) end @compression_duration.observe( - { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, duration, + labels: { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, ) compressed_size = compressed_messages.bytesize @compression_ratio.observe( - { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, # If original = 1MiB and compressed = 256KiB; then metric value = 0.75 = 75% when plotted 1 - compressed_size.to_f / original_size, + labels: { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, ) [compressed_messages, { "compression_algorithm": COMPRESSION_ALGORITHM_ZLIB }] diff --git a/lib/fluent/plugin/in_gcloud_pubsub.rb b/lib/fluent/plugin/in_gcloud_pubsub.rb index f079b5a..658507b 100644 --- a/lib/fluent/plugin/in_gcloud_pubsub.rb +++ b/lib/fluent/plugin/in_gcloud_pubsub.rb @@ -127,9 +127,9 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_pulled", - "Number of Pub/Sub messages pulled by the subscriber on each invocation", - {}, - [0, 1, 10, 50, 100, 250, 500, 1000], + docstring: "Number of Pub/Sub messages pulled by the subscriber on each invocation", + buckets: [0, 1, 10, 50, 100, 250, 500, 1000], + labels: [:subscription] ) end @@ -137,9 +137,9 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled_bytes") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_pulled_bytes", - "Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation", - {}, - [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], + docstring: "Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation", + buckets: [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], + labels: [:subscription] ) end @@ -147,8 +147,8 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_pull_errors_total") do ::Prometheus::Client.registry.counter( :"#{@metric_prefix}_pull_errors_total", - "Errors encountered while pulling or processing messages", - {}, + docstring: "Errors encountered while pulling or processing messages", + labels: [:subscription, :retryable], ) end end @@ -233,7 +233,7 @@ def subscribe def _subscribe messages = @subscriber.pull @return_immediately, @max_messages - @messages_pulled.observe(common_labels, messages.size) + @messages_pulled.observe(messages.size, labels: common_labels) if messages.empty? log.debug "no messages are pulled" return @@ -242,17 +242,17 @@ def _subscribe messages_size = messages.sum do |message| message.data.bytesize + message.attributes.sum { |k, v| k.bytesize + v.bytesize } end - @messages_pulled_bytes.observe(common_labels, messages_size) + @messages_pulled_bytes.observe(messages_size, labels: common_labels) process messages @subscriber.acknowledge messages log.debug "#{messages.length} message(s) processed" rescue Fluent::GcloudPubSub::RetryableError => e - @pull_errors.increment(common_labels.merge({ retryable: true })) + @pull_errors.increment(labels: common_labels.merge({ retryable: true })) log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s rescue StandardError => e - @pull_errors.increment(common_labels.merge({ retryable: false })) + @pull_errors.increment(labels: common_labels.merge({ retryable: false })) log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace e.backtrace end diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index 7fd62d3..495f8e7 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -67,9 +67,9 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_per_batch", - "Number of records published to Pub/Sub per buffer flush", - {}, - [1, 10, 50, 100, 250, 500, 1000], + docstring: "Number of records published to Pub/Sub per buffer flush", + labels: [:topic], + buckets: [1, 10, 50, 100, 250, 500, 1000], ) end @@ -77,9 +77,9 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_bytes", - "Total size in bytes of the records published to Pub/Sub", - {}, - [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], + docstring: "Total size in bytes of the records published to Pub/Sub", + labels: [:topic], + buckets: [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], ) end @@ -87,11 +87,11 @@ def configure(conf) Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do ::Prometheus::Client.registry.gauge( :"#{@metric_prefix}_compression_enabled", - "Whether compression/batching is enabled", - {}, + docstring: "Whether compression/batching is enabled", + labels: [:topic], ) end - @compression_enabled.set(common_labels, @compress_batches ? 1 : 0) + @compression_enabled.set(@compress_batches ? 1 : 0, labels: common_labels) end # rubocop:enable Metrics/MethodLength @@ -154,8 +154,8 @@ def publish(topic, messages) size = messages.map(&:bytesize).inject(:+) log.debug "send message topic:#{topic} length:#{messages.length} size:#{size}" - @messages_published.observe(common_labels, messages.length) - @bytes_published.observe(common_labels, size) + @messages_published.observe(messages.length, labels: common_labels) + @bytes_published.observe(size, labels: common_labels) @publisher.publish(topic, messages, @compress_batches) end diff --git a/test/plugin/test_in_gcloud_pubsub.rb b/test/plugin/test_in_gcloud_pubsub.rb index c4a5737..e04582d 100644 --- a/test/plugin/test_in_gcloud_pubsub.rb +++ b/test/plugin/test_in_gcloud_pubsub.rb @@ -7,13 +7,13 @@ require "fluent/test/driver/input" class GcloudPubSubInputTest < Test::Unit::TestCase - CONFIG = %( + CONFIG = %[ tag test project project-test topic topic-test subscription subscription-test key key-test - ) + ] DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 24_680