From 25e6aff308d67975d89d1400e465379670f434fe Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Wed, 24 May 2023 16:29:24 +0400 Subject: [PATCH 01/12] Add PostgresSQL Replication metricset --- metricbeat/docs/fields.asciidoc | 187 ++++++++++++++++++ metricbeat/docs/modules/postgresql.asciidoc | 7 + .../modules/postgresql/replication.asciidoc | 27 +++ metricbeat/docs/modules_list.asciidoc | 3 +- metricbeat/include/list_common.go | 1 + metricbeat/metricbeat.reference.yml | 3 + .../postgresql/_meta/config.reference.yml | 3 + metricbeat/module/postgresql/fields.go | 2 +- .../postgresql/replication/_meta/data.json | 42 ++++ .../replication/_meta/data_shared.json | 42 ++++ .../replication/_meta/docs.asciidoc | 1 + .../postgresql/replication/_meta/fields.yml | 83 ++++++++ .../module/postgresql/replication/data.go | 53 +++++ .../postgresql/replication/replication.go | 72 +++++++ .../replication_integration_test.go | 98 +++++++++ x-pack/metricbeat/metricbeat.reference.yml | 3 + 16 files changed, 625 insertions(+), 2 deletions(-) create mode 100644 metricbeat/docs/modules/postgresql/replication.asciidoc create mode 100644 metricbeat/module/postgresql/replication/_meta/data.json create mode 100644 metricbeat/module/postgresql/replication/_meta/data_shared.json create mode 100644 metricbeat/module/postgresql/replication/_meta/docs.asciidoc create mode 100644 metricbeat/module/postgresql/replication/_meta/fields.yml create mode 100644 metricbeat/module/postgresql/replication/data.go create mode 100644 metricbeat/module/postgresql/replication/replication.go create mode 100644 metricbeat/module/postgresql/replication/replication_integration_test.go diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 17f608f713b6..7c159b3ab26b 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -56304,6 +56304,193 @@ type: date -- +[float] +=== replication + +One row per replication, showing database replication statistics. Collected by querying pg_stat_database + + + +*`postgresql.replication.write_lag`*:: ++ +-- +Elapsed time during committed WALs from primary to the replica. + + +type: long + +-- + +*`postgresql.replication.state`*:: ++ +-- +WAL sender state. + + +type: keyword + +-- + +*`postgresql.replication.sync_priority`*:: ++ +-- +Priority of replica server being chosen as synchronous replica. + + +type: long + +-- + +*`postgresql.replication.backend_start`*:: ++ +-- +Start time when replica connected to primary. + + +type: long + +-- + +*`postgresql.replication.replay_lag`*:: ++ +-- +Elapsed time during committed WALs from primary to the replica. Fully committed in replica node + + +type: long + +-- + +*`postgresql.replication.flush_lag`*:: ++ +-- +Elapsed time during committed WALs from primary to the replica. WAL's has already been flushed but not yet applied. + + +type: long + +-- + +*`postgresql.replication.pid`*:: ++ +-- +Process id of walsender process + + +type: long + +-- + +*`postgresql.replication.client.address`*:: ++ +-- +Address of replica/streaming replication + + +type: long + +-- + +*`postgresql.replication.client.port`*:: ++ +-- +Port of replica/streaming replication + + +type: long + +-- + +*`postgresql.replication.client.hostname`*:: ++ +-- +Hostname of replica/streaming replication + + +type: long + +-- + +*`postgresql.replication.replay_lsn`*:: ++ +-- +Last transaction flush on disk at replica. + + +type: long + +-- + +*`postgresql.replication.user.name`*:: ++ +-- +Name of user which is used for Streaming replication + + +type: long + +-- + +*`postgresql.replication.user.id`*:: ++ +-- +ID of user which is used for Streaming replication + + +type: long + +-- + +*`postgresql.replication.sync_state`*:: ++ +-- +Sync State of replica. + + +type: long + +-- + +*`postgresql.replication.flush_lsn`*:: ++ +-- +Last transaction flush on disk at replica. + + +type: long + +-- + +*`postgresql.replication.application_name`*:: ++ +-- +Number of queries canceled due to conflicts with recovery in this database. + + +type: long + +-- + +*`postgresql.replication.write_lsn`*:: ++ +-- +Last transaction written on disk at replica. + + +type: long + +-- + +*`postgresql.replication.sent_lsn`*:: ++ +-- +Last transaction location sent to replica + + +type: long + +-- + [float] === statement diff --git a/metricbeat/docs/modules/postgresql.asciidoc b/metricbeat/docs/modules/postgresql.asciidoc index b09ade4bd127..fcc35873d070 100644 --- a/metricbeat/docs/modules/postgresql.asciidoc +++ b/metricbeat/docs/modules/postgresql.asciidoc @@ -96,6 +96,9 @@ metricbeat.modules: # Stats about every PostgreSQL process - activity + # Stats about every PostgreSQL replication process + - replication + # Stats about every statement executed in the server. It requires the # `pg_stats_statement` library to be configured in the server. #- statement @@ -126,6 +129,8 @@ The following metricsets are available: * <> +* <> + * <> include::postgresql/activity.asciidoc[] @@ -134,6 +139,8 @@ include::postgresql/bgwriter.asciidoc[] include::postgresql/database.asciidoc[] +include::postgresql/replication.asciidoc[] + include::postgresql/statement.asciidoc[] :edit_url!: diff --git a/metricbeat/docs/modules/postgresql/replication.asciidoc b/metricbeat/docs/modules/postgresql/replication.asciidoc new file mode 100644 index 000000000000..ede32d9c0c92 --- /dev/null +++ b/metricbeat/docs/modules/postgresql/replication.asciidoc @@ -0,0 +1,27 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// +:edit_url: https://github.com/elastic/beats/edit/main/metricbeat/module/postgresql/replication/_meta/docs.asciidoc + + +[[metricbeat-metricset-postgresql-replication]] +=== PostgreSQL replication metricset + +include::../../../module/postgresql/replication/_meta/docs.asciidoc[] + +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. + +:edit_url: + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/postgresql/replication/_meta/data.json[] +---- +:edit_url!: \ No newline at end of file diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 2a77f4d38cd3..9b91d948ef8f 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -247,9 +247,10 @@ This file is generated! See scripts/mage/docs_collector.go .2+| .2+| |<> |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | -.4+| .4+| |<> +.5+| .5+| |<> |<> |<> +|<> |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .3+| .3+| |<> diff --git a/metricbeat/include/list_common.go b/metricbeat/include/list_common.go index 434d2d7fc72c..88f6d0a06106 100644 --- a/metricbeat/include/list_common.go +++ b/metricbeat/include/list_common.go @@ -142,6 +142,7 @@ import ( _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/activity" _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/bgwriter" _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/database" + _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/replication" _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/statement" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus/collector" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 9034ab4ca9f7..d54cce6ca9d2 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -814,6 +814,9 @@ metricbeat.modules: # Stats about every PostgreSQL process - activity + # Stats about every PostgreSQL replication process + - replication + # Stats about every statement executed in the server. It requires the # `pg_stats_statement` library to be configured in the server. #- statement diff --git a/metricbeat/module/postgresql/_meta/config.reference.yml b/metricbeat/module/postgresql/_meta/config.reference.yml index 3b4ed4579d11..8891d1d1cf8a 100644 --- a/metricbeat/module/postgresql/_meta/config.reference.yml +++ b/metricbeat/module/postgresql/_meta/config.reference.yml @@ -10,6 +10,9 @@ # Stats about every PostgreSQL process - activity + # Stats about every PostgreSQL replication process + - replication + # Stats about every statement executed in the server. It requires the # `pg_stats_statement` library to be configured in the server. #- statement diff --git a/metricbeat/module/postgresql/fields.go b/metricbeat/module/postgresql/fields.go index cd679d7edc24..dc9906be153a 100644 --- a/metricbeat/module/postgresql/fields.go +++ b/metricbeat/module/postgresql/fields.go @@ -32,5 +32,5 @@ func init() { // AssetPostgresql returns asset data. // This is the base64 encoded zlib format compressed contents of module/postgresql. func AssetPostgresql() string { - return "eJzUWk+P47oNv8+nIN7l7RbZoL3OoUCxr0AX6O7bh92ix4CR6ViILHklOZn00xeU5D+xncwkY8+2Oc0kFvkTKZE/kv4Aezo9QmWc31lyP9QDgJde0SP88jV++e2Pf/7yAJCRE1ZWXhr9CH99AAD4TN5K4UAYpUh4yiC3poRuHTiyB7Ju/QDgCmP9Rhidy90j5KgcPQBYUoSOHmGHDwC5JJW5xyD8A2gsaQCNP/5U8fPW1FX6ZgIaf3o4yoh0nX7r6+nrQuHlQfpT+8OUtisa+fO7JsiMqEvSHiqyyQZQWSPIuRUb4ij1DqTOjS2RZbAZkO3nDfiCQNTWkvZnchtsYHLwBfqewFoUgA6cR0+AOmvWw4+a7GkNH1v/bE9nMsPvjKXabXj1plGy7j127qLmMzRh34wZetyio7WR2dkDjTmV0bvBD1csGqz66be4cWqlgy+kgy2KPekMJB9DreM2vVlfB8b/TiLb0+lo7BD1M+C+YEkzoKtms9bXeDSgMVqHZFpz7ciul/AVCwZldjvKQOpwul+EZcI/N/jgDq1YVUqKcBk3r1PekxTv6cD3LwAjlCTt15hllpy7Dcqnr5DWNYCitDsxFMb52+3xD+N8kNNiaJVHuSuOV5YqY2NUAgRLnCkIfvvyDZQx+7rixfHxDW/pKk6WNNPx/f7xK7A40HW5JRud2DOkdFA7Dpq5sSBMWda68fdR+iLYdiQ02XoFxsKHv4DMAeFfWj6BM2JPSShd8EVavOEd3biXU9X5ICWFxu2cH53cKgqWcoCWAGtvDijqugSFtRYF2VX/y6Oxe7KrkR5ldlKgYpe2h78TMPVrkgQVWlSKVPsFw+N0q4fhCOBopedHkiNam4qCxL4yUodfnUfr62oFR1SWBMkDf3tkwqEzsiFBHlFFYeuRkr8/edJOGu2gxBNY2knnySZ8LvoYs0zyNlC1IT4Y8br/ArLJU5qhv9WzsiQ4FqTjXU5kAI6RB/C1WoFc03rVPDQZCEZi+blIWKa34i1qxyzB6DfYzq/toe3p7e9xGmSgNYvBa2+SOkVSRlHhue2N5UseKZl0oM0QSmJ01HOQQufHsqb3GCRvRIF6N81kXrvJCJ1xBFhR0wUwR5RejuJsxLE1RhHqG6HYmth+IxLVWj6pBKMBQRmxv2Km23R/TEfOHIhDUzLEkEd10fOAqo7hs2PDI6EAf0r+foTvBfX3RE8k6rAXTIR9crXM1HhtYwVORQiajt0lL0sc5va+KJC6f6lGkiXbtffACra1v3SS+dO55oYNDVDAO9wGSvCe8UjX3R8nS6nQMndJ6yZBnAGmJ0GVB6PbFBjEcWEW9sff9JUL5BQ8KRc1kLVmIl3wTnJ0vkJfQF7rRpRSVx3NSz6crZkWnUmHW0XZ0B4td+JLYlHsm9JNkuPfm3Vxn8+RveClG28oPfnhpfjVQcnMj7NuV31+6oXBFC/DolBBjuRydewGUbYzXCNSA99M4wuur1m4W4H03eIxQelCa+BzHNei2GsxbUOHYRn+rGH+jdJDWBcZsIxH71oQew7APcSviLSOXRTBcIg4FlIUQzgjEC1t2UWO9JpuyDePXjovhQPcmtq3yiPFS5SuzfeubXP0uxaRbg/d2vQsGpjjWHN756Jjkm7tREFZrUYR4d664kssJ0wOreS+vnguCzwQbIk0VGRzY8tLx7OP1NKPmpxfAGkreSakXpbk1sFf63JYJUW4uTJ445X7bjwqwNLUOsYkJjIJpIsYXRVKtxj0OXRylOb6sgU3kprOJB+9Y0GWIJcqlUm8Ac88yXCg3a9YcCmVko6E0dmFMmBsCHfS4v/ZDoy/sEbL/8S2wg3G2NZ5Ttate0aZ/fQmHa27stryLvp+uI5tgrjOj2p7mg6KL8C2yWulZgcYzuaFQO28qSrKACEAYHM6gRq2FNgTyPH5KTDrXRgDJepTu42re0xJavlzIS0JzsehEXWVKQ2gbXK+Agt5oIUSTOhNQ1hAegfmqCEoD1wT3mljS1RqyOLggh8L1JkKXjaOAkPoKr9Ga2aYS0Zd072Y91dthEoZgUukpcaBrYbLxZ/bWHI0ax+A6WNDpFwsKhLFOXJ4DFQzKB1TqmY68BpK9bsmsOYYZkuNvG6q1Hzz4SizPrbzKVA7+ZlkVBMob6dSP2/2E5ovfw6E1xVoKWNvmNqKS/25nzwNWgGVlT/dAjjchI3JN0nkArkzCe5VK+MJQjtPe65X6NbClOUoOcwQKXs62lq3Z/UzohoxXAwXZ3itUYpt8HMRMwq+sHipv7VVRuy5AsD5oyyzuaQAWMEI7VVIxRLeDnmxjytE3DxkNlQM8tSQ25gjQKAoODxOtbrRh7kT8xMMjS3QxFQX7QnehZ0arVigUHVGDgrZNY66lwtGgs81s1heYCqyGJoY7uQ8lb+6QKTTf/Hp91ctyrsPnr5UMtwx6+J8FmsCFtwkkIgsmXh76oLB8ASsphpxLyD/vQ1dLQZftSOW/FY7subIt9DXVi9RhpsjX8EovUnfodP3ohsZwOXkRbEUtiT8TmhSO7KLtC8YWyP9TnB1lS3CYQO2JPxOaBkpWgxaEn47NGF0rqRYoJ5vcAjUgjgvZjUxF2k1xuGsJWEOZE8N3pHEZ2gLlZWxaE/r0OiYP4k18lMjRVh69gzA30Z1PowEoSUQptZh/mhph5ZLvPAeyLGITYbzJZz2RlIbOO9ovVtz4rRxzsY1oyuk3r1fhRH6uYIwvjS7DSvYTNkNwJG/3OzujL49+dmMPmyIhUzQa94NLTh2weWzwy65wwXPeDEwkeSCJeycEWYhCc7PFhvJkJGPZcLL4sX/TpU+KtfbeeFr6/WzF0LjGIr/Ci+qTdfxE2+HnkmV+mDSqzTNC6FBbvc6qKhqqB3u4iuhPlyFwLhueB+0G5i+bq7yli8Ydg2sYBWLkbBPTKLf/H3V/psittYwnPqezWPns1eHpfXnNb2enm4cPPwRB6vDdediBSq1QDZtm9XRtO2kw9ZXjcscZ9Yso6+z9QtHL1mcr2W4obNPgzpY/SKmqflqrbux+nMAS6lnhPdZalnW5awA8WlOgPg0O0DCiya8/dx9JtRzonM+y+gwH76vpqpVTFHOo87QZpDRQXZZq9d96CN94QgxQi+pNPa0jo3TGbtOw+uTOrOhhRC7NbEflMZ3z5r4HOeMDbsXAA09rvuAZtJ6OVuN+QKsSeGdcBO5fzu4gynuS+EqI1AteFqD/Fcf1ohywbM6hnnPUY0wlz2pY6R3HtQIdtlzOgZ75zHlMnNJ/7P8V7s/gFzWoCOc0/b8bwAAAP//1pQyXQ==" + return "eJzUW12v2zjOvj+/gpibti/SvLu352KBYmYWU6DtnEG76GWgyEwsHFlyJTk53l+/oCR/xF/5stvOuWqTmHpEUuRDin4Lz1g+Qq6t2xu03+QDgBNO4iP88hQ+/PzXh18eABK03IjcCa0e4V8PAAAf0RnBLXAtJXKHCeyMzqB5DiyaAxq7fgCwqTZuw7Xaif0j7Ji0+ABgUCKz+Ah79gCwEygT++iFvwXFMuxAoz9X5vR7o4s8fjIAjf5aOLKAdB2/a6/TXotxJw7ClfUXQ6tNrEh/fyqERPMiQ+UgRxN1ALnRHK1dkSKOQu1BqJ02GSMZpAZG+nMaXIrAC2NQuRO5FTbQO3Apcy2BBU+BWbCOOQSmkup5+FagKdfwa22fbXki039PWPL9hp7eVIusWz87NVH111VhW40Jc2zLLK61SE5+UKlTarXvfDGhUa/V97+FjWMtHVwqLGwZf0aVgCA3VCps0+n1NDD67yCyZyyP2nRRnwH3iWU4A7p8Nm09BdeASmkNkuGVC4tmvYStSDBIvd9jAkJ5774Iy4B9rrDBDauyPJeC+8O4uW/xlqRwTju2vwAMlwKVW7MkMWjtdVDeP0F8rgIUpN2IIdXWXa+PP7R1Xk6NoV48yF1RvDKYaxOiEjAwSJkC4bdPn0Fq/Vzk9HD4+Ya2NImTJM3kvl9+fQISB6rItmiCEVuKFBYKS0Fzpw1wnWWFqux9FC71uu0JjbpegTbw9p8gdsDgP0q8gNX8GaNQHLFFfHhDO7pyL2Xe2CAmhcrslB+t2Er0mrLADAIrnD4wXhQZSFYonqJZtT88avOMZtVbR+q94EySSWvnbwQMfRslQc4MkxJl/QHBo3SruuEI4GiEo59EQ9Q65Sny51wL5b+1jhlX5Cs4MmmQozjQp0ciHCpB4xPkkckgbN1b5PcXh8oKrSxkrASDe2EdmojPBhuzJBG0DSbrEO+VOG0/j2zQSxPmrrWsyBCOKapwliMZgGPgAXSsViDWuF5VPxoMBD2x9LtAWIa34gxTlliCVt9hO69qp22t297jMEhPaxaDV58kWQZShmHBU91rQ4c8UDJhQekulMjosGUgyazryxreo5e84SlT+2Emc+8mA3TC4WGFlUbAHJlwohdnA46t1hKZuhKKKZD01yNRtebjkqAVMJCaP0+o6bq1f40upw9IoSkqosujmuh5YLII4bNhwz2hAP8X7f0IX1Js7wlfkBd+LywS9sGnRSL7z1ZaoFTEQOGxOeRZxrq5vS0KhGofqp5kQXpt/WAF28KNeTL9Naa5YkMdFPCabT0leEN4hG3OjxWZkMwQd4nPDYI4AYwvHHMHWtUp0Iujwszvjz5pL84ZpeBBuUwBGqMH0gXtZMesy5lLYVeoSpSUk4amR96ePDMsOhGWbSUmXX3U3IkOiWH8uSrdBFr6vnou7PMc2fNWuvKE4ovrHopXFjJifpR1m+rzfSsMxnjpH/IVZE8uVce2E2UbxVUiFdDJ1C6l+pqE2xUI1zzcJyhNaPV8juJaEDsV0zZ46JbhZxXzlQkH/rnAgEVwvakgdg7ALcQvDbSOTBTAUIg4poKnXTg9EDVt2QeOdE835LNjTlgnuAW21YWrFw8UL1K6Ot/bus3R7loEut01a9WzqGD2Y831nYuGSdq15SkmhexFhFvrik+hnNA7qCW31wt+mbIDwhZRQY5mp0025p5tpAa/FWjdAkhryTMhdSJDu/b2WmfdKinA3UnNrjxyX7RjElimCxViEhGZCNIGjDb3pVsI+hQ6KUpTfVmD60mNPkmud0zRIOyEjGUSbcART9IUaJ9XJDgTUgqLXKtkpAzoK8KWiv+d9UD4U6OV+G9oK1yhjG2x26Gx65ZSZvfeuEZtrqQwtIu2HaaxDRDX+VFty+GgeAG2za6QcnaA3jdHArV1Os8xAQYeAKnTcqZgi549gej7T8qS1oHRkDFV1tuY3GNMUsv7hTDIKR/7RtQkU+pA2+zoCCxkgRqKV6HTFWEB4SzoowK/uOea8FppkzEpuywORuyYMpVIb2Vt0TOEpvKrVk00ccmw1nAv5s2kjpiUmrMl0lJlwHqF8eLPbgxanLUPQPSxIlI2FBWR4hwpPHqq6RftU6rqduAeSvWnQjD66O+WKnnNrVL1ydujSNrYTm+B6pufQUY1gPJ6KvXj7n588+UfnvDalBlMyBq6MHysP/eDb4NWgFnuymsA+5Ow0btNFLlA7oyCW9VK/wahvk871yu0a66zrJccZoiUrTXqWrel9ROiGjCMhosTvEZLSTr4sYgJBR1YNtbf2krNn6kCYPNHWWJzcQGgBXpoJyGlS1jb58U2Lh9xdz6zMUkgy4rchhwBnPGUwuNQq5s5f+9E/IT5xhYoJKrLTAmv/U61kiSQyyJBC6loGkfNcEFP8OnKJJYe0Dka5psYtrQOs1fWE+n4v/DrN5Mapd17S4+VDDfcdVE+CzUBCa4SSEAWVbwtm2DQ9YDVUCPuAvLf2tBkMXjXjkjy99qR0Uc6ha4waokyXB/pCAbpVfr2nb6LTqQHt0PH06WwReE3QhPKolmkfUHYKuk3givyZBEO67FF4TdCS1DiYtCi8Ouhca12UvAF6vkKB2eKI+XFpEDiIvWK4XLWINcHNGWFtyfxDG3BLNeGmXLtGx3zJ7FKfmykcINnfQDe9ep86AliBoHrQvn7R4N7ZqjE83MgxzQ0GU4fobTXk1rBeY3r/ZoSpwn3bFQz2lSo/ZuVv0I/XcBfX+r9hhbYDOkNwKIbb3Y3St+WbjaldxtiPhO0mnddDfZNMO47ZJIbTHDGip6JRBMsoecEWeKT4PxssZIMCbpQJlwWL36eKr1XrrcGVuaq2Fsi+0X7yYTMD6zdPSfbSDZ8o3+9l/wuWW6pbiTDVF3YqhiDr+8+2DA5nBuR0UmIY7BRG5ff7N9RtX999wHigJCXPLJoqfgmN0Kb0ynhe5TzFMWF+xa/42pmeIuhX60tKj/hG/vturDTyjk/cXQ9zs8kK5jQtworrCdtgWjBEdKCuWTlT+pX9PfvQvo+RyVBNLtUOsHBTe1kYdOfeE9f3314Zf0FUFUg+86Ch03xpHC+RC3RhdHVse7I/NPJIvF5sR7MixdNw6RyaiT2ViDvmkHZqL7/t84gy0jnw7F/ycHTJ23cXVhGhnVvxfNHFHcbpuqw27lu0D5Qxm7P63gXBq1CQ4i56ZA4Nld+M/OJmvHD5oFpxCFe38j9fLGi5h2+D73yu0H5RDeeYm/IHqXifhqk7U7DlooR9WdxnDOvBvxdytrI6ZbTalVXXapXi8oticdfDXoi7QeIdYVmnO7X44H3kv2T97/C1Bn9y5/K4Wu7gZfBTqQKdai2U73/5eU2b3/xvIDCsn14A8z5ytc3WK94/auZj7xvjOp7vk/U3Fd7rRgW+vMDg6ff/fW09mC4KehkTIxfzp0CTuZdp9Z1+HLlnNFfYY6y+9ypWM6kXKB5Vs+mBNXWg02mmFSu0cd5m0pqujk/4npR43Qs/QmdffirgdW+s6iueAqlminacwAzoWaE91EokRXZrADZy5wA2cvsAJGNqvB6v/uITM2JzrokwcN8+J50Xsi6daUSZhJI8CCarNW6bGwjvXBiMEDPMNOmXIc5iRkvmbvHJw5i+BvDcDkbrn/jtN5ZFZ/inPF+/gKg/kr7NqCJME7MdqV0Ada44I1wI+f8fnA7Q5uXwiUuKhf0Vi//bmcNKBf01T7MW1w1wFzWU/tIb3TUAHZZP+2DvdFNHWb5kvYn+Xeb34NcVqE9nMP6/F8AAAD//43XbRs=" } diff --git a/metricbeat/module/postgresql/replication/_meta/data.json b/metricbeat/module/postgresql/replication/_meta/data.json new file mode 100644 index 000000000000..53927019adb3 --- /dev/null +++ b/metricbeat/module/postgresql/replication/_meta/data.json @@ -0,0 +1,42 @@ +{ + "@timestamp": "2023-05-24T12:22:59.233Z", + "event": { + "dataset": "postgresql.replication", + "duration": 115000, + "module": "postgresql" + }, + "metricset": { + "name": "replication", + "period": 10000 + }, + "postgresql": { + "replication": { + "write_lag": "", + "state": "streaming", + "sync_priority": 0, + "backend_start": "2023-05-23T17:01:53.697Z", + "replay_lag": "", + "flush_lag": "", + "pid": 2516780, + "client": { + "address": "192.168.128.2", + "port": 46594, + "hostname": "" + }, + "replay_lsn": "A0/60003558", + "user": { + "name": "replicator", + "id": 16442 + }, + "sync_state": "async", + "flush_lsn": "A0/60003558", + "application_name": "walreceiver", + "write_lsn": "A0/60003558", + "sent_lsn": "A0/60003558" + } + }, + "service": { + "address": "postgres://localhost:5432/postgres?connect_timeout=10", + "type": "postgresql" + } +} diff --git a/metricbeat/module/postgresql/replication/_meta/data_shared.json b/metricbeat/module/postgresql/replication/_meta/data_shared.json new file mode 100644 index 000000000000..ef2902438b0f --- /dev/null +++ b/metricbeat/module/postgresql/replication/_meta/data_shared.json @@ -0,0 +1,42 @@ +{ + "@timestamp": "2023-05-24T12:22:59.233Z", + "event": { + "dataset": "postgresql.replication", + "duration": 115000, + "module": "postgresql" + }, + "metricset": { + "name": "replication", + "period": 10000 + }, + "postgresql": { + "replication": { + "write_lag": "", + "state": "", + "sync_priority": 0, + "backend_start": "", + "replay_lag": "", + "flush_lag": "", + "pid": 0, + "client": { + "address": "", + "port": 0, + "hostname": "" + }, + "replay_lsn": "", + "user": { + "name": "", + "id": 0 + }, + "sync_state": "", + "flush_lsn": "", + "application_name": "", + "write_lsn": "", + "sent_lsn": "" + } + }, + "service": { + "address": "postgres://localhost:5432/postgres?connect_timeout=10", + "type": "postgresql" + } +} diff --git a/metricbeat/module/postgresql/replication/_meta/docs.asciidoc b/metricbeat/module/postgresql/replication/_meta/docs.asciidoc new file mode 100644 index 000000000000..b5377bf6548b --- /dev/null +++ b/metricbeat/module/postgresql/replication/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the `replication` metricset of the PostgreSQL module. diff --git a/metricbeat/module/postgresql/replication/_meta/fields.yml b/metricbeat/module/postgresql/replication/_meta/fields.yml new file mode 100644 index 000000000000..8266d6ee0bdf --- /dev/null +++ b/metricbeat/module/postgresql/replication/_meta/fields.yml @@ -0,0 +1,83 @@ +- name: replication + type: group + description: > + One row per replication, showing database replication statistics. Collected by querying + pg_stat_database + release: ga + fields: + - name: write_lag + type: long + description: > + Elapsed time during committed WALs from primary to the replica. + - name: state + type: keyword + description: > + WAL sender state. + - name: sync_priority + type: long + description: > + Priority of replica server being chosen as synchronous replica. + - name: backend_start + type: long + description: > + Start time when replica connected to primary. + - name: replay_lag + type: long + description: > + Elapsed time during committed WALs from primary to the replica. + Fully committed in replica node + - name: flush_lag + type: long + description: > + Elapsed time during committed WALs from primary to the replica. + WAL's has already been flushed but not yet applied. + - name: pid + type: long + description: > + Process id of walsender process + - name: client.address + type: long + description: > + Address of replica/streaming replication + - name: client.port + type: long + description: > + Port of replica/streaming replication + - name: client.hostname + type: long + description: > + Hostname of replica/streaming replication + - name: replay_lsn + type: long + description: > + Last transaction flush on disk at replica. + - name: user.name + type: long + description: > + Name of user which is used for Streaming replication + - name: user.id + type: long + description: > + ID of user which is used for Streaming replication + - name: sync_state + type: long + description: > + Sync State of replica. + - name: flush_lsn + type: long + description: > + Last transaction flush on disk at replica. + - name: application_name + type: long + description: > + Number of queries canceled due to conflicts with recovery in this + database. + - name: write_lsn + type: long + description: > + Last transaction written on disk at replica. + - name: sent_lsn + type: long + description: > + Last transaction location sent to replica + diff --git a/metricbeat/module/postgresql/replication/data.go b/metricbeat/module/postgresql/replication/data.go new file mode 100644 index 000000000000..30b52691a886 --- /dev/null +++ b/metricbeat/module/postgresql/replication/data.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication + +import ( + "time" + + s "github.com/elastic/beats/v7/libbeat/common/schema" + c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" +) + +// Based on https://www.postgresql.org/docs/9.2/static/monitoring-stats.html#PG-STAT-DATABASE-VIEW +var schema = s.Schema{ + "pid": c.Int("pid"), + "user": s.Object{ + "id": c.Int("usesysid", s.Optional), + "name": c.Str("usename"), + }, + "application_name": c.Str("application_name"), + "client": s.Object{ + "address": c.Str("client_addr"), + "hostname": c.Str("client_hostname"), + "port": c.Int("client_port", s.Optional), + }, + "backend_start": c.Time(time.RFC3339Nano, "backend_start"), + "backend_xmin": c.Time(time.RFC3339Nano, "backend_xmin", s.Optional), + "state": c.Str("state"), + "sent_lsn": c.Str("sent_lsn"), + "write_lsn": c.Str("write_lsn"), + "flush_lsn": c.Str("flush_lsn"), + "replay_lsn": c.Str("replay_lsn"), + "write_lag": c.Str("write_lag"), + "flush_lag": c.Str("flush_lag"), + "replay_lag": c.Str("replay_lag"), + "sync_priority": c.Int("sync_priority"), + "sync_state": c.Str("sync_state"), + "reply_time": c.Str("reply_time"), +} diff --git a/metricbeat/module/postgresql/replication/replication.go b/metricbeat/module/postgresql/replication/replication.go new file mode 100644 index 000000000000..48d7f52771a9 --- /dev/null +++ b/metricbeat/module/postgresql/replication/replication.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/postgresql" + + // Register postgresql replication/sql driver + _ "github.com/lib/pq" +) + +// init registers the MetricSet with the central registry. +// The New method will be called after the setup of the module and before starting to fetch data +func init() { + mb.Registry.MustAddMetricSet("postgresql", "replication", New, + mb.WithHostParser(postgresql.ParseURL), + mb.DefaultMetricSet(), + ) +} + +// MetricSet type defines all fields of the MetricSet +type MetricSet struct { + *postgresql.MetricSet +} + +// New create a new instance of the postgresql replication MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + ms, err := postgresql.NewMetricSet(base) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil +} + +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { + ctx := context.Background() + results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_replication") + if err != nil { + return fmt.Errorf("error in QueryStats %w\n", err) + } + + for _, result := range results { + data, _ := schema.Apply(result) + reporter.Event(mb.Event{ + MetricSetFields: data, + }) + } + + return nil +} diff --git a/metricbeat/module/postgresql/replication/replication_integration_test.go b/metricbeat/module/postgresql/replication/replication_integration_test.go new file mode 100644 index 000000000000..5c93f313d6c2 --- /dev/null +++ b/metricbeat/module/postgresql/replication/replication_integration_test.go @@ -0,0 +1,98 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration +// +build integration + +package replication + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/tests/compose" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/metricbeat/module/postgresql" + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFetch(t *testing.T) { + service := compose.EnsureUp(t, "postgresql") + + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) + events, errs := mbtest.ReportingFetchV2Error(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check event fields + db_oid := event["oid"].(int64) + assert.True(t, db_oid >= 0) + assert.Contains(t, event, "name") + _, ok := event["name"].(string) + assert.True(t, ok) + + rows := event["rows"].(mapstr.M) + assert.Contains(t, rows, "returned") + assert.Contains(t, rows, "fetched") + assert.Contains(t, rows, "inserted") + assert.Contains(t, rows, "updated") + assert.Contains(t, rows, "deleted") +} + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "postgresql") + + getOid := func(event mapstr.M) int { + oid, err := event.GetValue("postgresql.database.oid") + require.NoError(t, err) + + switch oid := oid.(type) { + case int: + return oid + case int64: + return int(oid) + } + t.Log(event) + t.Fatalf("no numeric oid in event: %v (%T)", oid, oid) + return 0 + } + + f := mbtest.NewFetcher(t, getConfig(service.Host())) + f.WriteEventsCond(t, "", func(event mapstr.M) bool { + return getOid(event) != 0 + }) + f.WriteEventsCond(t, "./_meta/data_shared.json", func(event mapstr.M) bool { + return getOid(event) == 0 + }) +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": "postgresql", + "metricsets": []string{"database"}, + "hosts": []string{postgresql.GetDSN(host)}, + "username": postgresql.GetEnvUsername(), + "password": postgresql.GetEnvPassword(), + } +} diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 3a275347ff00..c9bb99067b7b 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1242,6 +1242,9 @@ metricbeat.modules: # Stats about every PostgreSQL process - activity + # Stats about every PostgreSQL replication process + - replication + # Stats about every statement executed in the server. It requires the # `pg_stats_statement` library to be configured in the server. #- statement From a3a716b3d41fa2e069904315e42c025b6ecc226a Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Wed, 31 May 2023 04:52:24 +0400 Subject: [PATCH 02/12] Fixes after review --- metricbeat/docs/fields.asciidoc | 52 ++++++++++------- .../modules/postgresql/replication.asciidoc | 2 + metricbeat/docs/modules_list.asciidoc | 2 +- metricbeat/module/postgresql/fields.go | 2 +- .../postgresql/replication/_meta/fields.yml | 57 +++++++++++-------- .../module/postgresql/replication/data.go | 11 ++-- .../postgresql/replication/replication.go | 8 +-- 7 files changed, 76 insertions(+), 58 deletions(-) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 7c159b3ab26b..84ed093f1fec 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -56307,7 +56307,7 @@ type: date [float] === replication -One row per replication, showing database replication statistics. Collected by querying pg_stat_database +One row per replication, showing database replication statistics. Collected by querying pg_stat_replication. @@ -56324,7 +56324,7 @@ type: long *`postgresql.replication.state`*:: + -- -WAL sender state. +Current WAL sender state type: keyword @@ -56334,7 +56334,7 @@ type: keyword *`postgresql.replication.sync_priority`*:: + -- -Priority of replica server being chosen as synchronous replica. +Priority of this standby server for being chosen as the synchronous standby. type: long @@ -56344,7 +56344,17 @@ type: long *`postgresql.replication.backend_start`*:: + -- -Start time when replica connected to primary. +Time when this process was started, i.e., when the client connected to this WAL sender. + + +type: long + +-- + +*`postgresql.replication.backend_xmin`*:: ++ +-- +This standby's xmin horizon reported by hot_standby_feedback. type: long @@ -56354,7 +56364,7 @@ type: long *`postgresql.replication.replay_lag`*:: + -- -Elapsed time during committed WALs from primary to the replica. Fully committed in replica node +Elapsed time during committed WALs from primary to the replica. Fully committed to replica node. type: long @@ -56364,7 +56374,7 @@ type: long *`postgresql.replication.flush_lag`*:: + -- -Elapsed time during committed WALs from primary to the replica. WAL's has already been flushed but not yet applied. +The elapsed time during committed WALs from primary to the replica. WALs have already been flushed but have not yet been applied. type: long @@ -56374,7 +56384,7 @@ type: long *`postgresql.replication.pid`*:: + -- -Process id of walsender process +Process ID of a WAL sender process. type: long @@ -56384,7 +56394,7 @@ type: long *`postgresql.replication.client.address`*:: + -- -Address of replica/streaming replication +IP address of the client connected to this WAL sender. If this field is null, it indicates that the client is connected via a Unix socket on the server machine. type: long @@ -56394,7 +56404,7 @@ type: long *`postgresql.replication.client.port`*:: + -- -Port of replica/streaming replication +TCP port number that the client is using for communication with this WAL sender, or -1 if a Unix socket is used. type: long @@ -56404,7 +56414,7 @@ type: long *`postgresql.replication.client.hostname`*:: + -- -Hostname of replica/streaming replication +Host name of the connected client, as reported by a reverse DNS lookup of client_addr. This field will only be non-null for IP connections, and only when log_hostname is enabled. type: long @@ -56414,7 +56424,7 @@ type: long *`postgresql.replication.replay_lsn`*:: + -- -Last transaction flush on disk at replica. +Last transaction log position replayed into the database on this standby server. type: long @@ -56424,17 +56434,17 @@ type: long *`postgresql.replication.user.name`*:: + -- -Name of user which is used for Streaming replication +Name of the user logged into this WAL sender process. -type: long +type: keyword -- *`postgresql.replication.user.id`*:: + -- -ID of user which is used for Streaming replication +OID of the user logged into this WAL sender process. type: long @@ -56444,17 +56454,17 @@ type: long *`postgresql.replication.sync_state`*:: + -- -Sync State of replica. +Synchronous state of this standby server. -type: long +type: keyword -- *`postgresql.replication.flush_lsn`*:: + -- -Last transaction flush on disk at replica. +Last transaction log position flushed to disk by this standby server. type: long @@ -56464,17 +56474,17 @@ type: long *`postgresql.replication.application_name`*:: + -- -Number of queries canceled due to conflicts with recovery in this database. +Name of the application that is connected to this WAL sender. -type: long +type: keyword -- *`postgresql.replication.write_lsn`*:: + -- -Last transaction written on disk at replica. +Last transaction log position written to disk by this standby server. type: long @@ -56484,7 +56494,7 @@ type: long *`postgresql.replication.sent_lsn`*:: + -- -Last transaction location sent to replica +Last transaction log position sent on this connection. type: long diff --git a/metricbeat/docs/modules/postgresql/replication.asciidoc b/metricbeat/docs/modules/postgresql/replication.asciidoc index ede32d9c0c92..0cda7f433b14 100644 --- a/metricbeat/docs/modules/postgresql/replication.asciidoc +++ b/metricbeat/docs/modules/postgresql/replication.asciidoc @@ -7,6 +7,8 @@ This file is generated! See scripts/mage/docs_collector.go [[metricbeat-metricset-postgresql-replication]] === PostgreSQL replication metricset +beta[] + include::../../../module/postgresql/replication/_meta/docs.asciidoc[] This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 9b91d948ef8f..79ddbeb03712 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -250,7 +250,7 @@ This file is generated! See scripts/mage/docs_collector.go .5+| .5+| |<> |<> |<> -|<> +|<> beta[] |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .3+| .3+| |<> diff --git a/metricbeat/module/postgresql/fields.go b/metricbeat/module/postgresql/fields.go index dc9906be153a..86f9f22211a6 100644 --- a/metricbeat/module/postgresql/fields.go +++ b/metricbeat/module/postgresql/fields.go @@ -32,5 +32,5 @@ func init() { // AssetPostgresql returns asset data. // This is the base64 encoded zlib format compressed contents of module/postgresql. func AssetPostgresql() string { - return "eJzUW12v2zjOvj+/gpibti/SvLu352KBYmYWU6DtnEG76GWgyEwsHFlyJTk53l+/oCR/xF/5stvOuWqTmHpEUuRDin4Lz1g+Qq6t2xu03+QDgBNO4iP88hQ+/PzXh18eABK03IjcCa0e4V8PAAAf0RnBLXAtJXKHCeyMzqB5DiyaAxq7fgCwqTZuw7Xaif0j7Ji0+ABgUCKz+Ah79gCwEygT++iFvwXFMuxAoz9X5vR7o4s8fjIAjf5aOLKAdB2/a6/TXotxJw7ClfUXQ6tNrEh/fyqERPMiQ+UgRxN1ALnRHK1dkSKOQu1BqJ02GSMZpAZG+nMaXIrAC2NQuRO5FTbQO3Apcy2BBU+BWbCOOQSmkup5+FagKdfwa22fbXki039PWPL9hp7eVIusWz87NVH111VhW40Jc2zLLK61SE5+UKlTarXvfDGhUa/V97+FjWMtHVwqLGwZf0aVgCA3VCps0+n1NDD67yCyZyyP2nRRnwH3iWU4A7p8Nm09BdeASmkNkuGVC4tmvYStSDBIvd9jAkJ5774Iy4B9rrDBDauyPJeC+8O4uW/xlqRwTju2vwAMlwKVW7MkMWjtdVDeP0F8rgIUpN2IIdXWXa+PP7R1Xk6NoV48yF1RvDKYaxOiEjAwSJkC4bdPn0Fq/Vzk9HD4+Ya2NImTJM3kvl9+fQISB6rItmiCEVuKFBYKS0Fzpw1wnWWFqux9FC71uu0JjbpegTbw9p8gdsDgP0q8gNX8GaNQHLFFfHhDO7pyL2Xe2CAmhcrslB+t2Er0mrLADAIrnD4wXhQZSFYonqJZtT88avOMZtVbR+q94EySSWvnbwQMfRslQc4MkxJl/QHBo3SruuEI4GiEo59EQ9Q65Sny51wL5b+1jhlX5Cs4MmmQozjQp0ciHCpB4xPkkckgbN1b5PcXh8oKrSxkrASDe2EdmojPBhuzJBG0DSbrEO+VOG0/j2zQSxPmrrWsyBCOKapwliMZgGPgAXSsViDWuF5VPxoMBD2x9LtAWIa34gxTlliCVt9hO69qp22t297jMEhPaxaDV58kWQZShmHBU91rQ4c8UDJhQekulMjosGUgyazryxreo5e84SlT+2Emc+8mA3TC4WGFlUbAHJlwohdnA46t1hKZuhKKKZD01yNRtebjkqAVMJCaP0+o6bq1f40upw9IoSkqosujmuh5YLII4bNhwz2hAP8X7f0IX1Js7wlfkBd+LywS9sGnRSL7z1ZaoFTEQOGxOeRZxrq5vS0KhGofqp5kQXpt/WAF28KNeTL9Naa5YkMdFPCabT0leEN4hG3OjxWZkMwQd4nPDYI4AYwvHHMHWtUp0Iujwszvjz5pL84ZpeBBuUwBGqMH0gXtZMesy5lLYVeoSpSUk4amR96ePDMsOhGWbSUmXX3U3IkOiWH8uSrdBFr6vnou7PMc2fNWuvKE4ovrHopXFjJifpR1m+rzfSsMxnjpH/IVZE8uVce2E2UbxVUiFdDJ1C6l+pqE2xUI1zzcJyhNaPV8juJaEDsV0zZ46JbhZxXzlQkH/rnAgEVwvakgdg7ALcQvDbSOTBTAUIg4poKnXTg9EDVt2QeOdE835LNjTlgnuAW21YWrFw8UL1K6Ot/bus3R7loEut01a9WzqGD2Y831nYuGSdq15SkmhexFhFvrik+hnNA7qCW31wt+mbIDwhZRQY5mp0025p5tpAa/FWjdAkhryTMhdSJDu/b2WmfdKinA3UnNrjxyX7RjElimCxViEhGZCNIGjDb3pVsI+hQ6KUpTfVmD60mNPkmud0zRIOyEjGUSbcART9IUaJ9XJDgTUgqLXKtkpAzoK8KWiv+d9UD4U6OV+G9oK1yhjG2x26Gx65ZSZvfeuEZtrqQwtIu2HaaxDRDX+VFty+GgeAG2za6QcnaA3jdHArV1Os8xAQYeAKnTcqZgi549gej7T8qS1oHRkDFV1tuY3GNMUsv7hTDIKR/7RtQkU+pA2+zoCCxkgRqKV6HTFWEB4SzoowK/uOea8FppkzEpuywORuyYMpVIb2Vt0TOEpvKrVk00ccmw1nAv5s2kjpiUmrMl0lJlwHqF8eLPbgxanLUPQPSxIlI2FBWR4hwpPHqq6RftU6rqduAeSvWnQjD66O+WKnnNrVL1ydujSNrYTm+B6pufQUY1gPJ6KvXj7n588+UfnvDalBlMyBq6MHysP/eDb4NWgFnuymsA+5Ow0btNFLlA7oyCW9VK/wahvk871yu0a66zrJccZoiUrTXqWrel9ROiGjCMhosTvEZLSTr4sYgJBR1YNtbf2krNn6kCYPNHWWJzcQGgBXpoJyGlS1jb58U2Lh9xdz6zMUkgy4rchhwBnPGUwuNQq5s5f+9E/IT5xhYoJKrLTAmv/U61kiSQyyJBC6loGkfNcEFP8OnKJJYe0Dka5psYtrQOs1fWE+n4v/DrN5Mapd17S4+VDDfcdVE+CzUBCa4SSEAWVbwtm2DQ9YDVUCPuAvLf2tBkMXjXjkjy99qR0Uc6ha4waokyXB/pCAbpVfr2nb6LTqQHt0PH06WwReE3QhPKolmkfUHYKuk3givyZBEO67FF4TdCS1DiYtCi8Ouhca12UvAF6vkKB2eKI+XFpEDiIvWK4XLWINcHNGWFtyfxDG3BLNeGmXLtGx3zJ7FKfmykcINnfQDe9ep86AliBoHrQvn7R4N7ZqjE83MgxzQ0GU4fobTXk1rBeY3r/ZoSpwn3bFQz2lSo/ZuVv0I/XcBfX+r9hhbYDOkNwKIbb3Y3St+WbjaldxtiPhO0mnddDfZNMO47ZJIbTHDGip6JRBMsoecEWeKT4PxssZIMCbpQJlwWL36eKr1XrrcGVuaq2Fsi+0X7yYTMD6zdPSfbSDZ8o3+9l/wuWW6pbiTDVF3YqhiDr+8+2DA5nBuR0UmIY7BRG5ff7N9RtX999wHigJCXPLJoqfgmN0Kb0ynhe5TzFMWF+xa/42pmeIuhX60tKj/hG/vturDTyjk/cXQ9zs8kK5jQtworrCdtgWjBEdKCuWTlT+pX9PfvQvo+RyVBNLtUOsHBTe1kYdOfeE9f3314Zf0FUFUg+86Ch03xpHC+RC3RhdHVse7I/NPJIvF5sR7MixdNw6RyaiT2ViDvmkHZqL7/t84gy0jnw7F/ycHTJ23cXVhGhnVvxfNHFHcbpuqw27lu0D5Qxm7P63gXBq1CQ4i56ZA4Nld+M/OJmvHD5oFpxCFe38j9fLGi5h2+D73yu0H5RDeeYm/IHqXifhqk7U7DlooR9WdxnDOvBvxdytrI6ZbTalVXXapXi8oticdfDXoi7QeIdYVmnO7X44H3kv2T97/C1Bn9y5/K4Wu7gZfBTqQKdai2U73/5eU2b3/xvIDCsn14A8z5ytc3WK94/auZj7xvjOp7vk/U3Fd7rRgW+vMDg6ff/fW09mC4KehkTIxfzp0CTuZdp9Z1+HLlnNFfYY6y+9ypWM6kXKB5Vs+mBNXWg02mmFSu0cd5m0pqujk/4npR43Qs/QmdffirgdW+s6iueAqlminacwAzoWaE91EokRXZrADZy5wA2cvsAJGNqvB6v/uITM2JzrokwcN8+J50Xsi6daUSZhJI8CCarNW6bGwjvXBiMEDPMNOmXIc5iRkvmbvHJw5i+BvDcDkbrn/jtN5ZFZ/inPF+/gKg/kr7NqCJME7MdqV0Ada44I1wI+f8fnA7Q5uXwiUuKhf0Vi//bmcNKBf01T7MW1w1wFzWU/tIb3TUAHZZP+2DvdFNHWb5kvYn+Xeb34NcVqE9nMP6/F8AAAD//43XbRs=" + return "eJzUW02P4zbSvs+vKOQyMy88xrvXPiwQJFnsAJOkg5lFjkaZKllEU6RCUu1Wfv2iSOrDkuz2h9zJ+tRti1UPq8jiU8XSJ3ii5gEq4/zOkvtDvQPw0it6gO8e45dff/vy3TuAjJywsvLS6Af45zsAgJ/JWykcCKMUCU8Z5NaU0I8DR/aZrFu/A3CFsX4jjM7l7gFyVI7eAVhShI4eYIfvAHJJKnMPQfgn0FjSCBp/fFPx89bUVfpmBhp/BjjKiHSdfhvqGepC4eWz9E33w5y2Exr586smyIyoS9IeKrLJBlBZI8i5FRtiL/UOpM6NLZFlsBmQ7ecN+IJA1NaS9gdyW2xgcvAF+oHAWhSADpxHT4A6a8fDHzXZZg0/dP7ZNgcyw++MpdptePSmVbIePHboovYzNuHQjBl63KKjtZHZwQOtOZXRu9EPJywarPr5xzhx6qSDL6SDLYon0hlIXoZax2l6sz4NjP+dRfZEzd7YMepXwP2CJS2ArlrMWo9xaUBrtB7JvObakV3fw1csGJTZ7SgDqcPqPgvLjH8u8MEVWrGqlBRhM25uUz6QFPfpyPdngBFKkvZrzDJLzl0G5fMjpHEtoCjtSgyFcf5ye/zbOB/kdBg65VHuiuOVpcrYGJUAwRKfFAQ//vIVlDFPdcWD4+MbntJJnCxpoeX77YdHYHGg63JLNjpxYEjpoHYcNHNjQZiyrHXr7730RbDtRGiy9QqMhU//AJkDwn+0fAFnxBMloXTEF2nwhmd04VyaqvdBOhRat/P56ORWUbCUA7QEWHvzjKKuS1BYa1GQXQ2/3Bv7RHY10aPMTgpU7NJu8fcC5n5NkqBCi0qR6r5geHzc6nE4Athb6fmR5IjOpqIg8VQZqcOvzqP1dbWCPSpLguQzf7tnwqEzsuGA3KOKwtYTJT+9eNJOGu2gxAYs7aTzZBM+F32MWSZ5Gqi6EB+MeNp/AdnsKs3QX+pZWRLsC9JxLycyAPvIA3hbrUCuab1qH5oNBBOx/FwkLPNT8Ra1Y5Zg9BtM5323aAd6h3OcBxlozd3gdTtJNZGUUVR4aHtjeZNHSiYdaDOGkhgdDRyk0PmprPk5BskbUaDezTOZWycZoTOOACtqOgJmj9LLSZyNOLbGKEJ9IRRbE9tvQqI6yyeVYDQgKCOeTpjpMt0/pCVnnolDUzLEmEf10fMZVR3DZ8+GJ0IB/i/5+wG+FTScE72QqMNcMBH22dEyU9OxrRX4KELQtO83eVni+GwfigKph5tqIlmyXQcPrGBb+2MrmT+9ay6Y0AgFfMBtoAQfGY90/f5xspQKLXOXNG4WxAFgehFUeTC6OwKDOE7Mwvz4m6FygXwEz8pFDWStmTkueCY5Ol+hLyCvdStKqZOO5iGfDsbMi86kw62ibGyPjjvxJrEontrUTZLj39txcZ6vkb3gpQt3KL348aZ476Bk5senbp99fh6EwRQvw6CQQU7kcnbsRlG2N1wrUgPvTOMLzq9ZuFuB9P3gKUHpQ2vgcxzXothTMW1Dz+M0/FXD/I7SQxgXGbCMS+9UEHsNwDXEr4i0jl0UwXCI2BdSFGM4ExAdbdlFjnRLNeSrRy+dl8IBbk3tO+WR4iVK1533ritzDKsWkW6P3drWLFqY01hzeeWiZ5Ju7URBWa0mEeHavOKXmE6YHDrJQ31xXRb4TLAl0lCRzY0tjy3PIVJLf9Tk/B2QdpIXQuplSW4d/LUux1lShJsrgxduuW/GowIsTa1jTGIik0C6iNFVIXWLQZ9DJ0dpzi87cBOpaU3y0tsXZAlyqVKaxBPwzJMMB9qnFQsupVLSkTA6O5IGTA3hGi3+l+3A+AtrtPwzlhUuMMa2znOybj0wyuKrN+no3JXVlmcx9MNpbDPEdXlU22Y+KJ6BbZPXSi0OMKzNI4HaeVNVlAFCAMDmdAI1bCmwJ5DT9VNgNtgwBkrUTTeNk3NMh9T914W0JPg8DoWok0xpBG2T8xa4kwc6KMGE3rSEBaR3YPYagvLANeGDNrZEpcYsDo74sUCdqeBl4ygwhD7za7Vmhrlk1DVfi/l40kaolBF4j2OpdWCn4Xjy5zaWHC1aB2D62BIpF5OKRHH2HB4D1QxKp5SqvR24hVL9qgms2Ye7pVZef6vUfvNpL7MhtsNboO7mZ5ZRzaC8nEr9dXc/ofjy/4HwugItZewNU1txrD73F98GrYDKyjeXAA47YWPyTRJ5h7MzCR5kK9MbhO4+7bVaoVsLU5aTw2GBSDnQ0eW6A6sfENWI4Wi4OMBrjVJsg78WMaPgDYvH6ltbZcQTZwC4fJRlNpcUACuYoD0JqbiHt8O5OMQVIm4eTjZUDLJpyW08I0CgKDg8zpW60Yd7J+YnGApboImpLtoGPoSZGq1YoFB1Rg4K2ReO+uaCieBDzSyWB5iKLIYihmucp/K9C0Q6/Ref/njSojz74OljKcMVd118nsWcgAW3B0hElky8bfpgMF4Bq7lC3BnkfzChk8ngTTNiyW81I2v2vAt9bfU90nCz5y0YpbfHd6j0nbUjA7icvCjuhS0JvxKa1I7sXcoXjK2VfiW4usruwmEDtiT8SmgZKbobtCT8cmjC6FxJcYd8vsUhUAviczGriblIpzFezloS5pls0+KdSHyFtlBZGYu2WYdCx/KHWCs/FVKEpVfXAHw/yfNhIggtgTC1DvePlnZoOcULfSD7IhYZDofwsTeR2sL5QOvdmg9OG+/ZOGd0hdS7j6twhX6oIFxfmt2GFWzm7AbgyB8vdvdG3zZ+MaOPC2LhJBgU78YWnLrg+Nphl1zhgle8GJhIcsE97JwRZuEQXJ4ttpIhIx/ThPPixd8nS5+k64OGlaUy9oHIadJ+0CFzXe4+kDB7FbIlf24GH5jZRuH8vf7la+UnhZXj7JHd09Zi25QMfv/+i4v9w5WVJe+H1AybZnT+/f4NuXt79//7918gtQtNNXS6Gy02lZXGHrYM32KjxySuu1p1HnW2bdou4txY2FKsYhtHGjBWattKvKm7Idf2Il3JuW/uRYqDe8ufxv9STnj61fAHdn7vgCVDYaz8MzRm9z2KhfGb9NgmJ8qOZ+W8ZLH5m24d/vyrVqGg00rwpn0WtMmOBOpc1a5YcFLfCgJaeGJhRKiftOWAUEcJ0NmLdaqucFbekI+/hpbdY1Whe3Vl4zDIpC1zRTvwtWAu6BM+tiWDmHzQzBHag2qlQi+G1BmfQ+RmGminpLNT+ixx1Bpr9KArEUoUhdTHUo+/UTPwRFTXHDww6HX9wKd7s6+d6g1N2xNZx5u4Y7SNy2UvlYolti3vSP1JT68zIVj282OLRhrtIjEOA8OBwoS4NUZotNKh9+l0bHZLHR9fmEkO+8iU2UFlnExv9rC6/kWEwZ2A0XNH/GVvRSx0TzH/xsS5AeqNXx85F1bgZwsTxK+HPGvQGHqOF9MJ+kZLrz30UmNGbDQ4E+kr78MstOzOe1fmNUqY8pQ3suqo9+ciqzoOg28F1IUm5hRj+ug5vZTuumJvzXEPXnuMzZb8V9jF87fVM+9AHkiV+tmk9dG+9hjk9i89iqqG2uEuvvjoQ8En3Ctc8NZj3xZ8W/fgW8bBvk0jWMViZEkz/dZv/lbm8H0IW2sY9zYfdB0vZ68eS+fPU3o9vVzYXvdbbB8ejzsUK1CpO9SMu5asaNqun8/WJ41rzX7ZWqo+fSd1ZOkli/O2DDt08Z7HHtbwqq692ay17pvHXwNYSr0gvJ+llmVdLgoQX5YEiC+LAyQ8asLL193PhHpJdM5nGT0vh+/RVLXqKrY6Q5tBRs+yP7UGd+xDpGc2ykboJZXGNuvYHrRgb8V4+6T+o3BRHnsSYtdDalJ91cSHOBdsSzkDaOjkuA5oJq2Xi92knoE1KbwSbuKgbwd31Kt8LlxlBKo7rtYg/+bFGlHeca1OYV6zVCPM+67UKdIrF2oEe991OgV75TL1VFb39D/Lv9n9AeR9DTrBOW/P/wYAAP//RQpfgQ==" } diff --git a/metricbeat/module/postgresql/replication/_meta/fields.yml b/metricbeat/module/postgresql/replication/_meta/fields.yml index 8266d6ee0bdf..103e2a357ba3 100644 --- a/metricbeat/module/postgresql/replication/_meta/fields.yml +++ b/metricbeat/module/postgresql/replication/_meta/fields.yml @@ -2,8 +2,8 @@ type: group description: > One row per replication, showing database replication statistics. Collected by querying - pg_stat_database - release: ga + pg_stat_replication. + release: beta fields: - name: write_lag type: long @@ -12,72 +12,79 @@ - name: state type: keyword description: > - WAL sender state. + Current WAL sender state. - name: sync_priority type: long description: > - Priority of replica server being chosen as synchronous replica. + Priority of this standby server for being chosen as the synchronous standby. - name: backend_start type: long description: > - Start time when replica connected to primary. + Time when this process was started, i.e., when the client connected to this WAL sender. + - name: backend_xmin + type: long + description: > + This standby's xmin horizon reported by hot_standby_feedback. - name: replay_lag type: long description: > Elapsed time during committed WALs from primary to the replica. - Fully committed in replica node + Fully committed to replica node. - name: flush_lag type: long description: > - Elapsed time during committed WALs from primary to the replica. - WAL's has already been flushed but not yet applied. + The elapsed time during committed WALs from primary to the replica. + WALs have already been flushed but have not yet been applied. - name: pid type: long description: > - Process id of walsender process + Process ID of a WAL sender process. - name: client.address type: long description: > - Address of replica/streaming replication + IP address of the client connected to this WAL sender. + If this field is null, it indicates that the client is + connected via a Unix socket on the server machine. - name: client.port type: long description: > - Port of replica/streaming replication + TCP port number that the client is using for communication + with this WAL sender, or -1 if a Unix socket is used. - name: client.hostname type: long description: > - Hostname of replica/streaming replication + Host name of the connected client, as reported by a reverse + DNS lookup of client_addr. This field will only be non-null + for IP connections, and only when log_hostname is enabled. - name: replay_lsn type: long description: > - Last transaction flush on disk at replica. + Last transaction log position replayed into the database on this standby server. - name: user.name - type: long + type: keyword description: > - Name of user which is used for Streaming replication + Name of the user logged into this WAL sender process. - name: user.id type: long description: > - ID of user which is used for Streaming replication + OID of the user logged into this WAL sender process. - name: sync_state - type: long + type: keyword description: > - Sync State of replica. + Synchronous state of this standby server. - name: flush_lsn type: long description: > - Last transaction flush on disk at replica. + Last transaction log position flushed to disk by this standby server. - name: application_name - type: long + type: keyword description: > - Number of queries canceled due to conflicts with recovery in this - database. + Name of the application that is connected to this WAL sender. - name: write_lsn type: long description: > - Last transaction written on disk at replica. + Last transaction log position written to disk by this standby server. - name: sent_lsn type: long description: > - Last transaction location sent to replica - + Last transaction log position sent on this connection. diff --git a/metricbeat/module/postgresql/replication/data.go b/metricbeat/module/postgresql/replication/data.go index 30b52691a886..1e490c31094e 100644 --- a/metricbeat/module/postgresql/replication/data.go +++ b/metricbeat/module/postgresql/replication/data.go @@ -24,7 +24,7 @@ import ( c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" ) -// Based on https://www.postgresql.org/docs/9.2/static/monitoring-stats.html#PG-STAT-DATABASE-VIEW +// Based on https://www.postgresql.org/docs/9.5/monitoring-stats.html#PG-STAT-REPLICATION-VIEW var schema = s.Schema{ "pid": c.Int("pid"), "user": s.Object{ @@ -33,12 +33,12 @@ var schema = s.Schema{ }, "application_name": c.Str("application_name"), "client": s.Object{ - "address": c.Str("client_addr"), - "hostname": c.Str("client_hostname"), + "address": c.Str("client_addr", s.Optional), + "hostname": c.Str("client_hostname", s.Optional), "port": c.Int("client_port", s.Optional), }, "backend_start": c.Time(time.RFC3339Nano, "backend_start"), - "backend_xmin": c.Time(time.RFC3339Nano, "backend_xmin", s.Optional), + "backend_xmin": c.Int("backend_xmin", s.Optional), "state": c.Str("state"), "sent_lsn": c.Str("sent_lsn"), "write_lsn": c.Str("write_lsn"), @@ -47,7 +47,6 @@ var schema = s.Schema{ "write_lag": c.Str("write_lag"), "flush_lag": c.Str("flush_lag"), "replay_lag": c.Str("replay_lag"), - "sync_priority": c.Int("sync_priority"), + "sync_priority": c.Int("sync_priority", s.Optional), "sync_state": c.Str("sync_state"), - "reply_time": c.Str("reply_time"), } diff --git a/metricbeat/module/postgresql/replication/replication.go b/metricbeat/module/postgresql/replication/replication.go index 48d7f52771a9..9674ae6ec25a 100644 --- a/metricbeat/module/postgresql/replication/replication.go +++ b/metricbeat/module/postgresql/replication/replication.go @@ -24,7 +24,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/postgresql" - // Register postgresql replication/sql driver + // Register postgresql as the sql driver _ "github.com/lib/pq" ) @@ -37,7 +37,7 @@ func init() { ) } -// MetricSet type defines all fields of the MetricSet +// MetricSet type defines all fields of the MetricSet. type MetricSet struct { *postgresql.MetricSet } @@ -51,14 +51,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{MetricSet: ms}, nil } -// Fetch methods implements the data gathering and data conversion to the right +// Fetch method implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { ctx := context.Background() results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_replication") if err != nil { - return fmt.Errorf("error in QueryStats %w\n", err) + return fmt.Errorf("error in QueryStats %w", err) } for _, result := range results { From c098a05754793418d59c8ffc9816dad9139de316 Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Wed, 31 May 2023 21:41:18 +0400 Subject: [PATCH 03/12] impove docs --- metricbeat/docs/fields.asciidoc | 2 +- metricbeat/module/postgresql/fields.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 84ed093f1fec..efeed5dbb011 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -56324,7 +56324,7 @@ type: long *`postgresql.replication.state`*:: + -- -Current WAL sender state +Current WAL sender state. type: keyword diff --git a/metricbeat/module/postgresql/fields.go b/metricbeat/module/postgresql/fields.go index 86f9f22211a6..630e3c950e31 100644 --- a/metricbeat/module/postgresql/fields.go +++ b/metricbeat/module/postgresql/fields.go @@ -32,5 +32,5 @@ func init() { // AssetPostgresql returns asset data. // This is the base64 encoded zlib format compressed contents of module/postgresql. func AssetPostgresql() string { - return "eJzUW02P4zbSvs+vKOQyMy88xrvXPiwQJFnsAJOkg5lFjkaZKllEU6RCUu1Wfv2iSOrDkuz2h9zJ+tRti1UPq8jiU8XSJ3ii5gEq4/zOkvtDvQPw0it6gO8e45dff/vy3TuAjJywsvLS6Af45zsAgJ/JWykcCKMUCU8Z5NaU0I8DR/aZrFu/A3CFsX4jjM7l7gFyVI7eAVhShI4eYIfvAHJJKnMPQfgn0FjSCBp/fFPx89bUVfpmBhp/BjjKiHSdfhvqGepC4eWz9E33w5y2Exr586smyIyoS9IeKrLJBlBZI8i5FRtiL/UOpM6NLZFlsBmQ7ecN+IJA1NaS9gdyW2xgcvAF+oHAWhSADpxHT4A6a8fDHzXZZg0/dP7ZNgcyw++MpdptePSmVbIePHboovYzNuHQjBl63KKjtZHZwQOtOZXRu9EPJywarPr5xzhx6qSDL6SDLYon0hlIXoZax2l6sz4NjP+dRfZEzd7YMepXwP2CJS2ArlrMWo9xaUBrtB7JvObakV3fw1csGJTZ7SgDqcPqPgvLjH8u8MEVWrGqlBRhM25uUz6QFPfpyPdngBFKkvZrzDJLzl0G5fMjpHEtoCjtSgyFcf5ye/zbOB/kdBg65VHuiuOVpcrYGJUAwRKfFAQ//vIVlDFPdcWD4+MbntJJnCxpoeX77YdHYHGg63JLNjpxYEjpoHYcNHNjQZiyrHXr7730RbDtRGiy9QqMhU//AJkDwn+0fAFnxBMloXTEF2nwhmd04VyaqvdBOhRat/P56ORWUbCUA7QEWHvzjKKuS1BYa1GQXQ2/3Bv7RHY10aPMTgpU7NJu8fcC5n5NkqBCi0qR6r5geHzc6nE4Athb6fmR5IjOpqIg8VQZqcOvzqP1dbWCPSpLguQzf7tnwqEzsuGA3KOKwtYTJT+9eNJOGu2gxAYs7aTzZBM+F32MWSZ5Gqi6EB+MeNp/AdnsKs3QX+pZWRLsC9JxLycyAPvIA3hbrUCuab1qH5oNBBOx/FwkLPNT8Ra1Y5Zg9BtM5323aAd6h3OcBxlozd3gdTtJNZGUUVR4aHtjeZNHSiYdaDOGkhgdDRyk0PmprPk5BskbUaDezTOZWycZoTOOACtqOgJmj9LLSZyNOLbGKEJ9IRRbE9tvQqI6yyeVYDQgKCOeTpjpMt0/pCVnnolDUzLEmEf10fMZVR3DZ8+GJ0IB/i/5+wG+FTScE72QqMNcMBH22dEyU9OxrRX4KELQtO83eVni+GwfigKph5tqIlmyXQcPrGBb+2MrmT+9ay6Y0AgFfMBtoAQfGY90/f5xspQKLXOXNG4WxAFgehFUeTC6OwKDOE7Mwvz4m6FygXwEz8pFDWStmTkueCY5Ol+hLyCvdStKqZOO5iGfDsbMi86kw62ibGyPjjvxJrEontrUTZLj39txcZ6vkb3gpQt3KL348aZ476Bk5senbp99fh6EwRQvw6CQQU7kcnbsRlG2N1wrUgPvTOMLzq9ZuFuB9P3gKUHpQ2vgcxzXothTMW1Dz+M0/FXD/I7SQxgXGbCMS+9UEHsNwDXEr4i0jl0UwXCI2BdSFGM4ExAdbdlFjnRLNeSrRy+dl8IBbk3tO+WR4iVK1533ritzDKsWkW6P3drWLFqY01hzeeWiZ5Ju7URBWa0mEeHavOKXmE6YHDrJQ31xXRb4TLAl0lCRzY0tjy3PIVJLf9Tk/B2QdpIXQuplSW4d/LUux1lShJsrgxduuW/GowIsTa1jTGIik0C6iNFVIXWLQZ9DJ0dpzi87cBOpaU3y0tsXZAlyqVKaxBPwzJMMB9qnFQsupVLSkTA6O5IGTA3hGi3+l+3A+AtrtPwzlhUuMMa2znOybj0wyuKrN+no3JXVlmcx9MNpbDPEdXlU22Y+KJ6BbZPXSi0OMKzNI4HaeVNVlAFCAMDmdAI1bCmwJ5DT9VNgNtgwBkrUTTeNk3NMh9T914W0JPg8DoWok0xpBG2T8xa4kwc6KMGE3rSEBaR3YPYagvLANeGDNrZEpcYsDo74sUCdqeBl4ygwhD7za7Vmhrlk1DVfi/l40kaolBF4j2OpdWCn4Xjy5zaWHC1aB2D62BIpF5OKRHH2HB4D1QxKp5SqvR24hVL9qgms2Ye7pVZef6vUfvNpL7MhtsNboO7mZ5ZRzaC8nEr9dXc/ofjy/4HwugItZewNU1txrD73F98GrYDKyjeXAA47YWPyTRJ5h7MzCR5kK9MbhO4+7bVaoVsLU5aTw2GBSDnQ0eW6A6sfENWI4Wi4OMBrjVJsg78WMaPgDYvH6ltbZcQTZwC4fJRlNpcUACuYoD0JqbiHt8O5OMQVIm4eTjZUDLJpyW08I0CgKDg8zpW60Yd7J+YnGApboImpLtoGPoSZGq1YoFB1Rg4K2ReO+uaCieBDzSyWB5iKLIYihmucp/K9C0Q6/Ref/njSojz74OljKcMVd118nsWcgAW3B0hElky8bfpgMF4Bq7lC3BnkfzChk8ngTTNiyW81I2v2vAt9bfU90nCz5y0YpbfHd6j0nbUjA7icvCjuhS0JvxKa1I7sXcoXjK2VfiW4usruwmEDtiT8SmgZKbobtCT8cmjC6FxJcYd8vsUhUAviczGriblIpzFezloS5pls0+KdSHyFtlBZGYu2WYdCx/KHWCs/FVKEpVfXAHw/yfNhIggtgTC1DvePlnZoOcULfSD7IhYZDofwsTeR2sL5QOvdmg9OG+/ZOGd0hdS7j6twhX6oIFxfmt2GFWzm7AbgyB8vdvdG3zZ+MaOPC2LhJBgU78YWnLrg+Nphl1zhgle8GJhIcsE97JwRZuEQXJ4ttpIhIx/ThPPixd8nS5+k64OGlaUy9oHIadJ+0CFzXe4+kDB7FbIlf24GH5jZRuH8vf7la+UnhZXj7JHd09Zi25QMfv/+i4v9w5WVJe+H1AybZnT+/f4NuXt79//7918gtQtNNXS6Gy02lZXGHrYM32KjxySuu1p1HnW2bdou4txY2FKsYhtHGjBWattKvKm7Idf2Il3JuW/uRYqDe8ufxv9STnj61fAHdn7vgCVDYaz8MzRm9z2KhfGb9NgmJ8qOZ+W8ZLH5m24d/vyrVqGg00rwpn0WtMmOBOpc1a5YcFLfCgJaeGJhRKiftOWAUEcJ0NmLdaqucFbekI+/hpbdY1Whe3Vl4zDIpC1zRTvwtWAu6BM+tiWDmHzQzBHag2qlQi+G1BmfQ+RmGminpLNT+ixx1Bpr9KArEUoUhdTHUo+/UTPwRFTXHDww6HX9wKd7s6+d6g1N2xNZx5u4Y7SNy2UvlYolti3vSP1JT68zIVj282OLRhrtIjEOA8OBwoS4NUZotNKh9+l0bHZLHR9fmEkO+8iU2UFlnExv9rC6/kWEwZ2A0XNH/GVvRSx0TzH/xsS5AeqNXx85F1bgZwsTxK+HPGvQGHqOF9MJ+kZLrz30UmNGbDQ4E+kr78MstOzOe1fmNUqY8pQ3suqo9+ciqzoOg28F1IUm5hRj+ug5vZTuumJvzXEPXnuMzZb8V9jF87fVM+9AHkiV+tmk9dG+9hjk9i89iqqG2uEuvvjoQ8En3Ctc8NZj3xZ8W/fgW8bBvk0jWMViZEkz/dZv/lbm8H0IW2sY9zYfdB0vZ68eS+fPU3o9vVzYXvdbbB8ejzsUK1CpO9SMu5asaNqun8/WJ41rzX7ZWqo+fSd1ZOkli/O2DDt08Z7HHtbwqq692ay17pvHXwNYSr0gvJ+llmVdLgoQX5YEiC+LAyQ8asLL193PhHpJdM5nGT0vh+/RVLXqKrY6Q5tBRs+yP7UGd+xDpGc2ykboJZXGNuvYHrRgb8V4+6T+o3BRHnsSYtdDalJ91cSHOBdsSzkDaOjkuA5oJq2Xi92knoE1KbwSbuKgbwd31Kt8LlxlBKo7rtYg/+bFGlHeca1OYV6zVCPM+67UKdIrF2oEe991OgV75TL1VFb39D/Lv9n9AeR9DTrBOW/P/wYAAP//RQpfgQ==" + return "eJzUW02P4zbSvs+vKOQyMy88xrvXPiwQJFnsAJOkg5lFjkaZKllEU6RCUu1Wfv2iSOrDkuz2h9zJ+tRti8WHVcXiU8XSJ3ii5gEq4/zOkvtDvQPw0it6gO8e45dff/vy3TuAjJywsvLS6Af45zsAgJ/JWykcCKMUCU8Z5NaU0I8DR/aZrFu/A3CFsX4jjM7l7gFyVI7eAVhShI4eYIfvAHJJKnMPQfgn0FjSCBp/fFPx89bUVfpmBhp/BjjKiHSdfhvOM5wLhZfP0jfdD3OznZiRP79qgsyIuiTtoSKbdACVNYKcW7Ei9lLvQOrc2BJZBqsBWX/egC8IRG0taX8gt8UGJgdfoB8IrEUB6MB59ASos3Y8/FGTbdbwQ2efbXMgM/zOWKrdhkdv2knWg8cOTdR+xiocqjFDj1t0tDYyO3igVacyejf64YRGg1Y//xgXTp108IV0sEXxRDoDyW6odVymN+vTwPjfWWRP1OyNHaN+BdwvWNIC6KrFtPUYXQNapfVI5meuHdn1PWzFgkGZ3Y4ykDp491lYZuxzgQ2umBWrSkkRNuPmtskHkuI+Hdn+DDBCSdJ+jVlmybnLoHx+hDSuBRSlXYmhMM5fro9/G+eDnA5DN3mUu+J4ZakyNkYlQLDEJwXBj798BWXMU13x4Pj4hpd0EidLWsh9v/3wCCwOdF1uyUYjDhQpHdSOg2ZuLAhTlrVu7b2Xvgi6nQhNul6BsfDpHyBzQPiPli/gjHiiJJSO2CIN3vCKLlxLU/U2SIdCa3Y+H53cKgqacoCWAGtvnlHUdQkKay0Ksqvhl3tjn8iuJvMos5MCFZu0c/5ewNyvSRJUaFEpUt0XDI+PWz0ORwB7Kz0/kgzR6VQUJJ4qI3X41Xm0vq5WsEdlSZB85m/3TDh0RjYckHtUUdh6MslPL560k0Y7KLEBSzvpPNmEz0UbY5ZJXgaqLsQHJZ62X0A266UZ+kstK0uCfUE67uVEBmAfeQBvqxXINa1X7UOzgWAilp+LhGV+Kd6idswSjH6D5bzvnHYw73CN8yADrbkbvG4nqSaSMooTHureWN7kkZJJB9qMoSRGRwMDKXR+Kmt+jUHyRhSod/NM5tZFRuiMI8CKMx0Bs0fp5STORhxbYxShvhCKrYn1NyFRnebTlGA0ICgjnk6o6bK5f0guZ56JQ1NSxJhH9dHzGVUdw2fPhidCAf4v2fsBvhU0XBO9kKjDWjAR9tnRMlPTsa0W+ChC0LTvN3lZ4vhsH4oCqYebaiJZsl4HD6xgW/tjnsyf3jQXLGiEAj7gNlCCj4xHun7/OFlKhZa5Sxo3C+IAML0IqjwY3R2BQRwnZmF9/M1wcoF8BM/KRQ1krZk5LnglOTpfoS8gr3UrSqmThuYhnw7GzIvOpMOtomysj4478SaxKJ7a1E2S49/bcXGdr5G9YKULdyi9+PGmeO+gZObHp26ffX4ehMEUL8OgkEFO5HJ27EZRtldcK1ID70zjC86vWbhbgfT94ClB6UNr4HMc16LYUzFtQ8/jNPxVxfyO0kMYFxmwjK53Koi9BuAa4ldEWscmimA4ROwLKYoxnAmIjrbsIke6pRry1aOXzkvhALem9t3kkeIlSted964rcwyrFpFuj83a1ixamNNYc3nlomeSbu1EQVmtJhHh2rzil5hOmBw6ycP5ol8W+EywJdJQkc2NLY+55xCppT9qcv4OSDvJCyH1siS3DvZal+MsKcLNlcELt9w341EBlqbWMSYxkUkgXcToqpC6xaDPoZOjNOeXHbiJ1OST7Hr7gixBLlVKk3gBnnmS4UD7tGLBpVRKOhJGZ0fSgKkiXKPF/7IeGH9hjZZ/xrLCBcrY1nlO1q0HSlnce9Mcnbmy2vIqhnY4jW2GuC6PatvMB8UzsG3yWqnFAQbfPBKonTdVRRkgBACsTidQw5YCewI59Z8Cs8GGMVCibrplnFxjOqTu7xfSkuDzOBSiTjKlEbRNzlvgThbooAQVetMSFpDegdlrCJMHrgkftLElKjVmcXDEjgXqTAUrG0eBIfSZXztrZphLxrnmazEfT+oIlTIC73EstQbsZjie/LmNJUeL1gGYPrZEysWkIlGcPYfHQDXDpFNK1d4O3EKpftUE1uzD3VIrr79Var/5tJfZENvhLVB38zPLqGZQXk6l/rq7n1B8+f9AeF2BljK2hqmtOFaf+4tvg1ZAZeWbSwCHnbAx+SaJvMPZmQQPspXpDUJ3n/ZardCthSnLyeGwQKQczNHlugOtHxDViOFouDjAa41SrIO/FjGj4A2Lx+pbW2XEE2cAuHyUZTaXJgCeYIL2JKTiHtYO5+IQV4i4eTjZUDHIpiW38YwAgaLg8DhX6kYf7p2Yn2AobIEmprpoG/gQVmq0YoFC1Rk5KGRfOOqbCyaCD2dmsTzAVGQxFDFc4zyV710g0um/+PTHkxrl1QdLH0sZrrjr4vMs5gQsuD1AIrKk4m3TB4OxB6zmCnFnkP/Bgk4mgzetiCW/1Yqs2fMu9LXV90jDzZ63YJTeHt+h0nfWjgzgcvKiuBe2JPxKaFI7sncpXzC2VvqV4OoquwuHDdiS8CuhZaTobtCS8MuhCaNzJcUd8vkWh0AtiM/FrCbmIt2M8XLWkjDPZJsW70TiK7SFyspYtM06FDqWP8Ra+amQIiy96gPw/STPh4kgtATC1DrcP1raoeUUL/SB7ItYZDgcwsfeRGoL5wOtd2s+OG28Z+Oc0RVS7z6uwhX64QTh+tLsNjzBZk5vAI788WJ3r/Rt4xdT+rggFk6CQfFurMGpCY77DpvkChO8YsXARJIJ7qHnjDALh+DybLGVDBn5mCacFy/+Pln6JF0fNKwslbEPRE6T9oMOmety94GE2auQLflzM/jAzDYK5+/1L/eVnxRWjrNHNk9bi21TMvj9+y8u9g9XVpa8H1IzbFrR+ff7N+Tu7d3/799/gdQuFGY4Mnmjxaay0tjDnuFblPSYxHV3q86jzrZN20acGwtbimVs40gDxlJtW4o3dTfk2makK0n3zc1IcXCv+tP4X8oJUb8a/kDP7x2wZCiMlX+Gzuy+SbEwfpMe2+RE2fG0nH0Wm7/p3uHPv2oVKjqtBG/aZ0Gb7Ii/56p2xYKL+lYQ0MILCyNCAaWtB4RCSoDOVqxTeYXT8oZ8/DX07B4rC92rLRuHUSZtmSv6ga8Fc0Gj8LEtGcTkg26O0B9UKxWaMaTO+CAiN9NBO2Wd3aTPEke9sUYP2hKhRFFIfSz3+Bt1A09Edd3BA4Ve1xB8ujn72qXe0LU9kXW8iztG2+gue6lUrLFteUfqT3p6nwlBs58fWzTSaBeZcRgYDhRmxK0yQqeVDs1Pp2OzW+r4+MJUcthIpswOKuNkerWHp+vfRBhcChg9d8Rf9lrEQhcV869MnBug3vj9kXNhBX62MEP8esizBp2h51gxnaBv5HrtoZc6M2KnwZlIX3khZiG3O+9lmdcoYUpU3kiro+afi7TqOAy+FVAXuphTjOmj5/RWumuLvTXJPXjvMXZb8l9hF89fV8+8BHkgVepnk/yjfe8xyO3fehRVDbXDXXzz0YeKT7hYuOC1x74v+Lb2wbeMg32fRtCKxciSZhqu3/y1zOELEbbWMG5uPmg7Xk5fPZbOnqfm9fRyYX/db7F/eDzuUKxApe5QNO56sqJqu4Y+W59UrjX7ZYup+vSl1BHXSxrnbRl26OJNjz2s4V1de7VZa913j78GsJR6QXg/Sy3LulwUIL4sCRBfFgdIeFSFl/vdz4R6SXTOZxk9L4fv0VS16kq2OkObQUbPsj+1BpfsQ6RndspG6CWVxjbr2B+0YHPFePukBqRwUx6bEmLbQ+pSfVXFhzgX7Es5A2ho5bgOaCatl4tdpZ6BNU14JdzEQd8O7qhZ+Vy4yghUd/TWIP9mZ40o7+irU5jXuGqEeV9PnSK90lEj2Pv66RTslW7qqazuaX+Wf7P5A8j7KnSCc16f/w0AAP//9q5frw==" } From 657144572ebf5e79d00ad7c00636ad2739831788 Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Fri, 2 Jun 2023 01:01:52 +0400 Subject: [PATCH 04/12] add Uint parse --- libbeat/common/schema/mapstrstr/mapstrstr.go | 21 +++++++++++++++++++ .../module/postgresql/replication/data.go | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/libbeat/common/schema/mapstrstr/mapstrstr.go b/libbeat/common/schema/mapstrstr/mapstrstr.go index 8bde7d6f221c..a3eb306bc683 100644 --- a/libbeat/common/schema/mapstrstr/mapstrstr.go +++ b/libbeat/common/schema/mapstrstr/mapstrstr.go @@ -129,6 +129,27 @@ func Int(key string, opts ...schema.SchemaOption) schema.Conv { return schema.SetOptions(schema.Conv{Key: key, Func: toInt}, opts) } +// toInt converts value to uint. In case of error, returns 0 +func toUint(key string, data map[string]interface{}) (interface{}, error) { + str, err := getString(key, data) + if err != nil { + return false, err + } + + value, err := strconv.ParseUint(str, 10, 32) + if err != nil { + msg := fmt.Sprintf("error converting param to uint: `%s`", str) + return 0, schema.NewWrongFormatError(key, msg) + } + + return value, nil +} + +// Uint creates a Conv object for parsing integers +func Uint(key string, opts ...schema.SchemaOption) schema.Conv { + return schema.SetOptions(schema.Conv{Key: key, Func: toUint}, opts) +} + // toStr converts value to str. In case of error, returns "" func toStr(key string, data map[string]interface{}) (interface{}, error) { return getString(key, data) diff --git a/metricbeat/module/postgresql/replication/data.go b/metricbeat/module/postgresql/replication/data.go index 1e490c31094e..ef11886f3e9f 100644 --- a/metricbeat/module/postgresql/replication/data.go +++ b/metricbeat/module/postgresql/replication/data.go @@ -38,7 +38,7 @@ var schema = s.Schema{ "port": c.Int("client_port", s.Optional), }, "backend_start": c.Time(time.RFC3339Nano, "backend_start"), - "backend_xmin": c.Int("backend_xmin", s.Optional), + "backend_xmin": c.Uint("backend_xmin", s.Optional), "state": c.Str("state"), "sent_lsn": c.Str("sent_lsn"), "write_lsn": c.Str("write_lsn"), From 55d70a1c4ea0225e4ab9297591761e1684f34e7b Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Tue, 6 Jun 2023 13:55:39 +0400 Subject: [PATCH 05/12] Change Uint size, fix comment --- libbeat/common/schema/mapstrstr/mapstrstr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/common/schema/mapstrstr/mapstrstr.go b/libbeat/common/schema/mapstrstr/mapstrstr.go index a3eb306bc683..b81f6b4ea841 100644 --- a/libbeat/common/schema/mapstrstr/mapstrstr.go +++ b/libbeat/common/schema/mapstrstr/mapstrstr.go @@ -129,14 +129,14 @@ func Int(key string, opts ...schema.SchemaOption) schema.Conv { return schema.SetOptions(schema.Conv{Key: key, Func: toInt}, opts) } -// toInt converts value to uint. In case of error, returns 0 +// toUint converts value to uint. In case of error, returns 0 func toUint(key string, data map[string]interface{}) (interface{}, error) { str, err := getString(key, data) if err != nil { return false, err } - value, err := strconv.ParseUint(str, 10, 32) + value, err := strconv.ParseUint(str, 10, 64) if err != nil { msg := fmt.Sprintf("error converting param to uint: `%s`", str) return 0, schema.NewWrongFormatError(key, msg) From 1d7027d3789c6b88238dc8ef4d40683046232b87 Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Tue, 6 Jun 2023 17:44:00 +0400 Subject: [PATCH 06/12] Change PG query --- metricbeat/module/postgresql/replication/_meta/data.json | 6 +++--- .../module/postgresql/replication/_meta/data_shared.json | 6 +++--- metricbeat/module/postgresql/replication/data.go | 6 +++--- metricbeat/module/postgresql/replication/replication.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/metricbeat/module/postgresql/replication/_meta/data.json b/metricbeat/module/postgresql/replication/_meta/data.json index 53927019adb3..c1d29d778cac 100644 --- a/metricbeat/module/postgresql/replication/_meta/data.json +++ b/metricbeat/module/postgresql/replication/_meta/data.json @@ -11,12 +11,12 @@ }, "postgresql": { "replication": { - "write_lag": "", + "write_lag": "0", "state": "streaming", "sync_priority": 0, "backend_start": "2023-05-23T17:01:53.697Z", - "replay_lag": "", - "flush_lag": "", + "replay_lag": "0", + "flush_lag": "0", "pid": 2516780, "client": { "address": "192.168.128.2", diff --git a/metricbeat/module/postgresql/replication/_meta/data_shared.json b/metricbeat/module/postgresql/replication/_meta/data_shared.json index ef2902438b0f..a5273171c017 100644 --- a/metricbeat/module/postgresql/replication/_meta/data_shared.json +++ b/metricbeat/module/postgresql/replication/_meta/data_shared.json @@ -11,12 +11,12 @@ }, "postgresql": { "replication": { - "write_lag": "", + "write_lag": "0", "state": "", "sync_priority": 0, "backend_start": "", - "replay_lag": "", - "flush_lag": "", + "replay_lag": "0", + "flush_lag": "0", "pid": 0, "client": { "address": "", diff --git a/metricbeat/module/postgresql/replication/data.go b/metricbeat/module/postgresql/replication/data.go index ef11886f3e9f..1c2ccb345cad 100644 --- a/metricbeat/module/postgresql/replication/data.go +++ b/metricbeat/module/postgresql/replication/data.go @@ -44,9 +44,9 @@ var schema = s.Schema{ "write_lsn": c.Str("write_lsn"), "flush_lsn": c.Str("flush_lsn"), "replay_lsn": c.Str("replay_lsn"), - "write_lag": c.Str("write_lag"), - "flush_lag": c.Str("flush_lag"), - "replay_lag": c.Str("replay_lag"), + "write_lag": c.Uint("write_lag", s.Optional), + "flush_lag": c.Uint("flush_lag", s.Optional), + "replay_lag": c.Uint("replay_lag", s.Optional), "sync_priority": c.Int("sync_priority", s.Optional), "sync_state": c.Str("sync_state"), } diff --git a/metricbeat/module/postgresql/replication/replication.go b/metricbeat/module/postgresql/replication/replication.go index 9674ae6ec25a..db421d9268c5 100644 --- a/metricbeat/module/postgresql/replication/replication.go +++ b/metricbeat/module/postgresql/replication/replication.go @@ -56,7 +56,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { ctx := context.Background() - results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_replication") + results, err := m.QueryStats(ctx, "SELECT EXTRACT(EPOCH FROM write_lag)::INT AS write_lag, EXTRACT(EPOCH FROM flush_lag)::INT AS flush_lag, EXTRACT(EPOCH FROM replay_lag)::INT AS replay_lag, pid, usesysid, usename, application_name, client_addr, client_hostname, client_port, backend_start, backend_xmin, state, sent_lsn, write_lsn, replay_lsn, sync_priority, sync_state FROM pg_stat_replication;") if err != nil { return fmt.Errorf("error in QueryStats %w", err) } From 50e66dab0312e45f3afc799b91beb672666a156e Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Mon, 10 Jul 2023 18:18:10 +0400 Subject: [PATCH 07/12] update changelog, fix nitpicks --- CHANGELOG.next.asciidoc | 1 + metricbeat/module/postgresql/replication/replication.go | 2 +- .../postgresql/replication/replication_integration_test.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0ecab1149134..f48f3dfcdfb7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -221,6 +221,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558] - Fix EC2 host.cpu.usage {pull}35717[35717] - Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075] +- Add the opportunity to get PostgreSQL replication metrics {pull}35562[35562] *Osquerybeat* diff --git a/metricbeat/module/postgresql/replication/replication.go b/metricbeat/module/postgresql/replication/replication.go index db421d9268c5..6c9365debbb2 100644 --- a/metricbeat/module/postgresql/replication/replication.go +++ b/metricbeat/module/postgresql/replication/replication.go @@ -58,7 +58,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { ctx := context.Background() results, err := m.QueryStats(ctx, "SELECT EXTRACT(EPOCH FROM write_lag)::INT AS write_lag, EXTRACT(EPOCH FROM flush_lag)::INT AS flush_lag, EXTRACT(EPOCH FROM replay_lag)::INT AS replay_lag, pid, usesysid, usename, application_name, client_addr, client_hostname, client_port, backend_start, backend_xmin, state, sent_lsn, write_lsn, replay_lsn, sync_priority, sync_state FROM pg_stat_replication;") if err != nil { - return fmt.Errorf("error in QueryStats %w", err) + return fmt.Errorf("error in QueryStats: %w", err) } for _, result := range results { diff --git a/metricbeat/module/postgresql/replication/replication_integration_test.go b/metricbeat/module/postgresql/replication/replication_integration_test.go index 5c93f313d6c2..3949f7d5effb 100644 --- a/metricbeat/module/postgresql/replication/replication_integration_test.go +++ b/metricbeat/module/postgresql/replication/replication_integration_test.go @@ -38,7 +38,7 @@ func TestFetch(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host())) events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + t.Fatalf("Expected 0 error, had %d: %v", len(errs), errs) } assert.NotEmpty(t, events) event := events[0].MetricSetFields From f3143d8063bab72dec266e6058222a32b11d2225 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 7 Jul 2023 23:30:20 +0200 Subject: [PATCH 08/12] Fix recovering from invalid output configuration under Elastic-Agent (#36016) * Fix recovering from invalid output configuration under Elastic-Agent This commit fixes two problems in the `ManagerV2`: - 1. If the first output configuration was invalid, it would never be "saved", so if a new output configuration was received, the ManagerV2 would try to reload it even with `stopOnOutputReload` enabled. 2. When `stopOnOutputReload` was set the output reload was skipped, but the ManagerV2 would still proceed with reloading inputs. This created a race condition where some inputs/runners would receive their stop signal before even start, effectively locking the shutdown process. * Add and refactor tests This commit add tests to ensure Beats can recover from an invalid output configuration when running under Elastic-Agent. It also refactors some of the code used to mock the Elastic-Agent and aggregates some shared code into a single package. * PR improvements * Add changelog * PR improvements --- CHANGELOG.next.asciidoc | 1 + libbeat/tests/integration/framework.go | 89 +++++++-- libbeat/tests/integration/mockserver.go | 137 ++++++++++++++ .../tests/integration/managerV2_test.go | 171 ++++++++++++++++-- .../libbeat/management/input_reload_test.go | 3 +- x-pack/libbeat/management/managerV2.go | 46 +++-- x-pack/libbeat/management/managerV2_test.go | 66 +------ 7 files changed, 413 insertions(+), 100 deletions(-) create mode 100644 libbeat/tests/integration/mockserver.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f48f3dfcdfb7..57905b0baa89 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -73,6 +73,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Do not print context cancelled error message when running under agent {pull}36006[36006] +- Fix recovering from invalid output configuration when running under Elastic-Agent {pull}36016[36016] *Auditbeat* diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 7514a9a5bcb6..91f9bfeeab7a 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -30,21 +30,27 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common/atomic" ) type BeatProc struct { - Binary string - Args []string - Cmd *exec.Cmd - t *testing.T - tempDir string - configFile string - beatName string - logFileOffset int64 + Args []string + Binary string + Cmd *exec.Cmd + RestartOnBeatOnExit bool + beatName string + cmdMutex sync.Mutex + configFile string + fullPath string + logFileOffset int64 + t *testing.T + tempDir string } // NewBeat createa a new Beat process from the system tests binary. @@ -72,27 +78,80 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) BeatProc { } // Start starts the Beat process -// args are extra arguments to be passed to the Beat +// args are extra arguments to be passed to the Beat. func (b *BeatProc) Start(args ...string) { t := b.t - b.Args = append(b.Args, args...) + fullPath, err := filepath.Abs(b.Binary) if err != nil { t.Fatalf("could not get full path from %q, err: %s", b.Binary, err) } - b.Cmd = exec.Command(fullPath, b.Args...) - if err := b.Cmd.Start(); err != nil { - t.Fatalf("could not start process: %s", err) + b.fullPath = fullPath + b.Args = append(b.Args, args...) + + done := atomic.MakeBool(false) + wg := sync.WaitGroup{} + if b.RestartOnBeatOnExit { + wg.Add(1) + go func() { + defer wg.Done() + for !done.Load() { + b.startBeat() + b.waitBeatToExit() + } + }() + } else { + b.startBeat() } + t.Cleanup(func() { - pid := b.Cmd.Process.Pid + b.cmdMutex.Lock() + // 1. Kill the Beat if err := b.Cmd.Process.Signal(os.Interrupt); err != nil { - t.Fatalf("could not stop process with PID: %d, err: %s", pid, err) + t.Fatalf("could not stop process with PID: %d, err: %s", + b.Cmd.Process.Pid, err) + } + + // Make sure the goroutine restarting the Beat has exited + if b.RestartOnBeatOnExit { + // 2. Set the done flag so the goroutine loop can exit + done.Store(true) + // 3. Release the mutex, keeping it locked until now + // ensures a new process won't start + b.cmdMutex.Unlock() + // 4. Wait for the goroutine to finish, this helps ensuring + // no other Beat process was started + wg.Wait() } }) } +// startBeat starts the Beat process. This method +// does not block nor waits the Beat to finish. +func (b *BeatProc) startBeat() { + b.cmdMutex.Lock() + + b.Cmd = exec.Command(b.fullPath, b.Args...) + if err := b.Cmd.Start(); err != nil { + b.t.Fatalf("could not start %q process: %s", b.beatName, err) + } + + b.cmdMutex.Unlock() +} + +// waitBeatToExit blocks until the Beat exits, it returns +// the process' exit code. +// `startBeat` must be called before this method. +func (b *BeatProc) waitBeatToExit() int { + if err := b.Cmd.Wait(); err != nil { + b.t.Fatalf("error waiting for %q to finish: %s. Exit code: %d", + b.beatName, err, b.Cmd.ProcessState.ExitCode()) + } + + return b.Cmd.ProcessState.ExitCode() +} + // LogContains looks for `s` as a substring of every log line, // it will open the log file on every call, read it until EOF, // then close it. diff --git a/libbeat/tests/integration/mockserver.go b/libbeat/tests/integration/mockserver.go new file mode 100644 index 000000000000..0a396cb78399 --- /dev/null +++ b/libbeat/tests/integration/mockserver.go @@ -0,0 +1,137 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// unitKey is used to identify a unique unit in a map +// the `ID` of a unit in itself is not unique without its type, only `Type` + `ID` is unique +type unitKey struct { + Type client.UnitType + ID string +} + +// NewMockServer creates a GRPC server to mock the Elastic-Agent. +// On the first check in call it will send the first element of `unit` +// as the expected unit, on successive calls, if the Beat has reached +// that state, it will move on to sending the next state. +// It will also validate the features. +// +// if `observedCallback` is not nil, it will be called on every +// check in receiving the `proto.CheckinObserved` sent by the +// Beat and index from `units` that was last sent to the Beat. +// +// If `delay` is not zero, when the Beat state matches the last +// sent units, the server will wait for `delay` before sending the +// the next state. This will block the check in call from the Beat. +func NewMockServer( + units [][]*proto.UnitExpected, + featuresIdxs []uint64, + features []*proto.Features, + observedCallback func(*proto.CheckinObserved, int), + delay time.Duration, +) *mock.StubServerV2 { + i := 0 + agentInfo := &proto.CheckinAgentInfo{ + Id: "elastic-agent-id", + Version: version.GetDefaultVersion(), + Snapshot: true, + } + return &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if observedCallback != nil { + observedCallback(observed, i) + } + matches := doesStateMatch(observed, units[i], featuresIdxs[i]) + if !matches { + // send same set of units and features + return &proto.CheckinExpected{ + AgentInfo: agentInfo, + Units: units[i], + Features: features[i], + FeaturesIdx: featuresIdxs[i], + } + } + // delay sending next expected based on delay + if delay > 0 { + <-time.After(delay) + } + // send next set of units and features + i += 1 + if i >= len(units) { + // stay on last index + i = len(units) - 1 + } + return &proto.CheckinExpected{ + AgentInfo: agentInfo, + Units: units[i], + Features: features[i], + FeaturesIdx: featuresIdxs[i], + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + // actions not tested here + return nil + }, + ActionsChan: make(chan *mock.PerformAction, 100), + } +} + +func doesStateMatch( + observed *proto.CheckinObserved, + expectedUnits []*proto.UnitExpected, + expectedFeaturesIdx uint64, +) bool { + if len(observed.Units) != len(expectedUnits) { + return false + } + expectedMap := make(map[unitKey]*proto.UnitExpected) + for _, exp := range expectedUnits { + expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp + } + for _, unit := range observed.Units { + exp, ok := expectedMap[unitKey{client.UnitType(unit.Type), unit.Id}] + if !ok { + return false + } + if unit.State != exp.State || unit.ConfigStateIdx != exp.ConfigStateIdx { + return false + } + } + + return observed.FeaturesIdx == expectedFeaturesIdx +} + +func RequireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + if err != nil { + require.NoError(t, err, "could not convert map[string]interface{} into structpb") + } + return str +} diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 48f2069220b0..5e3111a0e09e 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -14,7 +14,6 @@ import ( "time" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/beats/v7/x-pack/libbeat/management" @@ -62,7 +61,7 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { Id: "default", Type: "elasticsearch", Name: "elasticsearch", - Source: requireNewStruct(t, + Source: integration.RequireNewStruct(t, map[string]interface{}{ "type": "elasticsearch", "hosts": []interface{}{"http://localhost:9200"}, @@ -87,7 +86,7 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { Streams: []*proto.Stream{ { Id: "log-input-1", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "enabled": true, "type": "log", "paths": []interface{}{logFilePath}, @@ -108,7 +107,7 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { Id: "default", Type: "elasticsearch", Name: "elasticsearch", - Source: requireNewStruct(t, + Source: integration.RequireNewStruct(t, map[string]interface{}{ "type": "elasticsearch", "hosts": []interface{}{"http://localhost:9200"}, @@ -133,7 +132,7 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { Streams: []*proto.Stream{ { Id: "log-input-2", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "enabled": true, "type": "log", "paths": []interface{}{logFilePath}, @@ -278,7 +277,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { Id: "default", Type: "logstash", Name: "logstash", - Source: requireNewStruct(t, + Source: integration.RequireNewStruct(t, map[string]interface{}{ "type": "logstash", "invalid": "configuration", @@ -300,7 +299,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { Streams: []*proto.Stream{ { Id: "log-input", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "enabled": true, "type": "log", "paths": "/tmp/foo", @@ -344,12 +343,160 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { t.Cleanup(server.Stop) } -func requireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { - str, err := structpb.NewStruct(v) - if err != nil { - require.NoError(t, err) +func TestRecoverFromInvalidOutputConfiguration(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Having the log file enables the inputs to start, while it is not + // strictly necessary for testing output issues, it allows for the + // input to start which creates a more realistic test case and + // can help uncover other issues in the startup/shutdown process. + logFilePath := filepath.Join(filebeat.TempDir(), "flog.log") + generateLogFile(t, logFilePath) + + logLevel := proto.UnitLogLevel_INFO + filestreamInputHealthy := proto.UnitExpected{ + Id: "input-unit-healthy", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-input", + Type: "filestream", + Name: "filestream-input-healty", + Streams: []*proto.Stream{ + { + Id: "filestream-input-id", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "filestream-stream-input-id", + "enabled": true, + "type": "filestream", + "paths": logFilePath, + }), + }, + }, + }, + } + + filestreamInputStarting := proto.UnitExpected{ + Id: "input-unit-2", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_STARTING, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-input", + Type: "filestream", + Name: "filestream-input-starting", + Streams: []*proto.Stream{ + { + Id: "filestream-input-id", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "filestream-stream-input-id", + "enabled": true, + "type": "filestream", + "paths": logFilePath, + }), + }, + }, + }, + } + + healthyOutput := proto.UnitExpected{ + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + }), + }, + } + + brokenOutput := proto.UnitExpected{ + Id: "output-unit-borken", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_FAILED, + LogLevel: logLevel, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "logstash", + Name: "logstash", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "logstash", + "invalid": "configuration", + }), + }, + } + + // Those are the 'states' Filebeat will go through. + // After each state is reached the mockServer will + // send the next. + protoUnits := [][]*proto.UnitExpected{ + { + &healthyOutput, + &filestreamInputHealthy, + }, + { + &brokenOutput, + &filestreamInputStarting, + }, + { + &healthyOutput, + &filestreamInputHealthy, + }, + {}, // An empty one makes the Beat exit + } + + // We use `success` to signal the test has ended successfully + // if `success` is never closed, then the test will fail with a timeout. + success := make(chan struct{}) + // The test is successful when we reach the last element of `protoUnits` + onObserved := func(observed *proto.CheckinObserved, protoUnitsIdx int) { + if protoUnitsIdx == len(protoUnits)-1 { + close(success) + } + } + + server := integration.NewMockServer( + protoUnits, + []uint64{0, 0, 0, 0}, + []*proto.Features{nil, nil, nil, nil}, + onObserved, + 100*time.Millisecond, + ) + require.NoError(t, server.Start(), "could not start the mock Elastic-Agent server") + defer server.Stop() + + filebeat.RestartOnBeatOnExit = true + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + ) + + select { + case <-success: + case <-time.After(60 * time.Second): + t.Fatal("Output did not recover from a invalid configuration after 60s of waiting") } - return str } // generateLogFile generates a log file by appending the current diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go index 972c42908f11..61ed315dc7a4 100644 --- a/x-pack/libbeat/management/input_reload_test.go +++ b/x-pack/libbeat/management/input_reload_test.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) @@ -51,7 +52,7 @@ func TestInputReload(t *testing.T) { configIdx = currentIdx } - srv := mockSrv([][]*proto.UnitExpected{ + srv := integration.NewMockServer([][]*proto.UnitExpected{ { { Id: "output-unit", diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 07641a2f8c05..105ad98bca4f 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -34,6 +34,8 @@ import ( "github.com/elastic/beats/v7/libbeat/version" ) +var errStoppingOnOutputChange = errors.New("stopping Beat on output change") + // diagnosticHandler is a wrapper type that's a bit of a hack, the compiler won't let us send the raw unit struct, // since there's a type disagreement with the `client.DiagnosticHook` argument, and due to licensing issues we can't import the agent client types into the reloader type diagnosticHandler struct { @@ -593,7 +595,15 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { publisher.SetUnderAgentTrace(trace) // reload the output configuration - if err := cm.reloadOutput(outputUnit); err != nil { + restartBeat, err := cm.reloadOutput(outputUnit) + // The manager has already signalled the Beat to stop, + // there is nothing else to do. Trying to reload inputs + // will only lead to invalid state updates and possible + // race conditions. + if restartBeat { + return + } + if err != nil { // Output creation failed, there is no point in going any further // because there is no output read the events. // @@ -654,34 +664,40 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) { } } -func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { +// reloadOutput reload outputs, it returns a bool and an error. +// The bool, if set, indicates that the output reload requires an restart, +// in that case the error is always `nil`. +// +// In any other case, the bool is always false and the error will be non nil +// if any error has occurred. +func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) { // Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go output := cm.registry.GetReloadableOutput() if output == nil { - return fmt.Errorf("failed to find beat reloadable type 'output'") + return false, fmt.Errorf("failed to find beat reloadable type 'output'") } if unit == nil { // output is being stopped err := output.Reload(nil) if err != nil { - return fmt.Errorf("failed to reload output: %w", err) + return false, fmt.Errorf("failed to reload output: %w", err) } cm.lastOutputCfg = nil cm.lastBeatOutputCfg = nil - return nil + return false, nil } expected := unit.Expected() if expected.Config == nil { // should not happen; hard stop - return fmt.Errorf("output unit has no config") + return false, fmt.Errorf("output unit has no config") } if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, expected.Config) { // configuration for the output did not change; do nothing cm.logger.Debug("Skipped reloading output; configuration didn't change") - return nil + return false, nil } cm.logger.Debugf("Got output unit config '%s'", expected.Config.GetId()) @@ -690,21 +706,25 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { cm.logger.Info("beat is restarting because output changed") _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) cm.Stop() - return nil + return true, nil } reloadConfig, err := groupByOutputs(expected.Config) if err != nil { - return fmt.Errorf("failed to generate config for output: %w", err) + return false, fmt.Errorf("failed to generate config for output: %w", err) } + // Set those variables regardless of the outcome of output.Reload + // this ensures that if we're on a failed output state and a new + // output configuration is sent, the Beat will gracefully exit + cm.lastOutputCfg = expected.Config + cm.lastBeatOutputCfg = reloadConfig + err = output.Reload(reloadConfig) if err != nil { - return fmt.Errorf("failed to reload output: %w", err) + return false, fmt.Errorf("failed to reload output: %w", err) } - cm.lastOutputCfg = expected.Config - cm.lastBeatOutputCfg = reloadConfig - return nil + return false, nil } func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index aa84bb9f8f0a..65e240ec21c6 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/features" + "github.com/elastic/beats/v7/libbeat/tests/integration" ) func TestManagerV2(t *testing.T) { @@ -74,7 +75,7 @@ func TestManagerV2(t *testing.T) { t.Logf("FQDN feature flag set to %v", fqdnEnabled) } - srv := mockSrv([][]*proto.UnitExpected{ + srv := integration.NewMockServer([][]*proto.UnitExpected{ { { Id: "output-unit", @@ -99,7 +100,7 @@ func TestManagerV2(t *testing.T) { Streams: []*proto.Stream{ { Id: "system/metrics-system.filesystem-default-system-1", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "metricsets": []interface{}{"filesystem"}, "period": "1m", }), @@ -120,14 +121,14 @@ func TestManagerV2(t *testing.T) { Streams: []*proto.Stream{ { Id: "system/metrics-system.filesystem-default-system-2", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "metricsets": []interface{}{"filesystem"}, "period": "1m", }), }, { Id: "system/metrics-system.filesystem-default-system-3", - Source: requireNewStruct(t, map[string]interface{}{ + Source: integration.RequireNewStruct(t, map[string]interface{}{ "metricsets": []interface{}{"filesystem"}, "period": "1m", }), @@ -253,7 +254,7 @@ func TestOutputError(t *testing.T) { Id: "default", Type: "mock", Name: "mock", - Source: requireNewStruct(t, + Source: integration.RequireNewStruct(t, map[string]interface{}{ "Is": "this", "required?": "Yes!", @@ -280,7 +281,7 @@ func TestOutputError(t *testing.T) { Id: "default", Type: "mock", Name: "mock", - Source: requireNewStruct(t, + Source: integration.RequireNewStruct(t, map[string]interface{}{ "this": "is", "required": true, @@ -348,59 +349,6 @@ func TestOutputError(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached") } -func mockSrv( - units [][]*proto.UnitExpected, - featuresIdxs []uint64, - features []*proto.Features, - observedCallback func(*proto.CheckinObserved, int), - delay time.Duration, -) *mock.StubServerV2 { - i := 0 - agentInfo := &proto.CheckinAgentInfo{ - Id: "elastic-agent-id", - Version: "8.6.0", - Snapshot: true, - } - return &mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - if observedCallback != nil { - observedCallback(observed, i) - } - matches := DoesStateMatch(observed, units[i], featuresIdxs[i]) - if !matches { - // send same set of units and features - return &proto.CheckinExpected{ - AgentInfo: agentInfo, - Units: units[i], - Features: features[i], - FeaturesIdx: featuresIdxs[i], - } - } - // delay sending next expected based on delay - if delay > 0 { - <-time.After(delay) - } - // send next set of units and features - i += 1 - if i >= len(units) { - // stay on last index - i = len(units) - 1 - } - return &proto.CheckinExpected{ - AgentInfo: agentInfo, - Units: units[i], - Features: features[i], - FeaturesIdx: featuresIdxs[i], - } - }, - ActionImpl: func(response *proto.ActionResponse) error { - // actions not tested here - return nil - }, - ActionsChan: make(chan *mock.PerformAction, 100), - } -} - type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta From 483814553b3a5e8daa2092328e8edda928225d57 Mon Sep 17 00:00:00 2001 From: apmmachine <58790750+apmmachine@users.noreply.github.com> Date: Sun, 9 Jul 2023 22:02:32 -0400 Subject: [PATCH 09/12] chore: Updated to content "" in file "testing/environments/snapshot.yml" (#36003) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Made with ❤️️ by updatecli Co-authored-by: apmmachine --- testing/environments/snapshot.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index b4aa1ed02504..156084789c4a 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -3,7 +3,7 @@ version: '2.3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0-18649b44-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0-0d099cfb-SNAPSHOT # When extend is used it merges healthcheck.tests, see: # https://github.com/docker/compose/issues/8962 # healthcheck: @@ -31,7 +31,7 @@ services: - "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles" logstash: - image: docker.elastic.co/logstash/logstash:8.10.0-18649b44-SNAPSHOT + image: docker.elastic.co/logstash/logstash:8.10.0-0d099cfb-SNAPSHOT healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600 @@ -44,7 +44,7 @@ services: - 5055:5055 kibana: - image: docker.elastic.co/kibana/kibana:8.10.0-18649b44-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.10.0-0d099cfb-SNAPSHOT environment: - "ELASTICSEARCH_USERNAME=kibana_system_user" - "ELASTICSEARCH_PASSWORD=testing" From 300b3e3904e5f29afbc5f9a045cfac1aa598efc3 Mon Sep 17 00:00:00 2001 From: zxcSora <68225324+zxcSora@users.noreply.github.com> Date: Tue, 11 Jul 2023 10:58:15 +0400 Subject: [PATCH 10/12] Update CHANGELOG.next.asciidoc Co-authored-by: subham sarkar --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 981f853981ac..72c10b5c5884 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -223,7 +223,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558] - Fix EC2 host.cpu.usage {pull}35717[35717] - Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075] -- Add the opportunity to get PostgreSQL replication metrics {pull}35562[35562] +- Add support for PostgreSQL replication metrics {pull}35562[35562] *Osquerybeat* From 3adfd57a2f36231d9c5150f04366567e3f45fc20 Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Tue, 19 Sep 2023 18:33:38 +0400 Subject: [PATCH 11/12] fix panic --- .../postgresql/replication/replication_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/postgresql/replication/replication_integration_test.go b/metricbeat/module/postgresql/replication/replication_integration_test.go index 3949f7d5effb..7f381880a93e 100644 --- a/metricbeat/module/postgresql/replication/replication_integration_test.go +++ b/metricbeat/module/postgresql/replication/replication_integration_test.go @@ -40,7 +40,7 @@ func TestFetch(t *testing.T) { if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d: %v", len(errs), errs) } - assert.NotEmpty(t, events) + require.NotEmpty(t, events) event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) From 659bb76c918de79293e7f0134b87db8ff2fcb84f Mon Sep 17 00:00:00 2001 From: Derevyashkin Aleksandr Date: Tue, 19 Sep 2023 19:04:55 +0400 Subject: [PATCH 12/12] fix copy-paste misstake --- .../postgresql/replication/replication_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/postgresql/replication/replication_integration_test.go b/metricbeat/module/postgresql/replication/replication_integration_test.go index 7f381880a93e..aafcde204e45 100644 --- a/metricbeat/module/postgresql/replication/replication_integration_test.go +++ b/metricbeat/module/postgresql/replication/replication_integration_test.go @@ -90,7 +90,7 @@ func TestData(t *testing.T) { func getConfig(host string) map[string]interface{} { return map[string]interface{}{ "module": "postgresql", - "metricsets": []string{"database"}, + "metricsets": []string{"replication"}, "hosts": []string{postgresql.GetDSN(host)}, "username": postgresql.GetEnvUsername(), "password": postgresql.GetEnvPassword(),