From 378c7627dc2b5d12b7d7632dac53473cef46316e Mon Sep 17 00:00:00 2001 From: Greg Rychlewski Date: Mon, 24 Jan 2022 11:18:34 -0500 Subject: [PATCH 1/4] Revert "Revert "Allow a given cache name to be reprepared (#144)"" This reverts commit 989111ef64a5f70eed7aeed26a85da43b14f39bb. --- lib/myxql.ex | 14 ++++++------ lib/myxql/connection.ex | 47 +++++++++++++++++++++++++++++++---------- test/myxql_test.exs | 38 ++++++++++++++++++++++++++------- 3 files changed, 72 insertions(+), 27 deletions(-) diff --git a/lib/myxql.ex b/lib/myxql.ex index 07f7048..9d796d3 100644 --- a/lib/myxql.ex +++ b/lib/myxql.ex @@ -213,13 +213,12 @@ defmodule MyXQL do ## Options - * `:query_type` - use `:binary` for binary protocol (prepared statements), `:binary_then_text` to attempt + * `:query_type` - Use `:binary` for binary protocol (prepared statements), `:binary_then_text` to attempt executing a binary query and if that fails fallback to executing a text query, and `:text` for text protocol - (default: `:binary`) + (default: `:binary`). - * `:cache_statement` - caches the query with the given name. Opposite to the `name` option - given to `prepare/4`, if the cache statement name is reused with a different, the previous - query is automatically closed + * `:cache_statement` - Caches the query with the given name. If the cache statement + name is reused with a different statement, the previous query is automatically closed. Options are passed to `DBConnection.execute/4` for text protocol, and `DBConnection.prepare_execute/4` for binary protocol. See their documentation for all available @@ -361,9 +360,8 @@ defmodule MyXQL do Prepares a query that returns a single result to be later executed. To execute the query, call `execute/4`. To close the query, call `close/3`. - If a name is given, the name must be unique per query, as the name is cached - but the statement isn't. If a new statement is given to an old name, the old - statement will be the one effectively used. + If a name is given, the name must be unique per query, as the name is cached. + If a new statement uses an old name, the old statement will be closed. ## Options diff --git a/lib/myxql/connection.ex b/lib/myxql/connection.ex index 86bd590..52aa573 100644 --- a/lib/myxql/connection.ex +++ b/lib/myxql/connection.ex @@ -479,8 +479,8 @@ defmodule MyXQL.Connection do defp maybe_reprepare(%{ref: ref} = query, %{last_ref: ref} = state), do: {:ok, query, state} defp maybe_reprepare(query, state) do - if cached_query = queries_get(state, query) do - {:ok, cached_query, state} + if query_member?(state, query) do + {:ok, query, state} else prepare(query, state) end @@ -523,14 +523,13 @@ defmodule MyXQL.Connection do defp queries_put(state, %{cache: :reference} = query) do %{ - num_params: num_params, statement_id: statement_id, ref: ref, name: name } = query try do - :ets.insert(state.queries, {name, {num_params, statement_id, ref}}) + :ets.insert(state.queries, {name, statement_id, ref}) rescue ArgumentError -> :ok @@ -549,7 +548,7 @@ defmodule MyXQL.Connection do } = query try do - :ets.insert(state.queries, {name, {statement, num_params, statement_id, ref}}) + :ets.insert(state.queries, {name, statement_id, ref, statement, num_params}) rescue ArgumentError -> :ok @@ -574,30 +573,56 @@ defmodule MyXQL.Connection do defp queries_get(%{queries: nil}, _), do: nil defp queries_get(_state, %{name: ""}), do: nil - defp queries_get(state, %{cache: :reference, name: name} = query) do + defp queries_get(state, %{cache: :reference, name: name}) do try do :ets.lookup_element(state.queries, name, 2) rescue ArgumentError -> nil else - {num_params, statement_id, ref} -> - %{query | num_params: num_params, statement_id: statement_id, ref: ref} + statement_id -> + Client.com_stmt_close(state.client, statement_id) + :ets.delete(state.queries, name) + nil end end defp queries_get(state, %{cache: :statement, name: name, statement: statement} = query) do try do - :ets.lookup_element(state.queries, name, 2) + :ets.lookup(state.queries, name) rescue ArgumentError -> nil else - {^statement, num_params, statement_id, ref} -> + # :statement query already prepared + [{_name, statement_id, ref, ^statement, num_params}] -> %{query | num_params: num_params, statement_id: statement_id, ref: ref} - {_statement, _num_params, statement_id, _ref} -> + [{_name, statement_id, _ref, _statement, _num_params}] -> + Client.com_stmt_close(state.client, statement_id) + :ets.delete(state.queries, name) + nil + + # :reference query already prepared + [{_name, statement_id, _ref}] -> Client.com_stmt_close(state.client, statement_id) :ets.delete(state.queries, name) nil + + [] -> + nil + end + end + + defp query_member?(%{queries: nil}, _), do: false + defp query_member?(_, %{name: ""}), do: false + + defp query_member?(%{queries: queries}, %{name: name, ref: ref}) do + try do + :ets.lookup_element(queries, name, 3) + rescue + ArgumentError -> false + else + ^ref -> true + _ -> false end end end diff --git a/test/myxql_test.exs b/test/myxql_test.exs index 84d0e66..1351fe8 100644 --- a/test/myxql_test.exs +++ b/test/myxql_test.exs @@ -243,18 +243,19 @@ defmodule MyXQLTest do end test "prepare and then execute with name", c do - {:ok, query} = MyXQL.prepare(c.conn, "foo", "SELECT ? * ?") + {:ok, query1} = MyXQL.prepare(c.conn, "foo", "SELECT ? * ?") - assert query.num_params == 2 - assert {:ok, _, result} = MyXQL.execute(c.conn, query, [2, 3]) + assert query1.num_params == 2 + assert {:ok, _, result} = MyXQL.execute(c.conn, query1, [2, 3]) assert result.rows == [[6]] - # If we prepare it again, it won't make a difference if the name is the same - {:ok, query} = MyXQL.prepare(c.conn, "foo", "SELECT ? + ?") + # If we prepare it again with the same name it will use the new statement + {:ok, query2} = MyXQL.prepare(c.conn, "foo", "SELECT ? + ?") - assert query.num_params == 2 - assert {:ok, _, result} = MyXQL.execute(c.conn, query, [2, 3]) - assert result.rows == [[6]] + assert query2.num_params == 2 + assert {:ok, _, result} = MyXQL.execute(c.conn, query2, [2, 3]) + assert result.rows == [[5]] + assert query1.ref != query2.ref end test "query is re-prepared if executed after being closed", c do @@ -297,6 +298,16 @@ defmodule MyXQLTest do assert query1.statement_id != query2.statement_id end + test "prepare and then query with the same name", c do + {:ok, _query} = MyXQL.prepare(c.conn, "foo", "SELECT 42") + {:ok, %{rows: [[24]]}} = MyXQL.query(c.conn, "SELECT 24", [], cache_statement: "foo") + end + + test "query and then prepare_execute with the same name", c do + {:ok, %{rows: [[24]]}} = MyXQL.query(c.conn, "SELECT 24", [], cache_statement: "foo") + {:ok, _query, %{rows: [[42]]}} = MyXQL.prepare_execute(c.conn, "foo", "SELECT 42") + end + # This test is just describing existing behaviour, we may want to change it in the future. test "prepared query is not re-prepared after schema change", c do MyXQL.query!(c.conn, "CREATE TABLE test_prepared_schema_change (x integer)") @@ -735,6 +746,17 @@ defmodule MyXQLTest do end end + test "switching between single and multiple result prepared statement", c do + assert %MyXQL.Queries{} = + multi_query = MyXQL.prepare_many!(c.conn, "test_name", "CALL multi_procedure()") + + assert [%MyXQL.Result{rows: [[1]]}, %MyXQL.Result{rows: [[2]]}] = + MyXQL.execute_many!(c.conn, multi_query) + + assert %MyXQL.Query{} = single_query = MyXQL.prepare!(c.conn, "test_name", "SELECT 42;") + assert %MyXQL.Result{rows: [[42]]} = MyXQL.execute!(c.conn, single_query) + end + test "close a multiple result prepared statement", c do assert %MyXQL.Queries{} = query = MyXQL.prepare_many!(c.conn, "", "CALL multi_procedure()") assert :ok == MyXQL.close(c.conn, query) From 46bdf098c9ee55d2787ea1a30653ab5826925ff3 Mon Sep 17 00:00:00 2001 From: Greg Rychlewski Date: Mon, 24 Jan 2022 12:06:46 -0500 Subject: [PATCH 2/4] Revert "Revert "Reuse prepared statements in `prepare: :unnamed` (#146)"" This reverts commit 919eabec4945e54dd388dd792e41e9043644330b. --- lib/myxql/connection.ex | 37 +++++++++++++++++++++++--------- test/myxql/sync_test.exs | 46 +++++++++++++++++++++++++++++++++++++++- test/myxql_test.exs | 18 +++++++++++++--- 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/lib/myxql/connection.ex b/lib/myxql/connection.ex index 52aa573..b18c5e4 100644 --- a/lib/myxql/connection.ex +++ b/lib/myxql/connection.ex @@ -13,7 +13,7 @@ defmodule MyXQL.Connection do prepare: :named, queries: nil, transaction_status: :idle, - last_ref: nil + last_query: nil ] @impl true @@ -29,7 +29,7 @@ defmodule MyXQL.Connection do prepare: prepare, disconnect_on_error_codes: Keyword.fetch!(opts, :disconnect_on_error_codes), ping_timeout: ping_timeout, - queries: queries_new() + queries: queries_new(prepare) } {:ok, state} @@ -64,7 +64,8 @@ defmodule MyXQL.Connection do query = rename_query(state, query) if cached_query = queries_get(state, query) do - {:ok, cached_query, %{state | last_ref: cached_query.ref}} + %{ref: ref, statement_id: statement_id} = cached_query + {:ok, cached_query, %{state | last_query: {ref, statement_id}}} else case prepare(query, state) do {:ok, _, _} = ok -> @@ -469,14 +470,21 @@ defmodule MyXQL.Connection do ref = make_ref() query = %{query | num_params: num_params, statement_id: statement_id, ref: ref} queries_put(state, query) - {:ok, query, %{state | last_ref: ref}} + {:ok, query, %{state | last_query: {ref, statement_id}}} result -> result(result, query, state) end end - defp maybe_reprepare(%{ref: ref} = query, %{last_ref: ref} = state), do: {:ok, query, state} + defp maybe_reprepare(%{ref: ref} = query, %{last_query: {ref, _statement_id}} = state) do + {:ok, query, state} + end + + defp maybe_reprepare(query, %{queries: nil, last_query: {_ref, statement_id}} = state) do + Client.com_stmt_close(state.client, statement_id) + prepare(query, state) + end defp maybe_reprepare(query, state) do if query_member?(state, query) do @@ -490,12 +498,16 @@ defmodule MyXQL.Connection do %{state | cursors: Map.delete(state.cursors, cursor.ref)} end - # Close unnamed queries after executing them + # When prepare is :named, close unnamed queries after executing them. + # When prepare is :unnamed, queries will be closed the next time a + # query is prepared or a different query is executed. This allows us + # to re-execute the same unnamed query without preparing it again. + defp maybe_close(_query, %{queries: nil} = state), do: {:ok, state} defp maybe_close(%{name: ""} = query, state), do: close(query, state) defp maybe_close(_query, state), do: {:ok, state} - defp close(%{ref: ref} = query, %{last_ref: ref} = state) do - close(query, %{state | last_ref: nil}) + defp close(%{ref: ref} = query, %{last_query: {ref, _statement_id}} = state) do + close(query, %{state | last_query: nil}) end defp close(query, state) do @@ -516,7 +528,8 @@ defmodule MyXQL.Connection do ## Cache query handling - defp queries_new(), do: :ets.new(__MODULE__, [:set, :public]) + defp queries_new(:unnamed), do: nil + defp queries_new(_), do: :ets.new(__MODULE__, [:set, :public]) defp queries_put(%{queries: nil}, _), do: :ok defp queries_put(_state, %{name: ""}), do: :ok @@ -570,7 +583,11 @@ defmodule MyXQL.Connection do end end - defp queries_get(%{queries: nil}, _), do: nil + defp queries_get(%{queries: nil, last_query: {_ref, statement_id}} = state, _query) do + Client.com_stmt_close(state.client, statement_id) + nil + end + defp queries_get(_state, %{name: ""}), do: nil defp queries_get(state, %{cache: :reference, name: name}) do diff --git a/test/myxql/sync_test.exs b/test/myxql/sync_test.exs index e17e46a..57da216 100644 --- a/test/myxql/sync_test.exs +++ b/test/myxql/sync_test.exs @@ -8,8 +8,36 @@ defmodule MyXQL.SyncTest do {:ok, conn} = MyXQL.start_link(@opts ++ [prepare: :unnamed]) assert prepared_stmt_count() == 0 + # Multiple queries do not increase statement count MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42") + assert prepared_stmt_count() == 1 + + MyXQL.query!(conn, "SELECT 43", [], cache_statement: "43") + assert prepared_stmt_count() == 1 + + # Multiple preparations don't increase statement count + {:ok, _query} = MyXQL.prepare(conn, "1", "SELECT 1") + assert prepared_stmt_count() == 1 + + {:ok, _query} = MyXQL.prepare(conn, "2", "SELECT 2") + assert prepared_stmt_count() == 1 + end + + test "do not leak statements with prepare: :named" do + {:ok, conn} = MyXQL.start_link(@opts) assert prepared_stmt_count() == 0 + + MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42") + assert prepared_stmt_count() == 1 + + MyXQL.query!(conn, "SELECT 1337", [], cache_statement: "1337") + assert prepared_stmt_count() == 2 + + MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42") + assert prepared_stmt_count() == 2 + + MyXQL.query!(conn, "SELECT 43", []) + assert prepared_stmt_count() == 2 end test "do not leak statements with rebound :cache_statement" do @@ -23,15 +51,31 @@ defmodule MyXQL.SyncTest do assert prepared_stmt_count() == 1 end - test "do not leak statements with insert and failed insert" do + test "do not leak statements with insert and failed insert with prepare: :named" do {:ok, conn} = MyXQL.start_link(@opts) assert prepared_stmt_count() == 0 + {:ok, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (1)") assert prepared_stmt_count() == 0 + {:error, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (1)") assert prepared_stmt_count() == 0 end + test "do not leak statements with insert and failed insert with prepare: :unnamed" do + {:ok, conn} = MyXQL.start_link(@opts ++ [prepare: :unnamed]) + assert prepared_stmt_count() == 0 + + {:ok, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (2)") + assert prepared_stmt_count() == 1 + + {:error, _} = MyXQL.query(conn, "INSERT INTO uniques(a) VALUES (2)") + assert prepared_stmt_count() == 1 + + MyXQL.query!(conn, "SELECT 123", [], cache_statement: "123") + assert prepared_stmt_count() == 1 + end + test "do not leak statements on multiple executions of the same name in prepare_execute" do {:ok, conn} = MyXQL.start_link(@opts) {:ok, _, _} = MyXQL.prepare_execute(conn, "foo", "SELECT 42") diff --git a/test/myxql_test.exs b/test/myxql_test.exs index 1351fe8..bdcb910 100644 --- a/test/myxql_test.exs +++ b/test/myxql_test.exs @@ -205,14 +205,26 @@ defmodule MyXQLTest do assert query == query3 end - test ":unnamed" do + test ":unnamed re-executes last query without preparing" do {:ok, pid} = MyXQL.start_link(@opts ++ [prepare: :unnamed]) {:ok, query} = MyXQL.prepare(pid, "1", "SELECT 1") {:ok, query2, _} = MyXQL.execute(pid, query, []) assert query == query2 {:ok, query3, _} = MyXQL.execute(pid, query, []) - assert query2.ref != query3.ref - assert query2.statement_id != query3.statement_id + assert query2 == query3 + end + + test ":unnamed re-prepares if last query is not the same" do + {:ok, pid} = MyXQL.start_link(@opts ++ [prepare: :unnamed]) + + {:ok, query1} = MyXQL.prepare(pid, "1", "SELECT 1") + {:ok, query2} = MyXQL.prepare(pid, "2", "SELECT 2") + + {:ok, query1b, _} = MyXQL.execute(pid, query1, []) + {:ok, query2b, _} = MyXQL.execute(pid, query2, []) + + assert query1 != query1b + assert query2 != query2b end test ":force_named" do From de038d54406a5a01c1589d8e1fffcd5851c06a03 Mon Sep 17 00:00:00 2001 From: Greg Rychlewski Date: Mon, 24 Jan 2022 12:09:32 -0500 Subject: [PATCH 3/4] fix cache for multi-node setup --- lib/myxql/connection.ex | 54 +++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/lib/myxql/connection.ex b/lib/myxql/connection.ex index b18c5e4..2661a63 100644 --- a/lib/myxql/connection.ex +++ b/lib/myxql/connection.ex @@ -63,7 +63,7 @@ defmodule MyXQL.Connection do def handle_prepare(query, opts, state) do query = rename_query(state, query) - if cached_query = queries_get(state, query) do + if cached_query = queries_get(:prepare, state, query) do %{ref: ref, statement_id: statement_id} = cached_query {:ok, cached_query, %{state | last_query: {ref, statement_id}}} else @@ -487,8 +487,8 @@ defmodule MyXQL.Connection do end defp maybe_reprepare(query, state) do - if query_member?(state, query) do - {:ok, query, state} + if cached_query = queries_get(:execute, state, query) do + {:ok, cached_query, state} else prepare(query, state) end @@ -536,13 +536,14 @@ defmodule MyXQL.Connection do defp queries_put(state, %{cache: :reference} = query) do %{ + num_params: num_params, statement_id: statement_id, ref: ref, name: name } = query try do - :ets.insert(state.queries, {name, statement_id, ref}) + :ets.insert(state.queries, {name, statement_id, ref, num_params}) rescue ArgumentError -> :ok @@ -583,14 +584,14 @@ defmodule MyXQL.Connection do end end - defp queries_get(%{queries: nil, last_query: {_ref, statement_id}} = state, _query) do + defp queries_get(_, %{queries: nil, last_query: {_ref, statement_id}} = state, _query) do Client.com_stmt_close(state.client, statement_id) nil end - defp queries_get(_state, %{name: ""}), do: nil + defp queries_get(_, _state, %{name: ""}), do: nil - defp queries_get(state, %{cache: :reference, name: name}) do + defp queries_get(:prepare, state, %{cache: :reference, name: name}) do try do :ets.lookup_element(state.queries, name, 2) rescue @@ -603,7 +604,28 @@ defmodule MyXQL.Connection do end end - defp queries_get(state, %{cache: :statement, name: name, statement: statement} = query) do + defp queries_get(:execute, state, %{cache: :reference, name: name} = query) do + try do + :ets.lookup(state.queries, name) + rescue + ArgumentError -> nil + else + # :reference query already prepared + [{_name, statement_id, ref, num_params}] -> + %{query | num_params: num_params, statement_id: statement_id, ref: ref} + + # :statement query already prepared + [{_name, statement_id, _ref, _statement, _num_params}] -> + Client.com_stmt_close(state.client, statement_id) + :ets.delete(state.queries, name) + nil + + [] -> + nil + end + end + + defp queries_get(_, state, %{cache: :statement, name: name, statement: statement} = query) do try do :ets.lookup(state.queries, name) rescue @@ -619,7 +641,7 @@ defmodule MyXQL.Connection do nil # :reference query already prepared - [{_name, statement_id, _ref}] -> + [{_name, statement_id, _ref, _num_params}] -> Client.com_stmt_close(state.client, statement_id) :ets.delete(state.queries, name) nil @@ -628,18 +650,4 @@ defmodule MyXQL.Connection do nil end end - - defp query_member?(%{queries: nil}, _), do: false - defp query_member?(_, %{name: ""}), do: false - - defp query_member?(%{queries: queries}, %{name: name, ref: ref}) do - try do - :ets.lookup_element(queries, name, 3) - rescue - ArgumentError -> false - else - ^ref -> true - _ -> false - end - end end From 372a1baef0f2afc85580ec4c5af7b88aebfbc7f5 Mon Sep 17 00:00:00 2001 From: Greg Rychlewski Date: Tue, 25 Jan 2022 01:54:39 -0500 Subject: [PATCH 4/4] leak test --- test/myxql/sync_test.exs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/myxql/sync_test.exs b/test/myxql/sync_test.exs index 57da216..7ddc181 100644 --- a/test/myxql/sync_test.exs +++ b/test/myxql/sync_test.exs @@ -134,6 +134,20 @@ defmodule MyXQL.SyncTest do assert prepared_stmt_count() == 1 end + test "do not leak when re-preparing a query and then executing the old query" do + {:ok, conn} = MyXQL.start_link(@opts) + assert prepared_stmt_count() == 0 + + {:ok, query} = MyXQL.prepare(conn, "foo", "SELECT 42") + assert prepared_stmt_count() == 1 + + {:ok, _} = MyXQL.prepare(conn, "foo", "SELECT 43") + assert prepared_stmt_count() == 1 + + {:ok, _, _} = MyXQL.execute(conn, query) + assert prepared_stmt_count() == 1 + end + defp prepared_stmt_count() do [%{"Value" => count}] = TestHelper.mysql!("show global status like 'Prepared_stmt_count'") String.to_integer(count)