From 250921263d70f6ab66bbbc163827a86325c13cfc Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Tue, 1 Oct 2024 16:17:37 +0300 Subject: [PATCH] graphman: create GraphQL API to execute commands (#5554) * graphman: define a store for execution data * store: implement graphman store * graphman: extract & refactor deployment info, pause, resume commands * graphman: create graphql server to execute commands * node: run graphman graphql server on startup * graphman: use refactored commands in the cli * graphman: document graphql api usage * graphman: accept a list of deployments on restart command * graphman: make docs clearer * store: rename migration to make it latest --- Cargo.lock | 341 +++++++++++++++++- Cargo.toml | 31 +- core/graphman/Cargo.toml | 14 + core/graphman/src/commands/deployment/info.rs | 81 +++++ core/graphman/src/commands/deployment/mod.rs | 3 + .../graphman/src/commands/deployment/pause.rs | 84 +++++ .../src/commands/deployment/resume.rs | 84 +++++ core/graphman/src/commands/mod.rs | 1 + core/graphman/src/deployment.rs | 146 ++++++++ core/graphman/src/error.rs | 19 + core/graphman/src/execution_tracker.rs | 84 +++++ core/graphman/src/lib.rs | 15 + core/graphman_store/Cargo.toml | 10 + core/graphman_store/src/lib.rs | 127 +++++++ docs/graphman-graphql-api.md | 213 +++++++++++ graph/src/data/subgraph/status.rs | 4 +- graph/src/env/mod.rs | 8 + node/Cargo.toml | 4 + node/src/bin/manager.rs | 91 +++-- node/src/main.rs | 69 +++- node/src/manager/commands/deployment/info.rs | 161 +++++++++ node/src/manager/commands/deployment/mod.rs | 4 + node/src/manager/commands/deployment/pause.rs | 22 ++ .../manager/commands/deployment/restart.rs | 32 ++ .../src/manager/commands/deployment/resume.rs | 22 ++ node/src/manager/commands/info.rs | 29 -- node/src/manager/commands/mod.rs | 2 +- node/src/manager/deployment.rs | 72 +--- node/src/opt.rs | 7 + server/graphman/Cargo.toml | 27 ++ server/graphman/src/auth.rs | 148 ++++++++ server/graphman/src/entities/block_hash.rs | 31 ++ server/graphman/src/entities/block_number.rs | 29 ++ server/graphman/src/entities/block_ptr.rs | 19 + server/graphman/src/entities/command_kind.rs | 8 + .../graphman/src/entities/deployment_info.rs | 44 +++ .../src/entities/deployment_selector.rs | 46 +++ .../src/entities/deployment_status.rs | 37 ++ .../entities/deployment_version_selector.rs | 19 + .../graphman/src/entities/empty_response.rs | 15 + server/graphman/src/entities/execution.rs | 56 +++ server/graphman/src/entities/execution_id.rs | 35 ++ server/graphman/src/entities/mod.rs | 25 ++ .../graphman/src/entities/subgraph_health.rs | 14 + server/graphman/src/error.rs | 10 + server/graphman/src/handlers/graphql.rs | 36 ++ server/graphman/src/handlers/mod.rs | 6 + server/graphman/src/handlers/state.rs | 6 + server/graphman/src/lib.rs | 12 + server/graphman/src/resolvers/context.rs | 27 ++ .../src/resolvers/deployment_mutation.rs | 68 ++++ .../resolvers/deployment_mutation/pause.rs | 18 + .../resolvers/deployment_mutation/restart.rs | 51 +++ .../resolvers/deployment_mutation/resume.rs | 18 + .../src/resolvers/deployment_query.rs | 29 ++ .../src/resolvers/deployment_query/info.rs | 54 +++ .../graphman/src/resolvers/execution_query.rs | 24 ++ server/graphman/src/resolvers/mod.rs | 12 + .../graphman/src/resolvers/mutation_root.rs | 14 + server/graphman/src/resolvers/query_root.rs | 20 + server/graphman/src/schema.rs | 7 + server/graphman/src/server.rs | 148 ++++++++ server/graphman/tests/auth.rs | 66 ++++ server/graphman/tests/deployment_mutation.rs | 268 ++++++++++++++ server/graphman/tests/deployment_query.rs | 238 ++++++++++++ server/graphman/tests/util/client.rs | 34 ++ server/graphman/tests/util/mod.rs | 46 +++ server/graphman/tests/util/server.rs | 45 +++ store/postgres/Cargo.toml | 5 +- .../down.sql | 1 + .../up.sql | 10 + store/postgres/src/graphman/mod.rs | 92 +++++ store/postgres/src/graphman/schema.rs | 11 + store/postgres/src/lib.rs | 2 + store/test-store/src/store.rs | 2 +- 75 files changed, 3558 insertions(+), 155 deletions(-) create mode 100644 core/graphman/Cargo.toml create mode 100644 core/graphman/src/commands/deployment/info.rs create mode 100644 core/graphman/src/commands/deployment/mod.rs create mode 100644 core/graphman/src/commands/deployment/pause.rs create mode 100644 core/graphman/src/commands/deployment/resume.rs create mode 100644 core/graphman/src/commands/mod.rs create mode 100644 core/graphman/src/deployment.rs create mode 100644 core/graphman/src/error.rs create mode 100644 core/graphman/src/execution_tracker.rs create mode 100644 core/graphman/src/lib.rs create mode 100644 core/graphman_store/Cargo.toml create mode 100644 core/graphman_store/src/lib.rs create mode 100644 docs/graphman-graphql-api.md create mode 100644 node/src/manager/commands/deployment/info.rs create mode 100644 node/src/manager/commands/deployment/mod.rs create mode 100644 node/src/manager/commands/deployment/pause.rs create mode 100644 node/src/manager/commands/deployment/restart.rs create mode 100644 node/src/manager/commands/deployment/resume.rs delete mode 100644 node/src/manager/commands/info.rs create mode 100644 server/graphman/Cargo.toml create mode 100644 server/graphman/src/auth.rs create mode 100644 server/graphman/src/entities/block_hash.rs create mode 100644 server/graphman/src/entities/block_number.rs create mode 100644 server/graphman/src/entities/block_ptr.rs create mode 100644 server/graphman/src/entities/command_kind.rs create mode 100644 server/graphman/src/entities/deployment_info.rs create mode 100644 server/graphman/src/entities/deployment_selector.rs create mode 100644 server/graphman/src/entities/deployment_status.rs create mode 100644 server/graphman/src/entities/deployment_version_selector.rs create mode 100644 server/graphman/src/entities/empty_response.rs create mode 100644 server/graphman/src/entities/execution.rs create mode 100644 server/graphman/src/entities/execution_id.rs create mode 100644 server/graphman/src/entities/mod.rs create mode 100644 server/graphman/src/entities/subgraph_health.rs create mode 100644 server/graphman/src/error.rs create mode 100644 server/graphman/src/handlers/graphql.rs create mode 100644 server/graphman/src/handlers/mod.rs create mode 100644 server/graphman/src/handlers/state.rs create mode 100644 server/graphman/src/lib.rs create mode 100644 server/graphman/src/resolvers/context.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation/pause.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation/restart.rs create mode 100644 server/graphman/src/resolvers/deployment_mutation/resume.rs create mode 100644 server/graphman/src/resolvers/deployment_query.rs create mode 100644 server/graphman/src/resolvers/deployment_query/info.rs create mode 100644 server/graphman/src/resolvers/execution_query.rs create mode 100644 server/graphman/src/resolvers/mod.rs create mode 100644 server/graphman/src/resolvers/mutation_root.rs create mode 100644 server/graphman/src/resolvers/query_root.rs create mode 100644 server/graphman/src/schema.rs create mode 100644 server/graphman/src/server.rs create mode 100644 server/graphman/tests/auth.rs create mode 100644 server/graphman/tests/deployment_mutation.rs create mode 100644 server/graphman/tests/deployment_query.rs create mode 100644 server/graphman/tests/util/client.rs create mode 100644 server/graphman/tests/util/mod.rs create mode 100644 server/graphman/tests/util/server.rs create mode 100644 store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/down.sql create mode 100644 store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/up.sql create mode 100644 store/postgres/src/graphman/mod.rs create mode 100644 store/postgres/src/graphman/schema.rs diff --git a/Cargo.lock b/Cargo.lock index 422b5d15f7e..a8aa8881984 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "ascii_utils" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -173,6 +179,100 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-graphql" +version = "7.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf338d20ba5bab309f55ce8df95d65ee19446f7737f06f4a64593ab2c6b546ad" +dependencies = [ + "async-graphql-derive", + "async-graphql-parser", + "async-graphql-value", + "async-stream", + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "fast_chemail", + "fnv", + "futures-util", + "handlebars", + "http 1.1.0", + "indexmap 2.2.6", + "mime", + "multer", + "num-traits", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "static_assertions_next", + "tempfile", + "thiserror", + "uuid", +] + +[[package]] +name = "async-graphql-axum" +version = "7.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28f874ad4bc10519f3fa500e36814452033a5ce9ea681ab0a2e0d3b1f18bae44" +dependencies = [ + "async-graphql", + "async-trait", + "axum 0.7.5", + "bytes", + "futures-util", + "serde_json", + "tokio", + "tokio-stream", + "tokio-util 0.7.11", + "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "async-graphql-derive" +version = "7.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc51fd6b7102acda72bc94e8ae1543844d5688ff394a6cf7c21f2a07fe2d64e4" +dependencies = [ + "Inflector", + "async-graphql-parser", + "darling", + "proc-macro-crate 3.1.0", + "proc-macro2", + "quote", + "strum", + "syn 2.0.69", + "thiserror", +] + +[[package]] +name = "async-graphql-parser" +version = "7.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75361eefd64e39f89bead4cb45fddbaf60ddb0e7b15fb7c852b6088bcd63071f" +dependencies = [ + "async-graphql-value", + "pest", + "serde", + "serde_json", +] + +[[package]] +name = "async-graphql-value" +version = "7.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f665d2d52b41c4ed1f01c43f3ef27a2fe0af2452ed5c8bc7ac9b1a8719afaa" +dependencies = [ + "bytes", + "indexmap 2.2.6", + "serde", + "serde_json", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -242,7 +342,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -263,6 +363,43 @@ dependencies = [ "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "base64 0.21.7", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper 1.0.1", + "tokio", + "tokio-tungstenite 0.21.0", + "tower 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-layer 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -280,6 +417,27 @@ dependencies = [ "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -491,6 +649,9 @@ name = "bytes" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -1032,6 +1193,7 @@ dependencies = [ "pq-sys", "r2d2", "serde_json", + "uuid", ] [[package]] @@ -1348,6 +1510,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fast_chemail" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "495a39d30d624c2caabe6312bfead73e7717692b44e0b32df168c275a2e8e9e4" +dependencies = [ + "ascii_utils", +] + [[package]] name = "fastrand" version = "2.1.0" @@ -1863,6 +2034,7 @@ dependencies = [ name = "graph-node" version = "0.35.0" dependencies = [ + "anyhow", "clap", "diesel", "env_logger", @@ -1882,6 +2054,9 @@ dependencies = [ "graph-server-metrics", "graph-server-websocket", "graph-store-postgres", + "graphman", + "graphman-server", + "itertools 0.13.0", "json-structural-diff", "lazy_static", "prometheus", @@ -1983,7 +2158,7 @@ dependencies = [ "graph", "serde", "serde_derive", - "tokio-tungstenite", + "tokio-tungstenite 0.23.1", "uuid", ] @@ -1995,6 +2170,7 @@ dependencies = [ "anyhow", "async-trait", "blake3 1.5.1", + "chrono", "clap", "derive_more", "diesel", @@ -2005,6 +2181,7 @@ dependencies = [ "fallible-iterator 0.3.0", "git-testament", "graph", + "graphman-store", "graphql-parser", "hex", "itertools 0.13.0", @@ -2017,6 +2194,7 @@ dependencies = [ "pretty_assertions", "rand", "serde", + "serde_json", "stable-hash 0.3.4", "uuid", ] @@ -2056,6 +2234,55 @@ dependencies = [ "syn 2.0.69", ] +[[package]] +name = "graphman" +version = "0.35.0" +dependencies = [ + "anyhow", + "diesel", + "graph", + "graph-store-postgres", + "graphman-store", + "itertools 0.13.0", + "thiserror", + "tokio", +] + +[[package]] +name = "graphman-server" +version = "0.35.0" +dependencies = [ + "anyhow", + "async-graphql", + "async-graphql-axum", + "axum 0.7.5", + "chrono", + "diesel", + "graph", + "graph-store-postgres", + "graphman", + "graphman-store", + "lazy_static", + "reqwest", + "serde", + "serde_json", + "slog", + "test-store", + "thiserror", + "tokio", + "tower-http", +] + +[[package]] +name = "graphman-store" +version = "0.35.0" +dependencies = [ + "anyhow", + "chrono", + "diesel", + "strum", +] + [[package]] name = "graphql-parser" version = "0.4.0" @@ -2117,6 +2344,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08485b96a0e6393e9e4d1b8d48cf74ad6c063cd905eb33f42c1ce3f0377539b" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3025,6 +3266,23 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multiaddr" version = "0.17.1" @@ -4429,6 +4687,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_plain" version = "1.0.2" @@ -4782,6 +5050,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "static_assertions_next" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7beae5182595e9a8b683fa98c4317f956c9a2dec3b9716990d20023cc60c766" + [[package]] name = "stringprep" version = "0.1.5" @@ -4799,6 +5073,15 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] + [[package]] name = "strum_macros" version = "0.26.4" @@ -5289,6 +5572,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.23.1" @@ -5298,7 +5593,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.23.0", ] [[package]] @@ -5324,6 +5619,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -5391,7 +5687,7 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "flate2", @@ -5467,6 +5763,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.6.0", + "bytes", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "pin-project-lite", + "tower-layer 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -5508,6 +5820,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -5556,6 +5869,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.23.0" @@ -5738,6 +6070,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 514870b105e..2a11e9b5d49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ resolver = "2" members = [ "core", + "core/graphman", + "core/graphman_store", "chain/*", "graphql", "node", @@ -24,25 +26,46 @@ repository = "https://github.com/graphprotocol/graph-node" license = "MIT OR Apache-2.0" [workspace.dependencies] -diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono"] } +anyhow = "1.0" +async-graphql = { version = "7.0.6", features = ["chrono", "uuid"] } +async-graphql-axum = "7.0.6" +axum = "0.7.5" +chrono = "0.4.38" +clap = { version = "4.5.4", features = ["derive", "env"] } +diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } -diesel_derives = "2.1.4" diesel-dynamic-schema = "0.2.1" +diesel_derives = "2.1.4" diesel_migrations = "2.1.0" +graph = { path = "./graph" } +graph-core = { path = "./core" } +graph-store-postgres = { path = "./store/postgres" } +graphman-server = { path = "./server/graphman" } +graphman = { path = "./core/graphman" } +graphman-store = { path = "./core/graphman_store" } +itertools = "0.13.0" +lazy_static = "1.5.0" prost = "0.12.6" prost-types = "0.12.6" +regex = "1.5.4" +reqwest = "0.12.5" serde = { version = "1.0.126", features = ["rc"] } serde_derive = "1.0.125" serde_json = { version = "1.0", features = ["arbitrary_precision"] } serde_regex = "1.1.0" serde_yaml = "0.9.21" +slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] } sqlparser = "0.46.0" +strum = { version = "0.26", features = ["derive"] } syn = { version = "2.0.66", features = ["full"] } +test-store = { path = "./store/test-store" } +thiserror = "1.0.25" +tokio = { version = "1.38.0", features = ["full"] } tonic = { version = "0.11.0", features = ["tls-roots", "gzip"] } tonic-build = { version = "0.11.0", features = ["prost"] } -wasmtime = "15.0.1" +tower-http = { version = "0.5.2", features = ["cors"] } wasmparser = "0.118.1" -clap = { version = "4.5.4", features = ["derive", "env"] } +wasmtime = "15.0.1" # Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed. [profile.test] diff --git a/core/graphman/Cargo.toml b/core/graphman/Cargo.toml new file mode 100644 index 00000000000..001a683f4aa --- /dev/null +++ b/core/graphman/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "graphman" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow = { workspace = true } +diesel = { workspace = true } +graph = { workspace = true } +graph-store-postgres = { workspace = true } +graphman-store = { workspace = true } +itertools = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } diff --git a/core/graphman/src/commands/deployment/info.rs b/core/graphman/src/commands/deployment/info.rs new file mode 100644 index 00000000000..2d3f58d5dc9 --- /dev/null +++ b/core/graphman/src/commands/deployment/info.rs @@ -0,0 +1,81 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use graph::blockchain::BlockPtr; +use graph::components::store::BlockNumber; +use graph::components::store::DeploymentId; +use graph::components::store::StatusStore; +use graph::data::subgraph::schema::SubgraphHealth; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::Store; +use itertools::Itertools; + +use crate::deployment::Deployment; +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +#[derive(Clone, Debug)] +pub struct DeploymentStatus { + pub is_paused: Option, + pub is_synced: bool, + pub health: SubgraphHealth, + pub earliest_block_number: BlockNumber, + pub latest_block: Option, + pub chain_head_block: Option, +} + +pub fn load_deployments( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, + version: &DeploymentVersionSelector, +) -> Result, GraphmanError> { + let mut primary_conn = primary_pool.get()?; + + crate::deployment::load_deployments(&mut primary_conn, &deployment, &version) +} + +pub fn load_deployment_statuses( + store: Arc, + deployments: &[Deployment], +) -> Result, GraphmanError> { + use graph::data::subgraph::status::Filter; + + let deployment_ids = deployments + .iter() + .map(|deployment| DeploymentId::new(deployment.id)) + .collect_vec(); + + let deployment_statuses = store + .status(Filter::DeploymentIds(deployment_ids))? + .into_iter() + .map(|status| { + let id = status.id.0; + + let chain = status + .chains + .get(0) + .ok_or_else(|| { + GraphmanError::Store(anyhow!( + "deployment status has no chains on deployment '{id}'" + )) + })? + .to_owned(); + + Ok(( + id, + DeploymentStatus { + is_paused: status.paused, + is_synced: status.synced, + health: status.health, + earliest_block_number: chain.earliest_block_number.to_owned(), + latest_block: chain.latest_block.map(|x| x.to_ptr()), + chain_head_block: chain.chain_head_block.map(|x| x.to_ptr()), + }, + )) + }) + .collect::>()?; + + Ok(deployment_statuses) +} diff --git a/core/graphman/src/commands/deployment/mod.rs b/core/graphman/src/commands/deployment/mod.rs new file mode 100644 index 00000000000..9c695a5f74a --- /dev/null +++ b/core/graphman/src/commands/deployment/mod.rs @@ -0,0 +1,3 @@ +pub mod info; +pub mod pause; +pub mod resume; diff --git a/core/graphman/src/commands/deployment/pause.rs b/core/graphman/src/commands/deployment/pause.rs new file mode 100644 index 00000000000..fb6bcf2cc2a --- /dev/null +++ b/core/graphman/src/commands/deployment/pause.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::DeploymentLocator; +use graph::components::store::StoreEvent; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct ActiveDeployment { + locator: DeploymentLocator, + site: Site, +} + +#[derive(Debug, Error)] +pub enum PauseDeploymentError { + #[error("deployment '{0}' is already paused")] + AlreadyPaused(String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +impl ActiveDeployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + +pub fn load_active_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )? + .locator(); + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + let (_, is_paused) = catalog_conn + .assignment_status(&site) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("assignment status not found for '{locator}'")) + })?; + + if is_paused { + return Err(PauseDeploymentError::AlreadyPaused(locator.to_string())); + } + + Ok(ActiveDeployment { locator, site }) +} + +pub fn pause_active_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + active_deployment: ActiveDeployment, +) -> Result<(), GraphmanError> { + let primary_conn = primary_pool.get()?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes = catalog_conn.pause_subgraph(&active_deployment.site)?; + catalog_conn.send_store_event(¬ification_sender, &StoreEvent::new(changes))?; + + Ok(()) +} diff --git a/core/graphman/src/commands/deployment/resume.rs b/core/graphman/src/commands/deployment/resume.rs new file mode 100644 index 00000000000..28be98dc35c --- /dev/null +++ b/core/graphman/src/commands/deployment/resume.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::DeploymentLocator; +use graph::prelude::StoreEvent; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct PausedDeployment { + locator: DeploymentLocator, + site: Site, +} + +#[derive(Debug, Error)] +pub enum ResumeDeploymentError { + #[error("deployment '{0}' is not paused")] + NotPaused(String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +impl PausedDeployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + +pub fn load_paused_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )? + .locator(); + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + let (_, is_paused) = catalog_conn + .assignment_status(&site) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("assignment status not found for '{locator}'")) + })?; + + if !is_paused { + return Err(ResumeDeploymentError::NotPaused(locator.to_string())); + } + + Ok(PausedDeployment { locator, site }) +} + +pub fn resume_paused_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + paused_deployment: PausedDeployment, +) -> Result<(), GraphmanError> { + let primary_conn = primary_pool.get()?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes = catalog_conn.resume_subgraph(&paused_deployment.site)?; + catalog_conn.send_store_event(¬ification_sender, &StoreEvent::new(changes))?; + + Ok(()) +} diff --git a/core/graphman/src/commands/mod.rs b/core/graphman/src/commands/mod.rs new file mode 100644 index 00000000000..98629027b58 --- /dev/null +++ b/core/graphman/src/commands/mod.rs @@ -0,0 +1 @@ +pub mod deployment; diff --git a/core/graphman/src/deployment.rs b/core/graphman/src/deployment.rs new file mode 100644 index 00000000000..ed7b21701bf --- /dev/null +++ b/core/graphman/src/deployment.rs @@ -0,0 +1,146 @@ +use anyhow::anyhow; +use diesel::dsl::sql; +use diesel::prelude::*; +use diesel::sql_types::Text; +use graph::components::store::DeploymentId; +use graph::components::store::DeploymentLocator; +use graph::data::subgraph::DeploymentHash; +use graph_store_postgres::command_support::catalog; +use itertools::Itertools; + +use crate::GraphmanError; + +#[derive(Clone, Debug, Queryable)] +pub struct Deployment { + pub id: i32, + pub hash: String, + pub namespace: String, + pub name: String, + pub node_id: Option, + pub shard: String, + pub chain: String, + pub version_status: String, + pub is_active: bool, +} + +#[derive(Clone, Debug)] +pub enum DeploymentSelector { + Name(String), + Subgraph { hash: String, shard: Option }, + Schema(String), + All, +} + +#[derive(Clone, Debug)] +pub enum DeploymentVersionSelector { + Current, + Pending, + Used, + All, +} + +impl Deployment { + pub fn locator(&self) -> DeploymentLocator { + DeploymentLocator::new( + DeploymentId::new(self.id), + DeploymentHash::new(self.hash.clone()).unwrap(), + ) + } +} + +pub(crate) fn load_deployments( + primary_conn: &mut PgConnection, + deployment: &DeploymentSelector, + version: &DeploymentVersionSelector, +) -> Result, GraphmanError> { + use catalog::deployment_schemas as ds; + use catalog::subgraph as sg; + use catalog::subgraph_deployment_assignment as sgda; + use catalog::subgraph_version as sgv; + + let mut query = ds::table + .inner_join(sgv::table.on(sgv::deployment.eq(ds::subgraph))) + .inner_join(sg::table.on(sgv::subgraph.eq(sg::id))) + .left_outer_join(sgda::table.on(sgda::id.eq(ds::id))) + .select(( + ds::id, + sgv::deployment, + ds::name, + sg::name, + sgda::node_id.nullable(), + ds::shard, + ds::network, + sql::( + "( + case + when subgraphs.subgraph.pending_version = subgraphs.subgraph_version.id + then 'pending' + when subgraphs.subgraph.current_version = subgraphs.subgraph_version.id + then 'current' + else + 'unused' + end + ) status", + ), + ds::active, + )) + .into_boxed(); + + match deployment { + DeploymentSelector::Name(name) => { + let pattern = format!("%{}%", name.replace("%", "")); + query = query.filter(sg::name.ilike(pattern)); + } + DeploymentSelector::Subgraph { hash, shard } => { + query = query.filter(ds::subgraph.eq(hash)); + + if let Some(shard) = shard { + query = query.filter(ds::shard.eq(shard)); + } + } + DeploymentSelector::Schema(name) => { + query = query.filter(ds::name.eq(name)); + } + DeploymentSelector::All => { + // No query changes required. + } + }; + + let current_version_filter = sg::current_version.eq(sgv::id.nullable()); + let pending_version_filter = sg::pending_version.eq(sgv::id.nullable()); + + match version { + DeploymentVersionSelector::Current => { + query = query.filter(current_version_filter); + } + DeploymentVersionSelector::Pending => { + query = query.filter(pending_version_filter); + } + DeploymentVersionSelector::Used => { + query = query.filter(current_version_filter.or(pending_version_filter)); + } + DeploymentVersionSelector::All => { + // No query changes required. + } + } + + query.load(primary_conn).map_err(Into::into) +} + +pub(crate) fn load_deployment( + primary_conn: &mut PgConnection, + deployment: &DeploymentSelector, + version: &DeploymentVersionSelector, +) -> Result { + let deployment = load_deployments(primary_conn, deployment, version)? + .into_iter() + .exactly_one() + .map_err(|err| { + let count = err.into_iter().count(); + GraphmanError::Store(anyhow!( + "expected exactly one deployment for '{deployment:?}', found {count}" + )) + })?; + + Ok(deployment) +} diff --git a/core/graphman/src/error.rs b/core/graphman/src/error.rs new file mode 100644 index 00000000000..731b2574f0e --- /dev/null +++ b/core/graphman/src/error.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum GraphmanError { + #[error("store error: {0:#}")] + Store(#[source] anyhow::Error), +} + +impl From for GraphmanError { + fn from(err: graph::components::store::StoreError) -> Self { + Self::Store(err.into()) + } +} + +impl From for GraphmanError { + fn from(err: diesel::result::Error) -> Self { + Self::Store(err.into()) + } +} diff --git a/core/graphman/src/execution_tracker.rs b/core/graphman/src/execution_tracker.rs new file mode 100644 index 00000000000..96471d7c4a0 --- /dev/null +++ b/core/graphman/src/execution_tracker.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use graphman_store::ExecutionId; +use graphman_store::GraphmanStore; +use tokio::sync::Notify; + +/// The execution status is updated at this interval. +const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20); + +/// Used with long-running command executions to maintain their status as active. +pub struct GraphmanExecutionTracker { + id: ExecutionId, + heartbeat_stopper: Arc, + store: Arc, +} + +impl GraphmanExecutionTracker +where + S: GraphmanStore + Send + Sync + 'static, +{ + /// Creates a new execution tracker that spawns a separate background task that keeps + /// the execution active by periodically updating its status. + pub fn new(store: Arc, id: ExecutionId) -> Self { + let heartbeat_stopper = Arc::new(Notify::new()); + + let tracker = Self { + id, + store, + heartbeat_stopper, + }; + + tracker.spawn_heartbeat(); + tracker + } + + fn spawn_heartbeat(&self) { + let id = self.id; + let heartbeat_stopper = self.heartbeat_stopper.clone(); + let store = self.store.clone(); + + graph::spawn(async move { + store.mark_execution_as_running(id).unwrap(); + + let stop_heartbeat = heartbeat_stopper.notified(); + tokio::pin!(stop_heartbeat); + + loop { + tokio::select! { + biased; + + _ = &mut stop_heartbeat => { + break; + }, + + _ = tokio::time::sleep(DEFAULT_HEARTBEAT_INTERVAL) => { + store.mark_execution_as_running(id).unwrap(); + }, + } + } + }); + } + + /// Completes the execution with an error. + pub fn track_failure(self, error_message: String) -> Result<()> { + self.heartbeat_stopper.notify_one(); + + self.store.mark_execution_as_failed(self.id, error_message) + } + + /// Completes the execution with a success. + pub fn track_success(self) -> Result<()> { + self.heartbeat_stopper.notify_one(); + + self.store.mark_execution_as_succeeded(self.id) + } +} + +impl Drop for GraphmanExecutionTracker { + fn drop(&mut self) { + self.heartbeat_stopper.notify_one(); + } +} diff --git a/core/graphman/src/lib.rs b/core/graphman/src/lib.rs new file mode 100644 index 00000000000..71f8e77a848 --- /dev/null +++ b/core/graphman/src/lib.rs @@ -0,0 +1,15 @@ +//! This crate contains graphman commands that can be executed via +//! the GraphQL API as well as via the CLI. +//! +//! Each command is broken into small execution steps to allow different interfaces to perform +//! some additional interface-specific operations between steps. An example of this is printing +//! intermediate information to the user in the CLI, or prompting for additional input. + +mod error; + +pub mod commands; +pub mod deployment; +pub mod execution_tracker; + +pub use self::error::GraphmanError; +pub use self::execution_tracker::GraphmanExecutionTracker; diff --git a/core/graphman_store/Cargo.toml b/core/graphman_store/Cargo.toml new file mode 100644 index 00000000000..59705f944e2 --- /dev/null +++ b/core/graphman_store/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "graphman-store" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow = { workspace = true } +chrono = { workspace = true } +diesel = { workspace = true } +strum = { workspace = true } diff --git a/core/graphman_store/src/lib.rs b/core/graphman_store/src/lib.rs new file mode 100644 index 00000000000..b44cbca8a91 --- /dev/null +++ b/core/graphman_store/src/lib.rs @@ -0,0 +1,127 @@ +//! This crate allows graphman commands to store data in a persistent storage. +//! +//! Note: The trait is extracted as a separate crate to avoid cyclic dependencies between graphman +//! commands and store implementations. + +use anyhow::Result; +use chrono::DateTime; +use chrono::Utc; +use diesel::deserialize::FromSql; +use diesel::pg::Pg; +use diesel::pg::PgValue; +use diesel::serialize::Output; +use diesel::serialize::ToSql; +use diesel::sql_types::BigSerial; +use diesel::sql_types::Varchar; +use diesel::AsExpression; +use diesel::FromSqlRow; +use diesel::Queryable; +use strum::Display; +use strum::EnumString; +use strum::IntoStaticStr; + +/// Describes all the capabilities that graphman commands need from a persistent storage. +/// +/// The primary use case for this is background execution of commands. +pub trait GraphmanStore { + /// Creates a new pending execution of the specified type. + /// The implementation is expected to manage execution IDs and return unique IDs on each call. + /// + /// Creating a new execution does not mean that a command is actually running or will run. + fn new_execution(&self, kind: CommandKind) -> Result; + + /// Returns all stored execution data. + fn load_execution(&self, id: ExecutionId) -> Result; + + /// When an execution begins to make progress, this method is used to update its status. + /// + /// For long-running commands, it is expected that this method will be called at some interval + /// to show that the execution is still making progress. + /// + /// The implementation is expected to not allow updating the status of completed executions. + fn mark_execution_as_running(&self, id: ExecutionId) -> Result<()>; + + /// This is a finalizing operation and is expected to be called only once, + /// when an execution fails. + /// + /// The implementation is not expected to prevent overriding the final state of an execution. + fn mark_execution_as_failed(&self, id: ExecutionId, error_message: String) -> Result<()>; + + /// This is a finalizing operation and is expected to be called only once, + /// when an execution succeeds. + /// + /// The implementation is not expected to prevent overriding the final state of an execution. + fn mark_execution_as_succeeded(&self, id: ExecutionId) -> Result<()>; +} + +/// Data stored about a command execution. +#[derive(Clone, Debug, Queryable)] +pub struct Execution { + pub id: ExecutionId, + pub kind: CommandKind, + pub status: ExecutionStatus, + pub error_message: Option, + pub created_at: DateTime, + pub updated_at: Option>, + pub completed_at: Option>, +} + +/// A unique ID of a command execution. +#[derive(Clone, Copy, Debug, AsExpression, FromSqlRow)] +#[diesel(sql_type = BigSerial)] +pub struct ExecutionId(pub i64); + +/// Types of commands that can store data about their execution. +#[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Display, IntoStaticStr, EnumString)] +#[diesel(sql_type = Varchar)] +#[strum(serialize_all = "snake_case")] +pub enum CommandKind { + RestartDeployment, +} + +/// All possible states of a command execution. +#[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Display, IntoStaticStr, EnumString)] +#[diesel(sql_type = Varchar)] +#[strum(serialize_all = "snake_case")] +pub enum ExecutionStatus { + Initializing, + Running, + Failed, + Succeeded, +} + +impl FromSql for ExecutionId { + fn from_sql(bytes: PgValue) -> diesel::deserialize::Result { + Ok(ExecutionId(i64::from_sql(bytes)?)) + } +} + +impl ToSql for ExecutionId { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + >::to_sql(&self.0, &mut out.reborrow()) + } +} + +impl FromSql for CommandKind { + fn from_sql(bytes: PgValue) -> diesel::deserialize::Result { + Ok(std::str::from_utf8(bytes.as_bytes())?.parse()?) + } +} + +impl ToSql for CommandKind { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + >::to_sql(self.into(), &mut out.reborrow()) + } +} + +impl FromSql for ExecutionStatus { + fn from_sql(bytes: PgValue) -> diesel::deserialize::Result { + Ok(std::str::from_utf8(bytes.as_bytes())?.parse()?) + } +} + +impl ToSql for ExecutionStatus { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + >::to_sql(self.into(), &mut out.reborrow()) + } +} diff --git a/docs/graphman-graphql-api.md b/docs/graphman-graphql-api.md new file mode 100644 index 00000000000..486bee6090d --- /dev/null +++ b/docs/graphman-graphql-api.md @@ -0,0 +1,213 @@ +# Graphman GraphQL API + +The graphman API provides functionality to manage various aspects of `graph-node` through GraphQL operations. It is only +started when the environment variable `GRAPHMAN_SERVER_AUTH_TOKEN` is set. The token is used to authenticate graphman +GraphQL requests. Even with the token, the server should not be exposed externally as it provides operations that an +attacker can use to severely impede the functioning of an indexer. The server listens on the port `GRAPHMAN_PORT`, port +`8050` by default. + +Environment variables to control the graphman API: + +- `GRAPHMAN_SERVER_AUTH_TOKEN` - The token is used to authenticate graphman GraphQL requests. +- `GRAPHMAN_PORT` - The port for the graphman GraphQL server (Defaults to `8050`) + +## GraphQL playground + +When the graphman GraphQL server is running the GraphQL playground is available at the following +address: http://127.0.0.1:8050 + +**Note:** The port might be different. + +Please make sure to set the authorization header to be able to use the playground: + +```json +{ + "Authorization": "Bearer GRAPHMAN_SERVER_AUTH_TOKEN" +} +``` + +**Note:** There is a headers section at the bottom of the playground page. + +## Supported commands + +The playground is the best place to see the full schema, the latest available queries and mutations, and their +documentation. Below, we will briefly describe some supported commands and example queries. + +At the time of writing, the following graphman commands are available via the GraphQL API: + +### Deployment Info + +Returns the available information about one, multiple, or all deployments. + +**Example query:** + +```text +query { + deployment { + info(deployment: { hash: "Qm..." }) { + status { + isPaused + } + } + } +} +``` + +**Example response:** + +```json +{ + "data": { + "deployment": { + "info": [ + { + "status": { + "isPaused": false + } + } + ] + } + } +} +``` + +### Pause Deployment + +Pauses a deployment that is not already paused. + +**Example query:** + +```text +mutation { + deployment { + pause(deployment: { hash: "Qm..." }) { + success + } + } +} +``` + +**Example response:** + +```json +{ + "data": { + "deployment": { + "pause": { + "success": true + } + } + } +} +``` + +### Resume Deployment + +Resumes a deployment that has been previously paused. + +**Example query:** + +```text +mutation { + deployment { + resume(deployment: { hash: "Qm..." }) { + success + } + } +} +``` + +**Example response:** + +```json +{ + "data": { + "deployment": { + "resume": { + "success": true + } + } + } +} +``` + +### Restart Deployment + +Pauses a deployment and resumes it after a delay. + +**Example query:** + +```text +mutation { + deployment { + restart(deployment: { hash: "Qm..." }) { + id + } + } +} +``` + +**Example response:** + +```json +{ + "data": { + "deployment": { + "restart": { + "id": "UNIQUE_EXECUTION_ID" + } + } + } +} +``` + +This is a long-running command because the default delay before resuming the deployment is 20 seconds. Long-running +commands are executed in the background. For long-running commands, the GraphQL API will return a unique execution ID. + +The ID can be used to query the execution status and the output of the command: + +```text +query { + execution { + info(id: "UNIQUE_EXECUTION_ID") { + status + errorMessage + } + } +} +``` + +**Example response when execution is in-progress:** + +```json +{ + "data": { + "execution": { + "info": { + "status": "RUNNING", + "errorMessage": null + } + } + } +} +``` + +**Example response when execution is completed:** + +```json +{ + "data": { + "execution": { + "info": { + "status": "SUCCEEDED", + "errorMessage": null + } + } + } +} +``` + +## Other commands + +GraphQL support for other graphman commands will be added over time, so please make sure to check the GraphQL playground +for the full schema and the latest available queries and mutations. diff --git a/graph/src/data/subgraph/status.rs b/graph/src/data/subgraph/status.rs index aff4ee82512..e2c14751955 100644 --- a/graph/src/data/subgraph/status.rs +++ b/graph/src/data/subgraph/status.rs @@ -19,7 +19,7 @@ pub enum Filter { } /// Light wrapper around `EthereumBlockPointer` that is compatible with GraphQL values. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct EthereumBlock(BlockPtr); impl EthereumBlock { @@ -55,7 +55,7 @@ impl From for EthereumBlock { /// Indexing status information related to the chain. Right now, we only /// support Ethereum, but once we support more chains, we'll have to turn this into /// an enum -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChainInfo { /// The network name (e.g. `mainnet`, `ropsten`, `rinkeby`, `kovan` or `goerli`). pub network: String, diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index af53562528a..c6e180ea428 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -221,6 +221,11 @@ pub struct EnvVars { /// How long do we wait for a response from the provider before considering that it is unavailable. /// Default is 30s. pub genesis_validation_timeout: Duration, + + /// Sets the token that is used to authenticate graphman GraphQL queries. + /// + /// If not specified, the graphman server will not start. + pub graphman_server_auth_token: Option, } impl EnvVars { @@ -305,6 +310,7 @@ impl EnvVars { firehose_grpc_max_decode_size_mb: inner.firehose_grpc_max_decode_size_mb, genesis_validation_enabled: inner.genesis_validation_enabled.0, genesis_validation_timeout: Duration::from_secs(inner.genesis_validation_timeout), + graphman_server_auth_token: inner.graphman_server_auth_token, }) } @@ -454,6 +460,8 @@ struct Inner { genesis_validation_enabled: EnvVarBoolean, #[envconfig(from = "GRAPH_NODE_GENESIS_VALIDATION_TIMEOUT_SECONDS", default = "30")] genesis_validation_timeout: u64, + #[envconfig(from = "GRAPHMAN_SERVER_AUTH_TOKEN")] + graphman_server_auth_token: Option, } #[derive(Clone, Debug)] diff --git a/node/Cargo.toml b/node/Cargo.toml index fe4313d96aa..8a98396d6fd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -13,9 +13,11 @@ name = "graphman" path = "src/bin/manager.rs" [dependencies] +anyhow = { workspace = true } env_logger = "0.11.3" clap.workspace = true git-testament = "0.2" +itertools = { workspace = true } lazy_static = "1.5.0" url = "2.5.2" graph = { path = "../graph" } @@ -33,6 +35,8 @@ graph-server-json-rpc = { path = "../server/json-rpc" } graph-server-websocket = { path = "../server/websocket" } graph-server-metrics = { path = "../server/metrics" } graph-store-postgres = { path = "../store/postgres" } +graphman-server = { workspace = true } +graphman = { workspace = true } serde = { workspace = true } shellexpand = "3.1.0" termcolor = "1.4.1" diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 7efd5fd4e48..88979cd4413 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -34,6 +34,7 @@ use graph_store_postgres::{ connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore, SubscriptionManager, PRIMARY_SHARD, }; +use itertools::Itertools; use lazy_static::lazy_static; use std::str::FromStr; use std::{collections::HashMap, num::ParseIntError, sync::Arc, time::Duration}; @@ -181,10 +182,10 @@ pub enum Command { /// The deployment (see `help info`) deployment: DeploymentSearch, }, - /// Pause and resume a deployment + /// Pause and resume one or multiple deployments Restart { - /// The deployment (see `help info`) - deployment: DeploymentSearch, + /// The deployment(s) (see `help info`) + deployments: Vec, /// Sleep for this many seconds after pausing subgraphs #[clap( long, @@ -1123,28 +1124,23 @@ async fn main() -> anyhow::Result<()> { used, all, } => { - let (primary, store) = if status { - let (store, primary) = ctx.store_and_primary(); - (primary, Some(store)) - } else { - (ctx.primary_pool(), None) + let (store, primary_pool) = ctx.store_and_primary(); + + let ctx = commands::deployment::info::Context { + primary_pool, + store, }; - match deployment { - Some(deployment) => { - commands::info::run(primary, store, deployment, current, pending, used).err(); - } - None => { - if all { - let deployment = DeploymentSearch::All; - commands::info::run(primary, store, deployment, current, pending, used) - .err(); - } else { - bail!("Please specify a deployment or use --all to list all deployments"); - } - } + let args = commands::deployment::info::Args { + deployment: deployment.map(make_deployment_selector), + current, + pending, + status, + used, + all, }; - Ok(()) + + commands::deployment::info::run(ctx, args) } Unused(cmd) => { let store = ctx.subgraph_store(); @@ -1201,25 +1197,35 @@ async fn main() -> anyhow::Result<()> { commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node) } Pause { deployment } => { - let sender = ctx.notification_sender(); - let pool = ctx.primary_pool(); - let locator = &deployment.locate_unique(&pool)?; - commands::assign::pause_or_resume(pool, &sender, locator, true) - } + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); + commands::deployment::pause::run(primary_pool, notifications_sender, deployment) + } Resume { deployment } => { - let sender = ctx.notification_sender(); - let pool = ctx.primary_pool(); - let locator = &deployment.locate_unique(&pool).unwrap(); + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); - commands::assign::pause_or_resume(pool, &sender, locator, false) + commands::deployment::resume::run(primary_pool, notifications_sender, deployment) } - Restart { deployment, sleep } => { - let sender = ctx.notification_sender(); - let pool = ctx.primary_pool(); - let locator = &deployment.locate_unique(&pool).unwrap(); + Restart { deployments, sleep } => { + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); - commands::assign::restart(pool, &sender, locator, sleep) + for deployment in deployments.into_iter().unique() { + let deployment = make_deployment_selector(deployment); + + commands::deployment::restart::run( + primary_pool.clone(), + notifications_sender.clone(), + deployment, + sleep, + )?; + } + + Ok(()) } Rewind { force, @@ -1635,3 +1641,16 @@ async fn main() -> anyhow::Result<()> { fn parse_duration_in_secs(s: &str) -> Result { Ok(Duration::from_secs(s.parse()?)) } + +fn make_deployment_selector( + deployment: DeploymentSearch, +) -> graphman::deployment::DeploymentSelector { + use graphman::deployment::DeploymentSelector::*; + + match deployment { + DeploymentSearch::Name { name } => Name(name), + DeploymentSearch::Hash { hash, shard } => Subgraph { hash, shard }, + DeploymentSearch::All => All, + DeploymentSearch::Deployment { namespace } => Schema(namespace), + } +} diff --git a/node/src/main.rs b/node/src/main.rs index 4d0041e02a3..d67c38992ba 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -31,7 +31,11 @@ use graph_server_index_node::IndexNodeServer; use graph_server_json_rpc::JsonRpcServer; use graph_server_metrics::PrometheusMetricsServer; use graph_server_websocket::SubscriptionServer as GraphQLSubscriptionServer; -use graph_store_postgres::register_jobs as register_store_jobs; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::Store; +use graph_store_postgres::{register_jobs as register_store_jobs, NotificationSender}; +use graphman_server::GraphmanServer; +use graphman_server::GraphmanServerConfig; use std::io::{BufRead, BufReader}; use std::path::Path; use std::time::Duration; @@ -251,12 +255,23 @@ async fn main() { ) .await; - let launch_services = |logger: Logger, env_vars: Arc| async move { - let subscription_manager = store_builder.subscription_manager(); - let chain_head_update_listener = store_builder.chain_head_update_listener(); - let primary_pool = store_builder.primary_pool(); + let primary_pool = store_builder.primary_pool(); + let subscription_manager = store_builder.subscription_manager(); + let chain_head_update_listener = store_builder.chain_head_update_listener(); + let network_store = store_builder.network_store(config.chain_ids()); + + let graphman_server_config = make_graphman_server_config( + primary_pool.clone(), + network_store.cheap_clone(), + metrics_registry.cheap_clone(), + &env_vars, + &logger, + &logger_factory, + ); + + start_graphman_server(opt.graphman_port, graphman_server_config).await; - let network_store = store_builder.network_store(config.chain_ids()); + let launch_services = |logger: Logger, env_vars: Arc| async move { let block_store = network_store.block_store(); let validator: Arc = if env_vars.genesis_validation_enabled { @@ -536,3 +551,45 @@ async fn main() { graph::futures03::future::pending::<()>().await; } + +async fn start_graphman_server(port: u16, config: Option>) { + let Some(config) = config else { + return; + }; + + let server = GraphmanServer::new(config) + .unwrap_or_else(|err| panic!("Invalid graphman server configuration: {err:#}")); + + server + .start(port) + .await + .unwrap_or_else(|err| panic!("Failed to start graphman server: {err:#}")); +} + +fn make_graphman_server_config<'a>( + pool: ConnectionPool, + store: Arc, + metrics_registry: Arc, + env_vars: &EnvVars, + logger: &Logger, + logger_factory: &'a LoggerFactory, +) -> Option> { + let Some(auth_token) = &env_vars.graphman_server_auth_token else { + warn!( + logger, + "Missing graphman server auth token; graphman server will not start", + ); + + return None; + }; + + let notification_sender = Arc::new(NotificationSender::new(metrics_registry.clone())); + + Some(GraphmanServerConfig { + pool, + notification_sender, + store, + logger_factory, + auth_token: auth_token.to_owned(), + }) +} diff --git a/node/src/manager/commands/deployment/info.rs b/node/src/manager/commands/deployment/info.rs new file mode 100644 index 00000000000..1b4f646a212 --- /dev/null +++ b/node/src/manager/commands/deployment/info.rs @@ -0,0 +1,161 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::bail; +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::Store; +use graphman::commands::deployment::info::load_deployment_statuses; +use graphman::commands::deployment::info::load_deployments; +use graphman::commands::deployment::info::DeploymentStatus; +use graphman::deployment::Deployment; +use graphman::deployment::DeploymentSelector; +use graphman::deployment::DeploymentVersionSelector; + +use crate::manager::display::List; + +pub struct Context { + pub primary_pool: ConnectionPool, + pub store: Arc, +} + +pub struct Args { + pub deployment: Option, + pub current: bool, + pub pending: bool, + pub status: bool, + pub used: bool, + pub all: bool, +} + +pub fn run(ctx: Context, args: Args) -> Result<()> { + let Context { + primary_pool, + store, + } = ctx; + + let Args { + deployment, + current, + pending, + status, + used, + all, + } = args; + + let deployment = match deployment { + Some(deployment) => deployment, + None if all => DeploymentSelector::All, + None => { + bail!("Please specify a deployment or use --all to list all deployments"); + } + }; + + let version = make_deployment_version_selector(current, pending, used); + let deployments = load_deployments(primary_pool.clone(), &deployment, &version)?; + + if deployments.is_empty() { + println!("No matches"); + return Ok(()); + } + + let statuses = if status { + Some(load_deployment_statuses(store, &deployments)?) + } else { + None + }; + + print_info(deployments, statuses); + + Ok(()) +} + +fn make_deployment_version_selector( + current: bool, + pending: bool, + used: bool, +) -> DeploymentVersionSelector { + use DeploymentVersionSelector::*; + + match (current || used, pending || used) { + (false, false) => All, + (true, false) => Current, + (false, true) => Pending, + (true, true) => Used, + } +} + +fn print_info(deployments: Vec, statuses: Option>) { + let mut headers = vec![ + "Name", + "Status", + "Hash", + "Namespace", + "Shard", + "Active", + "Chain", + "Node ID", + ]; + + if statuses.is_some() { + headers.extend(vec![ + "Paused", + "Synced", + "Health", + "Earliest Block", + "Latest Block", + "Chain Head Block", + ]); + } + + let mut list = List::new(headers); + + const NONE: &str = "---"; + + fn optional(s: Option) -> String { + s.map(|x| x.to_string()).unwrap_or(NONE.to_owned()) + } + + for deployment in deployments { + let mut row = vec![ + deployment.name, + deployment.version_status, + deployment.hash, + deployment.namespace, + deployment.shard, + deployment.is_active.to_string(), + deployment.chain, + optional(deployment.node_id), + ]; + + let status = statuses.as_ref().map(|x| x.get(&deployment.id)); + + match status { + Some(Some(status)) => { + row.extend(vec![ + optional(status.is_paused), + status.is_synced.to_string(), + status.health.as_str().to_string(), + status.earliest_block_number.to_string(), + optional(status.latest_block.as_ref().map(|x| x.number)), + optional(status.chain_head_block.as_ref().map(|x| x.number)), + ]); + } + Some(None) => { + row.extend(vec![ + NONE.to_owned(), + NONE.to_owned(), + NONE.to_owned(), + NONE.to_owned(), + NONE.to_owned(), + NONE.to_owned(), + ]); + } + None => {} + } + + list.append(row); + } + + list.render(); +} diff --git a/node/src/manager/commands/deployment/mod.rs b/node/src/manager/commands/deployment/mod.rs new file mode 100644 index 00000000000..98910d7b4c4 --- /dev/null +++ b/node/src/manager/commands/deployment/mod.rs @@ -0,0 +1,4 @@ +pub mod info; +pub mod pause; +pub mod restart; +pub mod resume; diff --git a/node/src/manager/commands/deployment/pause.rs b/node/src/manager/commands/deployment/pause.rs new file mode 100644 index 00000000000..1cd4808fad8 --- /dev/null +++ b/node/src/manager/commands/deployment/pause.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::pause::load_active_deployment; +use graphman::commands::deployment::pause::pause_active_deployment; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, +) -> Result<()> { + let active_deployment = load_active_deployment(primary_pool.clone(), &deployment)?; + + println!("Pausing deployment {} ...", active_deployment.locator()); + + pause_active_deployment(primary_pool, notification_sender, active_deployment)?; + + Ok(()) +} diff --git a/node/src/manager/commands/deployment/restart.rs b/node/src/manager/commands/deployment/restart.rs new file mode 100644 index 00000000000..4febf81b63c --- /dev/null +++ b/node/src/manager/commands/deployment/restart.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; + +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, + delay: Duration, +) -> Result<()> { + super::pause::run( + primary_pool.clone(), + notification_sender.clone(), + deployment.clone(), + )?; + + println!( + "Waiting {}s to make sure pausing was processed ...", + delay.as_secs() + ); + + sleep(delay); + + super::resume::run(primary_pool, notification_sender, deployment.clone())?; + + Ok(()) +} diff --git a/node/src/manager/commands/deployment/resume.rs b/node/src/manager/commands/deployment/resume.rs new file mode 100644 index 00000000000..7e57d60cd48 --- /dev/null +++ b/node/src/manager/commands/deployment/resume.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::resume::load_paused_deployment; +use graphman::commands::deployment::resume::resume_paused_deployment; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, +) -> Result<()> { + let paused_deployment = load_paused_deployment(primary_pool.clone(), &deployment)?; + + println!("Resuming deployment {} ...", paused_deployment.locator()); + + resume_paused_deployment(primary_pool, notification_sender, paused_deployment)?; + + Ok(()) +} diff --git a/node/src/manager/commands/info.rs b/node/src/manager/commands/info.rs deleted file mode 100644 index 76781d74d57..00000000000 --- a/node/src/manager/commands/info.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::sync::Arc; - -use graph::{components::store::StatusStore, data::subgraph::status, prelude::anyhow}; -use graph_store_postgres::{connection_pool::ConnectionPool, Store}; - -use crate::manager::deployment::{Deployment, DeploymentSearch}; - -pub fn run( - pool: ConnectionPool, - store: Option>, - search: DeploymentSearch, - current: bool, - pending: bool, - used: bool, -) -> Result<(), anyhow::Error> { - let deployments = search.find(pool, current, pending, used)?; - let ids: Vec<_> = deployments.iter().map(|d| d.locator().id).collect(); - let statuses = match store { - Some(store) => store.status(status::Filter::DeploymentIds(ids))?, - None => vec![], - }; - - if deployments.is_empty() { - println!("No matches"); - } else { - Deployment::print_table(deployments, statuses); - } - Ok(()) -} diff --git a/node/src/manager/commands/mod.rs b/node/src/manager/commands/mod.rs index 14fd7632d59..127966879c9 100644 --- a/node/src/manager/commands/mod.rs +++ b/node/src/manager/commands/mod.rs @@ -6,9 +6,9 @@ pub mod copy; pub mod create; pub mod database; pub mod deploy; +pub mod deployment; pub mod drop; pub mod index; -pub mod info; pub mod listen; pub mod prune; pub mod query; diff --git a/node/src/manager/deployment.rs b/node/src/manager/deployment.rs index e6ae944185a..fc1a3e0e5a7 100644 --- a/node/src/manager/deployment.rs +++ b/node/src/manager/deployment.rs @@ -8,15 +8,12 @@ use diesel::{sql_types::Text, PgConnection}; use graph::components::store::DeploymentId; use graph::{ components::store::DeploymentLocator, - data::subgraph::status, prelude::{anyhow, lazy_static, regex::Regex, DeploymentHash}, }; use graph_store_postgres::command_support::catalog as store_catalog; use graph_store_postgres::connection_pool::ConnectionPool; use graph_store_postgres::unused; -use crate::manager::display::List; - lazy_static! { // `Qm...` optionally follow by `:$shard` static ref HASH_RE: Regex = Regex::new("\\A(?PQm[^:]+)(:(?P[a-z0-9_]+))?\\z").unwrap(); @@ -28,7 +25,7 @@ lazy_static! { /// by subgraph name, IPFS hash, or namespace. Since there can be multiple /// deployments for the same IPFS hash, the search term for a hash can /// optionally specify a shard. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum DeploymentSearch { Name { name: String }, Hash { hash: String, shard: Option }, @@ -205,71 +202,4 @@ impl Deployment { DeploymentHash::new(self.deployment.clone()).unwrap(), ) } - - pub fn print_table(deployments: Vec, statuses: Vec) { - let mut rows = vec![ - "name", - "status", - "id", - "namespace", - "shard", - "active", - "chain", - "node_id", - ]; - if !statuses.is_empty() { - rows.extend(vec![ - "paused", - "synced", - "health", - "earliest block", - "latest block", - "chain head block", - ]); - } - - let mut list = List::new(rows); - - for deployment in deployments { - let status = statuses - .iter() - .find(|status| &status.id.0 == &deployment.id); - - let mut rows = vec![ - deployment.name, - deployment.status, - deployment.deployment, - deployment.namespace, - deployment.shard, - deployment.active.to_string(), - deployment.chain, - deployment.node_id.unwrap_or("---".to_string()), - ]; - if let Some(status) = status { - let chain = &status.chains[0]; - rows.extend(vec![ - status - .paused - .map(|b| b.to_string()) - .unwrap_or("---".to_string()), - status.synced.to_string(), - status.health.as_str().to_string(), - chain.earliest_block_number.to_string(), - chain - .latest_block - .as_ref() - .map(|b| b.number().to_string()) - .unwrap_or("-".to_string()), - chain - .chain_head_block - .as_ref() - .map(|b| b.number().to_string()) - .unwrap_or("-".to_string()), - ]) - } - list.append(rows); - } - - list.render(); - } } diff --git a/node/src/opt.rs b/node/src/opt.rs index c2945959514..2b127e34e29 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -231,6 +231,13 @@ pub struct Opt { help = "Base URL for forking subgraphs" )] pub fork_base: Option, + #[clap( + long, + default_value = "8050", + value_name = "GRAPHMAN_PORT", + help = "Port for the graphman GraphQL server" + )] + pub graphman_port: u16, } impl From for config::Opt { diff --git a/server/graphman/Cargo.toml b/server/graphman/Cargo.toml new file mode 100644 index 00000000000..231ef5e0828 --- /dev/null +++ b/server/graphman/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "graphman-server" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-graphql = { workspace = true } +async-graphql-axum = { workspace = true } +axum = { workspace = true } +chrono = { workspace = true } +graph = { workspace = true } +graph-store-postgres = { workspace = true } +graphman = { workspace = true } +graphman-store = { workspace = true } +serde_json = { workspace = true } +slog = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tower-http = { workspace = true } + +[dev-dependencies] +diesel = { workspace = true } +lazy_static = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +test-store = { workspace = true } diff --git a/server/graphman/src/auth.rs b/server/graphman/src/auth.rs new file mode 100644 index 00000000000..d83dc58856c --- /dev/null +++ b/server/graphman/src/auth.rs @@ -0,0 +1,148 @@ +use anyhow::anyhow; +use axum::http::HeaderMap; +use graph::http::header::AUTHORIZATION; + +use crate::GraphmanServerError; + +/// Contains a valid authentication token and checks HTTP headers for valid tokens. +#[derive(Clone)] +pub struct AuthToken { + token: Vec, +} + +impl AuthToken { + pub fn new(token: impl AsRef) -> Result { + let token = token.as_ref().trim().as_bytes().to_vec(); + + if token.is_empty() { + return Err(GraphmanServerError::InvalidAuthToken(anyhow!( + "auth token can not be empty" + ))); + } + + Ok(Self { token }) + } + + pub fn headers_contain_correct_token(&self, headers: &HeaderMap) -> bool { + let header_token = headers + .get(AUTHORIZATION) + .and_then(|header| header.as_bytes().strip_prefix(b"Bearer ")); + + let Some(header_token) = header_token else { + return false; + }; + + let mut token_is_correct = true; + + // We compare every byte of the tokens to prevent token size leaks and timing attacks. + for i in 0..std::cmp::max(self.token.len(), header_token.len()) { + if self.token.get(i) != header_token.get(i) { + token_is_correct = false; + } + } + + token_is_correct + } +} + +pub fn unauthorized_graphql_message() -> serde_json::Value { + serde_json::json!({ + "errors": [ + { + "message": "You are not authorized to access this resource", + "extensions": { + "code": "UNAUTHORIZED" + } + } + ], + "data": null + }) +} + +#[cfg(test)] +mod tests { + use axum::http::HeaderValue; + + use super::*; + + fn header_value(s: &str) -> HeaderValue { + s.try_into().unwrap() + } + + fn bearer_value(s: &str) -> HeaderValue { + header_value(&format!("Bearer {s}")) + } + + #[test] + fn require_non_empty_tokens() { + assert!(AuthToken::new("").is_err()); + assert!(AuthToken::new(" ").is_err()); + assert!(AuthToken::new("\n\n").is_err()); + assert!(AuthToken::new("\t\t").is_err()); + } + + #[test] + fn check_missing_header() { + let token_a = AuthToken::new("123").unwrap(); + let token_b = AuthToken::new("abc").unwrap(); + + let headers = HeaderMap::new(); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + } + + #[test] + fn check_empty_header() { + let token_a = AuthToken::new("123").unwrap(); + let token_b = AuthToken::new("abc").unwrap(); + + let mut headers = HeaderMap::new(); + + headers.insert(AUTHORIZATION, header_value("")); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + + headers.insert(AUTHORIZATION, bearer_value("")); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + } + + #[test] + fn check_token_prefix() { + let token_a = AuthToken::new("123").unwrap(); + let token_b = AuthToken::new("abc").unwrap(); + + let mut headers = HeaderMap::new(); + + headers.insert(AUTHORIZATION, header_value("12")); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + + headers.insert(AUTHORIZATION, bearer_value("12")); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + } + + #[test] + fn validate_tokens() { + let token_a = AuthToken::new("123").unwrap(); + let token_b = AuthToken::new("abc").unwrap(); + + let mut headers = HeaderMap::new(); + + headers.insert(AUTHORIZATION, bearer_value("123")); + + assert!(token_a.headers_contain_correct_token(&headers)); + assert!(!token_b.headers_contain_correct_token(&headers)); + + headers.insert(AUTHORIZATION, bearer_value("abc")); + + assert!(!token_a.headers_contain_correct_token(&headers)); + assert!(token_b.headers_contain_correct_token(&headers)); + } +} diff --git a/server/graphman/src/entities/block_hash.rs b/server/graphman/src/entities/block_hash.rs new file mode 100644 index 00000000000..46ca970beee --- /dev/null +++ b/server/graphman/src/entities/block_hash.rs @@ -0,0 +1,31 @@ +use async_graphql::InputValueError; +use async_graphql::InputValueResult; +use async_graphql::Scalar; +use async_graphql::ScalarType; +use async_graphql::Value; + +/// Represents a block hash in hex form. +#[derive(Clone, Debug)] +pub struct BlockHash(pub String); + +/// Represents a block hash in hex form. +#[Scalar] +impl ScalarType for BlockHash { + fn parse(value: Value) -> InputValueResult { + let Value::String(value) = value else { + return Err(InputValueError::expected_type(value)); + }; + + Ok(BlockHash(value)) + } + + fn to_value(&self) -> Value { + Value::String(self.0.clone()) + } +} + +impl From for BlockHash { + fn from(block_hash: graph::blockchain::BlockHash) -> Self { + Self(block_hash.hash_hex()) + } +} diff --git a/server/graphman/src/entities/block_number.rs b/server/graphman/src/entities/block_number.rs new file mode 100644 index 00000000000..83fe9714265 --- /dev/null +++ b/server/graphman/src/entities/block_number.rs @@ -0,0 +1,29 @@ +use async_graphql::InputValueError; +use async_graphql::InputValueResult; +use async_graphql::Scalar; +use async_graphql::ScalarType; +use async_graphql::Value; + +#[derive(Clone, Debug)] +pub struct BlockNumber(pub i32); + +#[Scalar] +impl ScalarType for BlockNumber { + fn parse(value: Value) -> InputValueResult { + let Value::String(value) = value else { + return Err(InputValueError::expected_type(value)); + }; + + Ok(value.parse().map(BlockNumber)?) + } + + fn to_value(&self) -> Value { + Value::String(self.0.to_string()) + } +} + +impl From for BlockNumber { + fn from(block_number: graph::prelude::BlockNumber) -> Self { + Self(block_number) + } +} diff --git a/server/graphman/src/entities/block_ptr.rs b/server/graphman/src/entities/block_ptr.rs new file mode 100644 index 00000000000..7ae1ed517ba --- /dev/null +++ b/server/graphman/src/entities/block_ptr.rs @@ -0,0 +1,19 @@ +use async_graphql::SimpleObject; + +use crate::entities::BlockHash; +use crate::entities::BlockNumber; + +#[derive(Clone, Debug, SimpleObject)] +pub struct BlockPtr { + pub hash: BlockHash, + pub number: BlockNumber, +} + +impl From for BlockPtr { + fn from(block_ptr: graph::blockchain::BlockPtr) -> Self { + Self { + hash: block_ptr.hash.into(), + number: block_ptr.number.into(), + } + } +} diff --git a/server/graphman/src/entities/command_kind.rs b/server/graphman/src/entities/command_kind.rs new file mode 100644 index 00000000000..9fb324680c6 --- /dev/null +++ b/server/graphman/src/entities/command_kind.rs @@ -0,0 +1,8 @@ +use async_graphql::Enum; + +/// Types of commands that run in the background. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Enum)] +#[graphql(remote = "graphman_store::CommandKind")] +pub enum CommandKind { + RestartDeployment, +} diff --git a/server/graphman/src/entities/deployment_info.rs b/server/graphman/src/entities/deployment_info.rs new file mode 100644 index 00000000000..804e0d9ae9e --- /dev/null +++ b/server/graphman/src/entities/deployment_info.rs @@ -0,0 +1,44 @@ +use async_graphql::SimpleObject; + +use crate::entities::DeploymentStatus; + +#[derive(Clone, Debug, SimpleObject)] +pub struct DeploymentInfo { + pub hash: String, + pub namespace: String, + pub name: String, + pub node_id: Option, + pub shard: String, + pub chain: String, + pub version_status: String, + pub is_active: bool, + pub status: Option, +} + +impl From for DeploymentInfo { + fn from(deployment: graphman::deployment::Deployment) -> Self { + let graphman::deployment::Deployment { + id: _, + hash, + namespace, + name, + node_id, + shard, + chain, + version_status, + is_active, + } = deployment; + + Self { + hash, + namespace, + name, + node_id, + shard, + chain, + version_status, + is_active, + status: None, + } + } +} diff --git a/server/graphman/src/entities/deployment_selector.rs b/server/graphman/src/entities/deployment_selector.rs new file mode 100644 index 00000000000..97d8ec72b23 --- /dev/null +++ b/server/graphman/src/entities/deployment_selector.rs @@ -0,0 +1,46 @@ +use anyhow::anyhow; +use anyhow::Result; +use async_graphql::InputObject; + +/// Available criteria for selecting one or more deployments. +/// No more than one criterion can be selected at a time. +#[derive(Clone, Debug, InputObject)] +pub struct DeploymentSelector { + /// Selects deployments by subgraph name. + /// + /// It is not necessary to enter the full name, a name prefix or suffix may be sufficient. + pub name: Option, + + /// Selects deployments by IPFS hash. The format is `Qm...`. + pub hash: Option, + + /// Since the same IPFS hash can be deployed in multiple shards, + /// it is possible to specify the shard. + /// + /// It only works if the IPFS hash is also provided. + pub shard: Option, + + /// Selects a deployment by its database namespace. The format is `sgdNNN`. + pub schema: Option, +} + +impl TryFrom for graphman::deployment::DeploymentSelector { + type Error = anyhow::Error; + + fn try_from(deployment: DeploymentSelector) -> Result { + let DeploymentSelector { + name, + hash, + shard, + schema, + } = deployment; + + match (name, hash, shard, schema) { + (Some(name), None, None, None) => Ok(Self::Name(name)), + (None, Some(hash), shard, None) => Ok(Self::Subgraph { hash, shard }), + (None, None, None, Some(name)) => Ok(Self::Schema(name)), + (None, None, None, None) => Err(anyhow!("selector can not be empty")), + _ => Err(anyhow!("multiple selectors can not be applied at once")), + } + } +} diff --git a/server/graphman/src/entities/deployment_status.rs b/server/graphman/src/entities/deployment_status.rs new file mode 100644 index 00000000000..ae9df27c82b --- /dev/null +++ b/server/graphman/src/entities/deployment_status.rs @@ -0,0 +1,37 @@ +use async_graphql::SimpleObject; + +use crate::entities::BlockNumber; +use crate::entities::BlockPtr; +use crate::entities::SubgraphHealth; + +#[derive(Clone, Debug, SimpleObject)] +pub struct DeploymentStatus { + pub is_paused: Option, + pub is_synced: bool, + pub health: SubgraphHealth, + pub earliest_block_number: BlockNumber, + pub latest_block: Option, + pub chain_head_block: Option, +} + +impl From for DeploymentStatus { + fn from(status: graphman::commands::deployment::info::DeploymentStatus) -> Self { + let graphman::commands::deployment::info::DeploymentStatus { + is_paused, + is_synced, + health, + earliest_block_number, + latest_block, + chain_head_block, + } = status; + + Self { + is_paused, + is_synced, + health: health.into(), + earliest_block_number: earliest_block_number.into(), + latest_block: latest_block.map(Into::into), + chain_head_block: chain_head_block.map(Into::into), + } + } +} diff --git a/server/graphman/src/entities/deployment_version_selector.rs b/server/graphman/src/entities/deployment_version_selector.rs new file mode 100644 index 00000000000..59e68d8780f --- /dev/null +++ b/server/graphman/src/entities/deployment_version_selector.rs @@ -0,0 +1,19 @@ +use async_graphql::Enum; + +/// Used to filter deployments by version. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Enum)] +pub enum DeploymentVersionSelector { + Current, + Pending, + Used, +} + +impl From for graphman::deployment::DeploymentVersionSelector { + fn from(version: DeploymentVersionSelector) -> Self { + match version { + DeploymentVersionSelector::Current => Self::Current, + DeploymentVersionSelector::Pending => Self::Pending, + DeploymentVersionSelector::Used => Self::Used, + } + } +} diff --git a/server/graphman/src/entities/empty_response.rs b/server/graphman/src/entities/empty_response.rs new file mode 100644 index 00000000000..a66244f899e --- /dev/null +++ b/server/graphman/src/entities/empty_response.rs @@ -0,0 +1,15 @@ +use async_graphql::SimpleObject; + +/// This type is used when an operation has been successful, +/// but there is no output that can be returned. +#[derive(Clone, Debug, SimpleObject)] +pub struct EmptyResponse { + pub success: bool, +} + +impl EmptyResponse { + /// Returns a successful response. + pub fn new() -> Self { + Self { success: true } + } +} diff --git a/server/graphman/src/entities/execution.rs b/server/graphman/src/entities/execution.rs new file mode 100644 index 00000000000..1daae4a7d01 --- /dev/null +++ b/server/graphman/src/entities/execution.rs @@ -0,0 +1,56 @@ +use anyhow::Result; +use async_graphql::Enum; +use async_graphql::SimpleObject; +use chrono::DateTime; +use chrono::Utc; + +use crate::entities::CommandKind; +use crate::entities::ExecutionId; + +/// Data stored about a command execution. +#[derive(Clone, Debug, SimpleObject)] +pub struct Execution { + pub id: ExecutionId, + pub kind: CommandKind, + pub status: ExecutionStatus, + pub error_message: Option, + pub created_at: DateTime, + pub updated_at: Option>, + pub completed_at: Option>, +} + +/// All possible states of a command execution. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Enum)] +#[graphql(remote = "graphman_store::ExecutionStatus")] +pub enum ExecutionStatus { + Initializing, + Running, + Failed, + Succeeded, +} + +impl TryFrom for Execution { + type Error = anyhow::Error; + + fn try_from(execution: graphman_store::Execution) -> Result { + let graphman_store::Execution { + id, + kind, + status, + error_message, + created_at, + updated_at, + completed_at, + } = execution; + + Ok(Self { + id: id.into(), + kind: kind.into(), + status: status.into(), + error_message, + created_at, + updated_at, + completed_at, + }) + } +} diff --git a/server/graphman/src/entities/execution_id.rs b/server/graphman/src/entities/execution_id.rs new file mode 100644 index 00000000000..bfdc350bcab --- /dev/null +++ b/server/graphman/src/entities/execution_id.rs @@ -0,0 +1,35 @@ +use async_graphql::InputValueError; +use async_graphql::InputValueResult; +use async_graphql::Scalar; +use async_graphql::ScalarType; +use async_graphql::Value; + +#[derive(Clone, Debug)] +pub struct ExecutionId(pub i64); + +#[Scalar] +impl ScalarType for ExecutionId { + fn parse(value: Value) -> InputValueResult { + let Value::String(value) = value else { + return Err(InputValueError::expected_type(value)); + }; + + Ok(value.parse().map(ExecutionId)?) + } + + fn to_value(&self) -> Value { + Value::String(self.0.to_string()) + } +} + +impl From for ExecutionId { + fn from(id: graphman_store::ExecutionId) -> Self { + Self(id.0) + } +} + +impl From for graphman_store::ExecutionId { + fn from(id: ExecutionId) -> Self { + Self(id.0) + } +} diff --git a/server/graphman/src/entities/mod.rs b/server/graphman/src/entities/mod.rs new file mode 100644 index 00000000000..8f4b2d8c018 --- /dev/null +++ b/server/graphman/src/entities/mod.rs @@ -0,0 +1,25 @@ +mod block_hash; +mod block_number; +mod block_ptr; +mod command_kind; +mod deployment_info; +mod deployment_selector; +mod deployment_status; +mod deployment_version_selector; +mod empty_response; +mod execution; +mod execution_id; +mod subgraph_health; + +pub use self::block_hash::BlockHash; +pub use self::block_number::BlockNumber; +pub use self::block_ptr::BlockPtr; +pub use self::command_kind::CommandKind; +pub use self::deployment_info::DeploymentInfo; +pub use self::deployment_selector::DeploymentSelector; +pub use self::deployment_status::DeploymentStatus; +pub use self::deployment_version_selector::DeploymentVersionSelector; +pub use self::empty_response::EmptyResponse; +pub use self::execution::Execution; +pub use self::execution_id::ExecutionId; +pub use self::subgraph_health::SubgraphHealth; diff --git a/server/graphman/src/entities/subgraph_health.rs b/server/graphman/src/entities/subgraph_health.rs new file mode 100644 index 00000000000..473423f97f0 --- /dev/null +++ b/server/graphman/src/entities/subgraph_health.rs @@ -0,0 +1,14 @@ +use async_graphql::Enum; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Enum)] +#[graphql(remote = "graph::data::subgraph::schema::SubgraphHealth")] +pub enum SubgraphHealth { + /// Syncing without errors. + Healthy, + + /// Syncing but has errors. + Unhealthy, + + /// No longer syncing due to a fatal error. + Failed, +} diff --git a/server/graphman/src/error.rs b/server/graphman/src/error.rs new file mode 100644 index 00000000000..96dd31d0050 --- /dev/null +++ b/server/graphman/src/error.rs @@ -0,0 +1,10 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum GraphmanServerError { + #[error("invalid auth token: {0:#}")] + InvalidAuthToken(#[source] anyhow::Error), + + #[error("I/O error: {0:#}")] + Io(#[source] anyhow::Error), +} diff --git a/server/graphman/src/handlers/graphql.rs b/server/graphman/src/handlers/graphql.rs new file mode 100644 index 00000000000..4eeb88303cf --- /dev/null +++ b/server/graphman/src/handlers/graphql.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +use async_graphql::http::playground_source; +use async_graphql::http::GraphQLPlaygroundConfig; +use async_graphql_axum::GraphQLRequest; +use async_graphql_axum::GraphQLResponse; +use axum::extract::Extension; +use axum::extract::State; +use axum::http::HeaderMap; +use axum::response::Html; +use axum::response::IntoResponse; +use axum::response::Json; +use axum::response::Response; + +use crate::auth::unauthorized_graphql_message; +use crate::handlers::state::AppState; +use crate::schema::GraphmanSchema; + +pub async fn graphql_playground_handler() -> impl IntoResponse { + Html(playground_source(GraphQLPlaygroundConfig::new("/"))) +} + +pub async fn graphql_request_handler( + State(state): State>, + Extension(schema): Extension, + headers: HeaderMap, + req: GraphQLRequest, +) -> Response { + if !state.auth_token.headers_contain_correct_token(&headers) { + return Json(unauthorized_graphql_message()).into_response(); + } + + let resp: GraphQLResponse = schema.execute(req.into_inner()).await.into(); + + resp.into_response() +} diff --git a/server/graphman/src/handlers/mod.rs b/server/graphman/src/handlers/mod.rs new file mode 100644 index 00000000000..57ea7d37ec6 --- /dev/null +++ b/server/graphman/src/handlers/mod.rs @@ -0,0 +1,6 @@ +mod graphql; +mod state; + +pub use self::graphql::graphql_playground_handler; +pub use self::graphql::graphql_request_handler; +pub use self::state::AppState; diff --git a/server/graphman/src/handlers/state.rs b/server/graphman/src/handlers/state.rs new file mode 100644 index 00000000000..b0a0a0e1d21 --- /dev/null +++ b/server/graphman/src/handlers/state.rs @@ -0,0 +1,6 @@ +use crate::auth::AuthToken; + +/// The state that is shared between all request handlers. +pub struct AppState { + pub auth_token: AuthToken, +} diff --git a/server/graphman/src/lib.rs b/server/graphman/src/lib.rs new file mode 100644 index 00000000000..4a0b9df3a11 --- /dev/null +++ b/server/graphman/src/lib.rs @@ -0,0 +1,12 @@ +mod auth; +mod entities; +mod error; +mod handlers; +mod resolvers; +mod schema; +mod server; + +pub use self::error::GraphmanServerError; +pub use self::server::GraphmanServer; +pub use self::server::GraphmanServerConfig; +pub use self::server::GraphmanServerManager; diff --git a/server/graphman/src/resolvers/context.rs b/server/graphman/src/resolvers/context.rs new file mode 100644 index 00000000000..8cc3e819c6d --- /dev/null +++ b/server/graphman/src/resolvers/context.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use async_graphql::Context; +use async_graphql::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graph_store_postgres::Store; + +pub struct GraphmanContext { + pub primary_pool: ConnectionPool, + pub notification_sender: Arc, + pub store: Arc, +} + +impl GraphmanContext { + pub fn new(ctx: &Context<'_>) -> Result { + let primary_pool = ctx.data::()?.to_owned(); + let notification_sender = ctx.data::>()?.to_owned(); + let store = ctx.data::>()?.to_owned(); + + Ok(GraphmanContext { + primary_pool, + notification_sender, + store, + }) + } +} diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs new file mode 100644 index 00000000000..4b6da18b935 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; + +use async_graphql::Context; +use async_graphql::Object; +use async_graphql::Result; +use graph_store_postgres::graphman::GraphmanStore; + +use crate::entities::DeploymentSelector; +use crate::entities::EmptyResponse; +use crate::entities::ExecutionId; +use crate::resolvers::context::GraphmanContext; + +mod pause; +mod restart; +mod resume; + +pub struct DeploymentMutation; + +/// Mutations related to one or multiple deployments. +#[Object] +impl DeploymentMutation { + /// Pauses a deployment that is not already paused. + pub async fn pause( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + + pause::run(&ctx, &deployment)?; + + Ok(EmptyResponse::new()) + } + + /// Resumes a deployment that has been previously paused. + pub async fn resume( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + + resume::run(&ctx, &deployment)?; + + Ok(EmptyResponse::new()) + } + + /// Pauses a deployment and resumes it after a delay. + pub async fn restart( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + #[graphql( + default = 20, + desc = "The number of seconds to wait before resuming the deployment. + When not specified, it defaults to 20 seconds." + )] + delay_seconds: u64, + ) -> Result { + let store = ctx.data::>()?.to_owned(); + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + + restart::run_in_background(ctx, store, deployment, delay_seconds).await + } +} diff --git a/server/graphman/src/resolvers/deployment_mutation/pause.rs b/server/graphman/src/resolvers/deployment_mutation/pause.rs new file mode 100644 index 00000000000..8ba1f73446b --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/pause.rs @@ -0,0 +1,18 @@ +use async_graphql::Result; +use graphman::commands::deployment::pause::load_active_deployment; +use graphman::commands::deployment::pause::pause_active_deployment; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector) -> Result<()> { + let active_deployment = load_active_deployment(ctx.primary_pool.clone(), deployment)?; + + pause_active_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + active_deployment, + )?; + + Ok(()) +} diff --git a/server/graphman/src/resolvers/deployment_mutation/restart.rs b/server/graphman/src/resolvers/deployment_mutation/restart.rs new file mode 100644 index 00000000000..aa1241deb14 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/restart.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_graphql::Result; +use graph_store_postgres::graphman::GraphmanStore; +use graphman::deployment::DeploymentSelector; +use graphman::GraphmanExecutionTracker; +use graphman_store::CommandKind; +use graphman_store::GraphmanStore as _; + +use crate::entities::ExecutionId; +use crate::resolvers::context::GraphmanContext; + +pub async fn run_in_background( + ctx: GraphmanContext, + store: Arc, + deployment: DeploymentSelector, + delay_seconds: u64, +) -> Result { + let id = store.new_execution(CommandKind::RestartDeployment)?; + + graph::spawn(async move { + let tracker = GraphmanExecutionTracker::new(store, id); + let result = run(&ctx, &deployment, delay_seconds).await; + + match result { + Ok(()) => { + tracker.track_success().unwrap(); + } + Err(err) => { + tracker.track_failure(format!("{err:#?}")).unwrap(); + } + }; + }); + + Ok(id.into()) +} + +async fn run( + ctx: &GraphmanContext, + deployment: &DeploymentSelector, + delay_seconds: u64, +) -> Result<()> { + super::pause::run(ctx, deployment)?; + + tokio::time::sleep(Duration::from_secs(delay_seconds)).await; + + super::resume::run(ctx, deployment)?; + + Ok(()) +} diff --git a/server/graphman/src/resolvers/deployment_mutation/resume.rs b/server/graphman/src/resolvers/deployment_mutation/resume.rs new file mode 100644 index 00000000000..45fa30d5e7f --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/resume.rs @@ -0,0 +1,18 @@ +use async_graphql::Result; +use graphman::commands::deployment::resume::load_paused_deployment; +use graphman::commands::deployment::resume::resume_paused_deployment; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector) -> Result<()> { + let paused_deployment = load_paused_deployment(ctx.primary_pool.clone(), deployment)?; + + resume_paused_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + paused_deployment, + )?; + + Ok(()) +} diff --git a/server/graphman/src/resolvers/deployment_query.rs b/server/graphman/src/resolvers/deployment_query.rs new file mode 100644 index 00000000000..09d9d5bb792 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_query.rs @@ -0,0 +1,29 @@ +use async_graphql::Context; +use async_graphql::Object; +use async_graphql::Result; + +use crate::entities::DeploymentInfo; +use crate::entities::DeploymentSelector; +use crate::entities::DeploymentVersionSelector; + +mod info; + +pub struct DeploymentQuery; + +/// Queries related to one or multiple deployments. +#[Object] +impl DeploymentQuery { + /// Returns the available information about one, multiple, or all deployments. + pub async fn info( + &self, + ctx: &Context<'_>, + #[graphql(desc = "A selector for one or multiple deployments. + When not provided, it matches all deployments.")] + deployment: Option, + #[graphql(desc = "Applies version filter to the selected deployments. + When not provided, no additional version filter is applied.")] + version: Option, + ) -> Result> { + info::run(ctx, deployment, version) + } +} diff --git a/server/graphman/src/resolvers/deployment_query/info.rs b/server/graphman/src/resolvers/deployment_query/info.rs new file mode 100644 index 00000000000..b5f8c079b35 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_query/info.rs @@ -0,0 +1,54 @@ +use async_graphql::Context; +use async_graphql::Result; + +use crate::entities::DeploymentInfo; +use crate::entities::DeploymentSelector; +use crate::entities::DeploymentVersionSelector; +use crate::resolvers::context::GraphmanContext; + +pub fn run( + ctx: &Context<'_>, + deployment: Option, + version: Option, +) -> Result> { + let load_status = ctx.look_ahead().field("status").exists(); + let ctx = GraphmanContext::new(ctx)?; + + let deployment = deployment + .map(TryInto::try_into) + .transpose()? + .unwrap_or(graphman::deployment::DeploymentSelector::All); + + let version = version + .map(Into::into) + .unwrap_or(graphman::deployment::DeploymentVersionSelector::All); + + let deployments = graphman::commands::deployment::info::load_deployments( + ctx.primary_pool.clone(), + &deployment, + &version, + )?; + + let statuses = if load_status { + graphman::commands::deployment::info::load_deployment_statuses( + ctx.store.clone(), + &deployments, + )? + } else { + Default::default() + }; + + let resp = deployments + .into_iter() + .map(|deployment| { + let status = statuses.get(&deployment.id).cloned().map(Into::into); + + let mut info: DeploymentInfo = deployment.into(); + info.status = status; + + info + }) + .collect(); + + Ok(resp) +} diff --git a/server/graphman/src/resolvers/execution_query.rs b/server/graphman/src/resolvers/execution_query.rs new file mode 100644 index 00000000000..f0cded8ea97 --- /dev/null +++ b/server/graphman/src/resolvers/execution_query.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use async_graphql::Context; +use async_graphql::Object; +use async_graphql::Result; +use graph_store_postgres::graphman::GraphmanStore; +use graphman_store::GraphmanStore as _; + +use crate::entities::Execution; +use crate::entities::ExecutionId; + +pub struct ExecutionQuery; + +/// Queries related to command executions. +#[Object] +impl ExecutionQuery { + /// Returns all stored command execution data. + pub async fn info(&self, ctx: &Context<'_>, id: ExecutionId) -> Result { + let store = ctx.data::>()?.to_owned(); + let execution = store.load_execution(id.into())?; + + Ok(execution.try_into()?) + } +} diff --git a/server/graphman/src/resolvers/mod.rs b/server/graphman/src/resolvers/mod.rs new file mode 100644 index 00000000000..2f7f225f6f4 --- /dev/null +++ b/server/graphman/src/resolvers/mod.rs @@ -0,0 +1,12 @@ +mod context; +mod deployment_mutation; +mod deployment_query; +mod execution_query; +mod mutation_root; +mod query_root; + +pub use self::deployment_mutation::DeploymentMutation; +pub use self::deployment_query::DeploymentQuery; +pub use self::execution_query::ExecutionQuery; +pub use self::mutation_root::MutationRoot; +pub use self::query_root::QueryRoot; diff --git a/server/graphman/src/resolvers/mutation_root.rs b/server/graphman/src/resolvers/mutation_root.rs new file mode 100644 index 00000000000..566f21ac728 --- /dev/null +++ b/server/graphman/src/resolvers/mutation_root.rs @@ -0,0 +1,14 @@ +use async_graphql::Object; + +use crate::resolvers::DeploymentMutation; + +/// Note: Converted to GraphQL schema as `mutation`. +pub struct MutationRoot; + +#[Object] +impl MutationRoot { + /// Mutations related to one or multiple deployments. + pub async fn deployment(&self) -> DeploymentMutation { + DeploymentMutation {} + } +} diff --git a/server/graphman/src/resolvers/query_root.rs b/server/graphman/src/resolvers/query_root.rs new file mode 100644 index 00000000000..1c105abe40a --- /dev/null +++ b/server/graphman/src/resolvers/query_root.rs @@ -0,0 +1,20 @@ +use async_graphql::Object; + +use crate::resolvers::DeploymentQuery; +use crate::resolvers::ExecutionQuery; + +/// Note: Converted to GraphQL schema as `query`. +pub struct QueryRoot; + +#[Object] +impl QueryRoot { + /// Queries related to one or multiple deployments. + pub async fn deployment(&self) -> DeploymentQuery { + DeploymentQuery {} + } + + /// Queries related to command executions. + pub async fn execution(&self) -> ExecutionQuery { + ExecutionQuery {} + } +} diff --git a/server/graphman/src/schema.rs b/server/graphman/src/schema.rs new file mode 100644 index 00000000000..cbbda2b00e1 --- /dev/null +++ b/server/graphman/src/schema.rs @@ -0,0 +1,7 @@ +use async_graphql::EmptySubscription; +use async_graphql::Schema; + +use crate::resolvers::MutationRoot; +use crate::resolvers::QueryRoot; + +pub type GraphmanSchema = Schema; diff --git a/server/graphman/src/server.rs b/server/graphman/src/server.rs new file mode 100644 index 00000000000..ea71e7c2228 --- /dev/null +++ b/server/graphman/src/server.rs @@ -0,0 +1,148 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_graphql::EmptySubscription; +use async_graphql::Schema; +use axum::extract::Extension; +use axum::http::Method; +use axum::routing::get; +use axum::Router; +use graph::log::factory::LoggerFactory; +use graph::prelude::ComponentLoggerConfig; +use graph::prelude::ElasticComponentLoggerConfig; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::graphman::GraphmanStore; +use graph_store_postgres::NotificationSender; +use graph_store_postgres::Store; +use slog::{info, Logger}; +use tokio::sync::Notify; +use tower_http::cors::{Any, CorsLayer}; + +use crate::auth::AuthToken; +use crate::handlers::graphql_playground_handler; +use crate::handlers::graphql_request_handler; +use crate::handlers::AppState; +use crate::resolvers::MutationRoot; +use crate::resolvers::QueryRoot; +use crate::GraphmanServerError; + +#[derive(Clone)] +pub struct GraphmanServer { + pool: ConnectionPool, + notification_sender: Arc, + store: Arc, + graphman_store: Arc, + logger: Logger, + auth_token: AuthToken, +} + +#[derive(Clone)] +pub struct GraphmanServerConfig<'a> { + pub pool: ConnectionPool, + pub notification_sender: Arc, + pub store: Arc, + pub logger_factory: &'a LoggerFactory, + pub auth_token: String, +} + +pub struct GraphmanServerManager { + notify: Arc, +} + +impl GraphmanServer { + pub fn new(config: GraphmanServerConfig) -> Result { + let GraphmanServerConfig { + pool, + notification_sender, + store, + logger_factory, + auth_token, + } = config; + + let graphman_store = Arc::new(GraphmanStore::new(pool.clone())); + let auth_token = AuthToken::new(auth_token)?; + + let logger = logger_factory.component_logger( + "GraphmanServer", + Some(ComponentLoggerConfig { + elastic: Some(ElasticComponentLoggerConfig { + index: String::from("graphman-server-logs"), + }), + }), + ); + + Ok(Self { + pool, + notification_sender, + store, + graphman_store, + logger, + auth_token, + }) + } + + pub async fn start(self, port: u16) -> Result { + let Self { + pool, + notification_sender, + store, + graphman_store, + logger, + auth_token, + } = self; + + info!( + logger, + "Starting graphman server at: http://localhost:{}", port, + ); + + let app_state = Arc::new(AppState { auth_token }); + + let cors_layer = CorsLayer::new() + .allow_origin(Any) + .allow_methods([Method::GET, Method::OPTIONS, Method::POST]) + .allow_headers(Any); + + let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription) + .data(pool) + .data(notification_sender) + .data(store) + .data(graphman_store) + .finish(); + + let app = Router::new() + .route( + "/", + get(graphql_playground_handler).post(graphql_request_handler), + ) + .with_state(app_state) + .layer(cors_layer) + .layer(Extension(schema)); + + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + + let listener = tokio::net::TcpListener::bind(addr) + .await + .map_err(|err| GraphmanServerError::Io(err.into()))?; + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + graph::spawn(async move { + axum::serve(listener, app) + .with_graceful_shutdown(async move { + notify_clone.notified().await; + }) + .await + .unwrap_or_else(|err| panic!("Failed to start graphman server: {err}")); + }); + + Ok(GraphmanServerManager { notify }) + } +} + +impl GraphmanServerManager { + pub fn stop_server(self) { + self.notify.notify_one() + } +} diff --git a/server/graphman/tests/auth.rs b/server/graphman/tests/auth.rs new file mode 100644 index 00000000000..f60670c33dc --- /dev/null +++ b/server/graphman/tests/auth.rs @@ -0,0 +1,66 @@ +pub mod util; + +use serde_json::json; + +use self::util::client::send_graphql_request; +use self::util::client::send_request; +use self::util::client::BASE_URL; +use self::util::client::CLIENT; +use self::util::run_test; +use self::util::server::INVALID_TOKEN; +use self::util::server::VALID_TOKEN; + +#[test] +fn graphql_playground_is_accessible() { + run_test(|| async { + send_request(CLIENT.head(BASE_URL.as_str())).await; + }); +} + +#[test] +fn graphql_requests_are_not_allowed_without_a_valid_token() { + run_test(|| async { + let resp = send_graphql_request( + json!({ + "query": "{ __typename }" + }), + INVALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "errors": [ + { + "message": "You are not authorized to access this resource", + "extensions": { + "code": "UNAUTHORIZED" + } + } + ], + "data": null + }); + + assert_eq!(resp, expected_resp); + }); +} + +#[test] +fn graphql_requests_are_allowed_with_a_valid_token() { + run_test(|| async { + let resp = send_graphql_request( + json!({ + "query": "{ __typename }" + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "__typename": "QueryRoot" + } + }); + + assert_eq!(resp, expected_resp); + }); +} diff --git a/server/graphman/tests/deployment_mutation.rs b/server/graphman/tests/deployment_mutation.rs new file mode 100644 index 00000000000..dbc1f891323 --- /dev/null +++ b/server/graphman/tests/deployment_mutation.rs @@ -0,0 +1,268 @@ +pub mod util; + +use std::time::Duration; + +use graph::prelude::DeploymentHash; +use serde::Deserialize; +use serde_json::json; +use test_store::create_test_subgraph; +use tokio::time::sleep; + +use self::util::client::send_graphql_request; +use self::util::run_test; +use self::util::server::VALID_TOKEN; + +const TEST_SUBGRAPH_SCHEMA: &str = "type User @entity { id: ID!, name: String }"; + +async fn assert_deployment_paused(hash: &str, should_be_paused: bool) { + let query = r#"query DeploymentStatus($hash: String!) { + deployment { + info(deployment: { hash: $hash }) { + status { + isPaused + } + } + } + }"#; + + let resp = send_graphql_request( + json!({ + "query": query, + "variables": { + "hash": hash + } + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [ + { + "status": { + "isPaused": should_be_paused + } + } + ] + } + } + }); + + assert_eq!(resp, expected_resp); +} + +#[test] +fn graphql_can_pause_deployments() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let resp = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + pause(deployment: { hash: "subgraph_2" }) { + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "pause": { + "success": true, + } + } + } + }); + + assert_eq!(resp, expected_resp); + + assert_deployment_paused("subgraph_2", true).await; + assert_deployment_paused("subgraph_1", false).await; + }); +} + +#[test] +fn graphql_can_resume_deployments() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + pause(deployment: { hash: "subgraph_1" }) { + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + assert_deployment_paused("subgraph_1", true).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + resume(deployment: { hash: "subgraph_1" }) { + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + assert_deployment_paused("subgraph_1", false).await; + }); +} + +#[test] +fn graphql_can_restart_deployments() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + restart(deployment: { hash: "subgraph_2" }, delaySeconds: 2) + } + }"# + }), + VALID_TOKEN, + ) + .await; + + assert_deployment_paused("subgraph_2", true).await; + assert_deployment_paused("subgraph_1", false).await; + + sleep(Duration::from_secs(5)).await; + + assert_deployment_paused("subgraph_2", false).await; + assert_deployment_paused("subgraph_1", false).await; + }); +} + +#[test] +fn graphql_allows_tracking_restart_deployment_executions() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let resp = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + restart(deployment: { hash: "subgraph_1" }, delaySeconds: 2) + } + }"# + }), + VALID_TOKEN, + ) + .await; + + #[derive(Deserialize)] + struct Response { + data: Data, + } + + #[derive(Deserialize)] + struct Data { + deployment: Deployment, + } + + #[derive(Deserialize)] + struct Deployment { + restart: String, + } + + let resp: Response = serde_json::from_value(resp).expect("response is valid"); + let execution_id = resp.data.deployment.restart; + + let query = r#"query TrackRestartDeployment($id: String!) { + execution { + info(id: $id) { + id + kind + status + errorMessage + } + } + }"#; + + let resp = send_graphql_request( + json!({ + "query": query, + "variables": { + "id": execution_id + } + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "execution": { + "info": { + "id": execution_id, + "kind": "RESTART_DEPLOYMENT", + "status": "RUNNING", + "errorMessage": null, + } + } + } + }); + + assert_eq!(resp, expected_resp); + + sleep(Duration::from_secs(5)).await; + + let resp = send_graphql_request( + json!({ + "query": query, + "variables": { + "id": execution_id + } + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "execution": { + "info": { + "id": execution_id, + "kind": "RESTART_DEPLOYMENT", + "status": "SUCCEEDED", + "errorMessage": null, + } + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} diff --git a/server/graphman/tests/deployment_query.rs b/server/graphman/tests/deployment_query.rs new file mode 100644 index 00000000000..f39a1e0cd9a --- /dev/null +++ b/server/graphman/tests/deployment_query.rs @@ -0,0 +1,238 @@ +pub mod util; + +use graph::data::subgraph::DeploymentHash; +use serde_json::json; +use test_store::store::create_test_subgraph; +use test_store::store::NETWORK_NAME; +use test_store::store::NODE_ID; + +use self::util::client::send_graphql_request; +use self::util::run_test; +use self::util::server::VALID_TOKEN; + +const TEST_SUBGRAPH_SCHEMA: &str = "type User @entity { id: ID!, name: String }"; + +#[test] +fn graphql_returns_deployment_info() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + let locator = create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let resp = send_graphql_request( + json!({ + "query": r#"{ + deployment { + info { + hash + namespace + name + nodeId + shard + chain + versionStatus + isActive + status { + isPaused + isSynced + health + earliestBlockNumber + latestBlock { + hash + number + } + chainHeadBlock { + hash + number + } + } + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let namespace = format!("sgd{}", locator.id); + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [ + { + "hash": "subgraph_1", + "namespace": namespace, + "name": "subgraph_1", + "nodeId": NODE_ID.to_string(), + "shard": "primary", + "chain": NETWORK_NAME, + "versionStatus": "current", + "isActive": true, + "status": { + "isPaused": false, + "isSynced": false, + "health": "HEALTHY", + "earliestBlockNumber": "0", + "latestBlock": null, + "chainHeadBlock": null + } + } + ] + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} + +#[test] +fn graphql_returns_deployment_info_by_deployment_name() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let resp = send_graphql_request( + json!({ + "query": r#"{ + deployment { + info(deployment: { name: "subgraph_1" }) { + name + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [ + { + "name": "subgraph_1" + } + ] + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} + +#[test] +fn graphql_returns_deployment_info_by_deployment_hash() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let resp = send_graphql_request( + json!({ + "query": r#"{ + deployment { + info(deployment: { hash: "subgraph_2" }) { + hash + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [ + { + "hash": "subgraph_2" + } + ] + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} + +#[test] +fn graphql_returns_deployment_info_by_deployment_namespace() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + let locator = create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let namespace = format!("sgd{}", locator.id); + + let resp = send_graphql_request( + json!({ + "query": r#"query DeploymentInfo($namespace: String!) { + deployment { + info(deployment: { schema: $namespace }) { + namespace + } + } + }"#, + "variables": { + "namespace": namespace + } + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [ + { + "namespace": namespace + } + ] + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} + +#[test] +fn graphql_returns_empty_deployment_info_when_there_are_no_deployments() { + run_test(|| async { + let resp = send_graphql_request( + json!({ + "query": r#"{ + deployment { + info { + name + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "info": [] + } + } + }); + + assert_eq!(resp, expected_resp); + }); +} diff --git a/server/graphman/tests/util/client.rs b/server/graphman/tests/util/client.rs new file mode 100644 index 00000000000..fd0f063d83f --- /dev/null +++ b/server/graphman/tests/util/client.rs @@ -0,0 +1,34 @@ +use graph::http::header::AUTHORIZATION; +use lazy_static::lazy_static; +use reqwest::Client; +use reqwest::RequestBuilder; +use reqwest::Response; +use serde_json::Value; + +use crate::util::server::PORT; + +lazy_static! { + pub static ref CLIENT: Client = Client::new(); + pub static ref BASE_URL: String = format!("http://127.0.0.1:{PORT}"); +} + +pub async fn send_request(req: RequestBuilder) -> Response { + req.send() + .await + .expect("server is accessible") + .error_for_status() + .expect("response status is OK") +} + +pub async fn send_graphql_request(data: Value, token: &str) -> Value { + send_request( + CLIENT + .post(BASE_URL.as_str()) + .json(&data) + .header(AUTHORIZATION, format!("Bearer {token}")), + ) + .await + .json() + .await + .expect("GraphQL response is valid JSON") +} diff --git a/server/graphman/tests/util/mod.rs b/server/graphman/tests/util/mod.rs new file mode 100644 index 00000000000..61201dd708c --- /dev/null +++ b/server/graphman/tests/util/mod.rs @@ -0,0 +1,46 @@ +pub mod client; +pub mod server; + +use std::future::Future; +use std::sync::Mutex; + +use lazy_static::lazy_static; +use test_store::store::remove_subgraphs; +use test_store::store::PRIMARY_POOL; +use tokio::runtime::Builder; +use tokio::runtime::Runtime; + +lazy_static! { + // Used to make sure tests will run sequentially. + static ref SEQ_MUX: Mutex<()> = Mutex::new(()); + + // One runtime helps share the same server between the tests. + static ref RUNTIME: Runtime = Builder::new_current_thread().enable_all().build().unwrap(); +} + +pub fn run_test(test: T) +where + T: FnOnce() -> F, + F: Future, +{ + let _lock = SEQ_MUX.lock().unwrap_or_else(|err| err.into_inner()); + + cleanup_graphman_command_executions_table(); + remove_subgraphs(); + + RUNTIME.block_on(async { + server::start().await; + + test().await; + }); +} + +fn cleanup_graphman_command_executions_table() { + use diesel::prelude::*; + + let mut conn = PRIMARY_POOL.get().unwrap(); + + diesel::sql_query("truncate table public.graphman_command_executions;") + .execute(&mut conn) + .expect("truncate is successful"); +} diff --git a/server/graphman/tests/util/server.rs b/server/graphman/tests/util/server.rs new file mode 100644 index 00000000000..7fe38bd29b2 --- /dev/null +++ b/server/graphman/tests/util/server.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use graph::prelude::LoggerFactory; +use graph_store_postgres::NotificationSender; +use graphman_server::GraphmanServer; +use graphman_server::GraphmanServerConfig; +use lazy_static::lazy_static; +use test_store::LOGGER; +use test_store::METRICS_REGISTRY; +use test_store::PRIMARY_POOL; +use test_store::STORE; +use tokio::sync::OnceCell; + +pub const VALID_TOKEN: &str = "123"; +pub const INVALID_TOKEN: &str = "abc"; + +pub const PORT: u16 = 8050; + +lazy_static! { + static ref SERVER: OnceCell<()> = OnceCell::new(); +} + +pub async fn start() { + SERVER + .get_or_init(|| async { + let logger_factory = LoggerFactory::new(LOGGER.clone(), None, METRICS_REGISTRY.clone()); + let notification_sender = Arc::new(NotificationSender::new(METRICS_REGISTRY.clone())); + + let config = GraphmanServerConfig { + pool: PRIMARY_POOL.clone(), + notification_sender, + store: STORE.clone(), + logger_factory: &logger_factory, + auth_token: VALID_TOKEN.to_string(), + }; + + let server = GraphmanServer::new(config).expect("graphman config is valid"); + + server + .start(PORT) + .await + .expect("graphman server starts successfully"); + }) + .await; +} diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index cf7e0969cd2..fa9ea5a20c5 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] async-trait = "0.1.50" blake3 = "1.5" +chrono = { workspace = true } derive_more = { version = "0.99.18" } diesel = { workspace = true } diesel-dynamic-schema = { workspace = true } @@ -14,6 +15,7 @@ diesel_derives = { workspace = true } diesel_migrations = { workspace = true } fallible-iterator = "0.3.0" graph = { path = "../../graph" } +graphman-store = { workspace = true } Inflector = "0.11.3" lazy_static = "1.5" lru_time_cache = "0.11" @@ -23,6 +25,7 @@ openssl = "0.10.64" postgres-openssl = "0.5.0" rand = "0.8.4" serde = { workspace = true } +serde_json = { workspace = true } uuid = { version = "1.9.1", features = ["v4"] } stable-hash_legacy = { git = "https://github.com/graphprotocol/stable-hash", branch = "old", package = "stable-hash" } anyhow = "1.0.86" @@ -32,5 +35,5 @@ hex = "0.4.3" pretty_assertions = "1.4.0" [dev-dependencies] -clap.workspace = true +clap.workspace = true graphql-parser = "0.4.0" diff --git a/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/down.sql b/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/down.sql new file mode 100644 index 00000000000..88eb516c367 --- /dev/null +++ b/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/down.sql @@ -0,0 +1 @@ +drop table public.graphman_command_executions; diff --git a/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/up.sql b/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/up.sql new file mode 100644 index 00000000000..ab9a1b16eb1 --- /dev/null +++ b/store/postgres/migrations/2024-10-01-100427_create_graphman_command_executions_table/up.sql @@ -0,0 +1,10 @@ +create table public.graphman_command_executions +( + id bigserial primary key, + kind varchar not null check (kind in ('restart_deployment')), + status varchar not null check (status in ('initializing', 'running', 'failed', 'succeeded')), + error_message varchar default null, + created_at timestamp with time zone not null, + updated_at timestamp with time zone default null, + completed_at timestamp with time zone default null +); diff --git a/store/postgres/src/graphman/mod.rs b/store/postgres/src/graphman/mod.rs new file mode 100644 index 00000000000..c9aba751f50 --- /dev/null +++ b/store/postgres/src/graphman/mod.rs @@ -0,0 +1,92 @@ +use anyhow::Result; +use chrono::Utc; +use diesel::prelude::*; +use graphman_store::CommandKind; +use graphman_store::Execution; +use graphman_store::ExecutionId; +use graphman_store::ExecutionStatus; + +use crate::connection_pool::ConnectionPool; + +mod schema; + +use self::schema::graphman_command_executions as gce; + +#[derive(Clone)] +pub struct GraphmanStore { + primary_pool: ConnectionPool, +} + +impl GraphmanStore { + pub fn new(primary_pool: ConnectionPool) -> Self { + Self { primary_pool } + } +} + +impl graphman_store::GraphmanStore for GraphmanStore { + fn new_execution(&self, kind: CommandKind) -> Result { + let mut conn = self.primary_pool.get()?; + + let id: i64 = diesel::insert_into(gce::table) + .values(( + gce::kind.eq(kind), + gce::status.eq(ExecutionStatus::Initializing), + gce::created_at.eq(Utc::now()), + )) + .returning(gce::id) + .get_result(&mut conn)?; + + Ok(ExecutionId(id)) + } + + fn load_execution(&self, id: ExecutionId) -> Result { + let mut conn = self.primary_pool.get()?; + let execution = gce::table.find(id).first(&mut conn)?; + + Ok(execution) + } + + fn mark_execution_as_running(&self, id: ExecutionId) -> Result<()> { + let mut conn = self.primary_pool.get()?; + + diesel::update(gce::table) + .set(( + gce::status.eq(ExecutionStatus::Running), + gce::updated_at.eq(Utc::now()), + )) + .filter(gce::id.eq(id)) + .filter(gce::completed_at.is_null()) + .execute(&mut conn)?; + + Ok(()) + } + + fn mark_execution_as_failed(&self, id: ExecutionId, error_message: String) -> Result<()> { + let mut conn = self.primary_pool.get()?; + + diesel::update(gce::table) + .set(( + gce::status.eq(ExecutionStatus::Failed), + gce::error_message.eq(error_message), + gce::completed_at.eq(Utc::now()), + )) + .filter(gce::id.eq(id)) + .execute(&mut conn)?; + + Ok(()) + } + + fn mark_execution_as_succeeded(&self, id: ExecutionId) -> Result<()> { + let mut conn = self.primary_pool.get()?; + + diesel::update(gce::table) + .set(( + gce::status.eq(ExecutionStatus::Succeeded), + gce::completed_at.eq(Utc::now()), + )) + .filter(gce::id.eq(id)) + .execute(&mut conn)?; + + Ok(()) + } +} diff --git a/store/postgres/src/graphman/schema.rs b/store/postgres/src/graphman/schema.rs new file mode 100644 index 00000000000..fc721894a33 --- /dev/null +++ b/store/postgres/src/graphman/schema.rs @@ -0,0 +1,11 @@ +diesel::table! { + public.graphman_command_executions { + id -> BigSerial, + kind -> Varchar, + status -> Varchar, + error_message -> Nullable, + created_at -> Timestamptz, + updated_at -> Nullable, + completed_at -> Nullable, + } +} diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index dc1177c7ba3..409ce182d77 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -38,6 +38,8 @@ mod subgraph_store; pub mod transaction_receipt; mod writable; +pub mod graphman; + #[cfg(debug_assertions)] pub mod layout_for_tests { pub use crate::block_range::*; diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 2921d375286..59a65535cf3 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -65,7 +65,7 @@ lazy_static! { )); static ref STORE_POOL_CONFIG: (Arc, ConnectionPool, Config, Arc) = build_store(); - pub(crate) static ref PRIMARY_POOL: ConnectionPool = STORE_POOL_CONFIG.1.clone(); + pub static ref PRIMARY_POOL: ConnectionPool = STORE_POOL_CONFIG.1.clone(); pub static ref STORE: Arc = STORE_POOL_CONFIG.0.clone(); static ref CONFIG: Config = STORE_POOL_CONFIG.2.clone(); pub static ref SUBSCRIPTION_MANAGER: Arc = STORE_POOL_CONFIG.3.clone();