diff --git a/deps.edn b/deps.edn index 7304786fe..b8423531b 100644 --- a/deps.edn +++ b/deps.edn @@ -16,6 +16,9 @@ com.xtdb/xtdb-jdbc {:mvn/version "1.21.0"} com.xtdb/xtdb-s3 {:mvn/version "1.21.0"} + mysql/mysql-connector-java {:mvn/version "8.0.29"} + diehard/diehard {:mvn/version "0.11.3"} + ;; Jetty ring/ring-jetty-adapter {:mvn/version "1.9.5"} @@ -88,7 +91,7 @@ :aliases {:dev {:extra-paths ["dev" "test"] - :extra-deps { ;; Convenience libraries made available during development + :extra-deps {;; Convenience libraries made available during development org.clojure/test.check {:mvn/version "1.1.1"} nrepl/nrepl {:mvn/version "0.9.0"} org.clojure/alpha.spec {:git/url "https://github.com/clojure/spec-alpha2.git" diff --git a/dev/config.edn b/dev/config.edn new file mode 100644 index 000000000..db3c9b4f2 --- /dev/null +++ b/dev/config.edn @@ -0,0 +1,35 @@ +{ + ;; Used by bin/site to know where to send HTTP API requests. + :juxt.site.alpha/base-uri "http://localhost:5509" + + :ig/system + {:juxt.site.alpha.db/xt-node + { + :xtdb.http-server/server {:port 5511} + :xtdb.rocksdb/block-cache {:xtdb/module xtdb.rocksdb/->lru-block-cache + :cache-size 1600000000} + :xtdb/tx-log + {:kv-store {:xtdb/module xtdb.rocksdb/->kv-store + :db-dir "db/txes"}} + + :xtdb/document-store + {:kv-store {:xtdb/module xtdb.rocksdb/->kv-store + :db-dir "db/docs"}} + + :xtdb/index-store + {:kv-store {:xtdb/module xtdb.rocksdb/->kv-store + :db-dir "db/idxs"}}} + + :juxt.site.alpha.server/server + {:juxt.site.alpha/xt-node #ig/ref :juxt.site.alpha.db/xt-node + :juxt.site.alpha/port 5509 + + ;; Really, this is the canoncial-uri prefix where /_site exists. + :juxt.site.alpha/base-uri #ref [:juxt.site.alpha/base-uri] + + :juxt.site.alpha/dynamic? #profile {:dev true :prod false}} + + :juxt.site.alpha.nrepl/server + {:juxt.site.alpha/port 5510} +} +} diff --git a/src/juxt/site/alpha/db.clj b/src/juxt/site/alpha/db.clj index d1f410b2e..3e3958283 100644 --- a/src/juxt/site/alpha/db.clj +++ b/src/juxt/site/alpha/db.clj @@ -4,11 +4,56 @@ (:require [xtdb.api :as xt] [integrant.core :as ig] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log] + [diehard.core :as dh]) + (:import + java.time.Duration + software.amazon.awssdk.services.s3.S3AsyncClient + software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient)) + + + +(def s3-configurator + (reify xtdb.s3.S3Configurator + (makeClient [_this] + (.. (S3AsyncClient/builder) + (httpClientBuilder + (.. (NettyNioAsyncHttpClient/builder) + (connectionAcquisitionTimeout (Duration/ofSeconds 600)) + (maxConcurrency (Integer. 100)) + (maxPendingConnectionAcquires (Integer. 10000)))) + (build))))) + +(defn- start-node + [config] + (dh/with-retry + {:retry-if + (fn [_ ex] + (= "incomplete checkpoint restore" + (ex-message ex))) + :max-retries 3 + :on-failed-attempt + (fn [_ ex] + (log/warn ex "Couldn't complete checkpoint restore")) + :on-failure + (fn [_ ex] + (log/error ex "Checkpoint restore failed"))} + (xt/start-node config))) (defmethod ig/init-key ::xt-node [_ xtdb-opts] - (log/info "Starting XT node") - (xt/start-node xtdb-opts)) + (log/info "Starting XT node ...") + + (let [config (update-in xtdb-opts + [:xtdb/index-store :kv-store :checkpointer :store] + assoc :configurator (constantly s3-configurator)) + node (start-node config)] + ;; we need to make sure the tx-ingester has caught up before + ;; declaring the node up + (->> + (xt/submit-tx node [[::xt/put {:xt/id :tx-ingester-synced!}]]) + (xt/await-tx node)) + (log/info "... XT node started!") + node)) (defmethod ig/halt-key! ::xt-node [_ node] (.close node) diff --git a/src/juxt/site/alpha/graphql.clj b/src/juxt/site/alpha/graphql.clj index a79de9298..994975df1 100644 --- a/src/juxt/site/alpha/graphql.clj +++ b/src/juxt/site/alpha/graphql.clj @@ -453,15 +453,15 @@ "validation"])] (->> (keywordize-keys argument-values) (map - (fn [[k v]] - (let [validation-schema (some-> directive - (get k) - (selmer/render (selmer-args opts)) - edn/read-string) - invalid? (when (seq validation-schema) - (not (m/validate validation-schema v)))] - (when invalid? - (me/humanize (m/explain validation-schema v)))))) + (fn [[k v]] + (let [validation-schema (some-> directive + (get k) + (selmer/render (selmer-args opts)) + edn/read-string) + invalid? (when (seq validation-schema) + (not (m/validate validation-schema v)))] + (when invalid? + (me/humanize (m/explain validation-schema v)))))) (remove nil?)))) @@ -565,254 +565,252 @@ (and (get variable-values "historicalDb") (:_siteValidTime object-value)) - (do - (xt/db xt-node (-> object-value - :_siteValidTime - t/inst))) - :else - db) - (catch Exception _ db)) - object-id (:xt/id object-value) - lookup-entity (fn [id] (protected-lookup id subject db xt-node)) - opts - (merge field-resolver-args - {:site-args site-args - :xt-node xt-node - :schema schema - :field field - :field-kind field-kind - :types-by-name types-by-name - :mutation? mutation? - :base-uri base-uri - :type-k type-k - :lookup-entity lookup-entity - ::pass/subject subject - :db db})] - - (cond - ;; The registration of a resolver should be a privileged operation, since it - ;; has the potential to bypass access control. - (get site-args "resolver") - (let [resolver (requiring-resolve (symbol (get site-args "resolver")))] - ;; Resolvers need to do their own access control - (resolver opts)) - - mutation? (perform-mutation! opts) - - (get site-args "history") - (if-let [id (get argument-values "id")] - (let [limit (get argument-values "limit" 10) - offset (get argument-values "offset" 0) - order (case (get site-args "history") - "desc" :desc - "asc" :asc - :desc) - process-history-item - (fn [{::xt/keys [valid-time doc]}] - (assoc doc :_siteValidTime (t/instant valid-time)))] - (with-open [history (xt/open-entity-history db id order {:with-docs? true})] - (->> history - (iterator-seq) - (drop offset) - (take limit) - (map process-history-item)))) - (throw (ex-info "History queries must have an id argument" {}))) - - ;; Direct lookup - useful for query roots - (get site-args "e") - (let [e (get site-args "e")] - (or (protected-lookup e subject db xt-node) - (protected-lookup (get argument-values e) subject db xt-node))) - - (get site-args "q") - (let [object-id (:xt/id object-value) - arg-keys (fn [m] (remove #{"limit" "offset" "orderBy"} (keys m))) - in (cond->> (map symbol (arg-keys argument-values)) - object-id (concat ['object])) - q (assoc - (to-xt-query opts) - :in (if (second in) [in] (vec in))) - - query-args (cond->> (vals argument-values) - object-id (concat [object-id])) - args (if (second query-args) query-args (first query-args)) - results - (try - (if (nil? args) - (xt/q db q) - (xt/q db q args)) - (catch Exception e - (throw (ex-info "Failure when running XTDB query" - {:message (ex-message e) - :query (pr-str q) - :args args} - e)))) - limited-results (limit-results argument-values results) - result-entities (cond->> - (pull-entities db xt-node subject limited-results q) - (get site-args "a") - (map (keyword (get site-args "a"))) - - )] - ;;(log/tracef "GraphQL results is %s" (seq result-entities)) - - (process-xt-results field result-entities)) - - (get site-args "itemForId") - (let [item-key (keyword (get site-args "itemForId")) - query {:find ['e '_siteCreatedAt] - :where [['e type-k (field->type field)] - ['e :_siteCreatedAt '_siteCreatedAt] - ['e item-key (get argument-values "id")]]} - - query (to-xt-query (assoc opts :custom-xt-query query)) - results (xt/q db query) - result-entities (cond->> (pull-entities db xt-node subject results query) - (get site-args "a") - (map (keyword (get site-args "a"))))] - (vec (process-xt-results field result-entities))) - - (get site-args "a") - (let [att (get site-args "a") - val (if (vector? att) - (traverse object-value att subject db xt-node) - (get object-value (keyword att))) - transform-sym (some-> site-args (get "transform") symbol) - transform (when transform-sym (requiring-resolve transform-sym)) - ] - (if (= field-kind 'OBJECT) - (protected-lookup val subject db xt-node) - ;; TODO: check for lists - (cond-> val - transform transform))) - - (get site-args "ref") - (let [list? (list-type? (::g/type-ref field)) - ref (get site-args "ref") - e (or - (and (vector? ref) - (traverse object-value ref subject db xt-node)) - (get object-value ref) - (get object-value (keyword ref))) - lookup-entity #(protected-lookup % subject db xt-node) - type (field->type field)] - (if e - ;; referenced key exists on current entity - (lookup-entity e) - ;; try a query for any other entity which contains - ;; the reference key (reverse join) - (let [reverse-lookup-result - (xt/q db {:find ['e] - :where [['e type-k type] - ['e (keyword ref) (or - (get argument-values ref) - object-id)]]})] - (if list? - (map (comp lookup-entity first) reverse-lookup-result) - (lookup-entity (ffirst reverse-lookup-result)))))) - - (get site-args "each") - (let [att (get site-args "each") - val (if (vector? att) - (traverse object-value att subject db xt-node) - (get object-value (keyword att)))] - (if (-> field ::g/type-ref list-type?) - (map #(protected-lookup % subject db xt-node) val) - (throw (ex-info "Can only used 'each' on a LIST type" {:field-kind field-kind})))) - - (get site-args "siteResolver") - (let [resolver - (case (get site-args "siteResolver") - "allQueryParams" - (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-parameters) - "queryParam" - (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-parameter) - "queryString" - (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-string) - "constant" - (requiring-resolve 'juxt.site.alpha.graphql-resolver/constant) - (throw (ex-info "No such built-in resolver" {:site-resolver (get site-args "siteResolver")})))] - (resolver (assoc field-resolver-args ::site/request-context req))) - - ;; A function whose input is the result of a GraphqL 'sub' query, - ;; propagating the same subject and under the exact same access - ;; control policy. This allows the function to declare its necessary - ;; inputs as a query. - ;; - ;; In addition, a function's results may be memoized, with each result - ;; stored in XTDB which acts as a large persistent memoization - ;; cache. For this reason, the function must be pure. The function - ;; must take a single map which contains the results of the sub-query - ;; and any argument values (which would also be used as variable - ;; values in the GraphQL sub-query which computes the other input - ;; argument). - ;; - ;; Once this feature is working, replace it with a call to a lambda or - ;; similarly sandboxed execution environment. - (get site-args "function") - (throw (ex-info "Feature not yet supported" {})) - - (and (= 1 (count argument-values)) - (= "id" (ffirst argument-values))) - (lookup-entity (get argument-values "id")) - - (and (= 1 (count argument-values)) - (= "ids" (ffirst argument-values))) - (map lookup-entity (get argument-values "ids")) - - ;; Another strategy is to see if the field indexes the - ;; object-value. This strategy allows for delays to be used to prevent - ;; computing field values that aren't resolved. - (and (map? object-value) (contains? object-value field-name)) - (let [f (force (get object-value field-name))] - (if (fn? f) (f argument-values) f)) - - ;; If the key is 'id', we assume it should be translated to xt/id - (= "id" field-name) - (get object-value :xt/id) - - ;; Or simply try to extract the keyword - (and (map? object-value) - (or - ;; schema specifies this field is on the object - (get-in field [::schema/directives-by-name "onObject"]) - (contains? object-value (keyword field-name)))) - (let [result (get object-value (keyword field-name))] - (cond - (-> field ::g/type-ref list-type?) - (limit-results argument-values result) - ;; TODO validate enum (enum? (field->type field) types-by-name) - :else - result)) - - (and (field->type field) - (not (scalar? (field->type field) types-by-name)) - (not (enum? (field->type field) types-by-name))) - (infer-query db - xt-node - subject - field - (to-xt-query opts) - argument-values) - - (get argument-values "id") - (xt/entity db (get argument-values "id")) - - (and (get site-args "aggregate") - (get site-args "type")) - (case (get site-args "aggregate") - "count" (count - (xt/q - db (to-xt-query opts)))) - - (= "_siteValidTime" field-name) - (entity-valid-time object-value db) - - (= "_siteCreatedAt" field-name) - (entity-creation-time object-value db) - - :else - (default-for-type (::g/type-ref field)))))}))) + (do + (xt/db xt-node (-> object-value + :_siteValidTime + t/inst))) + :else + db) + (catch Exception _ db)) + object-id (:xt/id object-value) + lookup-entity (fn [id] (protected-lookup id subject db xt-node)) + opts + (merge field-resolver-args + {:site-args site-args + :xt-node xt-node + :schema schema + :field field + :field-kind field-kind + :types-by-name types-by-name + :mutation? mutation? + :base-uri base-uri + :type-k type-k + :lookup-entity lookup-entity + ::pass/subject subject + :db db})] + + (cond + ;; The registration of a resolver should be a privileged operation, since it + ;; has the potential to bypass access control. + (get site-args "resolver") + (let [resolver (requiring-resolve (symbol (get site-args "resolver")))] + ;; Resolvers need to do their own access control + (resolver opts)) + + mutation? (perform-mutation! opts) + + (get site-args "history") + (if-let [id (get argument-values "id")] + (let [limit (get argument-values "limit" 10) + offset (get argument-values "offset" 0) + order (case (get site-args "history") + "desc" :desc + "asc" :asc + :desc) + process-history-item + (fn [{::xt/keys [valid-time doc]}] + (assoc doc :_siteValidTime (t/instant valid-time)))] + (with-open [history (xt/open-entity-history db id order {:with-docs? true})] + (->> history + (iterator-seq) + (drop offset) + (take limit) + (map process-history-item)))) + (throw (ex-info "History queries must have an id argument" {}))) + + ;; Direct lookup - useful for query roots + (get site-args "e") + (let [e (get site-args "e")] + (or (protected-lookup e subject db xt-node) + (protected-lookup (get argument-values e) subject db xt-node))) + + (get site-args "q") + (let [object-id (:xt/id object-value) + arg-keys (fn [m] (remove #{"limit" "offset" "orderBy"} (keys m))) + in (cond->> (map symbol (arg-keys argument-values)) + object-id (concat ['object])) + q (assoc + (to-xt-query opts) + :in (if (second in) [in] (vec in))) + + query-args (cond->> (vals argument-values) + object-id (concat [object-id])) + args (if (second query-args) query-args (first query-args)) + results + (try + (if (nil? args) + (xt/q db q) + (xt/q db q args)) + + (catch Exception e + (throw (ex-info "Failure when running XTDB query" + {:message (ex-message e) + :query (pr-str q) + :args args} + e)))) + limited-results (limit-results argument-values results) + result-entities (cond->> + (pull-entities db xt-node subject limited-results q) + (get site-args "a") + (map (keyword (get site-args "a"))))] + + ;;(log/tracef "GraphQL results is %s" (seq result-entities)) + + (process-xt-results field result-entities)) + + (get site-args "itemForId") + (let [item-key (keyword (get site-args "itemForId")) + query {:find ['e '_siteCreatedAt] + :where [['e type-k (field->type field)] + ['e :_siteCreatedAt '_siteCreatedAt] + ['e item-key (get argument-values "id")]]} + + query (to-xt-query (assoc opts :custom-xt-query query)) + results (xt/q db query) + result-entities (cond->> (pull-entities db xt-node subject results query) + (get site-args "a") + (map (keyword (get site-args "a"))))] + (vec (process-xt-results field result-entities))) + + (get site-args "a") + (let [att (get site-args "a") + val (if (vector? att) + (traverse object-value att subject db xt-node) + (get object-value (keyword att))) + transform-sym (some-> site-args (get "transform") symbol) + transform (when transform-sym (requiring-resolve transform-sym))] + (if (= field-kind 'OBJECT) + (protected-lookup val subject db xt-node) + ;; TODO: check for lists + (cond-> val + transform transform))) + + (get site-args "ref") + (let [list? (list-type? (::g/type-ref field)) + ref (get site-args "ref") + e (or + (and (vector? ref) + (traverse object-value ref subject db xt-node)) + (get object-value ref) + (get object-value (keyword ref))) + lookup-entity #(protected-lookup % subject db xt-node) + type (field->type field)] + (if e + ;; referenced key exists on current entity + (lookup-entity e) + ;; try a query for any other entity which contains + ;; the reference key (reverse join) + (let [reverse-lookup-result + (xt/q db {:find ['e] + :where [['e type-k type] + ['e (keyword ref) (or + (get argument-values ref) + object-id)]]})] + (if list? + (map (comp lookup-entity first) reverse-lookup-result) + (lookup-entity (ffirst reverse-lookup-result)))))) + + (get site-args "each") + (let [att (get site-args "each") + val (if (vector? att) + (traverse object-value att subject db xt-node) + (get object-value (keyword att)))] + (if (-> field ::g/type-ref list-type?) + (map #(protected-lookup % subject db xt-node) val) + (throw (ex-info "Can only used 'each' on a LIST type" {:field-kind field-kind})))) + + (get site-args "siteResolver") + (let [resolver + (case (get site-args "siteResolver") + "allQueryParams" + (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-parameters) + "queryParam" + (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-parameter) + "queryString" + (requiring-resolve 'juxt.site.alpha.graphql-resolver/query-string) + "constant" + (requiring-resolve 'juxt.site.alpha.graphql-resolver/constant) + (throw (ex-info "No such built-in resolver" {:site-resolver (get site-args "siteResolver")})))] + (resolver (assoc field-resolver-args ::site/request-context req))) + + ;; A function whose input is the result of a GraphqL 'sub' query, + ;; propagating the same subject and under the exact same access + ;; control policy. This allows the function to declare its necessary + ;; inputs as a query. + ;; + ;; In addition, a function's results may be memoized, with each result + ;; stored in XTDB which acts as a large persistent memoization + ;; cache. For this reason, the function must be pure. The function + ;; must take a single map which contains the results of the sub-query + ;; and any argument values (which would also be used as variable + ;; values in the GraphQL sub-query which computes the other input + ;; argument). + ;; + ;; Once this feature is working, replace it with a call to a lambda or + ;; similarly sandboxed execution environment. + (get site-args "function") + (throw (ex-info "Feature not yet supported" {})) + + (and (= 1 (count argument-values)) + (= "id" (ffirst argument-values))) + (lookup-entity (get argument-values "id")) + + (and (= 1 (count argument-values)) + (= "ids" (ffirst argument-values))) + (map lookup-entity (get argument-values "ids")) + + ;; Another strategy is to see if the field indexes the + ;; object-value. This strategy allows for delays to be used to prevent + ;; computing field values that aren't resolved. + (and (map? object-value) (contains? object-value field-name)) + (let [f (force (get object-value field-name))] + (if (fn? f) (f argument-values) f)) + + ;; If the key is 'id', we assume it should be translated to xt/id + (= "id" field-name) + (get object-value :xt/id) + + ;; Or simply try to extract the keyword + (and (map? object-value) + (or + ;; schema specifies this field is on the object + (get-in field [::schema/directives-by-name "onObject"]) + (contains? object-value (keyword field-name)))) + (let [result (get object-value (keyword field-name))] + (cond + (-> field ::g/type-ref list-type?) + (limit-results argument-values result) + ;; TODO validate enum (enum? (field->type field) types-by-name) + :else + result)) + + (and (field->type field) + (not (scalar? (field->type field) types-by-name)) + (not (enum? (field->type field) types-by-name))) + (infer-query db + xt-node + subject + field + (to-xt-query opts)) + + (get argument-values "id") + (xt/entity db (get argument-values "id")) + + (and (get site-args "aggregate") + (get site-args "type")) + (case (get site-args "aggregate") + "count" (count + (xt/q + db (to-xt-query opts)))) + + (= "_siteValidTime" field-name) + (entity-valid-time object-value db) + + (= "_siteCreatedAt" field-name) + (entity-creation-time object-value db) + + :else + (default-for-type (::g/type-ref field)))))}))) (defn common-variables "Return the common 'built-in' variables that are bound always bound." diff --git a/src/juxt/site/alpha/handler.clj b/src/juxt/site/alpha/handler.clj index 5cbd4068f..670253934 100644 --- a/src/juxt/site/alpha/handler.clj +++ b/src/juxt/site/alpha/handler.clj @@ -809,6 +809,10 @@ ;; itself should be ignorant of such policies. Additionally, this is more ;; aligned to OpenAPI's declaration of per-resource errors. +(defn- q + [db query args] + (xt/q db query args)) + (defn error-resource "Locate an error resource. Currently only uses a simple database lookup of an 'ErrorResource' entity matching the status. In future this could use rules to @@ -820,6 +824,7 @@ :where [[er ::site/type "ErrorResource"] [er :ring.response/status status]] :in [status]} status))] + (log/tracef "ErrorResource found for status %d: %s" status res) res)) @@ -1169,13 +1174,36 @@ (assoc-in [:ring.response/headers "retry-after"] "120"))}))) (h req))) +(def cors-headers + "Generic CORS headers" + {"Access-Control-Allow-Origin" "*" + "Access-Control-Allow-Headers" "*" + "Access-Control-Allow-Methods" "GET"}) + +(defn preflight? + "Returns true if the request is a preflight request" + [request] + (= (request :request-method) :options)) + +(defn all-cors + "Allow requests from all origins - also check preflight" + [handler] + (fn [request] + (if (preflight? request) + {:status 200 + :headers cors-headers + :body "preflight complete"} + (let [response (handler request)] + (update-in response [:headers] + merge cors-headers ))))) + (defn make-pipeline "Make a pipeline of Ring middleware. Note, that each Ring middleware designates a processing stage. An interceptor chain (perhaps using Pedestal (pedestal.io) or Sieppari (https://github.com/metosin/sieppari) could be used. This is currently a synchronous chain but async could be supported in the future." [opts] - [ + [all-cors ;; Switch Ring requests/responses to Ring 2 namespaced keywords wrap-ring-1-adapter diff --git a/src/juxt/site/alpha/repl.clj b/src/juxt/site/alpha/repl.clj index e63c9d8c6..8bd9b22c8 100644 --- a/src/juxt/site/alpha/repl.clj +++ b/src/juxt/site/alpha/repl.clj @@ -62,16 +62,16 @@ (defn e [id] (postwalk - (fn [x] (if (and (vector? x) - (#{::http/content ::http/body} (first x)) - (> (count (second x)) 1024)) + (fn [x] (if (and (vector? x) + (#{::http/content ::http/body} (first x)) + (> (count (second x)) 1024)) - [(first x) - (cond - (= ::http/content (first x)) (str (subs (second x) 0 80) "…") - :else (format "(%d bytes)" (count (second x))))] - x)) - (xt/entity (db) id))) + [(first x) + (cond + (= ::http/content (first x)) (str (subs (second x) 0 80) "…") + :else (format "(%d bytes)" (count (second x))))] + x)) + (xt/entity (db) id))) (defn hist [id] (xt/entity-history (db) id :asc {:with-docs? true})) @@ -80,44 +80,44 @@ (defn put! [& ms] (->> - (xt/submit-tx - (xt-node) - (for [m ms] - (let [vt (:xtdb.api/valid-time m)] - [:xtdb.api/put (dissoc m :xtdb.api/valid-time) vt]))) - (xt/await-tx (xt-node)))) + (xt/submit-tx + (xt-node) + (for [m ms] + (let [vt (:xtdb.api/valid-time m)] + [:xtdb.api/put (dissoc m :xtdb.api/valid-time) vt]))) + (xt/await-tx (xt-node)))) (defn grep [re coll] (filter #(re-matches (re-pattern re) %) coll)) (defn rm! [& ids] (->> - (xt/submit-tx - (xt-node) - (for [id ids] - [:xtdb.api/delete id])) - (xt/await-tx (xt-node)))) + (xt/submit-tx + (xt-node) + (for [id ids] + [:xtdb.api/delete id])) + (xt/await-tx (xt-node)))) (defn evict! [& ids] (->> - (xt/submit-tx - (xt-node) - (for [id ids] - [:xtdb.api/evict id])) - (xt/await-tx (xt-node)))) + (xt/submit-tx + (xt-node) + (for [id ids] + [:xtdb.api/evict id])) + (xt/await-tx (xt-node)))) (defn q [query & args] (apply xt/q (db) query args)) (defn t [t] (map - first - (xt/q (db) '{:find [e] :where [[e ::site/type t]] :in [t]} t))) + first + (xt/q (db) '{:find [e] :where [[e ::site/type t]] :in [t]} t))) (defn t* [t] (map - first - (xt/q (db) '{:find [e] :where [[e :type t]] :in [t]} t))) + first + (xt/q (db) '{:find [e] :where [[e :type t]] :in [t]} t))) (defn types [] (->> (q '{:find [t] @@ -156,18 +156,18 @@ (defn now-id [] (.format - (.withZone - (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM-dd-HHmmss") - (java.time.ZoneId/systemDefault)) - (java.time.Instant/now))) + (.withZone + (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM-dd-HHmmss") + (java.time.ZoneId/systemDefault)) + (java.time.Instant/now))) ;; Start import at 00:35 (defn resources-from-stream [in] (let [record (try (edn/read - {:eof :eof :readers edn-readers} - in) + {:eof :eof :readers edn-readers} + in) (catch Exception e (def in in) (prn (.getMessage e))))] @@ -184,50 +184,60 @@ (let [tx-id (xt/submit-tx node tx)] (xt/await-tx node tx-id))) -(defn import-resources - ([] (import-resources "import/resources.edn")) - ([filename] - (let [node (xt-node) - in (java.io.PushbackReader. (io/reader (io/input-stream (io/file filename))))] - (doseq [rec (resources-from-stream in)] - (when (:xt/id rec) - (if (xt/entity (xt/db node) (:xt/id rec)) - (println "Skipping existing resource: " (:xt/id rec)) - (do - (submit-and-wait-tx node [[:xtdb.api/put rec]]) - (println "Imported resource: " (:xt/id rec))))))))) +(defn apply-uri-mappings + [mapping] + (fn [ent] + ;; Create a regex pattern which detects anything as a mapping key + (let [pat (re-pattern (str/join "|" (map #(format "\\Q%s\\E" %) (keys mapping))))] + (postwalk + (fn [s] + (cond-> s + (string? s) + (str/replace pat (fn [x] (get mapping x))))) + ent)))) + +(let [url-mapping {"{{KG_URL_BASE}}" + (or (System/getenv "KG_URL_BASE") "http://localhost:5509")} + set-kg-url-base (apply-uri-mappings url-mapping)] + (defn import-resources + ([] (import-resources "import/resources.edn")) + ([filename] + (let [node (xt-node) + in (java.io.PushbackReader. (io/reader (io/input-stream (io/file filename))))] + (doseq [rec (resources-from-stream in)] + (when (:xt/id rec) + (let [rec (set-kg-url-base rec)] + (if (xt/entity (xt/db node) (:xt/id rec)) + (println "Skipping existing resource: " (:xt/id rec)) + (do + (submit-and-wait-tx node [[:xtdb.api/put rec]]) + (println "Imported resource: " (:xt/id rec))))))))))) + + + (defn validate-resource-line [s] (edn/read-string - {:eof :eof :readers edn-readers} - s)) + {:eof :eof :readers edn-readers} + s)) (defn get-zipped-output-stream [] (let [zos (doto - (-> (str (now-id) ".edn.zip") - io/file - io/output-stream - java.util.zip.ZipOutputStream.) + (-> (str (now-id) ".edn.zip") + io/file + io/output-stream + java.util.zip.ZipOutputStream.) (.putNextEntry (java.util.zip.ZipEntry. "resources.edn")))] (java.io.OutputStreamWriter. zos))) -(defn apply-uri-mappings [mapping] - (fn [ent] - ;; Create a regex pattern which detects anything as a mapping key - (let [pat (re-pattern (str/join "|" (map #(format "\\Q%s\\E" %) (keys mapping))))] - (postwalk - (fn [s] - (cond-> s - (string? s) - (str/replace pat (fn [x] (get mapping x))))) - ent)))) + (comment (export-resources - {:pred (fn [x] (or (= (:juxt.home/type x) "Person"))) - :filename "/home/mal/Sync/persons.edn" - :uri-mapping {"http://localhost:2021" - "https://home.juxt.site"}})) + {:pred (fn [x] (or (= (:juxt.home/type x) "Person"))) + :filename "/home/mal/Sync/persons.edn" + :uri-mapping {"http://localhost:2021" + "https://home.juxt.site"}})) (defn export-resources "Export all resources to a file." @@ -261,9 +271,9 @@ (validate-resource-line line) (catch Exception e (throw - (ex-info - (format "Serialization of entity '%s' will not be readable" (:xt/id ent)) - {:xt/id (:xt/id ent)} e)))) + (ex-info + (format "Serialization of entity '%s' will not be readable" (:xt/id ent)) + {:xt/id (:xt/id ent)} e)))) (.write w line) (.write w (System/lineSeparator)))) (let [n (inc (first (last batch))) @@ -274,7 +284,6 @@ (remove-method print-method (type (byte-array []))) (printf "Dumped %d resources\n" (count resources))))) - (defn cat-type [t] (->> (q '{:find [(pull e [*])] @@ -286,9 +295,9 @@ (defn rules [] (sort-by - str - (map first - (q '{:find [(pull e [*])] :where [[e ::site/type "Rule"]]})))) + str + (map first + (q '{:find [(pull e [*])] :where [[e ::site/type "Rule"]]})))) (defn uuid ([] (str (java.util.UUID/randomUUID))) @@ -299,17 +308,16 @@ (defn req [s] (into - (sorted-map) - (cache/find - cache/requests-cache - (re-pattern (str "/_site/requests/" s))))) + (sorted-map) + (cache/find + cache/requests-cache + (re-pattern (str "/_site/requests/" s))))) (defn recent ([] (recent 5)) ([n] (map (juxt ::site/request-id ::site/date ::site/uri :ring.request/method :ring.response/status) - (cache/recent cache/requests-cache n)) - )) + (cache/recent cache/requests-cache n)))) (defn requests-cache [] cache/requests-cache) @@ -356,8 +364,8 @@ db (xt/db (xt-node))] [;; Awaiting a fix to https://github.com/juxt/xtdb/issues/1480 #_{:complete? (and - (xt/entity db (str base-uri "/_site/tx_fns/put_if_match_wildcard")) - (xt/entity db (str base-uri "/_site/tx_fns/put_if_match_etags"))) + (xt/entity db (str base-uri "/_site/tx_fns/put_if_match_wildcard")) + (xt/entity db (str base-uri "/_site/tx_fns/put_if_match_etags"))) :happy-message "Site transaction functions installed." :sad-message "Site transaction functions not installed. " :fix "Enter (put-site-txfns!) to fix this."} @@ -390,9 +398,9 @@ (if complete? (println "[✔] " (ansi/green happy-message)) (println - "[ ] " - (ansi/red sad-message) - (ansi/yellow fix)))) + "[ ] " + (ansi/red sad-message) + (ansi/yellow fix)))) (println) (if (every? :complete? steps) :ok :incomplete))) @@ -432,11 +440,11 @@ (let [config (config) xt-node (xt-node)] (init/put-superuser! - xt-node - {:username username - :fullname fullname - :password password} - config) + xt-node + {:username username + :fullname fullname + :password password} + config) (status (steps config))))) (defn update-site-graphql @@ -469,12 +477,12 @@ (defn reset-password! [username password] (let [user (str (::site/base-uri (config)) "/_site/users/" username)] (put! - {:xt/id (str user "/password") - ::site/type "Password" - ::http/methods #{:post} - ::pass/user user - ::pass/password-hash (password/encrypt password) - ::pass/classification "RESTRICTED"}))) + {:xt/id (str user "/password") + ::site/type "Password" + ::http/methods #{:post} + ::pass/user user + ::pass/password-hash (password/encrypt password) + ::pass/classification "RESTRICTED"}))) (defn user [username] (e (format "%s/_site/users/%s" (::site/base-uri (config)) username))) @@ -496,3 +504,26 @@ schema (:juxt.grab.alpha/schema (e (format "%s/_site/graphql" (::site/base-uri config)))) document (graphql.document/compile-document (graphql.parser/parse (slurp (io/file "opt/graphql/graphiql-introspection-query.graphql"))) schema)] (graphql/query schema document "IntrospectionQuery" {} {::site/db (db)}))) + +(defn repl-post-handler [{::site/keys [uri db] + ::pass/keys [subject] + :as req}] + (let [body (some-> req ::site/received-representation ::http/body (String.) read-string) + _ (when (nil? body) + (throw + (ex-info + "Invalid body" + {::site/request-context req}))) + + results (try + (binding [*ns* (find-ns 'juxt.site.alpha.repl)] + (eval body)) + (catch Exception e + (throw (ex-info "Syntax error" e))))] + + (-> req + (assoc + :ring.response/status 200 + :ring.response/body + (json/write-value-as-string results)) + (update :ring.response/headers assoc "content-type" "application/json")))) diff --git a/src/juxt/site/alpha/util.clj b/src/juxt/site/alpha/util.clj index 03845a55c..0abf91033 100644 --- a/src/juxt/site/alpha/util.clj +++ b/src/juxt/site/alpha/util.clj @@ -2,7 +2,8 @@ (ns juxt.site.alpha.util (:require - [juxt.clojars-mirrors.nippy.v3v1v1.taoensso.nippy.utils :refer [freezable?]])) + [juxt.clojars-mirrors.nippy.v3v1v1.taoensso.nippy.utils :refer [freezable?]] + [xtdb.api :as xt])) (alias 'site (create-ns 'juxt.site.alpha)) (alias 'http (create-ns 'juxt.http.alpha)) diff --git a/src/juxt/site/alpha/xtdb.clj b/src/juxt/site/alpha/xtdb.clj index 56e67cec0..86a1508d1 100644 --- a/src/juxt/site/alpha/xtdb.clj +++ b/src/juxt/site/alpha/xtdb.clj @@ -1,5 +1,6 @@ ;; Copyright © 2021, JUXT LTD. +(+) (ns juxt.site.alpha.xtdb) (defn inline-clj-pred [f & args]