Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Commit

Permalink
Add current-for-tokens (#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Schorfheide authored and dposada committed Jul 11, 2019
1 parent 7307cde commit c6b2af1
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 8 deletions.
20 changes: 20 additions & 0 deletions waiter/bin/run-using-marathon.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash
# Usage: run-using-marathon.sh [PORT]
#
# Examples:
# run-using-marathon.sh 9091
# run-using-marathon.sh
#
# Runs Waiter, configured to use a local marathon

export WAITER_PORT=${1:-9091}

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

export WAITER_MARATHON=http://localhost:8080
export WAITER_ZOOKEEPER_CONNECT_STRING=localhost:2181
export GRAPHITE_SERVER_PORT=5555

echo "Starting waiter..."
cd ${DIR}/..
WAITER_AUTH_RUN_AS_USER=$(id -un) WAITER_LOG_FILE_PREFIX=${WAITER_PORT}- lein do clean, compile, run config-minimesos.edn
65 changes: 65 additions & 0 deletions waiter/integration/waiter/token_request_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,71 @@
(finally
(delete-token-and-assert waiter-url token))))))

(deftest ^:parallel ^:integration-fast test-current-for-tokens
(testing-using-waiter-url
(let [service-name (rand-name)
token (create-token-name waiter-url service-name)
service-description (assoc (kitchen-request-headers :prefix "")
:name service-name)]
(try
(testing "creating initial token"
(let [response (post-token waiter-url (assoc service-description :token token))]
(assert-response-status response 200)))

(let [initial-service-id (retrieve-service-id waiter-url {:x-waiter-token token})]
(testing "current service reports as current for token"
(let [initial-service-details (service-settings waiter-url initial-service-id)]
(is (= [token] (:current-for-tokens initial-service-details)))))

(testing "updating token"
(let [new-service-description (assoc service-description :metadata {"foo" "bar"})
response (post-token waiter-url (assoc new-service-description :token token))]
(assert-response-status response 200)))

(testing "old service is no longer current for token"
(let [initial-service-details' (service-settings waiter-url initial-service-id)
new-service-id (retrieve-service-id waiter-url {:x-waiter-token token})
new-service-details (service-settings waiter-url new-service-id)]
(is (nil? (:current-for-tokens initial-service-details')))
(is (= [token] (:current-for-tokens new-service-details))))))
(finally
(delete-token-and-assert waiter-url token))))))

(deftest ^:parallel ^:integration-fast test-current-for-tokens-multiple-source-tokens
(testing-using-waiter-url
(let [service-name (rand-name)
token-name-a (create-token-name waiter-url (str service-name "-A"))
token-name-b (create-token-name waiter-url (str service-name "-B"))
first-service-description (assoc (kitchen-request-headers :prefix "")
:name service-name)
combined-token-header (str token-name-a "," token-name-b)]
(try
(let [response (post-token waiter-url (assoc first-service-description :token token-name-a))]
(assert-response-status response 200))
(let [response (post-token waiter-url (assoc first-service-description :token token-name-b))]
(assert-response-status response 200))

(let [service-id-a (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})
new-service-description (update first-service-description :cpus #(+ % 0.1))]
(let [response (post-token waiter-url (assoc new-service-description :token token-name-a))]
(assert-response-status response 200))

(let [service-id-b (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})]
(let [response (post-token waiter-url (assoc new-service-description :token token-name-b))]
(assert-response-status response 200))

(let [service-id-c (retrieve-service-id waiter-url {:x-waiter-token combined-token-header})
service-a-details (service-settings waiter-url service-id-a)
service-b-details (service-settings waiter-url service-id-b)
service-c-details (service-settings waiter-url service-id-c)]
(is (nil? (:current-for-tokens service-a-details)))
(is (nil? (:current-for-tokens service-b-details)))
(is (= [token-name-a token-name-b]
(:current-for-tokens service-c-details))))))
(finally
(delete-token-and-assert waiter-url token-name-a)
(delete-token-and-assert waiter-url token-name-b))))))

(deftest ^:parallel ^:integration-slow ^:resource-heavy test-service-fallback-support
(testing-using-waiter-url
(let [service-name (rand-name)
Expand Down
5 changes: 3 additions & 2 deletions waiter/src/waiter/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,8 @@
:service-handler-fn (pc/fnk [[:curator kv-store]
[:daemons router-state-maintainer]
[:routines allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-sync-fn
router-metrics-helpers service-id->service-description-fn service-id->source-tokens-entries-fn]
router-metrics-helpers service-id->service-description-fn service-id->source-tokens-entries-fn
token->token-hash]
[:scheduler scheduler]
[:state router-id scheduler-interactions-thread-pool]
wrap-secure-request-fn]
Expand All @@ -1283,7 +1284,7 @@
generate-log-url-fn make-inter-router-requests-sync-fn
service-id->service-description-fn service-id->source-tokens-entries-fn
query-state-fn service-id->metrics-fn scheduler-interactions-thread-pool
request)))))
token->token-hash request)))))
:service-id-handler-fn (pc/fnk [[:curator kv-store]
[:routines store-service-description-fn]
wrap-descriptor-fn wrap-secure-request-fn]
Expand Down
25 changes: 21 additions & 4 deletions waiter/src/waiter/handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,24 @@
:failed-instances (get service-id->failed-instances service-id)
:killed-instances (get service-id->killed-instances service-id)}))

