diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 9deb4226..a2c895bc 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -12,13 +12,13 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, :identity, :connection_options, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) attr_reader :timeouts, :error_handlers, :capabilities attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, - :payload_codec, :legacy_signals, :no_signals_in_first_task + :payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -84,6 +84,7 @@ def initialize @search_attributes = {} @header_propagators = [] @capabilities = Capabilities.new(self) + @connection_options = {} # Signals previously were incorrectly replayed in order within a workflow task window, rather # than at the beginning. Correcting this changes the determinism of any workflow with signals. @@ -120,7 +121,8 @@ def for_connection host: host, port: port, credentials: credentials, - identity: identity || default_identity + identity: identity || default_identity, + connection_options: connection_options ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index b70bcbed..a36fe091 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -12,8 +12,9 @@ def self.generate(configuration) port = configuration.port credentials = configuration.credentials identity = configuration.identity + options = configuration.connection_options - connection_class.new(host, port, identity, credentials) + connection_class.new(host, port, identity, credentials, options) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 06be6a6d..1bec78f2 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -2,6 +2,7 @@ require 'time' require 'google/protobuf/well_known_types' require 'securerandom' +require 'json' require 'gen/temporal/api/filter/v1/message_pb' require 'gen/temporal/api/workflowservice/v1/service_services_pb' require 'gen/temporal/api/operatorservice/v1/service_services_pb' @@ -795,11 +796,45 @@ def pause_schedule(namespace:, schedule_id:, should_pause:, note: nil) attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request def client - @client ||= Temporalio::Api::WorkflowService::V1::WorkflowService::Stub.new( + return @client if @client + + channel_args = {} + + if options[:keepalive_time_ms] + channel_args["grpc.keepalive_time_ms"] = options[:keepalive_time_ms] + end + + if options[:retry_connection] || options[:retry_policy] + channel_args["grpc.enable_retries"] = 1 + + retry_policy = options[:retry_policy] || { + retryableStatusCodes: ["UNAVAILABLE"], + maxAttempts: 3, + initialBackoff: "0.1s", + backoffMultiplier: 2.0, + maxBackoff: "0.3s" + } + + channel_args["grpc.service_config"] = ::JSON.generate( + methodConfig: [ + { + name: [ + { + service: "temporal.api.workflowservice.v1.WorkflowService", + } + ], + retryPolicy: retry_policy + } + ] + ) + end + + @client = Temporalio::Api::WorkflowService::V1::WorkflowService::Stub.new( url, credentials, timeout: CONNECTION_TIMEOUT_SECONDS, - interceptors: [ClientNameVersionInterceptor.new] + interceptors: [ClientNameVersionInterceptor.new], + channel_args: channel_args ) end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index ee3c1fcb..0799c5d0 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -830,4 +830,101 @@ class TestDeserializer end end end + + describe "passing in options" do + before do + allow(subject).to receive(:client).and_call_original + end + + context "when keepalive_time_ms is passed" do + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, keepalive_time_ms: 30_000) } + + it "passes the option to the channel args" do + expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with( + ":", + :this_channel_is_insecure, + timeout: 60, + interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)], + channel_args: { + "grpc.keepalive_time_ms" => 30_000 + } + ) + subject.send(:client) + end + end + + context "when passing retry_connection" do + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_connection: true) } + + it "passes the option to the channel args" do + expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with( + ":", + :this_channel_is_insecure, + timeout: 60, + interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)], + channel_args: { + "grpc.enable_retries" => 1, + "grpc.service_config" => { + methodConfig: [ + { + name: [ + { + service: "temporal.api.workflowservice.v1.WorkflowService", + } + ], + retryPolicy: { + retryableStatusCodes: ["UNAVAILABLE"], + maxAttempts: 3, + initialBackoff: "0.1s", + backoffMultiplier: 2.0, + maxBackoff: "0.3s" + } + } + ] + }.to_json + } + ) + subject.send(:client) + end + end + + context "when passing a custom retry policy" do + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_policy: retry_policy) } + + let(:retry_policy) do + { + retryableStatusCodes: ["UNAVAILABLE", "INTERNAL"], + maxAttempts: 1, + initialBackoff: "0.2s", + backoffMultiplier: 1.0, + maxBackoff: "0.5s" + } + end + + it "passes the policy to the channel args" do + expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with( + ":", + :this_channel_is_insecure, + timeout: 60, + interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)], + channel_args: { + "grpc.enable_retries" => 1, + "grpc.service_config" => { + methodConfig: [ + { + name: [ + { + service: "temporal.api.workflowservice.v1.WorkflowService", + } + ], + retryPolicy: retry_policy + } + ] + }.to_json + } + ) + subject.send(:client) + end + end + end end