Skip to content

Cache issues #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions lib/myxql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,12 @@ defmodule MyXQL do

## Options

* `:query_type` - use `:binary` for binary protocol (prepared statements), `:binary_then_text` to attempt
* `:query_type` - Use `:binary` for binary protocol (prepared statements), `:binary_then_text` to attempt
executing a binary query and if that fails fallback to executing a text query, and `:text` for text protocol
(default: `:binary`)
(default: `:binary`).

* `:cache_statement` - caches the query with the given name. Opposite to the `name` option
given to `prepare/4`, if the cache statement name is reused with a different, the previous
query is automatically closed
* `:cache_statement` - Caches the query with the given name. If the cache statement
name is reused with a different statement, the previous query is automatically closed.

Options are passed to `DBConnection.execute/4` for text protocol, and
`DBConnection.prepare_execute/4` for binary protocol. See their documentation for all available
Expand Down Expand Up @@ -361,9 +360,8 @@ defmodule MyXQL do
Prepares a query that returns a single result to be later executed.

To execute the query, call `execute/4`. To close the query, call `close/3`.
If a name is given, the name must be unique per query, as the name is cached
but the statement isn't. If a new statement is given to an old name, the old
statement will be the one effectively used.
If a name is given, the name must be unique per query, as the name is cached.
If a new statement uses an old name, the old statement will be closed.

## Options

Expand Down
92 changes: 71 additions & 21 deletions lib/myxql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule MyXQL.Connection do
prepare: :named,
queries: nil,
transaction_status: :idle,
last_ref: nil
last_query: nil
]

@impl true
Expand All @@ -29,7 +29,7 @@ defmodule MyXQL.Connection do
prepare: prepare,
disconnect_on_error_codes: Keyword.fetch!(opts, :disconnect_on_error_codes),
ping_timeout: ping_timeout,
queries: queries_new()
queries: queries_new(prepare)
}

{:ok, state}
Expand Down Expand Up @@ -63,8 +63,9 @@ defmodule MyXQL.Connection do
def handle_prepare(query, opts, state) do
query = rename_query(state, query)

if cached_query = queries_get(state, query) do
{:ok, cached_query, %{state | last_ref: cached_query.ref}}
if cached_query = queries_get(:prepare, state, query) do
%{ref: ref, statement_id: statement_id} = cached_query
{:ok, cached_query, %{state | last_query: {ref, statement_id}}}
else
case prepare(query, state) do
{:ok, _, _} = ok ->
Expand Down Expand Up @@ -469,17 +470,24 @@ defmodule MyXQL.Connection do
ref = make_ref()
query = %{query | num_params: num_params, statement_id: statement_id, ref: ref}
queries_put(state, query)
{:ok, query, %{state | last_ref: ref}}
{:ok, query, %{state | last_query: {ref, statement_id}}}

result ->
result(result, query, state)
end
end

defp maybe_reprepare(%{ref: ref} = query, %{last_ref: ref} = state), do: {:ok, query, state}
defp maybe_reprepare(%{ref: ref} = query, %{last_query: {ref, _statement_id}} = state) do
{:ok, query, state}
end

defp maybe_reprepare(query, %{queries: nil, last_query: {_ref, statement_id}} = state) do
Client.com_stmt_close(state.client, statement_id)
prepare(query, state)
end

defp maybe_reprepare(query, state) do
if cached_query = queries_get(state, query) do
if cached_query = queries_get(:execute, state, query) do
{:ok, cached_query, state}
else
prepare(query, state)
Expand All @@ -490,12 +498,16 @@ defmodule MyXQL.Connection do
%{state | cursors: Map.delete(state.cursors, cursor.ref)}
end

