Skip to content

Add Couch Stats Resource Tracker (CSRT) #5491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Apache CouchDB README
=====================


+---------+
| |1| |2| |
+---------+
Expand Down
30 changes: 30 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1119,3 +1119,33 @@ url = {{nouveau_url}}
;mem3_shards = true
;nouveau_index_manager = true
;dreyfus_index_manager = true

; Couch Stats Resource Tracker (CSRT)
[csrt]
enabled = true

; CSRT Rexi Server Init P tracking
; Enable these to enable additional metrics for RPC worker spawn rates
; Mod and Function are separated by double underscores
[csrt.init_p]
enabled = false
fabric_rpc__all_docs = true
fabric_rpc__changes = true
fabric_rpc__map_view = true
fabric_rpc__reduce_view = true
fabric_rpc__get_all_security = true
fabric_rpc__open_doc = true
fabric_rpc__update_docs = true
fabric_rpc__open_shard = true

;; CSRT dbname matchers
;; Given a dbname and a positive integer, this will enable an IO matcher
;; against the provided db for any requests that induce IO in quantities
;; greater than the provided threshold on any one of: ioq_calls, rows_read
;; docs_read, get_kp_node, get_kv_node, or changes_processed.
;;
[csrt_logger.dbnames_io]
;; foo = 100
;; _dbs = 123
;; _users = 234
;; foo/bar = 200
10 changes: 10 additions & 0 deletions src/chttpd/src/chttpd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ handle_request_int(MochiReq) ->
% Save client socket so that it can be monitored for disconnects
chttpd_util:mochiweb_client_req_set(MochiReq),

%% This is probably better in before_request, but having Path is nice
csrt:create_coordinator_context(HttpReq0, Path),
csrt:set_context_handler_fun(?MODULE, ?FUNCTION_NAME),

{HttpReq2, Response} =
case before_request(HttpReq0) of
{ok, HttpReq1} ->
Expand Down Expand Up @@ -369,6 +373,7 @@ handle_request_int(MochiReq) ->

before_request(HttpReq) ->
try
csrt:set_context_handler_fun(?MODULE, ?FUNCTION_NAME),
chttpd_stats:init(),
chttpd_plugin:before_request(HttpReq)
catch
Expand All @@ -388,6 +393,8 @@ after_request(HttpReq, HttpResp0) ->
HttpResp2 = update_stats(HttpReq, HttpResp1),
chttpd_stats:report(HttpReq, HttpResp2),
maybe_log(HttpReq, HttpResp2),
%% NOTE: do not set_context_handler_fun to preserve the Handler
csrt:destroy_context(),
HttpResp2.

process_request(#httpd{mochi_req = MochiReq} = HttpReq) ->
Expand All @@ -400,6 +407,7 @@ process_request(#httpd{mochi_req = MochiReq} = HttpReq) ->
RawUri = MochiReq:get(raw_path),

try
csrt:set_context_handler_fun(?MODULE, ?FUNCTION_NAME),
couch_httpd:validate_host(HttpReq),
check_request_uri_length(RawUri),
check_url_encoding(RawUri),
Expand All @@ -425,10 +433,12 @@ handle_req_after_auth(HandlerKey, HttpReq) ->
HandlerKey,
fun chttpd_db:handle_request/1
),
csrt:set_context_handler_fun(HandlerFun),
AuthorizedReq = chttpd_auth:authorize(
possibly_hack(HttpReq),
fun chttpd_auth_request:authorize_request/1
),
csrt:set_context_username(AuthorizedReq),
{AuthorizedReq, HandlerFun(AuthorizedReq)}
catch
ErrorType:Error:Stack ->
Expand Down
2 changes: 2 additions & 0 deletions src/chttpd/src/chttpd_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

% Database request handlers
handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) ->
csrt:set_context_dbname(DbName),
case {Method, RestParts} of
{'PUT', []} ->
create_db_req(Req, DbName);
Expand All @@ -103,6 +104,7 @@ handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req)
do_db_req(Req, fun db_req/2);
{_, [SecondPart | _]} ->
Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
csrt:set_context_handler_fun(Handler),
do_db_req(Req, Handler)
end.

Expand Down
1 change: 1 addition & 0 deletions src/chttpd/src/chttpd_httpd_handlers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1;
url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
Expand Down
107 changes: 107 additions & 0 deletions src/chttpd/src/chttpd_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
handle_replicate_req/1,
handle_reload_query_servers_req/1,
handle_task_status_req/1,
handle_resource_status_req/1,
handle_up_req/1,
handle_utils_dir_req/1,
handle_utils_dir_req/2,
Expand Down Expand Up @@ -219,6 +220,112 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").

