diff --git a/src/marina.app.src b/src/marina.app.src index 6ffb2f0..56f0889 100644 --- a/src/marina.app.src +++ b/src/marina.app.src @@ -1,6 +1,6 @@ {application, marina, [ {description, "cassandra client"}, - {vsn, "0.2.6"}, + {vsn, "0.2.7"}, {registered, []}, {applications, [ kernel, diff --git a/src/marina.erl b/src/marina.erl index 6be0dfc..f259f6f 100644 --- a/src/marina.erl +++ b/src/marina.erl @@ -83,12 +83,12 @@ execute(StatementId, ConsistencyLevel, Flags, Timeout) -> {ok, term()} | {error, term()}. execute(StatementId, Values, ConsistencyLevel, Flags, Timeout) -> - response(call({execute, StatementId, Values, ConsistencyLevel, Flags}, Timeout)). + call({execute, StatementId, Values, ConsistencyLevel, Flags}, Timeout). -spec prepare(query(), timeout()) -> {ok, term()} | {error, term()}. prepare(Query, Timeout) -> - response(call({prepare, Query}, Timeout)). + call({prepare, Query}, Timeout). -spec query(query(), consistency(), [flag()], timeout()) -> {ok, term()} | {error, term()}. @@ -100,14 +100,18 @@ query(Query, ConsistencyLevel, Flags, Timeout) -> {ok, term()} | {error, term()}. query(Query, Values, ConsistencyLevel, Flags, Timeout) -> - response(call({query, Query, Values, ConsistencyLevel, Flags}, Timeout)). + call({query, Query, Values, ConsistencyLevel, Flags}, Timeout). -spec receive_response(reference(), non_neg_integer()) -> {ok, term()} | {error, term()}. receive_response(Ref, Timeout) -> + Timestamp = os:timestamp(), receive {?APP, Ref, Reply} -> - response(Reply) + response(Reply); + {?APP, _, _} -> + Timeout2 = timeout(Timeout, Timestamp), + receive_response(Ref, Timeout2) after Timeout -> {error, timeout} end. @@ -160,19 +164,14 @@ async_call(Msg, Pid) -> true -> Server ! {call, Ref, Pid, Msg}, {ok, Ref}; - _ -> + false -> {error, backlog_full} end. call(Msg, Timeout) -> case async_call(Msg, self()) of {ok, Ref} -> - receive - {?APP, Ref, Reply} -> - Reply - after Timeout -> - {error, timeout} - end; + receive_response(Ref, Timeout); {error, Reason} -> {error, Reason} end. @@ -181,3 +180,7 @@ random_server() -> PoolSize = application:get_env(?APP, pool_size, ?DEFAULT_POOL_SIZE), Random = erlang:phash2({os:timestamp(), self()}, PoolSize) + 1, marina_utils:child_name(Random). + +timeout(Timeout, Timestamp) -> + Diff = timer:now_diff(os:timestamp(), Timestamp) div 1000, + Timeout - Diff.