Skip to content

Commit

Permalink
Merge pull request #3291 from Zak-Kent/pdb-4659-acceptance-tests-for-…
Browse files Browse the repository at this point in the history
…cmd-broadcast

(PDB-4659) Add  acceptance tests for cmd broadcast
  • Loading branch information
austb authored Aug 5, 2020
2 parents da82665 + f22bdbf commit e3abb42
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 11 deletions.
16 changes: 8 additions & 8 deletions src/puppetlabs/puppetdb/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
payload :- {s/Any s/Any}
timeout]
(let [body (json/generate-string payload)
url-params (str
(format "?command=%s&version=%s&certname=%s"
(str/replace command #" " "_") version certname)
(when-let [producer_timestamp (-> payload :producer_timestamp str)]
(format "&producer-timestamp=%s" producer_timestamp)))
url (str (utils/base-url->str base-url)
url-params
(when timeout (format "&secondsToWaitForCompletion=%s" timeout)))]
url-params (utils/cmd-url-params {:command command
:version version
:certname certname
:producer-timestamp (-> payload
:producer_timestamp
str)
:timeout timeout})
url (str (utils/base-url->str base-url) url-params)]
(http-client/post url {:body body
:throw-exceptions false
:content-type :json
Expand Down
10 changes: 7 additions & 3 deletions src/puppetlabs/puppetdb/command.clj
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@
(update-counter! :invalidated command version dec!))

(defn broadcast-cmd
[{:keys [certname command version id] :as cmd}
[{:keys [certname command version id callback] :as cmd}
write-dbs pool response-chan stats shutdown-for-ex]
(let [n-dbs (count write-dbs)
statuses (repeatedly n-dbs #(atom nil))
Expand All @@ -586,7 +586,8 @@
not-exception? #(not (instance? Throwable %))
attempt-exec (fn [db status]
(try
(attempt-exec-command cmd db status response-chan stats
;; handle callback below to avoid races during cmd broadcast
(attempt-exec-command (assoc cmd :callback identity) db status response-chan stats
shutdown-for-ex)
nil
(catch Throwable ex
Expand All @@ -612,13 +613,16 @@

(let [results @results]
(if (some not-exception? results) ;; success
true
(do
(callback {:command cmd :result results})
true)
(let [msg (trs "[{0}] [{1}] Unable to broadcast command for {2}"
id command certname)
ex (ex-info msg {:kind ::retry})]
(doseq [result results
:when (instance? Throwable result)]
(.addSuppressed ex result))
(callback {:command cmd :exception ex})
(throw ex))))))

(defn process-cmd
Expand Down
10 changes: 10 additions & 0 deletions src/puppetlabs/puppetdb/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@
:prefix "/metrics"
:version (or version :v1)})

(defn cmd-url-params
[{:keys [command version certname producer-timestamp timeout]}]
(str
(format "?command=%s&version=%s&certname=%s"
(str/replace command #" " "_") version certname)
(when producer-timestamp
(format "&producer-timestamp=%s" producer-timestamp))
(when timeout
(format "&secondsToWaitForCompletion=%s" timeout))))

(defn assoc-if-exists
"Assoc only if the key is already present"
[m & ks]
Expand Down
194 changes: 194 additions & 0 deletions test/puppetlabs/puppetdb/command_broadcast_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
(ns puppetlabs.puppetdb.command-broadcast-test
(:require
[clojure.test :refer :all]
[puppetlabs.puppetdb.integration.fixtures :as int]
[puppetlabs.puppetdb.utils :as utils]
[puppetlabs.puppetdb.testutils.cli :refer [example-report
example-facts
example-catalog
example-certname]]
[puppetlabs.puppetdb.command.constants :as cmd-consts]
[puppetlabs.puppetdb.testutils.services :as svc-utils]
[puppetlabs.puppetdb.utils.metrics :as mutils]
[puppetlabs.puppetdb.command :as cmd]
[clojure.string :as str])
(:import (java.sql SQLException)))

(def commands
{:replace-facts {:data (assoc example-facts :values {:foo "the foo"})
:cname (:replace-facts cmd-consts/command-names)
:ver cmd-consts/latest-facts-version}
:store-report {:data example-report
:cname (:store-report cmd-consts/command-names)
:ver cmd-consts/latest-report-version}
:replace-catalog {:data example-catalog
:cname (:replace-catalog cmd-consts/command-names)
:ver cmd-consts/latest-catalog-version}})

(defn post-blocking-ssl-cmd [pdb cmd]
(let [{:keys [data cname ver]} (cmd commands)
cmd-url (-> pdb int/server-info :base-url
(assoc :prefix "/pdb/cmd" :version :v1))
url-params (utils/cmd-url-params {:command cname
:version ver
:certname example-certname
:timeout 5 ; seconds
:producer-timestamp (-> data
:producer_timestamp
str)})
url (str (utils/base-url->str cmd-url) url-params)]
(svc-utils/post-ssl-or-throw url data)))

(defn populate-db
([pdb]
(populate-db pdb false))
([pdb assert-errors?]
(doseq [cmd (keys commands)]
(if assert-errors?
(is (thrown? Exception (post-blocking-ssl-cmd pdb cmd)))
(post-blocking-ssl-cmd pdb cmd)))))

(deftest cmd-broadcast
(with-open [pg1 (int/setup-postgres)
pg2 (int/setup-postgres)
pdb1 (int/run-puppetdb pg1 {:database {"gc-interval" 0}
:database-pg1 (int/server-info pg1)
:database-pg2 (int/server-info pg2)})]
;; post a few commands to trigger broadcast to both pgs
(populate-db pdb1)

(testing "Expected data is present in pdb1's read-database pg1"
(let [expected [{:certname "foo.local"}]]
(is (= expected (int/pql-query pdb1 "reports [certname] {}")))
(is (= expected (int/pql-query pdb1 "facts [certname] {}")))
(is (= expected (int/pql-query pdb1 "catalogs [certname] {}")))))

(let [port (-> pdb1 int/server-info :base-url :port)
list-metrics (fn [metric-type]
;; return metrics in :name=pg1.<metric-name> format
(-> (str "https://localhost:" port "/metrics/v2/list")
svc-utils/get-ssl
:body
:value
metric-type
keys))]

(testing "Expected number of storage metrics are created for both PGs"
(let [storage-metrics (list-metrics :puppetlabs.puppetdb.storage)
;; filter out non-broadcast metrics created in the registry during other tests
broadcast-metrics (filter #(re-find (re-pattern (str "pg1" "|" "pg2")) (str %)) storage-metrics)
expected-count 21 ;; 21 per pg metrics registered in storage.clj
[pg-1 pg-2] (vals (group-by #(subs (str %) 1 9) broadcast-metrics))]
(is (= expected-count (count pg-1)))
(is (= (count pg-1) (count pg-2)))))

(testing "Expected number of admin metrics are created for both PGs"
(let [admin-metrics (list-metrics :puppetlabs.puppetdb.admin)
;; filter out non-broadcast metrics created in the registry during other tests
broadcast-metrics (filter #(re-find (re-pattern (str "pg1" "|" "pg2")) (str %)) admin-metrics)
expected-count 13 ;; 13 per pg admin metrics registered in services.clj
[pg-1 pg-2] (vals (group-by #(subs (str %) 1 9) broadcast-metrics))]
(is (= expected-count (count pg-1)))
(is (= (count pg-1) (count pg-2)))))

(testing "Storage metrics updated for both PGs"
(let [report-min-time (fn [pg]
(-> (str "https://localhost:"
port
"/metrics/v2/read/"
"puppetlabs.puppetdb.storage"
(str ":name=" pg ".store-report-time"))
svc-utils/get-ssl
:body
:value
:Min))]
;; spot check that the store report timer Min val updated for both pgs
(is (not= 0.0 (report-min-time "pg1")))
(is (not= 0.0 (report-min-time "pg2"))))))

(with-open [pdb2 (int/run-puppetdb pg2 {:database {"gc-interval" 0}})]
(testing "Expected data is present in pdb2's read-database pg2"
(let [expected [{:certname "foo.local"}]]
(is (= expected (int/pql-query pdb2 "reports [certname] {}")))
(is (= expected (int/pql-query pdb2 "facts [certname] {}")))
(is (= expected (int/pql-query pdb2 "catalogs [certname] {}"))))))))

(deftest cmd-broadcast-with-one-pg-down
(with-open [pg1 (int/setup-postgres)
pg2 (int/setup-postgres)
pdb1 (int/run-puppetdb pg1 {:database {"gc-interval" 0}
:database-pg1 (int/server-info pg1)
:database-pg2 (int/server-info pg2)})]
(let [shared-globals (-> pdb1
int/server-info
:app
.app_context
deref
:service-contexts
:PuppetDBServer
:shared-globals)
db-names (:scf-write-db-names shared-globals)
dbs (:scf-write-dbs shared-globals)
db2 (nth dbs (.indexOf db-names "pg2"))
dlo (:path (:dlo shared-globals))
orig-exec-cmds cmd/exec-command]

;; break exec-command for all db2 submissions
(with-redefs [cmd/exec-command (fn [{:keys [command version] :as cmd} db conn-status start]
(when (= db2 db)
(throw (SQLException. "BOOM")))
(orig-exec-cmds cmd db conn-status start))]

;; post a few commands to trigger broadcast, submission will fail to one backend
(populate-db pdb1)

(testing "Expected data is present in pdb1's read-database pg1"
(let [expected [{:certname "foo.local"}]]
(is (= expected (int/pql-query pdb1 "reports [certname] {}")))
(is (= expected (int/pql-query pdb1 "facts [certname] {}")))
(is (= expected (int/pql-query pdb1 "catalogs [certname] {}")))))

(testing "dlo should be clean because command was succesfully delivered to one pg"
;; file-seq returns an entry for the dlo dir
(is (= 1 (-> dlo str clojure.java.io/file file-seq count))))))))

(deftest cmd-broadcast-with-all-pgs-down
(with-open [pg1 (int/setup-postgres)
pg2 (int/setup-postgres)
pdb1 (int/run-puppetdb pg1 {:database {"gc-interval" 0}
:database-pg1 (int/server-info pg1)
:database-pg2 (int/server-info pg2)})]
(let [shared-globals (-> pdb1
int/server-info
:app
.app_context
deref
:service-contexts
:PuppetDBServer
:shared-globals)
dlo (:path (:dlo shared-globals))
;; the dlo file-seq includes one entry for the dir and two per failed command
expected-dlo-count (inc (* 2 (count (keys commands))))]

;; break exec-command for all submissions to all pgs
(with-redefs [cmd/exec-command (fn [& args] (throw (SQLException. "BOOM")))
;; don't allow retries to speed cmd ending up in dlo
cmd/maximum-allowable-retries 0]

;; post a few commands all of which should fail. Assertion done in populate-db
(populate-db pdb1 true)

;; account for race between cmds being discared to dlo vs. checking for the discards
(let [dlo-files (loop [retries 0]
(let [dlo-files (-> dlo str clojure.java.io/file file-seq)]
(cond
(= expected-dlo-count (count dlo-files)) dlo-files
(> retries 10) (throw (ex-info "dlo in unexpected state"
{:dlo-files dlo-files}))
:else
(do
(Thread/sleep 100)
(recur (inc retries))))))]

(testing "dlo should contain two entries per failed command"
(is (= expected-dlo-count (count dlo-files)))))))))
14 changes: 14 additions & 0 deletions test/puppetlabs/puppetdb/testutils/services.clj
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,20 @@
:ssl-cert default-cert
:ssl-key default-ssl-key}))

(pls/defn-validated post-ssl-or-throw
[url-str :- String
body]
"Same as `post-ssl` except will throw if an error status is returned."
(let [resp (post-ssl url-str body)]
(if (>= (:status resp) 400)
(throw
(ex-info (format "Failed request to '%s' with status '%s'"
url-str (:status resp))
{:url url-str
:response resp
:status (:status resp)}))
resp)))

(defn certname-query
"Returns a function that will query the given endpoint (`suffix`)
for the provided `certname`"
Expand Down

0 comments on commit e3abb42

Please sign in to comment.