handle_resource_status_req(#httpd{method = 'POST'} = Req) ->
ok = chttpd:verify_is_server_admin(Req),
chttpd:validate_ctype(Req, "application/json"),
{Props} = chttpd:json_body_obj(Req),
Action = proplists:get_value(<<"action">>, Props),
Key = proplists:get_value(<<"key">>, Props),
Val = proplists:get_value(<<"val">>, Props),

CountBy = fun csrt:count_by/1,
GroupBy = fun csrt:group_by/2,
SortedBy1 = fun csrt:sorted_by/1,
SortedBy2 = fun csrt:sorted_by/2,
ConvertEle = fun erlang:binary_to_existing_atom/1,
ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end,
ToJson = fun csrt_util:to_json/1,
JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end,

Fun =
case {Action, Key, Val} of
{<<"count_by">>, Keys, undefined} when is_list(Keys) ->
Keys1 = [ConvertEle(K) || K <- Keys],
fun() -> CountBy(Keys1) end;
{<<"count_by">>, Key, undefined} ->
Key1 = ConvertEle(Key),
fun() -> CountBy(Key1) end;
{<<"group_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) ->
Keys1 = ConvertList(Keys),
Vals1 = ConvertList(Vals),
fun() -> GroupBy(Keys1, Vals1) end;
{<<"group_by">>, Key, Vals} when is_list(Vals) ->
Key1 = ConvertEle(Key),
Vals1 = ConvertList(Vals),
fun() -> GroupBy(Key1, Vals1) end;
{<<"group_by">>, Keys, Val} when is_list(Keys) ->
Keys1 = ConvertList(Keys),
Val1 = ConvertEle(Val),
fun() -> GroupBy(Keys1, Val1) end;
{<<"group_by">>, Key, Val} ->
Key1 = ConvertEle(Key),
Val1 = ConvertList(Val),
fun() -> GroupBy(Key1, Val1) end;
{<<"sorted_by">>, Key, undefined} ->
Key1 = ConvertEle(Key),
fun() -> JsonKeys(SortedBy1(Key1)) end;
{<<"sorted_by">>, Keys, undefined} when is_list(Keys) ->
Keys1 = [ConvertEle(K) || K <- Keys],
fun() -> JsonKeys(SortedBy1(Keys1)) end;
{<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) ->
Keys1 = ConvertList(Keys),
Vals1 = ConvertList(Vals),
fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end;
{<<"sorted_by">>, Key, Vals} when is_list(Vals) ->
Key1 = ConvertEle(Key),
Vals1 = ConvertList(Vals),
fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end;
{<<"sorted_by">>, Keys, Val} when is_list(Keys) ->
Keys1 = ConvertList(Keys),
Val1 = ConvertEle(Val),
fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end;
{<<"sorted_by">>, Key, Val} ->
Key1 = ConvertEle(Key),
Val1 = ConvertList(Val),
fun() -> JsonKeys(SortedBy2(Key1, Val1)) end;
_ ->
throw({badrequest, invalid_resource_request})
end,

Fun1 = fun() ->
case Fun() of
Map when is_map(Map) ->
{maps:fold(
fun
%% TODO: Skip 0 value entries?
(_K, 0, A) -> A;
(K, V, A) -> [{ToJson(K), V} | A]
end,
[],
Map
)};
List when is_list(List) ->
List
end
end,

{Resp, _Bad} = rpc:multicall(erlang, apply, [
fun() ->
{node(), Fun1()}
end,
[]
]),
%%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]),
send_json(Req, {Resp});
handle_resource_status_req(#httpd{method = 'GET'} = Req) ->
ok = chttpd:verify_is_server_admin(Req),
{Resp, Bad} = rpc:multicall(erlang, apply, [
fun() ->
{node(), csrt:active()}
end,
[]
]),
%% TODO: incorporate Bad responses
send_json(Req, {Resp});
handle_resource_status_req(Req) ->
ok = chttpd:verify_is_server_admin(Req),
send_method_not_allowed(Req, "GET,HEAD,POST").

handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) ->
chttpd:validate_ctype(Req, "application/json"),
%% see HACK in chttpd.erl about replication
Expand Down
7 changes: 4 additions & 3 deletions src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

setup() ->
Hashed = couch_passwords:hash_admin_password(?PASS),
ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist = false),
ok = config:set("couchdb", "max_document_size", "50"),
ok = config:set("admins", ?USER, ?b2l(Hashed), false),
ok = config:set("couchdb", "max_document_size", "50", false),

TmpDb = ?tempdb(),
Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
Port = mochiweb_socket_server:get(chttpd, port),
Expand All @@ -35,7 +36,7 @@ setup() ->

teardown(Url) ->
delete_db(Url),
ok = config:delete("admins", ?USER, _Persist = false),
ok = config:delete("admins", ?USER, false),
ok = config:delete("couchdb", "max_document_size").

create_db(Url) ->
Expand Down
2 changes: 2 additions & 0 deletions src/config/src/config_listener_mon.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
-module(config_listener_mon).
-behaviour(gen_server).

-dialyzer({nowarn_function, init/1}).

