diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index a3e625eaf7..dc3ee79eb2 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -1146,6 +1146,19 @@ {default, 24} ]}. +%% @doc Periodically log replrtq queue stats (queue size, reap/delete +%% attempts and aborts) at this interval (seconds) +{mapping, "queue_manager_log_frequency", "riak_kv.queue_manager_log_frequency", [ + {datatype, integer}, + {default, 30} +]}. + +%% @doc Suppress logging of queue stats when all items are 0 +{mapping, "queue_manager_log_suppress_zero_stats", "riak_kv.queue_manager_log_suppress_zero_stats", [ + {datatype, flag}, + {default, off} +]}. + %% @doc The maximume number of workers to be for any given peer may be %% configured - if not configured will default to the number of sinkworkers {mapping, "replrtq_sinkpeerlimit", "riak_kv.replrtq_sinkpeerlimit", [ @@ -1171,4 +1184,4 @@ {datatype, {flag, enabled, disabled}}, {default, disabled}, {commented, enabled} -]}. \ No newline at end of file +]}. diff --git a/src/riak_kv_eraser.erl b/src/riak_kv_eraser.erl index 5977b50166..85b558a96d 100644 --- a/src/riak_kv_eraser.erl +++ b/src/riak_kv_eraser.erl @@ -292,12 +292,19 @@ handle_async_message(timeout, State) -> pqueue_length = {RedoQL, DeleteQL - BatchSize}}} end; handle_async_message(log_queue, State) -> - lager:info("Eraser job ~w has queue lengths ~w " ++ - "delete_attempts=~w delete_aborts=~w ", - [State#state.job_id, - State#state.pqueue_length, - State#state.delete_attempts, - State#state.delete_aborts]), + case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of + true when State#state.pqueue_length =:= {0,0}, + State#state.delete_attempts =:= 0, + State#state.delete_aborts =:= 0 -> + nop; + _ -> + lager:info("Reaper Job ~w has queue lengths ~w " + "reap_attempts=~w reap_aborts=~w", + [State#state.job_id, + State#state.pqueue_length, + State#state.delete_attempts, + State#state.delete_aborts]) + end, erlang:send_after(?LOG_TICK, self(), log_queue), {noreply, State#state{delete_attempts = 0, delete_aborts = 0}}. diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index ac598685ba..5a501ef25d 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -275,12 +275,19 @@ handle_async_message(timeout, State) -> pqueue_length = {RedoQL, ReapQL - BatchSize}}} end; handle_async_message(log_queue, State) -> - lager:info("Reaper Job ~w has queue lengths ~w " ++ - " reap_attempts=~w reap_aborts=~w ", - [State#state.job_id, - State#state.pqueue_length, - State#state.reap_attempts, - State#state.reap_aborts]), + case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of + true when State#state.pqueue_length =:= {0,0}, + State#state.reap_attempts =:= 0, + State#state.reap_aborts =:= 0 -> + nop; + _ -> + lager:info("Reaper Job ~w has queue lengths ~w " + "reap_attempts=~w reap_aborts=~w", + [State#state.job_id, + State#state.pqueue_length, + State#state.reap_attempts, + State#state.reap_aborts]) + end, erlang:send_after(?LOG_TICK, self(), log_queue), {noreply, State#state{reap_attempts = 0, reap_aborts = 0}}. diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index dbb5772363..6522c7438b 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -393,10 +393,14 @@ handle_cast({requeue_work, WorkItem}, State) -> handle_info(timeout, State) -> prompt_work(), - erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), + LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, + ?LOG_TIMER_SECONDS), + erlang:send_after(LogFreq * 1000, self(), log_stats), {noreply, State}; handle_info(log_stats, State) -> - erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), + LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, + ?LOG_TIMER_SECONDS), + erlang:send_after(LogFreq * 1000, self(), log_stats), SinkWork0 = case State#state.enabled of true -> @@ -782,7 +786,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) -> {replmod_time, RT}, {modified_time, MTS, MTM, MTH, MTD, MTL}} = SinkWork#sink_work.queue_stats, - lager:info("Queue=~w success_count=~w error_count=~w" ++ + lager:info([{queue_name, QueueName}], + "Queue=~w success_count=~w error_count=~w" ++ " mean_fetchtime_ms=~s" ++ " mean_pushtime_ms=~s" ++ " mean_repltime_ms=~s" ++ @@ -796,7 +801,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) -> end, PeerDelays = lists:foldl(FoldPeerInfoFun, "", SinkWork#sink_work.peer_list), - lager:info("Queue=~w has peer delays of~s", [QueueName, PeerDelays]), + lager:info([{queue_name, QueueName}], + "Queue=~w has peer delays of~s", [QueueName, PeerDelays]), {QueueName, Iteration, SinkWork#sink_work{queue_stats = ?ZERO_STATS}}. diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index e4443f868e..8986a1f11d 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -288,14 +288,14 @@ init([]) -> QC = lists:map(MaptoQC, QFM), QL = app_helper:get_env(riak_kv, replrtq_srcqueuelimit, ?QUEUE_LIMIT), OL = app_helper:get_env(riak_kv, replrtq_srcobjectlimit, ?OBJECT_LIMIT), - LogFreq = app_helper:get_env(riak_kv, replrtq_logfrequency, ?LOG_TIMER_SECONDS * 1000), + LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS), erlang:send_after(LogFreq, self(), log_queue), {ok, #state{queue_filtermap = QFM, queue_map = QM, queue_countmap = QC, queue_limit = QL, object_limit = OL, - log_frequency_in_ms = LogFreq}}. + log_frequency_in_ms = LogFreq * 1000}}. handle_call({rtq_ttaaefs, QueueName, ReplEntries}, _From, State) -> {ApproachingLimit, QueueMap, QueueCountMap} = @@ -408,10 +408,15 @@ handle_cast({rtq_coordput, Bucket, ReplEntry}, State) -> queue_countmap = QueueCountMap}}. handle_info(log_queue, State) -> + Suppress = app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false), LogFun = - fun({QueueName, {P1Q, P2Q, P3Q}}) -> - lager:info("QueueName=~w has queue sizes p1=~w p2=~w p3=~w", - [QueueName, P1Q, P2Q, P3Q]) + fun({_QueueName, {0, 0, 0}}) when Suppress == true -> + ok; + ({QueueName, {P1Q, P2Q, P3Q}}) -> + lager:info( + [{queue_name, QueueName}, {p1q, P1Q}, {p2q, P2Q}, {p3q, P3Q}], + "QueueName=~w has queue sizes p1=~w p2=~w p3=~w", + [QueueName, P1Q, P2Q, P3Q]) end, lists:foreach(LogFun, State#state.queue_countmap), erlang:send_after(State#state.log_frequency_in_ms, self(), log_queue),