# Close unnamed queries after executing them
# When prepare is :named, close unnamed queries after executing them.
# When prepare is :unnamed, queries will be closed the next time a
# query is prepared or a different query is executed. This allows us
# to re-execute the same unnamed query without preparing it again.
defp maybe_close(_query, %{queries: nil} = state), do: {:ok, state}
defp maybe_close(%{name: ""} = query, state), do: close(query, state)
defp maybe_close(_query, state), do: {:ok, state}

defp close(%{ref: ref} = query, %{last_ref: ref} = state) do
close(query, %{state | last_ref: nil})
defp close(%{ref: ref} = query, %{last_query: {ref, _statement_id}} = state) do
close(query, %{state | last_query: nil})
end

defp close(query, state) do
Expand All @@ -516,7 +528,8 @@ defmodule MyXQL.Connection do

## Cache query handling

defp queries_new(), do: :ets.new(__MODULE__, [:set, :public])
defp queries_new(:unnamed), do: nil
defp queries_new(_), do: :ets.new(__MODULE__, [:set, :public])

defp queries_put(%{queries: nil}, _), do: :ok
defp queries_put(_state, %{name: ""}), do: :ok
Expand All @@ -530,7 +543,7 @@ defmodule MyXQL.Connection do
} = query

try do
:ets.insert(state.queries, {name, {num_params, statement_id, ref}})
:ets.insert(state.queries, {name, statement_id, ref, num_params})
rescue
ArgumentError ->
:ok
Expand All @@ -549,7 +562,7 @@ defmodule MyXQL.Connection do
} = query

try do
:ets.insert(state.queries, {name, {statement, num_params, statement_id, ref}})
:ets.insert(state.queries, {name, statement_id, ref, statement, num_params})
rescue
ArgumentError ->
:ok
Expand All @@ -571,33 +584,70 @@ defmodule MyXQL.Connection do
end
end

defp queries_get(%{queries: nil}, _), do: nil
defp queries_get(_state, %{name: ""}), do: nil
defp queries_get(_, %{queries: nil, last_query: {_ref, statement_id}} = state, _query) do
Client.com_stmt_close(state.client, statement_id)
nil
end

defp queries_get(state, %{cache: :reference, name: name} = query) do
defp queries_get(_, _state, %{name: ""}), do: nil

defp queries_get(:prepare, state, %{cache: :reference, name: name}) do
try do
:ets.lookup_element(state.queries, name, 2)
rescue
ArgumentError -> nil
else
{num_params, statement_id, ref} ->
statement_id ->
Client.com_stmt_close(state.client, statement_id)
:ets.delete(state.queries, name)
nil
end
end

defp queries_get(:execute, state, %{cache: :reference, name: name} = query) do
try do
:ets.lookup(state.queries, name)
rescue
ArgumentError -> nil
else
# :reference query already prepared
[{_name, statement_id, ref, num_params}] ->
%{query | num_params: num_params, statement_id: statement_id, ref: ref}

# :statement query already prepared
[{_name, statement_id, _ref, _statement, _num_params}] ->
Client.com_stmt_close(state.client, statement_id)
:ets.delete(state.queries, name)
nil

[] ->
nil
end
end

defp queries_get(state, %{cache: :statement, name: name, statement: statement} = query) do
defp queries_get(_, state, %{cache: :statement, name: name, statement: statement} = query) do
try do
:ets.lookup_element(state.queries, name, 2)
:ets.lookup(state.queries, name)
rescue
ArgumentError -> nil
else
{^statement, num_params, statement_id, ref} ->
# :statement query already prepared
[{_name, statement_id, ref, ^statement, num_params}] ->
%{query | num_params: num_params, statement_id: statement_id, ref: ref}

{_statement, _num_params, statement_id, _ref} ->
[{_name, statement_id, _ref, _statement, _num_params}] ->
Client.com_stmt_close(state.client, statement_id)
:ets.delete(state.queries, name)
nil

# :reference query already prepared
[{_name, statement_id, _ref, _num_params}] ->
Client.com_stmt_close(state.client, statement_id)
:ets.delete(state.queries, name)
nil