-export([
subscribe/2,
start_link/2
Expand Down
2 changes: 2 additions & 0 deletions src/couch/include/couch_db.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
-define(INTERACTIVE_EDIT, interactive_edit).
-define(REPLICATED_CHANGES, replicated_changes).

-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])).

-type branch() :: {Key::term(), Value::term(), Tree::term()}.
-type path() :: {Start::pos_integer(), branch()}.
-type update_type() :: replicated_changes | interactive_edit.
Expand Down
33 changes: 33 additions & 0 deletions src/couch/priv/stats_descriptions.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@
{type, counter},
{desc, <<"number of couch_server LRU operations skipped">>}
]}.
{[couchdb, couch_server, open], [
{type, counter},
{desc, <<"number of couch_server open operations invoked">>}
]}.
{[couchdb, query_server, vdu_rejects], [
{type, counter},
{desc, <<"number of rejections by validate_doc_update function">>}
Expand Down Expand Up @@ -418,10 +422,39 @@
{type, counter},
{desc, <<"number of other requests">>}
]}.
{[couchdb, query_server, volume, ddoc_filter], [
{type, counter},
{desc, <<"number of docs filtered by ddoc filters">>}
]}.
{[couchdb, legacy_checksums], [
{type, counter},
{desc, <<"number of legacy checksums found in couch_file instances">>}
]}.
{[couchdb, btree, folds], [
{type, counter},
{desc, <<"number of couch btree kv fold callback invocations">>}
]}.
{[couchdb, btree, get_node, kp_node], [
{type, counter},
{desc, <<"number of couch btree kp_nodes read">>}
]}.
{[couchdb, btree, get_node, kv_node], [
{type, counter},
{desc, <<"number of couch btree kv_nodes read">>}
]}.
{[couchdb, btree, write_node, kp_node], [
{type, counter},
{desc, <<"number of couch btree kp_nodes written">>}
]}.
{[couchdb, btree, write_node, kv_node], [
{type, counter},
{desc, <<"number of couch btree kv_nodes written">>}
]}.
%% CSRT (couch_stats_resource_tracker) stats
{[couchdb, csrt, delta_missing_t0], [
{type, counter},
{desc, <<"number of csrt contexts without a proper startime">>}
]}.
{[pread, exceed_eof], [
{type, counter},
{desc, <<"number of the attempts to read beyond end of db file">>}
Expand Down
3 changes: 3 additions & 0 deletions src/couch/src/couch_btree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->

get_node(#btree{fd = Fd}, NodePos) ->
{ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
%% TODO: wire in csrt tracking
couch_stats:increment_counter([couchdb, btree, get_node, NodeType]),
{NodeType, NodeList}.

write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
Expand All @@ -480,6 +482,7 @@ write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
% now write out each chunk and return the KeyPointer pairs for those nodes
ToWrite = [{NodeType, Chunk} || Chunk <- Chunks],
WriteOpts = [{compression, Comp}],
couch_stats:increment_counter([couchdb, btree, write_node, NodeType]),
{ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
{ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}.

Expand Down
2 changes: 2 additions & 0 deletions src/couch/src/couch_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).

open_doc(Db, Id, Options) ->
%% TODO: wire in csrt tracking
increment_stat(Db, [couchdb, database_reads]),
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted = true} = Doc} ->
Expand Down Expand Up @@ -1987,6 +1988,7 @@ increment_stat(#db{options = Options}, Stat, Count) when
->
case lists:member(sys_db, Options) of
true ->
%% TODO: we shouldn't leak resource usage just because it's a sys_db
ok;
false ->
couch_stats:increment_counter(Stat, Count)
Expand Down
11 changes: 9 additions & 2 deletions src/couch/src/couch_os_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ bump_cmd_time_stat(Cmd, USec) when is_list(Cmd), is_integer(USec) ->
bump_time_stat(ddoc_new, USec);
[<<"ddoc">>, _, [<<"validate_doc_update">> | _] | _] ->
bump_time_stat(ddoc_vdu, USec);
[<<"ddoc">>, _, [<<"filters">> | _] | _] ->
bump_time_stat(ddoc_filter, USec);
[<<"ddoc">>, _, [<<"filters">> | _], [Docs | _] | _] ->
bump_time_stat(ddoc_filter, USec),
bump_volume_stat(ddoc_filter, Docs);
[<<"ddoc">> | _] ->
bump_time_stat(ddoc_other, USec);
_ ->
Expand All @@ -258,6 +259,12 @@ bump_time_stat(Stat, USec) when is_atom(Stat), is_integer(USec) ->
couch_stats:increment_counter([couchdb, query_server, calls, Stat]),
couch_stats:increment_counter([couchdb, query_server, time, Stat], USec).

bump_volume_stat(ddoc_filter = Stat, Docs) when is_atom(Stat), is_list(Docs) ->
couch_stats:increment_counter([couchdb, query_server, volume, Stat], length(Docs));
bump_volume_stat(_, _) ->
%% TODO: handle other stats?
ok.

log_level("debug") ->
debug;
log_level("info") ->
Expand Down
Loading