(defn- get-current-for-tokens
[source-token-entries token->token-hash]
(reduce (fn [acc source-tokens]
(let [current-for-tokens (map (fn [{:strs [token version]}]
(let [current-version (token->token-hash token)]
(= version current-version)))
source-tokens)]
(if (every? true? current-for-tokens)
(into acc (map (fn [t] (get t "token")) source-tokens))
acc)))
[]
source-token-entries))

(defn- get-service-handler
"Returns details about the service such as the service description, metrics, instances, etc."
[router-id service-id core-service-description kv-store generate-log-url-fn make-inter-router-requests-fn
service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn
request]
token->token-hash request]
(let [global-state (query-state-fn)
service-instance-maps (try
(let [assoc-log-url-to-instances
Expand Down Expand Up @@ -395,6 +408,7 @@
(catch Exception e
(log/error e "Error in retrieving service suspended state for" service-id)))
source-tokens-entries (service-id->source-tokens-entries-fn service-id)
current-for-tokens (get-current-for-tokens source-tokens-entries token->token-hash)
request-params (-> request ru/query-params-request :query-params)
include-effective-parameters? (utils/request-flag request-params "effective-parameters")
last-request-time (get-in (service-id->metrics-fn) [service-id "last-request-time"])
Expand All @@ -419,7 +433,9 @@
(:time service-suspended-state)
(assoc :service-suspended-state service-suspended-state)
(seq source-tokens-entries)
(assoc :source-tokens source-tokens-entries))]
(assoc :source-tokens source-tokens-entries)
(seq current-for-tokens)
(assoc :current-for-tokens current-for-tokens))]
(utils/clj->streaming-json-response result-map)))

(defn service-handler
Expand All @@ -429,7 +445,7 @@
:get returns details about the service such as the service description, metrics, instances, etc."
[router-id service-id scheduler kv-store allowed-to-manage-service?-fn generate-log-url-fn make-inter-router-requests-fn
service-id->service-description-fn service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn
scheduler-interactions-thread-pool request]
scheduler-interactions-thread-pool token->token-hash request]
(try
(when-not service-id
(throw (ex-info "Missing service-id" {:log-level :info :status 400})))
Expand All @@ -441,7 +457,8 @@
scheduler-interactions-thread-pool request)
:get (get-service-handler router-id service-id core-service-description kv-store generate-log-url-fn
make-inter-router-requests-fn service-id->service-description-fn
service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn request))))
service-id->source-tokens-entries-fn query-state-fn service-id->metrics-fn
token->token-hash request))))
(catch Exception ex
(utils/exception->response ex request))))

Expand Down
6 changes: 4 additions & 2 deletions waiter/test/waiter/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@
:make-inter-router-requests-sync-fn nil
:router-metrics-helpers {:service-id->metrics-fn (constantly {})}
:service-id->service-description-fn (constantly {})
:service-id->source-tokens-entries-fn (constantly #{})}
:service-id->source-tokens-entries-fn (constantly #{})
:token->token-hash identity}
:scheduler {:scheduler (reify scheduler/ServiceScheduler
(delete-service [_ _]
(let [result @delete-service-result-atom]
Expand Down Expand Up @@ -511,7 +512,8 @@
:make-inter-router-requests-sync-fn nil
:router-metrics-helpers {:service-id->metrics-fn (constantly service-id->metrics)}
:service-id->service-description-fn (constantly {})
:service-id->source-tokens-entries-fn (constantly #{})}
:service-id->source-tokens-entries-fn (constantly #{})
:token->token-hash identity}
:scheduler {:scheduler (Object.)}
:state {:router-id "router-id"
:scheduler-interactions-thread-pool scheduler-interactions-thread-pool}
Expand Down

0 comments on commit c6b2af1

Please sign in to comment.