[] ->
nil
end
end
end
60 changes: 59 additions & 1 deletion test/myxql/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,36 @@ defmodule MyXQL.SyncTest do
{:ok, conn} = MyXQL.start_link(@opts ++ [prepare: :unnamed])
assert prepared_stmt_count() == 0

# Multiple queries do not increase statement count
MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42")
assert prepared_stmt_count() == 1

MyXQL.query!(conn, "SELECT 43", [], cache_statement: "43")
assert prepared_stmt_count() == 1

# Multiple preparations don't increase statement count
{:ok, _query} = MyXQL.prepare(conn, "1", "SELECT 1")
assert prepared_stmt_count() == 1

{:ok, _query} = MyXQL.prepare(conn, "2", "SELECT 2")
assert prepared_stmt_count() == 1
end

test "do not leak statements with prepare: :named" do
{:ok, conn} = MyXQL.start_link(@opts)
assert prepared_stmt_count() == 0

MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42")
assert prepared_stmt_count() == 1

MyXQL.query!(conn, "SELECT 1337", [], cache_statement: "1337")
assert prepared_stmt_count() == 2

MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42")
assert prepared_stmt_count() == 2

MyXQL.query!(conn, "SELECT 43", [])
assert prepared_stmt_count() == 2
end

test "do not leak statements with rebound :cache_statement" do
Expand All @@ -23,15 +51,31 @@ defmodule MyXQL.SyncTest do
assert prepared_stmt_count() == 1
end

test "do not leak statements with insert and failed insert" do
test "do not leak statements with insert and failed insert with prepare: :named" do
{:ok, conn} = MyXQL.start_link(@opts)
assert prepared_stmt_count() == 0

