From 8f696d2d28f1e2568d776959fc7888d4bfc95eaf Mon Sep 17 00:00:00 2001 From: Joakim Saario Date: Mon, 21 Oct 2024 17:05:38 +0200 Subject: [PATCH 1/2] Add support for service discovery on Kubernetes --- Dockerfile | 2 +- docker-compose-tests.yml | 4 +- go.mod | 53 ++++- go.sum | 141 ++++++++++-- proxy/pkg/common/types.go | 1 + proxy/pkg/config/config.go | 10 +- proxy/pkg/kubernetes/topology.go | 318 ++++++++++++++++++++++++++++ proxy/pkg/zdmproxy/clienthandler.go | 136 +++++++++++- proxy/pkg/zdmproxy/controlconn.go | 87 ++++++-- proxy/pkg/zdmproxy/proxy.go | 33 ++- 10 files changed, 721 insertions(+), 64 deletions(-) create mode 100644 proxy/pkg/kubernetes/topology.go diff --git a/Dockerfile b/Dockerfile index 875366fc..b4006a81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ # $ docker build . -f ./Dockerfile -t zdm-proxy ########## -FROM golang:1.19-bullseye AS builder +FROM golang:1.23-bookworm AS builder ENV GO111MODULE=on \ CGO_ENABLED=0 \ diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index bc49ec16..e80f3f19 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -27,7 +27,7 @@ services: ipv4_address: 192.168.100.102 proxy: - image: golang:1.21.11-bookworm + image: golang:1.23.2-bookworm container_name: zdm_tests_proxy restart: unless-stopped tty: true @@ -52,4 +52,4 @@ services: - /source/compose/nosqlbench-entrypoint.sh networks: proxy: - ipv4_address: 192.168.100.104 \ No newline at end of file + ipv4_address: 192.168.100.104 diff --git a/go.mod b/go.mod index 28211350..14651c92 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,14 @@ module github.com/datastax/zdm-proxy -go 1.19 +go 1.22.0 + +toolchain go1.23.2 require ( github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e - github.com/google/uuid v1.1.1 + github.com/google/uuid v1.6.0 github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/mcuadros/go-defaults v1.2.0 @@ -14,25 +16,56 @@ require ( github.com/prometheus/client_model v0.1.0 github.com/rs/zerolog v1.20.0 github.com/sirupsen/logrus v1.6.0 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.9.0 + gopkg.in/yaml.v3 v3.0.1 + k8s.io/apimachinery v0.31.1 + k8s.io/client-go v0.31.1 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/protobuf v1.3.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.19.6 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.22.4 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.3 // indirect - github.com/google/go-cmp v0.5.2 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect - github.com/kr/pretty v0.2.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.0.3 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/common v0.7.0 // indirect github.com/prometheus/procfs v0.0.8 // indirect - golang.org/x/sys v0.3.0 // indirect + github.com/x448/float16 v0.8.4 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/time v0.3.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/api v0.31.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index 387403a2..14e057c4 100644 --- a/go.sum +++ b/go.sum @@ -15,68 +15,114 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d h1:UnPtAA8Ux3GvHLazSSUydERFuoQRyxHrB8puzXyjXIE= github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d/go.mod h1:6FzirJfdffakAVqmHjwVfFkpru/gNbIazUOK5rIhndc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= +github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= +github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU= +github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e h1:SroDcndcOU9BVAduPf/PXihXoR2ZYTQYLXbupbqxAyQ= github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E= github.com/pierrec/lz4/v4 v4.0.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc= @@ -92,6 +138,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= @@ -99,43 +147,106 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= +golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU= +k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI= +k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U= +k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0= +k8s.io/client-go v0.31.1/go.mod h1:sKI8871MJN2OyeqRlmA4W4KM9KBdBUpDLu/43eGemCg= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/proxy/pkg/common/types.go b/proxy/pkg/common/types.go index 41f26f65..777b8a0d 100644 --- a/proxy/pkg/common/types.go +++ b/proxy/pkg/common/types.go @@ -10,6 +10,7 @@ import ( // - Assignment of C* hosts per proxy instance for request connections type TopologyConfig struct { VirtualizationEnabled bool // enabled if ZDM_PROXY_TOPOLOGY_ADDRESSES is not empty + KubernetesService string // comes from ZDM_PROXY_TOPOLOGY_KUBERNETES_SERVICE Addresses []net.IP // comes from ZDM_PROXY_TOPOLOGY_ADDRESSES Count int // comes from length of ZDM_PROXY_TOPOLOGY_ADDRESSES Index int // comes from ZDM_PROXY_TOPOLOGY_INDEX diff --git a/proxy/pkg/config/config.go b/proxy/pkg/config/config.go index cc62842b..cb0e8321 100644 --- a/proxy/pkg/config/config.go +++ b/proxy/pkg/config/config.go @@ -29,9 +29,10 @@ type Config struct { // Proxy Topology (also known as system.peers "virtualization") bucket - ProxyTopologyIndex int `default:"0" split_words:"true" yaml:"proxy_topology_index"` - ProxyTopologyAddresses string `split_words:"true" yaml:"proxy_topology_addresses"` - ProxyTopologyNumTokens int `default:"8" split_words:"true" yaml:"proxy_topology_num_tokens"` + ProxyTopologyIndex int `default:"0" split_words:"true" yaml:"proxy_topology_index"` + ProxyTopologyAddresses string `split_words:"true" yaml:"proxy_topology_addresses"` + ProxyTopologyNumTokens int `default:"8" split_words:"true" yaml:"proxy_topology_num_tokens"` + ProxyTopologyKubernetesService string `split_words:"true" yaml:"proxy_topology_kubernetes_service"` // Origin bucket @@ -238,7 +239,7 @@ func (c *Config) ParseTopologyConfig() (*common.TopologyConfig, error) { proxyInstanceCount := len(proxyAddressesTyped) proxyIndex := c.ProxyTopologyIndex - if proxyIndex < 0 || proxyIndex >= proxyInstanceCount { + if c.ProxyTopologyKubernetesService == "" && (proxyIndex < 0 || proxyIndex >= proxyInstanceCount) { return nil, fmt.Errorf("invalid ZDM_PROXY_TOPOLOGY_INDEX and ZDM_PROXY_TOPOLOGY_ADDRESSES values; "+ "proxy index (%d) must be less than length of addresses (%d) and non negative", proxyIndex, proxyInstanceCount) } @@ -253,6 +254,7 @@ func (c *Config) ParseTopologyConfig() (*common.TopologyConfig, error) { Index: proxyIndex, Count: proxyInstanceCount, NumTokens: c.ProxyTopologyNumTokens, + KubernetesService: c.ProxyTopologyKubernetesService, }, nil } diff --git a/proxy/pkg/kubernetes/topology.go b/proxy/pkg/kubernetes/topology.go new file mode 100644 index 00000000..3cd553c3 --- /dev/null +++ b/proxy/pkg/kubernetes/topology.go @@ -0,0 +1,318 @@ +package kubernetes + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "sync" + "time" + + log "github.com/sirupsen/logrus" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +type TopologyEventType uint8 + +const ( + AddedEvent TopologyEventType = iota + RemovedEvent + StatusChangedEvent +) + +type TopologyEvent struct { + Type TopologyEventType + Host net.IP + StatusUp bool +} + +type TopologyRegistry struct { + clientset *kubernetes.Clientset + + serviceNamespace string + serviceName string + + local net.IP + + nodes map[string]bool // Represents a set of node IPs and their status + nodesLock sync.RWMutex + + eventChannels map[chan<- TopologyEvent]struct{} + eventChannelsLock sync.RWMutex +} + +func NewTopologyRegistry(serviceName string) (*TopologyRegistry, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + currentNamespace, err := os.ReadFile( + "/var/run/secrets/kubernetes.io/serviceaccount/namespace", + ) + if err != nil { + return nil, err + } + + localIPs, err := net.LookupIP(os.Getenv("HOSTNAME")) + if err != nil { + return nil, err + } + if len(localIPs) == 0 { + return nil, errors.New("No local IP address found") + } + + return &TopologyRegistry{ + clientset: clientset, + serviceNamespace: string(currentNamespace), + serviceName: serviceName, + local: localIPs[0], + nodes: make(map[string]bool), + eventChannels: make(map[chan<- TopologyEvent]struct{}), + }, nil +} + +func (tr *TopologyRegistry) Start(ctx context.Context) { + go tr.runInformer(ctx) +} + +func (tr *TopologyRegistry) Peers() []net.IP { + tr.nodesLock.RLock() + defer tr.nodesLock.RUnlock() + + log.Debugf("[TopologyRegistry] Nodes: %v", tr.nodes) + + var result []net.IP + for host, statusUp := range tr.nodes { + if statusUp && host != tr.Local().String() { + result = append(result, net.ParseIP(host)) + } + } + + log.Debugf("[TopologyRegistry] Peers result: %v", result) + + return result +} + +func (tr *TopologyRegistry) Local() net.IP { + return tr.local +} + +func (tr *TopologyRegistry) Subscribe(eventChannel chan TopologyEvent) { + tr.eventChannelsLock.Lock() + defer tr.eventChannelsLock.Unlock() + + tr.eventChannels[eventChannel] = struct{}{} +} + +func (tr *TopologyRegistry) Unsubscribe(eventChannel chan TopologyEvent) { + tr.eventChannelsLock.Lock() + defer tr.eventChannelsLock.Unlock() + + delete(tr.eventChannels, eventChannel) +} + +func (tr *TopologyRegistry) broadcastEvents(events []TopologyEvent) { + log.Infof("[TopologyRegistry] Broadcasting topology events.") + log.Debugf("[TopologyRegistry] Broadcasting topology events: %v", events) + + tr.eventChannelsLock.RLock() + defer tr.eventChannelsLock.RUnlock() + + for _, event := range events { + for ch := range tr.eventChannels { + ch <- event + } + } +} + +func (tr *TopologyRegistry) runInformer(ctx context.Context) { + informerFactory := informers.NewFilteredSharedInformerFactory( + tr.clientset, + 5*time.Minute, + tr.serviceNamespace, + func(options *metav1.ListOptions) { + options.LabelSelector = fmt.Sprintf( + "kubernetes.io/service-name=%s", + tr.serviceName, + ) + }, + ) + + informer := informerFactory.Discovery().V1().EndpointSlices().Informer() + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if ok { + tr.handleNewEndpointSlice(endpointSlice) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldEndpointSlice, okOld := oldObj.(*discoveryv1.EndpointSlice) + newEndpointSlice, okNew := newObj.(*discoveryv1.EndpointSlice) + if okOld && okNew { + tr.handleModifiedEndpointSlice(oldEndpointSlice, newEndpointSlice) + } + }, + DeleteFunc: func(obj interface{}) { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if ok { + tr.handleRemovedEndpointSlice(endpointSlice) + } + }, + }, + ) + + log.Infof("[TopologyRegistry] Starting EndpointSlice informer.") + defer log.Infof("[TopologyRegistry] Informer stopped.") + + stopCh := make(chan struct{}) + defer close(stopCh) + + informer.Run(stopCh) + + <-ctx.Done() +} + +func (tr *TopologyRegistry) handleNewEndpointSlice( + endpointSlice *discoveryv1.EndpointSlice, +) { + var events []TopologyEvent + defer func() { + tr.broadcastEvents(events) + }() + + tr.nodesLock.Lock() + defer tr.nodesLock.Unlock() + + for _, endpoint := range endpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + statusUp := *endpoint.Conditions.Ready + tr.nodes[host] = statusUp + events = append(events, TopologyEvent{ + Type: AddedEvent, + Host: net.ParseIP(host), + StatusUp: statusUp, + }) + } +} + +func (tr *TopologyRegistry) handleModifiedEndpointSlice( + oldEndpointSlice, newEndpointSlice *discoveryv1.EndpointSlice, +) { + var events []TopologyEvent + defer func() { + tr.broadcastEvents(events) + }() + + oldMap := make(map[string]bool, len(oldEndpointSlice.Endpoints)) + for _, endpoint := range oldEndpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + statusUp := *endpoint.Conditions.Ready + oldMap[host] = statusUp + } + + newMap := make(map[string]bool, len(newEndpointSlice.Endpoints)) + for _, endpoint := range newEndpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + statusUp := *endpoint.Conditions.Ready + newMap[host] = statusUp + } + + tr.nodesLock.Lock() + defer tr.nodesLock.Unlock() + + for _, endpoint := range oldEndpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + if newStatusUp, ok := newMap[host]; ok { + oldStatusUp := *endpoint.Conditions.Ready + if oldStatusUp == newStatusUp { + continue + } + + tr.nodes[host] = newStatusUp + events = append(events, TopologyEvent{ + Type: StatusChangedEvent, + Host: net.ParseIP(host), + StatusUp: newStatusUp, + }) + } else { + delete(tr.nodes, host) + events = append(events, TopologyEvent{ + Type: RemovedEvent, + Host: net.ParseIP(host), + StatusUp: false, + }) + } + } + + for _, endpoint := range newEndpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + if statusUp, ok := oldMap[host]; !ok { + tr.nodes[host] = statusUp + events = append(events, TopologyEvent{ + Type: AddedEvent, + Host: net.ParseIP(host), + StatusUp: statusUp, + }) + } + } +} + +func (tr *TopologyRegistry) handleRemovedEndpointSlice( + endpointSlice *discoveryv1.EndpointSlice, +) { + var events []TopologyEvent + defer func() { + tr.broadcastEvents(events) + }() + + tr.nodesLock.Lock() + defer tr.nodesLock.Unlock() + + for _, endpoint := range endpointSlice.Endpoints { + if len(endpoint.Addresses) == 0 || endpoint.Addresses[0] == "" { + continue + } + + host := endpoint.Addresses[0] + delete(tr.nodes, host) + events = append(events, TopologyEvent{ + Type: RemovedEvent, + Host: net.ParseIP(host), + StatusUp: false, + }) + } +} diff --git a/proxy/pkg/zdmproxy/clienthandler.go b/proxy/pkg/zdmproxy/clienthandler.go index 066acf0a..28f9d13a 100644 --- a/proxy/pkg/zdmproxy/clienthandler.go +++ b/proxy/pkg/zdmproxy/clienthandler.go @@ -6,20 +6,22 @@ import ( "encoding/hex" "errors" "fmt" + "net" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + "github.com/datastax/go-cassandra-native-protocol/frame" "github.com/datastax/go-cassandra-native-protocol/message" "github.com/datastax/go-cassandra-native-protocol/primitive" "github.com/datastax/zdm-proxy/proxy/pkg/common" "github.com/datastax/zdm-proxy/proxy/pkg/config" + "github.com/datastax/zdm-proxy/proxy/pkg/kubernetes" "github.com/datastax/zdm-proxy/proxy/pkg/metrics" "github.com/google/uuid" log "github.com/sirupsen/logrus" - "net" - "sort" - "strings" - "sync" - "sync/atomic" - "time" ) /* @@ -98,6 +100,11 @@ type ClientHandler struct { conf *config.Config topologyConfig *common.TopologyConfig + topologyRegistry *kubernetes.TopologyRegistry + + clientSubscribedEventTypes map[kubernetes.TopologyEventType]struct{} + clientSubscribedEventTypesLock sync.RWMutex + localClientHandlerWg *sync.WaitGroup originHost *Host @@ -129,6 +136,7 @@ func NewClientHandler( targetControlConn *ControlConn, conf *config.Config, topologyConfig *common.TopologyConfig, + topologyRegistry *kubernetes.TopologyRegistry, targetUsername string, targetPassword string, originUsername string, @@ -306,6 +314,8 @@ func NewClientHandler( conf: conf, localClientHandlerWg: localClientHandlerWg, topologyConfig: topologyConfig, + topologyRegistry: topologyRegistry, + clientSubscribedEventTypes: make(map[kubernetes.TopologyEventType]struct{}), originHost: originHost, targetHost: targetHost, originObserver: originObserver, @@ -334,6 +344,7 @@ func (ch *ClientHandler) run(activeClients *int32) { } ch.requestLoop() ch.listenForEventMessages() + ch.listenForTopologyChanges() ch.responseLoop() addObserver(ch.originObserver, ch.originControlConn) @@ -567,6 +578,90 @@ func (ch *ClientHandler) listenForEventMessages() { }() } +// Listens for virtual topology changes using TopologyRegistry. +func (ch *ClientHandler) listenForTopologyChanges() { + // TODO: Should really be part of `listenForEventMessages`, but keeping + // them separate for now to avoid heavy refactoring of + // `listenForEventMessages`. + + if !ch.topologyConfig.VirtualizationEnabled || ch.topologyConfig.KubernetesService == "" { + return + } + + ch.localClientHandlerWg.Add(1) + go func() { + log.Debug("listenForTopologyChanges starting now.") + defer log.Debugf("Shutting down listenForTopologyChanges.") + + defer ch.localClientHandlerWg.Done() + + eventChannel := make(chan kubernetes.TopologyEvent) + ch.topologyRegistry.Subscribe(eventChannel) + defer ch.topologyRegistry.Unsubscribe(eventChannel) + + for { + var event kubernetes.TopologyEvent + select { + case <-ch.clientHandlerContext.Done(): + return + case event = <-eventChannel: + } + + ch.clientSubscribedEventTypesLock.RLock() + _, ok := ch.clientSubscribedEventTypes[event.Type] + ch.clientSubscribedEventTypesLock.RUnlock() + if !ok { + continue + } + + var eventMessage message.Message + switch event.Type { + case kubernetes.AddedEvent: + eventMessage = &message.TopologyChangeEvent{ + ChangeType: primitive.TopologyChangeTypeNewNode, + Address: &primitive.Inet{ + Addr: event.Host, + Port: int32(ch.conf.ProxyListenPort), + }, + } + case kubernetes.RemovedEvent: + eventMessage = &message.TopologyChangeEvent{ + ChangeType: primitive.TopologyChangeTypeRemovedNode, + Address: &primitive.Inet{ + Addr: event.Host, + Port: int32(ch.conf.ProxyListenPort), + }, + } + case kubernetes.StatusChangedEvent: + changeType := primitive.StatusChangeTypeDown + if event.StatusUp { + changeType = primitive.StatusChangeTypeUp + } + eventMessage = &message.StatusChangeEvent{ + ChangeType: changeType, + Address: &primitive.Inet{ + Addr: event.Host, + Port: int32(ch.conf.ProxyListenPort), + }, + } + default: + continue // Unknown event type. Ignore it. + } + + eventMessageStreamId := int16(-1) + eventFrame := frame.NewFrame( + ch.originControlConn.cqlConn.GetProtocolVersion(), + eventMessageStreamId, + eventMessage, + ) + + if rawFrame, err := defaultCodec.ConvertToRawFrame(eventFrame); err == nil { + ch.clientConnector.sendResponseToClient(rawFrame) + } + } + }() +} + // Infinite loop that blocks on receiving from the response channel // (which is written by both cluster connectors). func (ch *ClientHandler) responseLoop() { @@ -1346,6 +1441,34 @@ func (ch *ClientHandler) handleRequest(f *frame.RawFrame) { log.Warnf("error sending request with opcode %02x and streamid %d: %s", f.Header.OpCode, f.Header.StreamId, err.Error()) return } + + if f.Header.OpCode == primitive.OpCodeRegister { + if !ch.topologyConfig.VirtualizationEnabled || ch.topologyConfig.KubernetesService == "" { + return + } + + decodedFrame, err := defaultCodec.ConvertFromRawFrame(f) + if err != nil { + return + } + + message, ok := decodedFrame.Body.Message.(*message.Register) + if !ok { + return + } + + ch.clientSubscribedEventTypesLock.Lock() + for _, eventType := range message.EventTypes { + switch eventType { + case primitive.EventTypeTopologyChange: + ch.clientSubscribedEventTypes[kubernetes.AddedEvent] = struct{}{} + ch.clientSubscribedEventTypes[kubernetes.RemovedEvent] = struct{}{} + case primitive.EventTypeStatusChange: + ch.clientSubscribedEventTypes[kubernetes.StatusChangedEvent] = struct{}{} + } + } + ch.clientSubscribedEventTypesLock.Unlock() + } } // Forwards the request, parsing it and enqueuing it to the appropriate cluster connector(s)' write queue(s). @@ -1581,6 +1704,7 @@ func (ch *ClientHandler) handleInterceptedRequest( if err != nil { return nil, err } + log.Debugf("VirtualHosts: %v", virtualHosts) typeCodec := GetDefaultGenericTypeCodec() diff --git a/proxy/pkg/zdmproxy/controlconn.go b/proxy/pkg/zdmproxy/controlconn.go index 17fd8048..1fb29f1e 100644 --- a/proxy/pkg/zdmproxy/controlconn.go +++ b/proxy/pkg/zdmproxy/controlconn.go @@ -4,15 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/datastax/go-cassandra-native-protocol/frame" - "github.com/datastax/go-cassandra-native-protocol/message" - "github.com/datastax/go-cassandra-native-protocol/primitive" - "github.com/datastax/zdm-proxy/proxy/pkg/common" - "github.com/datastax/zdm-proxy/proxy/pkg/config" - "github.com/datastax/zdm-proxy/proxy/pkg/metrics" - "github.com/google/uuid" - "github.com/jpillora/backoff" - log "github.com/sirupsen/logrus" "math" "math/big" "math/rand" @@ -22,11 +13,23 @@ import ( "sync" "sync/atomic" "time" + + "github.com/datastax/go-cassandra-native-protocol/frame" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/datastax/go-cassandra-native-protocol/primitive" + "github.com/datastax/zdm-proxy/proxy/pkg/common" + "github.com/datastax/zdm-proxy/proxy/pkg/config" + "github.com/datastax/zdm-proxy/proxy/pkg/kubernetes" + "github.com/datastax/zdm-proxy/proxy/pkg/metrics" + "github.com/google/uuid" + "github.com/jpillora/backoff" + log "github.com/sirupsen/logrus" ) type ControlConn struct { conf *config.Config topologyConfig *common.TopologyConfig + topologyRegistry *kubernetes.TopologyRegistry cqlConn CqlConnection retryBackoffPolicy *backoff.Backoff heartbeatPeriod time.Duration @@ -62,15 +65,25 @@ const ProxyVirtualPartitioner = "org.apache.cassandra.dht.Murmur3Partitioner" const ccWriteTimeout = 5 * time.Second const ccReadTimeout = 10 * time.Second -func NewControlConn(ctx context.Context, defaultPort int, connConfig ConnectionConfig, - username string, password string, conf *config.Config, topologyConfig *common.TopologyConfig, proxyRand *rand.Rand, - metricsHandler *metrics.MetricHandler) *ControlConn { +func NewControlConn( + ctx context.Context, + defaultPort int, + connConfig ConnectionConfig, + username string, + password string, + conf *config.Config, + topologyConfig *common.TopologyConfig, + topologyRegistry *kubernetes.TopologyRegistry, + proxyRand *rand.Rand, + metricsHandler *metrics.MetricHandler, +) *ControlConn { authEnabled := &atomic.Value{} authEnabled.Store(true) return &ControlConn{ - conf: conf, - topologyConfig: topologyConfig, - cqlConn: nil, + conf: conf, + topologyConfig: topologyConfig, + topologyRegistry: topologyRegistry, + cqlConn: nil, retryBackoffPolicy: &backoff.Backoff{ Factor: conf.HeartbeatRetryBackoffFactor, Jitter: true, @@ -339,6 +352,28 @@ func (cc *ControlConn) openInternal(endpoints []Endpoint, ctx context.Context) ( } }) + if cc.topologyConfig.VirtualizationEnabled && cc.topologyConfig.KubernetesService != "" { + go func() { + eventChannel := make(chan kubernetes.TopologyEvent) + defer close(eventChannel) + + cc.topologyRegistry.Subscribe(eventChannel) + defer cc.topologyRegistry.Unsubscribe(eventChannel) + + for { + select { + case <-ctx.Done(): + return + case <-eventChannel: + select { + case cc.refreshHostsDebouncer <- newConn: + default: + } + } + } + }() + } + err = newConn.SubscribeToProtocolEvents(ctx, []primitive.EventType{primitive.EventTypeTopologyChange}) if err == nil { _, err = cc.RefreshHosts(newConn, ctx) @@ -502,12 +537,19 @@ func (cc *ControlConn) RefreshHosts(conn CqlConnection, ctx context.Context) ([] return orderedLocalHosts[i].Rack < orderedLocalHosts[j].Rack }) - assignedHosts := computeAssignedHosts(cc.topologyConfig.Index, cc.topologyConfig.Count, orderedLocalHosts) + topologyCount := cc.topologyConfig.Count + if cc.topologyConfig.VirtualizationEnabled && cc.topologyConfig.KubernetesService != "" { + nodes := cc.topologyRegistry.Peers() + nodes = append(nodes, cc.topologyRegistry.Local()) + topologyCount = len(nodes) + } + + assignedHosts := computeAssignedHosts(cc.topologyConfig.Index, topologyCount, orderedLocalHosts) shuffleHosts(cc.proxyRand, assignedHosts) var virtualHosts []*VirtualHost if cc.topologyConfig.VirtualizationEnabled { - virtualHosts, err = computeVirtualHosts(cc.topologyConfig, orderedLocalHosts) + virtualHosts, err = computeVirtualHosts(cc.topologyConfig, cc.topologyRegistry, orderedLocalHosts) if err != nil { return nil, err } @@ -740,8 +782,17 @@ func shuffleHosts(rnd *rand.Rand, hosts []*Host) { }) } -func computeVirtualHosts(topologyConfig *common.TopologyConfig, orderedHosts []*Host) ([]*VirtualHost, error) { +func computeVirtualHosts( + topologyConfig *common.TopologyConfig, + topologyRegistry *kubernetes.TopologyRegistry, + orderedHosts []*Host, +) ([]*VirtualHost, error) { proxyAddresses := topologyConfig.Addresses + if topologyConfig.KubernetesService != "" { + nodes := topologyRegistry.Peers() + nodes = append(nodes, topologyRegistry.Local()) + proxyAddresses = nodes + } numTokens := topologyConfig.NumTokens twoPow64 := new(big.Int).Exp(big.NewInt(2), big.NewInt(64), nil) twoPow63 := new(big.Int).Exp(big.NewInt(2), big.NewInt(63), nil) diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index f1ba5afc..85fb6ee1 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -5,26 +5,30 @@ import ( "crypto/tls" "errors" "fmt" + "math/rand" + "net" + "runtime" + "sync" + "sync/atomic" + "time" + "github.com/datastax/zdm-proxy/proxy/pkg/common" "github.com/datastax/zdm-proxy/proxy/pkg/config" + "github.com/datastax/zdm-proxy/proxy/pkg/kubernetes" "github.com/datastax/zdm-proxy/proxy/pkg/metrics" "github.com/datastax/zdm-proxy/proxy/pkg/metrics/noopmetrics" "github.com/datastax/zdm-proxy/proxy/pkg/metrics/prommetrics" "github.com/jpillora/backoff" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "math/rand" - "net" - "runtime" - "sync" - "sync/atomic" - "time" ) type ZdmProxy struct { Conf *config.Config TopologyConfig *common.TopologyConfig + topologyRegistry *kubernetes.TopologyRegistry + originConnectionConfig ConnectionConfig targetConnectionConfig ConnectionConfig @@ -183,9 +187,19 @@ func (p *ZdmProxy) initializeControlConnections(ctx context.Context) error { return fmt.Errorf("failed to parse topology config: %w", err) } + var topologyRegistry *kubernetes.TopologyRegistry + if topologyConfig.VirtualizationEnabled && topologyConfig.KubernetesService != "" { + topologyRegistry, err = kubernetes.NewTopologyRegistry(topologyConfig.KubernetesService) + if err != nil { + return fmt.Errorf("failed to create topology registry: %w", err) + } + topologyRegistry.Start(ctx) + } + log.Infof("Parsed Topology Config: %v", topologyConfig) p.lock.Lock() p.TopologyConfig = topologyConfig + p.topologyRegistry = topologyRegistry p.lock.Unlock() parsedOriginContactPoints, err := p.Conf.ParseOriginContactPoints() @@ -249,7 +263,8 @@ func (p *ZdmProxy) initializeControlConnections(ctx context.Context) error { originControlConn := NewControlConn( p.controlConnShutdownCtx, p.Conf.OriginPort, p.originConnectionConfig, - p.Conf.OriginUsername, p.Conf.OriginPassword, p.Conf, topologyConfig, p.proxyRand, p.metricHandler) + p.Conf.OriginUsername, p.Conf.OriginPassword, p.Conf, topologyConfig, p.topologyRegistry, + p.proxyRand, p.metricHandler) if err := originControlConn.Start(p.controlConnShutdownWg, ctx); err != nil { return fmt.Errorf("failed to initialize origin control connection: %w", err) @@ -261,7 +276,8 @@ func (p *ZdmProxy) initializeControlConnections(ctx context.Context) error { targetControlConn := NewControlConn( p.controlConnShutdownCtx, p.Conf.TargetPort, p.targetConnectionConfig, - p.Conf.TargetUsername, p.Conf.TargetPassword, p.Conf, topologyConfig, p.proxyRand, p.metricHandler) + p.Conf.TargetUsername, p.Conf.TargetPassword, p.Conf, topologyConfig, p.topologyRegistry, + p.proxyRand, p.metricHandler) if err := targetControlConn.Start(p.controlConnShutdownWg, ctx); err != nil { return fmt.Errorf("failed to initialize target control connection: %w", err) @@ -550,6 +566,7 @@ func (p *ZdmProxy) handleNewConnection(clientConn net.Conn) { p.targetControlConn, p.Conf, p.TopologyConfig, + p.topologyRegistry, p.Conf.TargetUsername, p.Conf.TargetPassword, p.Conf.OriginUsername, From 08109c93d4e7c1ee1e64856897f6918195c88622 Mon Sep 17 00:00:00 2001 From: Joakim Saario Date: Mon, 28 Oct 2024 16:59:58 +0100 Subject: [PATCH 2/2] Index need to match the entry for the local node in cc.virtualHosts --- proxy/pkg/zdmproxy/controlconn.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/proxy/pkg/zdmproxy/controlconn.go b/proxy/pkg/zdmproxy/controlconn.go index 1fb29f1e..d933619b 100644 --- a/proxy/pkg/zdmproxy/controlconn.go +++ b/proxy/pkg/zdmproxy/controlconn.go @@ -538,13 +538,15 @@ func (cc *ControlConn) RefreshHosts(conn CqlConnection, ctx context.Context) ([] }) topologyCount := cc.topologyConfig.Count + localIndex := cc.topologyConfig.Index if cc.topologyConfig.VirtualizationEnabled && cc.topologyConfig.KubernetesService != "" { nodes := cc.topologyRegistry.Peers() nodes = append(nodes, cc.topologyRegistry.Local()) topologyCount = len(nodes) + localIndex = topologyCount - 1 } - assignedHosts := computeAssignedHosts(cc.topologyConfig.Index, topologyCount, orderedLocalHosts) + assignedHosts := computeAssignedHosts(localIndex, topologyCount, orderedLocalHosts) shuffleHosts(cc.proxyRand, assignedHosts) var virtualHosts []*VirtualHost @@ -629,6 +631,11 @@ func (cc *ControlConn) GetVirtualHosts() ([]*VirtualHost, error) { } func (cc *ControlConn) GetLocalVirtualHostIndex() int { + if cc.topologyConfig.KubernetesService != "" { + // TODO: Ideally we shouldn't rely on indices at all, but this is the + // least intrusive way I could think of. + return len(cc.virtualHosts) - 1 // Local node is the last one. + } return cc.topologyConfig.Index }