Skip to content

Commit

Permalink
Merge pull request #8 from lpgauth/dev
Browse files Browse the repository at this point in the history
0.2.2
  • Loading branch information
lpgauth committed Apr 7, 2015
2 parents 48e0481 + 9572b9a commit c77e606
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 20 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{deps, [
{edown, ".*", {git, "https://github.com/esl/edown.git", "HEAD"}},
{lz4, ".*", {git, "https://github.com/lpgauth/erlang-lz4.git", {branch, "marina"}}},
{lz4, ".*", {git, "https://github.com/szktty/erlang-lz4.git", "0572b0ea2a"}},
{timing, ".*", {git, "https://github.com/lpgauth/timing.git", {branch, "master"}}}
]}.

Expand Down
2 changes: 1 addition & 1 deletion src/marina.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, marina, [
{description, "cassandra client"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{applications, [
kernel,
Expand Down
14 changes: 7 additions & 7 deletions src/marina.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,43 @@

%% public
-spec async_execute(statement_id(), consistency(), [flag()], pid()) ->
{ok, erlang:ref()} | {error, backlog_full}.
{ok, reference()} | {error, backlog_full}.

async_execute(StatementId, ConsistencyLevel, Flags, Pid) ->
async_execute(StatementId, [], ConsistencyLevel, Flags, Pid).

-spec async_execute(statement_id(), [value()], consistency(), [flag()], pid()) ->
{ok, erlang:ref()} | {error, backlog_full}.
{ok, reference()} | {error, backlog_full}.

async_execute(StatementId, Values, ConsistencyLevel, Flags, Pid) ->
async_call({execute, StatementId, Values, ConsistencyLevel, Flags}, Pid).

-spec async_prepare(query(), pid()) ->
{ok, erlang:ref()} | {error, backlog_full}.
{ok, reference()} | {error, backlog_full}.

async_prepare(Query, Pid) ->
async_call({prepare, Query}, Pid).

-spec async_query(query(), consistency(), [flag()], pid()) ->
{ok, erlang:ref()} | {error, backlog_full}.
{ok, reference()} | {error, backlog_full}.

async_query(Query, ConsistencyLevel, Flags, Pid) ->
async_query(Query, [], ConsistencyLevel, Flags, Pid).

-spec async_query(query(), [value()], consistency(), [flag()], pid()) ->
{ok, erlang:ref()} | {error, backlog_full}.
{ok, reference()} | {error, backlog_full}.

async_query(Query, Values, ConsistencyLevel, Flags, Pid) ->
async_call({query, Query, Values, ConsistencyLevel, Flags}, Pid).

-spec async_reusable_query(query(), consistency(), [flag()], pid(), timeout()) ->
{ok, erlang:ref()} | {error, term()}.
{ok, reference()} | {error, term()}.

async_reusable_query(Query, ConsistencyLevel, Flags, Pid, Timeout) ->
async_reusable_query(Query, [], ConsistencyLevel, Flags, Pid, Timeout).

-spec async_reusable_query(query(), [value()], consistency(), [flag()], pid(), timeout()) ->
{ok, erlang:ref()} | {error, term()}.
{ok, reference()} | {error, term()}.

async_reusable_query(Query, Values, ConsistencyLevel, Flags, Pid, Timeout) ->
case marina_cache:get(Query) of
Expand Down
4 changes: 2 additions & 2 deletions src/marina_frame.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pending_size(_) ->
decode_body(0, Body) ->
Body;
decode_body(1, Body) ->
{ok, Body2} = lz4:unpack(Body),
{ok, Body2} = marina_utils:unpack(Body),
Body2.

decode(<<1:1, ?PROTO_VERSION:7/unsigned-integer, Flags:8/unsigned-integer,
Expand All @@ -70,5 +70,5 @@ decode(Rest, Acc) ->
encode_body(0, Body) ->
Body;
encode_body(1, Body) ->
{ok, Body2} = lz4:pack(iolist_to_binary(Body)),
{ok, Body2} = marina_utils:pack(Body),
Body2.
11 changes: 5 additions & 6 deletions src/marina_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ prepare(Stream, FrameFlags, Query) ->
stream = Stream,
opcode = ?OP_PREPARE,
flags = frame_flags(FrameFlags),
body = [<<(marina_types:encode_long_string(Query))/binary>>]
body = [marina_types:encode_long_string(Query)]
}).

-spec query(stream(), [frame_flag()], query(), [value()], consistency(), [flag()]) -> iolist().
Expand All @@ -41,9 +41,9 @@ query(Stream, FrameFlags, Query, Values, ConsistencyLevel, Flags) ->
stream = Stream,
opcode = ?OP_QUERY,
flags = frame_flags(FrameFlags),
body = [<<(marina_types:encode_long_string(Query))/binary,
(marina_types:encode_short(ConsistencyLevel))/binary, Flags2,
Values2/binary>>]
body = [marina_types:encode_long_string(Query),
marina_types:encode_short(ConsistencyLevel), Flags2,
Values2]
}).

-spec startup([frame_flag()]) -> iolist().
Expand Down Expand Up @@ -77,8 +77,7 @@ flags_and_values(Flags, Values) ->
Flags2 = flags([{values, true} | Flags]),
ValuesCount = length(Values),
EncodedValues = [marina_types:encode_bytes(Value) || Value <- Values],
Values2 = <<(marina_types:encode_short(ValuesCount))/binary,
(iolist_to_binary(EncodedValues))/binary>>,
Values2 = [marina_types:encode_short(ValuesCount), EncodedValues],
{Flags2, Values2}.

frame_flags([]) ->
Expand Down
2 changes: 1 addition & 1 deletion src/marina_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ init([]) ->
marina_queue:init(),

{ok, {{one_for_one, 5, 10},
marina_utils:childs_specs()
marina_utils:child_specs()
}}.
33 changes: 31 additions & 2 deletions src/marina_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,61 @@
-include("marina.hrl").

-export([
childs_specs/0,
child_specs/0,
child_name/1,
info_msg/2,
pack/1,
timeout/2,
unpack/1,
warning_msg/2

]).

%% public
childs_specs() ->
-spec child_specs() -> [supervisor:child_spec()].

child_specs() ->
PoolSize = application:get_env(?APP, pool_size, ?DEFAULT_POOL_SIZE),
[child_spec(N) || N <- lists:seq(1, PoolSize)].

-spec child_name(integer()) -> atom().

child_name(N) ->
list_to_atom(?SERVER_BASE_NAME ++ integer_to_list(N)).

-spec info_msg(string(), [term()]) -> ok.

info_msg(Format, Data) ->
error_logger:info_msg(Format, Data).

-spec pack(binary() | iolist()) -> {ok, binary()} | {error, term()}.

pack(Iolist) when is_list(Iolist) ->
pack(iolist_to_binary(Iolist));
pack(Binary) ->
case lz4:compress(Binary, []) of
{ok, Compressed} ->
{ok, <<(size(Binary)):32/unsigned-integer, Compressed/binary>>};
Error ->
Error
end.

-spec timeout(erlang:timestamp(), non_neg_integer()) -> non_neg_integer().

timeout(Timestamp, Timeout) ->
Diff = timer:now_diff(os:timestamp(), Timestamp) div 1000,
case Timeout - Diff of
X when X < 0 -> 0;
X -> X
end.

-spec unpack(binary()) -> {ok, binary()} | {error, term()}.

unpack(<<Size:32/unsigned-integer, Binary/binary>>) ->
lz4:uncompress(Binary, Size).

-spec warning_msg(string(), [term()]) -> ok.

warning_msg(Format, Data) ->
error_logger:warning_msg(Format, Data).

Expand Down

0 comments on commit c77e606

Please sign in to comment.