From 82ca27bc2537d3587867cca25fb207439fabdf00 Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Sat, 9 Oct 2021 20:01:43 -0400 Subject: [PATCH 1/5] Initial commit for recording datom changes --- src/asami/common_index.cljc | 21 +++++++++++++++++++++ src/asami/core.cljc | 33 +++++++++++++++++++++------------ src/asami/durable/graph.cljc | 12 ++++++++---- src/asami/durable/store.cljc | 12 +++++++----- src/asami/graph.cljc | 5 ++++- src/asami/index.cljc | 6 +++--- src/asami/memory.cljc | 12 +++++++----- src/asami/multi_graph.cljc | 6 +++--- src/asami/storage.cljc | 8 ++++++-- 9 files changed, 80 insertions(+), 35 deletions(-) diff --git a/src/asami/common_index.cljc b/src/asami/common_index.cljc index 6dca2ed..362c27b 100644 --- a/src/asami/common_index.cljc +++ b/src/asami/common_index.cljc @@ -10,6 +10,27 @@ and multigraph implementations." :cljs [schema.core :as s :include-macros true])) #?(:clj (:import [clojure.lang ITransientCollection]))) + +(defn graph-transact + "Common graph transaction operation" + [graph tx-id assertions retractions generated-data] + (let [[a r] @generated-data + asserts (transient a) + retracts (transient r) + new-graph (as-> graph gr + (reduce (fn [acc [s p o :as triple]] + (let [ad (graph-delete acc s p o)] + (when-not (identical? ad acc) (conj! retracts triple)) + ad)) + gr retractions) + (reduce (fn [acc [s p o :as triple]] + (let [aa (graph-add acc s p o tx-id)] + (when-not (identical? aa acc) (conj! asserts triple)) + aa)) + gr assertions))] + (vreset! generated-data [(persistent! asserts) (persistent! retracts)]) + new-graph)) + (defprotocol NestedIndex (lowest-level-fn [this] "Returns a function for handling the lowest index level retrieval") (lowest-level-sets-fn [this] "Returns a function retrieving all lowest level values as sets") diff --git a/src/asami/core.cljc b/src/asami/core.cljc index 195f6a1..b91b605 100644 --- a/src/asami/core.cljc +++ b/src/asami/core.cljc @@ -188,30 +188,39 @@ (fn [] (let [tx-id (storage/next-tx connection) as-datom (fn [assert? [e a v]] (->Datom e a v tx-id assert?)) + ;; function to convert assertions and retractions into a seq of datoms + build-datoms (fn [assertions retractions] + (concat + (map (partial as-datom false) retractions) + (map (partial as-datom true) assertions))) current-db (storage/db connection) ;; single maps should not be passed in, but if they are then wrap them seq-wrapper (fn [x] (if (map? x) [x] x)) - ;; a volatile to capture data for the user - generated-data (volatile! [tx-triples nil {}]) + ;; volatiles to capture data for the user + ;; This is to avoid passing parameters to functions that users may want to call directly + ;; and especially to avoid the difficulty of asking users to of return multiple structures + vtempids (volatile! {}) ;; volatile to capture the tempid map from built-triples + generated-data (volatile! [[] []]) ;; volatile to capture the asserted and retracted data in a transaction [db-before db-after] (if tx-triples ;; simple assertion of triples - (storage/transact-data connection (seq-wrapper tx-triples) nil) + (storage/transact-data connection generated-data (seq-wrapper tx-triples) nil) ;; a seq of statements and/or entities - ;; this generates triples and retractions inside a transaction - ;; capture this data to return to the user + ;; convert these to assertions/retractions and send to transaction + ;; also, capture tempids that are generated during conversion (storage/transact-data connection + generated-data (fn [graph] ;; building triples returns a tuple of assertions, retractions, tempids - (vreset! generated-data - (entities/build-triples graph (seq-wrapper (or tx-data tx-info))))))) + (let [[_ _ tempids :as result] + (entities/build-triples graph (seq-wrapper (or tx-data tx-info)))] + (vreset! vtempids tempids) + result)))) ;; pull out the info captured during the transaction - [triples retracts tempids] (deref generated-data)] + [triples retracts] (deref generated-data)] {:db-before db-before :db-after db-after - :tx-data (concat - (map (partial as-datom false) retracts) - (map (partial as-datom true) triples)) - :tempids tempids})))] + :tx-data (build-datoms triples retracts) + :tempids @vtempids})))] #?(:clj (CompletableFuture/supplyAsync (reify Supplier (get [_] (op))) (or executor clojure.lang.Agent/soloExecutor)) :cljs (let [d (delay (op))] diff --git a/src/asami/durable/graph.cljc b/src/asami/durable/graph.cljc index 83b128d..1380411 100644 --- a/src/asami/durable/graph.cljc +++ b/src/asami/durable/graph.cljc @@ -70,7 +70,9 @@ ;; The statement already existed. The pools SHOULD be identical, but check in case they're not (if (identical? pool new-pool) this - (assoc this :pool new-pool))))) + (do + (log/warn "A statement existed that used an element not found in the data pool") + (assoc this :pool new-pool)))))) (graph-delete [this subj pred obj] @@ -91,9 +93,11 @@ (graph-transact [this tx-id assertions retractions] - (as-> this graph - (reduce (fn [acc [s p o]] (graph/graph-delete acc s p o)) graph retractions) - (reduce (fn [acc [s p o]] (graph/graph-add acc s p o tx-id)) graph assertions))) + (common-index/graph-transact this tx-id assertions retractions (volatile! [[] [] {}]))) + + (graph-transact + [this tx-id assertions retractions generated-data] + (common-index/graph-transact this tx-id assertions retractions generated-data)) (graph-diff [this other] diff --git a/src/asami/durable/store.cljc b/src/asami/durable/store.cljc index 818a516..b56f50f 100644 --- a/src/asami/durable/store.cljc +++ b/src/asami/durable/store.cljc @@ -1,7 +1,7 @@ (ns ^{:doc "The implements the Block storage version of a Graph/Database/Connection" :author "Paula Gearon"} asami.durable.store - (:require [asami.storage :as storage :refer [ConnectionType DatabaseType]] + (:require [asami.storage :as storage :refer [ConnectionType DatabaseType UpdateData]] [asami.graph :as graph] [asami.internal :as i :refer [now instant? long-time]] [asami.durable.common :as common @@ -225,15 +225,17 @@ "Removes a series of tuples from the latest graph, and asserts new tuples into the graph. Updates the connection to the new graph." ([conn :- ConnectionType + updates! :- UpdateData asserts :- [Triple] ;; triples to insert retracts :- [Triple]] ;; triples to remove - (transact-update* conn (fn [graph tx-id] (graph/graph-transact graph tx-id asserts retracts)))) + (transact-update* conn (fn [graph tx-id] (graph/graph-transact graph tx-id asserts retracts updates!)))) ([conn :- ConnectionType + updates! :- UpdateData generator-fn] (transact-update* conn (fn [graph tx-id] (let [[asserts retracts] (generator-fn graph)] - (graph/graph-transact graph tx-id asserts retracts)))))) + (graph/graph-transact graph tx-id asserts retracts updates!)))))) (defrecord DurableConnection [name tx-manager grapha nodea lock] @@ -244,8 +246,8 @@ (delete-database [this] (delete-database* this)) (release [this] (release* this)) (transact-update [this update-fn] (transact-update* this update-fn)) - (transact-data [this asserts retracts] (transact-data* this asserts retracts)) - (transact-data [this generator-fn] (transact-data* this generator-fn)) + (transact-data [this updates! asserts retracts] (transact-data* this updates! asserts retracts)) + (transact-data [this updates! generator-fn] (transact-data* this updates! generator-fn)) common/Lockable (lock! [this] #?(:clj (.lock ^Lock lock))) (unlock! [this] #?(:clj (.unlock ^Lock lock)))) diff --git a/src/asami/graph.cljc b/src/asami/graph.cljc index 0dfa587..ccbc43c 100644 --- a/src/asami/graph.cljc +++ b/src/asami/graph.cljc @@ -12,7 +12,10 @@ (new-graph [this] "Creates an empty graph of the same type") (graph-add [this subj pred obj] [this subj pred obj tx] "Adds triples to the graph") (graph-delete [this subj pred obj] "Removes triples from the graph") - (graph-transact [this tx-id assertions retractions] "Bulk operation to add and remove multiple statements in a single operation") + (graph-transact + [this tx-id assertions retractions] + [this tx-id assertions retractions generated] + "Bulk operation to add and remove multiple statements in a single operation") (graph-diff [this other] "Returns all subjects that have changed in this graph, compared to other") (resolve-triple [this subj pred obj] "Resolves patterns from the graph, and returns unbound columns only") (count-triple [this subj pred obj] "Resolves patterns from the graph, and returns the size of the resolution")) diff --git a/src/asami/index.cljc b/src/asami/index.cljc index 8c9665d..a652405 100644 --- a/src/asami/index.cljc +++ b/src/asami/index.cljc @@ -100,9 +100,9 @@ (log/trace "statement did not exist") this))) (graph-transact [this tx-id assertions retractions] - (as-> this graph - (reduce (fn [acc [s p o]] (graph-delete acc s p o)) graph retractions) - (reduce (fn [acc [s p o]] (graph-add acc s p o tx-id)) graph assertions))) + (common/graph-transact this tx-id assertions retractions (volatile! [[] [] {}]))) + (graph-transact [this tx-id assertions retractions generated-data] + (common/graph-transact this tx-id assertions retractions generated-data)) (graph-diff [this other] (when-not (= (type this) (type other)) (throw (ex-info "Unable to compare diffs between graphs of different types" {:this this :other other}))) diff --git a/src/asami/memory.cljc b/src/asami/memory.cljc index e77608a..f10f7ec 100644 --- a/src/asami/memory.cljc +++ b/src/asami/memory.cljc @@ -1,7 +1,7 @@ (ns ^{:doc "A storage implementation over in-memory indexing." :author "Paula Gearon"} asami.memory - (:require [asami.storage :as storage :refer [ConnectionType DatabaseType]] + (:require [asami.storage :as storage :refer [ConnectionType DatabaseType UpdateData]] [asami.internal :refer [now instant?]] [asami.index :as mem] [asami.multi-graph :as multi] @@ -66,8 +66,8 @@ (delete-database [this] true) ;; no-op for memory databases (release [this]) ;; no-op for memory databases (transact-update [this update-fn] (transact-update* this update-fn)) - (transact-data [this asserts retracts] (transact-data* this asserts retracts)) - (transact-data [this generator-fn] (transact-data* this generator-fn))) + (transact-data [this updates! asserts retracts] (transact-data* this updates! asserts retracts)) + (transact-data [this updates! generator-fn] (transact-data* this updates! generator-fn))) (def empty-graph mem/empty-graph) @@ -174,15 +174,17 @@ "Removes a series of tuples from the latest graph, and asserts new tuples into the graph. Updates the connection to the new graph." ([conn :- ConnectionType + updates! :- UpdateData asserts :- [Triple] ;; triples to insert retracts :- [Triple]] ;; triples to remove - (transact-update* conn (fn [graph tx-id] (gr/graph-transact graph tx-id asserts retracts)))) + (transact-update* conn (fn [graph tx-id] (gr/graph-transact graph tx-id asserts retracts updates!)))) ([conn :- ConnectionType + updates! :- UpdateData generator-fn] (transact-update* conn (fn [graph tx-id] (let [[asserts retracts] (generator-fn graph)] - (gr/graph-transact graph tx-id asserts retracts)))))) + (gr/graph-transact graph tx-id asserts retracts updates!)))))) (s/defn entity* :- (s/maybe {s/Any s/Any}) diff --git a/src/asami/multi_graph.cljc b/src/asami/multi_graph.cljc index c2ca810..19e86dd 100644 --- a/src/asami/multi_graph.cljc +++ b/src/asami/multi_graph.cljc @@ -132,9 +132,9 @@ allow rules to successfully use this graph type." (log/trace "statement did not exist") this))) (graph-transact [this tx-id assertions retractions] - (as-> this graph - (reduce (fn [acc [s p o]] (graph-delete acc s p o)) graph retractions) - (reduce (fn [acc [s p o]] (graph-add acc s p o tx-id)) graph assertions))) + (common/graph-transact this tx-id assertions retractions (volatile! [[] [] {}]))) + (graph-transact [this tx-id assertions retractions generated-data] + (common/graph-transact this tx-id assertions retractions generated-data)) (graph-diff [this other] (when-not (= (type this) (type other)) (throw (ex-info "Unable to compare diffs between graphs of different types" {:this this :other other}))) diff --git a/src/asami/storage.cljc b/src/asami/storage.cljc index dc756f2..66784e2 100644 --- a/src/asami/storage.cljc +++ b/src/asami/storage.cljc @@ -14,8 +14,8 @@ (transact-update [this update-fn] "Updates a graph in the database with the provided function. Function args are connection and transaction-id") (transact-data - [this asserts retracts] - [this generator-fn] "Updates the database with provided data")) + [this updates! asserts retracts] + [this updates! generator-fn] "Updates the database with provided data")) (defprotocol Database (as-of [this t] "Retrieves a database as of a given moment, inclusive") @@ -26,6 +26,10 @@ (graph [this] "Returns the internal graph for the database") (entity [this id] [this id nested?] "Returns an entity for an identifier")) +(def UpdateData (s/pred #(and (instance? #?(:clj clojure.lang.Volatile :cljs Volatile) %) + (vector? (deref %)) + (= 2 (count (deref %)))))) + (def DatabaseType (s/pred #(satisfies? Database %))) (def ConnectionType (s/pred #(satisfies? Connection %))) From e529a30d304862651c4cb8ee3cad6e5ad01c262f Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Sun, 10 Oct 2021 15:58:48 -0400 Subject: [PATCH 2/5] Fixed test issues with new insertion semantics --- test/asami/api_test.cljc | 5 +++-- test/asami/durable/api_test.cljc | 5 +++-- test/asami/durable/store_test.cljc | 17 +++++++++-------- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/test/asami/api_test.cljc b/test/asami/api_test.cljc index 94cc0c9..a033ed0 100644 --- a/test/asami/api_test.cljc +++ b/test/asami/api_test.cljc @@ -164,7 +164,7 @@ (is (= 2 (count (q '[:find ?e ?a ?v :where [?e ?a ?v]] (:db-after r))))) (let [r2 @(transact c {:tx-data [[:db/retract :mem/node-1 :property "value"] [:db/retract :mem/node-1 :property "missing"]]})] - (is (= 2 (count (:tx-data r2)))) + (is (= 1 (count (:tx-data r2)))) (is (= [[:mem/node-2 :property "other"]] (q '[:find ?e ?a ?v :where [?e ?a ?v]] (:db-after r2))))))) @@ -250,7 +250,8 @@ {:keys [tempids tx-data] :as r} @(transact c [data]) one (tempids -1) d (db c)] - (is (= 20 (count tx-data))) + ;; nil is contained twice, so 19 statements, rather than the 20 inserted + (is (= 19 (count tx-data))) (is (= 2 (count (filter #(and (= :tg/first (nth % 1)) (= :tg/nil (nth % 2))) tx-data)))) (is (= {:name "Home" :address nil diff --git a/test/asami/durable/api_test.cljc b/test/asami/durable/api_test.cljc index ab65af0..70e7728 100644 --- a/test/asami/durable/api_test.cljc +++ b/test/asami/durable/api_test.cljc @@ -173,7 +173,7 @@ (is (= 2 (count (q '[:find ?e ?a ?v :where [?e ?a ?v]] (:db-after r))))) (let [r2 @(transact c {:tx-data [[:db/retract :mem/node-1 :property "value"] [:db/retract :mem/node-1 :property "missing"]]})] - (is (= 2 (count (:tx-data r2)))) + (is (= 1 (count (:tx-data r2)))) (is (= [[:mem/node-2 :property "other"]] (q '[:find ?e ?a ?v :where [?e ?a ?v]] (:db-after r2)))))) (delete-database "asami:local://testr") @@ -232,7 +232,8 @@ {:keys [tempids tx-data] :as r} @(transact c [data]) one (tempids -1) d (db c)] - (is (= 20 (count tx-data))) + ;; nil is contained twice, so 19 statements, rather than the 20 inserted + (is (= 19 (count tx-data))) (is (= 2 (count (filter #(and (= :tg/first (nth % 1)) (= :tg/nil (nth % 2))) tx-data)))) (is (= {:name "Home" :address nil diff --git a/test/asami/durable/store_test.cljc b/test/asami/durable/store_test.cljc index 4e16718..043099f 100644 --- a/test/asami/durable/store_test.cljc +++ b/test/asami/durable/store_test.cljc @@ -9,6 +9,7 @@ #?(:clj [clojure.java.io :as io]) [clojure.test #?(:clj :refer :cljs :refer-macros) [deftest is]])) +(defn updates [] (volatile! [[] []])) (deftest test-create (let [db-name "empty-db"] @@ -31,7 +32,7 @@ (deftest test-simple-data (let [dbname "simple-db" conn (create-database dbname) - [_ db1] (transact-data conn demo-data nil) + [_ db1] (transact-data conn (updates) demo-data nil) db2 (db conn) r1 (fn [[e a v]] (set (resolve-triple (graph db1) e a v))) r2 (fn [[e a v]] (set (resolve-triple (graph db2) e a v)))] @@ -108,7 +109,7 @@ (deftest test-phased-data (let [dbname "ph-testdata-db" conn (create-database dbname) - [_ db1] (transact-data conn demo-data nil) + [_ db1] (transact-data conn (updates) demo-data nil) resolve (fn [[e a v]] (set (resolve-triple (graph db1) e a v))) t1 (:t db1)] (is (= #{[:name "Persephone Konstantopoulos"] @@ -134,7 +135,7 @@ (is (empty? (resolve '[:b :age 24]))) (is (= (set demo-data) (resolve '[?e ?a ?v]))) - (let [_ (transact-data conn update-data remove-data) + (let [_ (transact-data conn (updates) update-data remove-data) db2 (db conn) ts2 (:timestamp db2) r2 (fn [[e a v]] (set (resolve-triple (graph db2) e a v))) @@ -188,7 +189,7 @@ (is (= (set demo-data) (r1 '[?e ?a ?v]))) - (let [_ (transact-data conn update-data2 remove-data2) + (let [_ (transact-data conn (updates) update-data2 remove-data2) db3 (db conn) r3 (fn [[e a v]] (set (resolve-triple (graph db3) e a v))) ts2-time-- (instant (dec ts2)) @@ -286,11 +287,11 @@ (deftest test-saved-data (let [dbname "saved-testdata-db" conn (create-database dbname) - _ (transact-data conn demo-data nil) + _ (transact-data conn (updates) demo-data nil) _ (sleep 10) - [_ db2] (transact-data conn update-data remove-data) + [_ db2] (transact-data conn (updates) update-data remove-data) ts2 (:timestamp db2) - _ (transact-data conn update-data2 remove-data2)] + _ (transact-data conn (updates) update-data2 remove-data2)] (common/close @(:grapha conn)) (common/close (:tx-manager conn)) @@ -407,7 +408,7 @@ (deftest test-entity-data (let [dbname "entity-db" conn (create-database dbname) - [_ db1] (transact-data conn entity-data nil)] + [_ db1] (transact-data conn (updates) entity-data nil)] (is (= {:name "Persephone Konstantopoulos" :age 23 :friends [{:name "Anastasia Christodoulopoulos" :age 23} From aec11ef7cb6ee94adc61e9a571a6e945dc28d8fb Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Sun, 10 Oct 2021 17:26:24 -0400 Subject: [PATCH 3/5] Moved datom creation into graph updates --- src/asami/common_index.cljc | 13 ++++--- src/asami/core.cljc | 70 ++++++++++++++++++++++++++++++------- 2 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/asami/common_index.cljc b/src/asami/common_index.cljc index 75b9202..5a48f38 100644 --- a/src/asami/common_index.cljc +++ b/src/asami/common_index.cljc @@ -2,7 +2,8 @@ and multigraph implementations." :author "Paula Gearon"} asami.common-index - (:require [asami.graph :refer [Graph graph-add graph-delete graph-diff resolve-triple count-triple broad-node-type?]] + (:require [asami.datom :as datom :refer [->Datom]] + [asami.graph :refer [Graph graph-add graph-delete graph-diff resolve-triple count-triple broad-node-type?]] [asami.internal :as internal] [zuko.schema :as st] [clojure.set :as set] @@ -18,14 +19,16 @@ and multigraph implementations." asserts (transient a) retracts (transient r) new-graph (as-> graph gr - (reduce (fn [acc [s p o :as triple]] + (reduce (fn [acc [s p o]] (let [ad (graph-delete acc s p o)] - (when-not (identical? ad acc) (conj! retracts triple)) + (when-not (identical? ad acc) + (conj! retracts (->Datom s p o tx-id false))) ad)) gr retractions) - (reduce (fn [acc [s p o :as triple]] + (reduce (fn [acc [s p o]] (let [aa (graph-add acc s p o tx-id)] - (when-not (identical? aa acc) (conj! asserts triple)) + (when-not (identical? aa acc) + (conj! asserts (->Datom s p o tx-id true))) aa)) gr assertions))] (vreset! generated-data [(persistent! asserts) (persistent! retracts)]) diff --git a/src/asami/core.cljc b/src/asami/core.cljc index b91b605..0e169cc 100644 --- a/src/asami/core.cljc +++ b/src/asami/core.cljc @@ -5,7 +5,6 @@ [asami.memory :as memory] #?(:clj [asami.durable.store :as durable]) ;; TODO: make this available to CLJS when ready [asami.query :as query] - [asami.datom :as datom :refer [->Datom]] [asami.graph :as gr] [asami.entities :as entities] [asami.entities.general :refer [GraphType]] @@ -148,13 +147,13 @@ (s/optional-key :update-fn) (s/pred fn?)} [s/Any])) -(s/defn transact +(s/defn transact-async ;; returns a deref'able object that derefs to: ;; {:db-before DatabaseType ;; :db-after DatabaseType ;; :tx-data [datom/DatomType] ;; :tempids {s/Any s/Any}} - "Updates a database. This is currently synchronous, but returns a future or delay for compatibility with Datomic. + "Updates a database. connection: The connection to the database to be updated. tx-info: This is either a seq of items to be transacted, or a map. If this is a map, then a :tx-data value will contain the same type of seq that tx-info may have. @@ -186,14 +185,7 @@ {:db-before db-before :db-after db-after})) (fn [] - (let [tx-id (storage/next-tx connection) - as-datom (fn [assert? [e a v]] (->Datom e a v tx-id assert?)) - ;; function to convert assertions and retractions into a seq of datoms - build-datoms (fn [assertions retractions] - (concat - (map (partial as-datom false) retractions) - (map (partial as-datom true) assertions))) - current-db (storage/db connection) + (let [current-db (storage/db connection) ;; single maps should not be passed in, but if they are then wrap them seq-wrapper (fn [x] (if (map? x) [x] x)) ;; volatiles to capture data for the user @@ -219,7 +211,7 @@ [triples retracts] (deref generated-data)] {:db-before db-before :db-after db-after - :tx-data (build-datoms triples retracts) + :tx-data (concat retracts triples) :tempids @vtempids})))] #?(:clj (CompletableFuture/supplyAsync (reify Supplier (get [_] (op))) (or executor clojure.lang.Agent/soloExecutor)) @@ -227,6 +219,60 @@ (force d) d)))) +;; set a generous default transaction timeout of 100 seconds +#?(:clj (def ^:const default-tx-timeout 100000)) + +#?(:clj + (defn get-timeout + "Retrieves the timeout value to use in ms" + [] + (or (System/getProperty "asami.txTimeoutMsec") + (System/getProperty "datomic.txTimeoutMsec") + default-tx-timeout))) + +#?(:clj + (s/defn transact + "This returns a completed future with the data from a transaction. + See the documentation for transact-async for full details on arguments. + If the transaction times out, the call to transact will throw an ExceptionInfo exception. + The default is 100 seconds + + The result derefs to a map of: + :db-before database value before the transaction + :db-after database value after the transaction + :tx-data a sequence of the transacted datom operations + :tempids a map of temporary id values and the db identifiers that were allocated for them}" + ;; returns a deref'able object that derefs to: + ;; {:db-before DatabaseType + ;; :db-after DatabaseType + ;; :tx-data [datom/DatomType] + ;; :tempids {s/Any s/Any}} + [connection :- ConnectionType + tx-info :- TransactData] + (let [transact-future (transact-async connection tx-info) + timeout (get-timeout)] + (when (= ::timeout (deref transact-future timeout ::timeout)) + (throw (ex-info "Transaction timeout" {:timeout timeout}))) + transact-future)) + + :cljs + (s/defn transact + "This is a thin wrapper around the transact-async function. + TODO: convert this to a promise-based approach for the async implementation + See the documentation for transact-async for full details on arguments. + returns a deref'able object that derefs to a map of: + :db-before database value before the transaction + :db-after database value after the transaction + :tx-data a sequence of the transacted datom operations + :tempids a map of temporary id values and the db identifiers that were allocated for them}" + ;; {:db-before DatabaseType + ;; :db-after DatabaseType + ;; :tx-data [datom/DatomType] + ;; :tempids {s/Any s/Any}} + [connection :- ConnectionType + tx-info :- TransactData] + (transact-async connection tx-info))) + (defn- graphs-of "Converts Database objects to the graph that they wrap. Other arguments are returned unmodified." [inputs] From 92364f4e6e830e68a6e1ea1a017bf744ad6dfac0 Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Sun, 10 Oct 2021 17:26:54 -0400 Subject: [PATCH 4/5] Clearing out connections that have been deleted --- src/asami/memory.cljc | 13 ++++++++++--- test/asami/api_test.cljc | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/asami/memory.cljc b/src/asami/memory.cljc index f10f7ec..0157199 100644 --- a/src/asami/memory.cljc +++ b/src/asami/memory.cljc @@ -37,7 +37,8 @@ (< 0 c) (recur low mid))))))) -(declare as-of* as-of-t* as-of-time* since* since-t* graph* entity* next-tx* db* transact-update* transact-data*) +(declare as-of* as-of-t* as-of-time* since* since-t* graph* entity* + next-tx* db* delete-database* transact-update* transact-data*) ;; graph is the wrapped graph ;; history is a seq of Databases, excluding this one @@ -63,7 +64,7 @@ (get-name [this] name) (next-tx [this] (next-tx* this)) (db [this] (db* this)) - (delete-database [this] true) ;; no-op for memory databases + (delete-database [this] (delete-database* this)) (release [this]) ;; no-op for memory databases (transact-update [this update-fn] (transact-update* this update-fn)) (transact-data [this updates! asserts retracts] (transact-data* this updates! asserts retracts)) @@ -73,7 +74,6 @@ (def empty-graph mem/empty-graph) (def empty-multi-graph multi/empty-multi-graph) - (s/defn new-connection :- ConnectionType "Creates a memory Connection object" [name :- s/Str @@ -90,6 +90,13 @@ [connection :- ConnectionType] (:db @(:state connection))) +(s/defn delete-database* :- s/Bool + "Reverts the state of a connection to an empty database, resetting the initialization time." + [{:keys [state] :as connection} :- ConnectionType] + (let [db (->MemoryDatabase (-> state deref :history first :graph) [] (now) 0)] + (reset! state {:db db :history [db]}) + true)) + (s/defn as-database :- DatabaseType "Creates a Database around an existing Graph. graph: The graph to build a database around. " diff --git a/test/asami/api_test.cljc b/test/asami/api_test.cljc index a033ed0..1931d4c 100644 --- a/test/asami/api_test.cljc +++ b/test/asami/api_test.cljc @@ -706,6 +706,24 @@ (delete-database db-io) (delete-database db-new)))) +(deftest test-database-delete + (testing "Are deleted memory databases cleared" + (let [db-name "asami:mem://test-del" + conn (connect db-name) + {d :db-after} @(transact conn {:tx-data io-entities}) + r (set (q '[:find ?e ?a ?v :where [?e ?a ?v]] d))] + (is (= 13 (count r))) + + (delete-database db-name) + + (let [d2 (db conn) + r2 (set (q '[:find ?e ?a ?v :where [?e ?a ?v]] d2)) + r3 (set (q '[:find ?e ?a ?v :where [?e ?a ?v]] conn)) + r-old (set (q '[:find ?e ?a ?v :where [?e ?a ?v]] d))] + (is (empty? r2)) + (is (empty? r3)) + (is (= 13 (count r-old))))))) + (deftest test-update-unowned (testing "Doing an update on an attribute that references a top level entity" (let [c (connect "asami:mem://testupdate") From 8946abfecbb63ee6822e7eb8ef8c53432e86c1ac Mon Sep 17 00:00:00 2001 From: Paula Gearon Date: Sun, 10 Oct 2021 18:12:54 -0400 Subject: [PATCH 5/5] Now reattaching deleted connections if they are reused --- src/asami/core.cljc | 17 ++++++++++++++++- src/asami/durable/store.cljc | 4 ++++ src/asami/memory.cljc | 15 ++++++++++++--- src/asami/storage.cljc | 1 + test/asami/api_test.cljc | 13 ++++++++++--- 5 files changed, 43 insertions(+), 7 deletions(-) diff --git a/src/asami/core.cljc b/src/asami/core.cljc index 0e169cc..9f49bac 100644 --- a/src/asami/core.cljc +++ b/src/asami/core.cljc @@ -61,7 +61,7 @@ database was created, false if it already exists." [uri :- s/Str] (boolean - (if-not (@connections uri) + (when-not (@connections uri) (swap! connections assoc uri (connection-for uri))))) (s/defn connect :- ConnectionType @@ -124,6 +124,17 @@ (swap! connections assoc uri c) c))) +(defn check-attachment + "Checks if a connection is attached to the connections map. + If not, then connect. Returns the connection if previously connected, + false if it needed to be reconnected." + [connection] + (let [url (storage/get-url connection)] + (or (@connections url) + (do + (swap! connections assoc url connection) + false)))) + (def db storage/db) (def as-of storage/as-of) (def as-of-t storage/as-of-t) @@ -179,6 +190,10 @@ :tempids mapping of the temporary IDs in entities to the allocated nodes" [{:keys [name state] :as connection} :- ConnectionType {:keys [tx-data tx-triples executor update-fn] :as tx-info} :- TransactData] + + ;; Detached databases need to be reattached when transacted into + (check-attachment connection) + (let [op (if update-fn (fn [] (let [[db-before db-after] (storage/transact-update connection update-fn)] diff --git a/src/asami/durable/store.cljc b/src/asami/durable/store.cljc index b56f50f..73185fd 100644 --- a/src/asami/durable/store.cljc +++ b/src/asami/durable/store.cljc @@ -237,10 +237,14 @@ (let [[asserts retracts] (generator-fn graph)] (graph/graph-transact graph tx-id asserts retracts updates!)))))) +(s/defn get-url* :- s/Str + [{:keys [name]} :- ConnectionType] + (str "asami:local://" name)) (defrecord DurableConnection [name tx-manager grapha nodea lock] storage/Connection (get-name [this] name) + (get-url [this] (get-url* this)) (next-tx [this] (common/tx-count tx-manager)) (db [this] (db* this)) (delete-database [this] (delete-database* this)) diff --git a/src/asami/memory.cljc b/src/asami/memory.cljc index 0157199..5d7a235 100644 --- a/src/asami/memory.cljc +++ b/src/asami/memory.cljc @@ -5,10 +5,9 @@ [asami.internal :refer [now instant?]] [asami.index :as mem] [asami.multi-graph :as multi] - [asami.graph :as gr] + [asami.graph :as gr :refer [GraphType]] [asami.query :as query] [zuko.schema :refer [Triple]] - [asami.entities.general :as entity :refer [GraphType]] [asami.entities.reader :as reader] [schema.core :as s :include-macros true])) @@ -38,7 +37,7 @@ (declare as-of* as-of-t* as-of-time* since* since-t* graph* entity* - next-tx* db* delete-database* transact-update* transact-data*) + get-url* next-tx* db* delete-database* transact-update* transact-data*) ;; graph is the wrapped graph ;; history is a seq of Databases, excluding this one @@ -62,6 +61,7 @@ (defrecord MemoryConnection [name state] storage/Connection (get-name [this] name) + (get-url [this] (get-url* this)) (next-tx [this] (next-tx* this)) (db [this] (db* this)) (delete-database [this] (delete-database* this)) @@ -81,6 +81,15 @@ (let [db (->MemoryDatabase gr [] (now) 0)] (->MemoryConnection name (atom {:db db :history [db]})))) +(s/defn get-url* :- s/Str + [{:keys [name state]} :- ConnectionType] + (let [first-graph (-> state deref :history first :graph) + gtype (condp = first-graph + empty-graph "mem" + empty-multi-graph "multi" + (throw (ex-info (str "Unknown graph type:" (type first-graph)) {:graph first-graph})))] + (str "asami:" gtype "://" name))) + (s/defn next-tx* :- s/Num [connection :- ConnectionType] (count (:history @(:state connection)))) diff --git a/src/asami/storage.cljc b/src/asami/storage.cljc index 66784e2..6e521fc 100644 --- a/src/asami/storage.cljc +++ b/src/asami/storage.cljc @@ -6,6 +6,7 @@ (defprotocol Connection (get-name [this] "Retrieves the name of the database") + (get-url [this] "Retrieves the url of the database. Based on the name.") (next-tx [this] "Returns the next transaction ID that this connection will use") (get-lock [this] "Returns a lock that ensures that this Connection can only be updated by a single thread at a time") (db [this] "Retrieves the latest database from this connection") diff --git a/test/asami/api_test.cljc b/test/asami/api_test.cljc index 1931d4c..808dff7 100644 --- a/test/asami/api_test.cljc +++ b/test/asami/api_test.cljc @@ -1,7 +1,7 @@ (ns asami.api-test "Tests the public query functionality" - (:require [asami.core :refer [q show-plan create-database connect db transact - entity as-of since import-data export-data delete-database]] + (:require [asami.core :as a :refer [q show-plan create-database connect db transact + entity as-of since import-data export-data delete-database]] [asami.index :as i] [asami.graph :as graph] [asami.multi-graph :as m] @@ -722,7 +722,14 @@ r-old (set (q '[:find ?e ?a ?v :where [?e ?a ?v]] d))] (is (empty? r2)) (is (empty? r3)) - (is (= 13 (count r-old))))))) + (is (= 13 (count r-old)))) + + (is (nil? (get @a/connections db-name))) + (let [{dx :db-after} @(transact conn {:tx-triples [[:mem/node-1 :property "value"] + [:mem/node-2 :property "other"]]}) + rx (q '[:find ?e ?a ?v :where [?e ?a ?v]] dx)] + (is (identical? conn (get @a/connections db-name))) + (is (= 2 (count rx))))))) (deftest test-update-unowned (testing "Doing an update on an attribute that references a top level entity"