{:ok, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (1)")
assert prepared_stmt_count() == 0

{:error, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (1)")
assert prepared_stmt_count() == 0
end

test "do not leak statements with insert and failed insert with prepare: :unnamed" do
{:ok, conn} = MyXQL.start_link(@opts ++ [prepare: :unnamed])
assert prepared_stmt_count() == 0

{:ok, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (2)")
assert prepared_stmt_count() == 1

{:error, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (2)")
assert prepared_stmt_count() == 1

MyXQL.query!(conn, "SELECT 123", [], cache_statement: "123")
assert prepared_stmt_count() == 1
end

test "do not leak statements on multiple executions of the same name in prepare_execute" do
{:ok, conn} = MyXQL.start_link(@opts)
{:ok, _, _} = MyXQL.prepare_execute(conn, "foo", "SELECT 42")
Expand Down Expand Up @@ -90,6 +134,20 @@ defmodule MyXQL.SyncTest do
assert prepared_stmt_count() == 1
end

test "do not leak when re-preparing a query and then executing the old query" do
{:ok, conn} = MyXQL.start_link(@opts)
assert prepared_stmt_count() == 0

{:ok, query} = MyXQL.prepare(conn, "foo", "SELECT 42")
assert prepared_stmt_count() == 1

{:ok, _} = MyXQL.prepare(conn, "foo", "SELECT 43")
assert prepared_stmt_count() == 1

{:ok, _, _} = MyXQL.execute(conn, query)
assert prepared_stmt_count() == 1
end

defp prepared_stmt_count() do
[%{"Value" => count}] = TestHelper.mysql!("show global status like 'Prepared_stmt_count'")
String.to_integer(count)
Expand Down
56 changes: 45 additions & 11 deletions test/myxql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,26 @@ defmodule MyXQLTest do
assert query == query3
end

test ":unnamed" do
test ":unnamed re-executes last query without preparing" do
{:ok, pid} = MyXQL.start_link(@opts ++ [prepare: :unnamed])
{:ok, query} = MyXQL.prepare(pid, "1", "SELECT 1")
{:ok, query2, _} = MyXQL.execute(pid, query, [])
assert query == query2
{:ok, query3, _} = MyXQL.execute(pid, query, [])
assert query2.ref != query3.ref
assert query2.statement_id != query3.statement_id
assert query2 == query3
end

test ":unnamed re-prepares if last query is not the same" do
{:ok, pid} = MyXQL.start_link(@opts ++ [prepare: :unnamed])

{:ok, query1} = MyXQL.prepare(pid, "1", "SELECT 1")
{:ok, query2} = MyXQL.prepare(pid, "2", "SELECT 2")

{:ok, query1b, _} = MyXQL.execute(pid, query1, [])
{:ok, query2b, _} = MyXQL.execute(pid, query2, [])

assert query1 != query1b
assert query2 != query2b
end

test ":force_named" do
Expand Down Expand Up @@ -243,18 +255,19 @@ defmodule MyXQLTest do
end

test "prepare and then execute with name", c do
{:ok, query} = MyXQL.prepare(c.conn, "foo", "SELECT ? * ?")
{:ok, query1} = MyXQL.prepare(c.conn, "foo", "SELECT ? * ?")

assert query.num_params == 2
assert {:ok, _, result} = MyXQL.execute(c.conn, query, [2, 3])
assert query1.num_params == 2
assert {:ok, _, result} = MyXQL.execute(c.conn, query1, [2, 3])
assert result.rows == [[6]]

# If we prepare it again, it won't make a difference if the name is the same
{:ok, query} = MyXQL.prepare(c.conn, "foo", "SELECT ? + ?")
# If we prepare it again with the same name it will use the new statement
{:ok, query2} = MyXQL.prepare(c.conn, "foo", "SELECT ? + ?")

assert query.num_params == 2
assert {:ok, _, result} = MyXQL.execute(c.conn, query, [2, 3])
assert result.rows == [[6]]
assert query2.num_params == 2
assert {:ok, _, result} = MyXQL.execute(c.conn, query2, [2, 3])
assert result.rows == [[5]]
assert query1.ref != query2.ref
end

test "query is re-prepared if executed after being closed", c do
Expand Down Expand Up @@ -297,6 +310,16 @@ defmodule MyXQLTest do
assert query1.statement_id != query2.statement_id
end

test "prepare and then query with the same name", c do
{:ok, _query} = MyXQL.prepare(c.conn, "foo", "SELECT 42")
{:ok, %{rows: [[24]]}} = MyXQL.query(c.conn, "SELECT 24", [], cache_statement: "foo")
end

test "query and then prepare_execute with the same name", c do
{:ok, %{rows: [[24]]}} = MyXQL.query(c.conn, "SELECT 24", [], cache_statement: "foo")
{:ok, _query, %{rows: [[42]]}} = MyXQL.prepare_execute(c.conn, "foo", "SELECT 42")
end

# This test is just describing existing behaviour, we may want to change it in the future.
test "prepared query is not re-prepared after schema change", c do
MyXQL.query!(c.conn, "CREATE TABLE test_prepared_schema_change (x integer)")
Expand Down Expand Up @@ -735,6 +758,17 @@ defmodule MyXQLTest do
end
end

test "switching between single and multiple result prepared statement", c do
assert %MyXQL.Queries{} =
multi_query = MyXQL.prepare_many!(c.conn, "test_name", "CALL multi_procedure()")

assert [%MyXQL.Result{rows: [[1]]}, %MyXQL.Result{rows: [[2]]}] =
MyXQL.execute_many!(c.conn, multi_query)

assert %MyXQL.Query{} = single_query = MyXQL.prepare!(c.conn, "test_name", "SELECT 42;")
assert %MyXQL.Result{rows: [[42]]} = MyXQL.execute!(c.conn, single_query)
end

test "close a multiple result prepared statement", c do
assert %MyXQL.Queries{} = query = MyXQL.prepare_many!(c.conn, "", "CALL multi_procedure()")
assert :ok == MyXQL.close(c.conn, query)
Expand Down