diff --git a/waiter/bin/run-using-marathon.sh b/waiter/bin/run-using-marathon.sh new file mode 100755 index 000000000..ec6d17800 --- /dev/null +++ b/waiter/bin/run-using-marathon.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Usage: run-using-marathon.sh [PORT] +# +# Examples: +# run-using-marathon.sh 9091 +# run-using-marathon.sh +# +# Runs Waiter, configured to use a local marathon + +export WAITER_PORT=${1:-9091} + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +export WAITER_MARATHON=http://localhost:8080 +export WAITER_ZOOKEEPER_CONNECT_STRING=localhost:2181 +export GRAPHITE_SERVER_PORT=5555 + +echo "Starting waiter..." +cd ${DIR}/.. +WAITER_AUTH_RUN_AS_USER=$(id -un) WAITER_LOG_FILE_PREFIX=${WAITER_PORT}- lein do clean, compile, run config-minimesos.edn diff --git a/waiter/integration/waiter/token_request_test.clj b/waiter/integration/waiter/token_request_test.clj index 0ddfce29b..20c9be89b 100644 --- a/waiter/integration/waiter/token_request_test.clj +++ b/waiter/integration/waiter/token_request_test.clj @@ -1358,6 +1358,71 @@ (finally (delete-token-and-assert waiter-url token)))))) +(deftest ^:parallel ^:integration-fast test-current-for-tokens + (testing-using-waiter-url + (let [service-name (rand-name) + token (create-token-name waiter-url service-name) + service-description (assoc (kitchen-request-headers :prefix "") + :name service-name)] + (try + (testing "creating initial token" + (let [response (post-token waiter-url (assoc service-description :token token))] + (assert-response-status response 200))) + + (let [initial-service-id (retrieve-service-id waiter-url {:x-waiter-token token})] + (testing "current service reports as current for token" + (let [initial-service-details (service-settings waiter-url initial-service-id)] + (is (= [token] (:current-for-tokens initial-service-details))))) + + (testing "updating token" + (let [new-service-description (assoc service-description :metadata {"foo" "bar"}) + response (post-token waiter-url (assoc new-service-description :token token))] + (assert-response-status response 200))) + + (testing "old service is no longer current for token" + (let [initial-service-details' (service-settings waiter-url initial-service-id) + new-service-id (retrieve-service-id waiter-url {:x-waiter-token token}) + new-service-details (service-settings waiter-url new-service-id)] + (is (nil? (:current-for-tokens initial-service-details'))) + (is (= [token] (:current-for-tokens new-service-details)))))) + (finally + (delete-token-and-assert waiter-url token)))))) + +(deftest ^:parallel ^:integration-fast test-current-for-tokens-multiple-source-tokens + (testing-using-waiter-url + (let [service-name (rand-name) + token-name-a (create-token-name waiter-url (str service-name "-A")) + token-name-b (create-token-name waiter-url (str service-name "-B")) + first-service-description (assoc (kitchen-request-headers :prefix "") + :name service-name) + combined-token-header (str token-name-a "," token-name-b)] + (try + (let [response (post-token waiter-url (assoc first-service-description :token token-name-a))] + (assert-response-status response 200)) + (let [response (post-token waiter-url (assoc first-service-description :token token-name-b))] + (assert-response-status response 200)) + + (let [service-id-a (retrieve-service-id waiter-url {:x-waiter-token combined-token-header}) + new-service-description (update first-service-description :cpus #(+ % 0.1))] + (let [response (post-token waiter-url (assoc new-service-description :token token-name-a))] + (assert-response-status response 200)) + + (let [service-id-b (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})] + (let [response (post-token waiter-url (assoc new-service-description :token token-name-b))] + (assert-response-status response 200)) + + (let [service-id-c (retrieve-service-id waiter-url {:x-waiter-token combined-token-header}) + service-a-details (service-settings waiter-url service-id-a) + service-b-details (service-settings waiter-url service-id-b) + service-c-details (service-settings waiter-url service-id-c)] + (is (nil? (:current-for-tokens service-a-details))) + (is (nil? (:current-for-tokens service-b-details))) + (is (= [token-name-a token-name-b] + (:current-for-tokens service-c-details)))))) + (finally + (delete-token-and-assert waiter-url token-name-a) + (delete-token-and-assert waiter-url token-name-b)))))) + (deftest ^:parallel ^:integration-slow ^:resource-heavy test-service-fallback-support (testing-using-waiter-url (let [service-name (rand-name) diff --git a/waiter/src/waiter/core.clj b/waiter/src/waiter/core.clj index ee3931a83..a92351674 100644 --- a/waiter/src/waiter/core.clj +++ b/waiter/src/waiter/core.clj @@ -1271,7 +1271,8 @@ :service-handler-fn (pc/fnk [[:curator kv-store] [:daemons router-state-maintainer] [:routines allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-sync-fn - router-metrics-helpers service-id->service-description-fn service-id->source-tokens-entries-fn] + router-metrics-helpers service-id->service-description-fn service-id->source-tokens-entries-fn + token->token-hash] [:scheduler scheduler] [:state router-id scheduler-interactions-thread-pool] wrap-secure-request-fn] @@ -1283,7 +1284,7 @@ generate-log-url-fn make-inter-router-requests-sync-fn service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn scheduler-interactions-thread-pool - request))))) + token->token-hash request))))) :service-id-handler-fn (pc/fnk [[:curator kv-store] [:routines store-service-description-fn] wrap-descriptor-fn wrap-secure-request-fn] diff --git a/waiter/src/waiter/handler.clj b/waiter/src/waiter/handler.clj index 4af1ea080..317db67cd 100644 --- a/waiter/src/waiter/handler.clj +++ b/waiter/src/waiter/handler.clj @@ -352,11 +352,24 @@ :failed-instances (get service-id->failed-instances service-id) :killed-instances (get service-id->killed-instances service-id)})) +(defn- get-current-for-tokens + [source-token-entries token->token-hash] + (reduce (fn [acc source-tokens] + (let [current-for-tokens (map (fn [{:strs [token version]}] + (let [current-version (token->token-hash token)] + (= version current-version))) + source-tokens)] + (if (every? true? current-for-tokens) + (into acc (map (fn [t] (get t "token")) source-tokens)) + acc))) + [] + source-token-entries)) + (defn- get-service-handler "Returns details about the service such as the service description, metrics, instances, etc." [router-id service-id core-service-description kv-store generate-log-url-fn make-inter-router-requests-fn service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn - request] + token->token-hash request] (let [global-state (query-state-fn) service-instance-maps (try (let [assoc-log-url-to-instances @@ -395,6 +408,7 @@ (catch Exception e (log/error e "Error in retrieving service suspended state for" service-id))) source-tokens-entries (service-id->source-tokens-entries-fn service-id) + current-for-tokens (get-current-for-tokens source-tokens-entries token->token-hash) request-params (-> request ru/query-params-request :query-params) include-effective-parameters? (utils/request-flag request-params "effective-parameters") last-request-time (get-in (service-id->metrics-fn) [service-id "last-request-time"]) @@ -419,7 +433,9 @@ (:time service-suspended-state) (assoc :service-suspended-state service-suspended-state) (seq source-tokens-entries) - (assoc :source-tokens source-tokens-entries))] + (assoc :source-tokens source-tokens-entries) + (seq current-for-tokens) + (assoc :current-for-tokens current-for-tokens))] (utils/clj->streaming-json-response result-map))) (defn service-handler @@ -429,7 +445,7 @@ :get returns details about the service such as the service description, metrics, instances, etc." [router-id service-id scheduler kv-store allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-fn service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn - scheduler-interactions-thread-pool request] + scheduler-interactions-thread-pool token->token-hash request] (try (when-not service-id (throw (ex-info "Missing service-id" {:log-level :info :status 400}))) @@ -441,7 +457,8 @@ scheduler-interactions-thread-pool request) :get (get-service-handler router-id service-id core-service-description kv-store generate-log-url-fn make-inter-router-requests-fn service-id->service-description-fn - service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn request)))) + service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn + token->token-hash request)))) (catch Exception ex (utils/exception->response ex request)))) diff --git a/waiter/test/waiter/core_test.clj b/waiter/test/waiter/core_test.clj index 663e87710..559928811 100644 --- a/waiter/test/waiter/core_test.clj +++ b/waiter/test/waiter/core_test.clj @@ -417,7 +417,8 @@ :make-inter-router-requests-sync-fn nil :router-metrics-helpers {:service-id->metrics-fn (constantly {})} :service-id->service-description-fn (constantly {}) - :service-id->source-tokens-entries-fn (constantly #{})} + :service-id->source-tokens-entries-fn (constantly #{}) + :token->token-hash identity} :scheduler {:scheduler (reify scheduler/ServiceScheduler (delete-service [_ _] (let [result @delete-service-result-atom] @@ -511,7 +512,8 @@ :make-inter-router-requests-sync-fn nil :router-metrics-helpers {:service-id->metrics-fn (constantly service-id->metrics)} :service-id->service-description-fn (constantly {}) - :service-id->source-tokens-entries-fn (constantly #{})} + :service-id->source-tokens-entries-fn (constantly #{}) + :token->token-hash identity} :scheduler {:scheduler (Object.)} :state {:router-id "router-id" :scheduler-interactions-thread-pool scheduler-interactions-thread-pool}