From 4f7efa89f76888e7deaf5438049faa8901a0c961 Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Wed, 8 Dec 2021 15:11:17 +0100 Subject: [PATCH 1/8] feat: add redis transport --- .github/workflows/cd-chart.yml | 2 +- .github/workflows/cd.yml | 20 +- .gitignore | 1 + .goreleaser.yml | 82 ++------ caddy/go.sum | 10 + go.mod | 1 + go.sum | 23 +++ redis_transport.go | 332 +++++++++++++++++++++++++++++++++ subscribe.go | 4 +- subscriber.go | 9 +- 10 files changed, 405 insertions(+), 79 deletions(-) create mode 100644 redis_transport.go diff --git a/.github/workflows/cd-chart.yml b/.github/workflows/cd-chart.yml index 9030cc29..38c71750 100644 --- a/.github/workflows/cd-chart.yml +++ b/.github/workflows/cd-chart.yml @@ -3,7 +3,7 @@ name: Release Chart on: push: tags: - - 'v*' + - 'v*' jobs: release: diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index b020a7a4..06729e04 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -18,24 +18,16 @@ jobs: uses: actions/setup-go@v2 with: go-version: '1.17' - - name: Docker Hub Login - run: docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" - env: - DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} - DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} - - - name: Import GPG key - id: import_gpg - uses: crazy-max/ghaction-import-gpg@v3 + - name: Login to GitHub Container Registry + uses: docker/login-action@v1 with: - gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} - passphrase: ${{ secrets.PASSPHRASE }} - - - name: Run GoReleaser + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Run GoReleaser uses: goreleaser/goreleaser-action@v2 with: version: latest args: release --rm-dist env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - GPG_FINGERPRINT: ${{ steps.import_gpg.outputs.fingerprint }} diff --git a/.gitignore b/.gitignore index c7996283..abcfc573 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ dist /spec/mercure.html /spec/mercure.txt /spec/mercure.xml +.vscode diff --git a/.goreleaser.yml b/.goreleaser.yml index 61cec2ec..3fecc36d 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -35,20 +35,6 @@ builds: - 5 - 6 - 7 -# Legacy build -- id: legacy - dir: cmd/mercure - ldflags: - - -s -w -X github.com/dunglas/mercure/common.version={{ .Version }} -X github.com/dunglas/mercure/common.commit={{ .ShortCommit }} -X github.com/dunglas/mercure/common.buildDate={{ .Date }} - goos: - - linux - - darwin - - windows - goarch: - - 386 - - amd64 - - arm - - arm64 archives: - builds: - caddy @@ -67,33 +53,16 @@ archives: format_overrides: - goos: windows format: zip -- id: legacy - builds: - - legacy - name_template: "{{ .ProjectName }}-legacy_{{ .Version }}_{{ .Os }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}{{ if .Mips }}_{{ .Mips }}{{ end }}" - replacements: - darwin: Darwin - linux: Linux - windows: Windows - 386: i386 - amd64: x86_64 - files: - - COPYRIGHT - - LICENSE - - README.md - format_overrides: - - goos: windows - format: zip dockers: - ids: - caddy goos: linux goarch: amd64 image_templates: - - 'dunglas/mercure:{{ .Tag }}-amd64' - - 'dunglas/mercure:v{{ .Major }}-amd64' - - 'dunglas/mercure:v{{ .Major }}.{{ .Minor }}-amd64' - - 'dunglas/mercure:latest-amd64' + - 'ghcr.io/devmachine-fr/mercure:{{ .Tag }}-amd64' + - 'ghcr.io/devmachine-fr/mercure:v{{ .Major }}-amd64' + - 'ghcr.io/devmachine-fr/mercure:v{{ .Major }}.{{ .Minor }}-amd64' + - 'ghcr.io/devmachine-fr/mercure:latest-amd64' use: buildx build_flag_templates: - "--platform=linux/amd64" @@ -105,41 +74,30 @@ dockers: goos: linux goarch: arm64 image_templates: - - 'dunglas/mercure:{{ .Tag }}-arm64v8' - - 'dunglas/mercure:v{{ .Major }}-arm64v8' - - 'dunglas/mercure:v{{ .Major }}.{{ .Minor }}-arm64v8' - - 'dunglas/mercure:latest-arm64v8' + - 'ghcr.io/devmachine-fr/mercure:{{ .Tag }}-arm64v8' + - 'ghcr.io/devmachine-fr/mercure:v{{ .Major }}-arm64v8' + - 'ghcr.io/devmachine-fr/mercure:v{{ .Major }}.{{ .Minor }}-arm64v8' + - 'ghcr.io/devmachine-fr/mercure:latest-arm64v8' use: buildx build_flag_templates: - "--platform=linux/arm64/v8" extra_files: - Caddyfile - Caddyfile.dev -- ids: - - legacy - dockerfile: Dockerfile.legacy - image_templates: - - 'dunglas/mercure:legacy-{{ .Tag }}' - - 'dunglas/mercure:legacy-v{{ .Major }}' - - 'dunglas/mercure:legacy-v{{ .Major }}.{{ .Minor }}' - - 'dunglas/mercure:legacy-latest' docker_manifests: -- name_template: dunglas/mercure:{{ .Tag }} +- name_template: ghcr.io/devmachine-fr/mercure:{{ .Tag }} image_templates: - - dunglas/mercure:{{ .Tag }}-amd64 - - dunglas/mercure:{{ .Tag }}-arm64v8 -- name_template: dunglas/mercure:v{{ .Major }} + - ghcr.io/devmachine-fr/mercure:{{ .Tag }}-amd64 + - ghcr.io/devmachine-fr/mercure:{{ .Tag }}-arm64v8 +- name_template: ghcr.io/devmachine-fr/mercure:v{{ .Major }} image_templates: - - dunglas/mercure:v{{ .Major }}-amd64 - - dunglas/mercure:v{{ .Major }}-arm64v8 -- name_template: dunglas/mercure:v{{ .Major }}.{{ .Minor }} + - ghcr.io/devmachine-fr/mercure:v{{ .Major }}-amd64 + - ghcr.io/devmachine-fr/mercure:v{{ .Major }}-arm64v8 +- name_template: ghcr.io/devmachine-fr/mercure:v{{ .Major }}.{{ .Minor }} image_templates: - - dunglas/mercure:v{{ .Major }}.{{ .Minor }}-amd64 - - dunglas/mercure:v{{ .Major }}.{{ .Minor }}-arm64v8 -- name_template: dunglas/mercure:latest + - ghcr.io/devmachine-fr/mercure:v{{ .Major }}.{{ .Minor }}-amd64 + - ghcr.io/devmachine-fr/mercure:v{{ .Major }}.{{ .Minor }}-arm64v8 +- name_template: ghcr.io/devmachine-fr/mercure:latest image_templates: - - dunglas/mercure:latest-amd64 - - dunglas/mercure:latest-arm64v8 -signs: - - artifacts: checksum - args: ["--batch", "-u", "{{ .Env.GPG_FINGERPRINT }}", "--output", "${signature}", "--detach-sign", "${artifact}"] + - ghcr.io/devmachine-fr/mercure:latest-amd64 + - ghcr.io/devmachine-fr/mercure:latest-arm64v8 \ No newline at end of file diff --git a/caddy/go.sum b/caddy/go.sum index cfe09adc..939942c7 100644 --- a/caddy/go.sum +++ b/caddy/go.sum @@ -187,6 +187,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dlclark/regexp2 v1.1.6/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= @@ -235,6 +237,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= github.com/go-piv/piv-go v1.7.0/go.mod h1:ON2WvQncm7dIkCQ7kYJs+nc3V4jHGfrrJnSF8HKy7Gk= +github.com/go-redis/redis/v8 v8.11.1 h1:Aqf/1y2eVfE9zrySM++/efzwv3mkLH7n/T96//gbo94= +github.com/go-redis/redis/v8 v8.11.1/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= @@ -342,6 +346,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -587,12 +592,14 @@ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -957,6 +964,7 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -1054,6 +1062,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201231184435-2d18734c6014/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1158,6 +1167,7 @@ golang.org/x/tools v0.0.0-20201017001424-6003fad69a88/go.mod h1:z6u4i615ZeAfBE4X golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= diff --git a/go.mod b/go.mod index 2073f5a9..16e0883c 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 + github.com/go-redis/redis/v8 v8.11.1 github.com/hashicorp/golang-lru v0.5.4 github.com/kevburnsjr/skipfilter v0.0.1 github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index 4d6a5016..d45ae575 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -123,6 +125,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-redis/redis/v8 v8.11.1 h1:Aqf/1y2eVfE9zrySM++/efzwv3mkLH7n/T96//gbo94= +github.com/go-redis/redis/v8 v8.11.1/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= @@ -179,6 +183,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -233,6 +238,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= @@ -290,7 +296,14 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= @@ -463,6 +476,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -488,6 +502,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -495,6 +510,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -530,6 +546,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -542,8 +559,10 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -568,6 +587,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -646,6 +666,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -767,10 +788,12 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/redis_transport.go b/redis_transport.go new file mode 100644 index 00000000..ec45ab8e --- /dev/null +++ b/redis_transport.go @@ -0,0 +1,332 @@ +package mercure + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/url" + "strconv" + "sync" + + redis "github.com/go-redis/redis/v8" + "go.uber.org/zap" +) + +func init() { //nolint:gochecknoinits + RegisterTransportFactory("redis", NewRedisTransport) +} + +const RedisDefaultCleanupFrequency = 0.3 + +const defaultRedisBucketName = "updates" + +// RedisTransport implements the TransportInterface using the Bolt database. +type RedisTransport struct { + sync.RWMutex + logger Logger + client *redis.Client + ctx context.Context + bucketName string + size uint64 + cleanupFrequency float64 + subscribers *SubscriberList + closed chan struct{} + closedOnce sync.Once + lastEventID string +} + +// NewRedisTransport create a new RedisTransport. +func NewRedisTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error) { //nolint:ireturn + var err error + q := u.Query() + bucketName := defaultRedisBucketName + if q.Get("bucket_name") != "" { + bucketName = q.Get("bucket_name") + } + + size := uint64(0) + if sizeParameter := q.Get("size"); sizeParameter != "" { + size, err = strconv.ParseUint(sizeParameter, 10, 64) + if err != nil { + return nil, &TransportError{u.Redacted(), fmt.Sprintf(`invalid "size" parameter %q`, sizeParameter), err} + } + } + + cleanupFrequency := RedisDefaultCleanupFrequency + cleanupFrequencyParameter := q.Get("cleanup_frequency") + if cleanupFrequencyParameter != "" { + cleanupFrequency, err = strconv.ParseFloat(cleanupFrequencyParameter, 64) + if err != nil { + return nil, &TransportError{u.Redacted(), fmt.Sprintf(`invalid "cleanup_frequency" parameter %q`, cleanupFrequencyParameter), err} + } + } + + path := u.Path // absolute path (bolt:///path.db) + if path == "" { + path = u.Host // relative path (bolt://path.db) + } + if path == "" { + return nil, &TransportError{u.Redacted(), "missing path", err} + } + + client := redis.NewClient(&redis.Options{ + Addr: path, + Password: "", // no password set + DB: 0, // use default DB + }) + + ctx := context.Background() + + redisTransport := &RedisTransport{ + logger: l, + client: client, + ctx: ctx, + bucketName: bucketName, + size: size, + cleanupFrequency: cleanupFrequency, + subscribers: NewSubscriberList(1e5), + closed: make(chan struct{}), + lastEventID: getRedisLastEventID(ctx, client, bucketName), + } + + go subscribeToUpdate(redisTransport) + + return redisTransport, nil +} + +func subscribeToUpdate(t *RedisTransport) { + pubsub := t.client.Subscribe(t.ctx, "update") + ch := pubsub.Channel() + for msg := range ch { + var update *Update + errUnmarshal := json.Unmarshal([]byte(msg.Payload), &update) + if errUnmarshal != nil { + t.logger.Error("error when unmarshaling message", zap.Any("message", msg), zap.Error(errUnmarshal)) + + continue + } + t.dispatch(update) + } +} + +func getRedisLastEventID(ctx context.Context, client *redis.Client, bucketName string) string { + lastEventID := EarliestLastEventID + + lastValue, err := client.LIndex(ctx, bucketName, 0).Result() + if err == nil { + var lastUpdate *Update + errUnmarshal := json.Unmarshal([]byte(lastValue), &lastUpdate) + if errUnmarshal != nil { + return lastEventID + } + lastEventID = lastUpdate.ID + } + + return lastEventID +} + +// Dispatch dispatches an update to all subscribers and persists it in Bolt DB. +func (t *RedisTransport) Dispatch(update *Update) error { + select { + case <-t.closed: + return ErrClosedTransport + default: + } + + AssignUUID(update) + + t.Lock() + defer t.Unlock() + + updateJSON, err := json.Marshal(*update) + if err != nil { + return fmt.Errorf("error when marshaling update: %w", err) + } + + if err := t.persist(update.ID, updateJSON); err != nil { + return err + } + + // publish in pubsub for others mercure instances to consume the update and dispatch it to its subscribers + if err := t.client.Publish(t.ctx, "update", updateJSON).Err(); err != nil { + return fmt.Errorf("error when publishing update: %w", err) + } + + return nil +} + +// Called when a pubsub message is received. +func (t *RedisTransport) dispatch(update *Update) error { + select { + case <-t.closed: + return ErrClosedTransport + default: + } + + t.Lock() + defer t.Unlock() + + for _, s := range t.subscribers.MatchAny(update) { + if !s.Dispatch(update, false) { + t.subscribers.Remove(s) + } + } + + return nil +} + +// persist stores update in the database. +func (t *RedisTransport) persist(updateID string, updateJSON []byte) error { + t.lastEventID = updateID + err := t.client.LPush(t.ctx, t.bucketName, updateJSON).Err() + if err != nil { + return fmt.Errorf("error while persisting to redis: %w", err) + } + + return t.cleanup() +} + +// AddSubscriber adds a new subscriber to the transport. +func (t *RedisTransport) AddSubscriber(s *Subscriber) error { + select { + case <-t.closed: + return ErrClosedTransport + default: + } + + t.Lock() + t.subscribers.Add(s) + t.Unlock() + + if s.RequestLastEventID != "" { + t.dispatchHistory(s) + } + + s.Ready() + + return nil +} + +// RemoveSubscriber removes a new subscriber from the transport. +func (t *RedisTransport) RemoveSubscriber(s *Subscriber) error { + select { + case <-t.closed: + return ErrClosedTransport + default: + } + + t.Lock() + defer t.Unlock() + t.subscribers.Remove(s) + + return nil +} + +// GetSubscribers get the list of active subscribers. +func (t *RedisTransport) GetSubscribers() (string, []*Subscriber, error) { + t.RLock() + defer t.RUnlock() + + var subscribers []*Subscriber + t.subscribers.Walk(0, func(s *Subscriber) bool { + subscribers = append(subscribers, s) + + return true + }) + + return t.lastEventID, subscribers, nil +} + +func (t *RedisTransport) dispatchHistory(s *Subscriber) { + updates, err := t.client.LRange(t.ctx, t.bucketName, 0, int64(t.size)).Result() + if err != nil { + s.HistoryDispatched(EarliestLastEventID) + + return + } + + responseLastEventID := EarliestLastEventID + afterFromID := s.RequestLastEventID == EarliestLastEventID + for _, update := range updates { + var lastUpdate *Update + errUnmarshal := json.Unmarshal([]byte(update), &lastUpdate) + if errUnmarshal != nil { + s.HistoryDispatched(responseLastEventID) + t.logger.Error("error when unmarshaling update", zap.String("update", update), zap.Error(errUnmarshal)) + + return + } + + if !afterFromID { + responseLastEventID = lastUpdate.ID + if responseLastEventID == s.RequestLastEventID { + afterFromID = true + } + + continue + } + + if !s.Dispatch(lastUpdate, true) { + s.HistoryDispatched(responseLastEventID) + + return + } + + return + } + + s.HistoryDispatched(responseLastEventID) +} + +// Close closes the Transport. +func (t *RedisTransport) Close() (err error) { + t.closedOnce.Do(func() { + close(t.closed) + + t.Lock() + defer t.Unlock() + + t.subscribers.Walk(0, func(s *Subscriber) bool { + s.Disconnect() + t.subscribers.Remove(s) + + return true + }) + err = t.client.Close() + }) + + if err == nil { + return nil + } + + return fmt.Errorf("unable to close Redis client: %w", err) +} + +// cleanup removes entries in the history above the size limit, triggered probabilistically. +func (t *RedisTransport) cleanup() error { + sizeUpdates, errLen := t.client.LLen(t.ctx, t.bucketName).Result() + if errLen != nil { + return fmt.Errorf("error when getting updates length: %w", errLen) + } + + if t.size == 0 || + t.cleanupFrequency == 0 || + t.size >= uint64(sizeUpdates) || + (t.cleanupFrequency != 1 && rand.Float64() < t.cleanupFrequency) { //nolint:gosec + return nil + } + + errTrim := t.client.LTrim(t.ctx, t.bucketName, 0, int64(t.size)).Err() + if errTrim != nil { + return fmt.Errorf("error when trimming update length: %w", errLen) + } + + return nil +} + +// Interface guards. +var ( + _ Transport = (*RedisTransport)(nil) + _ TransportSubscribers = (*RedisTransport)(nil) +) diff --git a/subscribe.go b/subscribe.go index 75ce1e9d..076506be 100644 --- a/subscribe.go +++ b/subscribe.go @@ -106,7 +106,9 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request) *Subscr s.SetTopics(topics, privateTopics) h.dispatchSubscriptionUpdate(s, true) - if err := h.transport.AddSubscriber(s); err != nil { + if err := h.transport.AddSubscriber(s); err == nil { + h.dispatchSubscriptionUpdate(s, true) + } else { http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) h.dispatchSubscriptionUpdate(s, false) if c := h.logger.Check(zap.ErrorLevel, "Unable to add subscriber"); c != nil { diff --git a/subscriber.go b/subscriber.go index 0b0a34f1..1a6e1289 100644 --- a/subscriber.go +++ b/subscriber.go @@ -9,6 +9,7 @@ import ( "github.com/gofrs/uuid" uritemplate "github.com/yosida95/uritemplate/v3" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -76,7 +77,13 @@ func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool { return false } - s.out <- u + select { + case s.out <- u: + default: + s.logger.Error("error when dispatching the update", zap.Any("update", u), zap.Any("subscriber", s)) + + return false + } return true } From 90a521a911ce84d7e07c0444a581c3be433c3939 Mon Sep 17 00:00:00 2001 From: Olivier Boudet Date: Mon, 24 Oct 2022 17:45:06 +0200 Subject: [PATCH 2/8] fix: pubsub channel name --- redis_transport.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis_transport.go b/redis_transport.go index ec45ab8e..4d045dbd 100644 --- a/redis_transport.go +++ b/redis_transport.go @@ -96,7 +96,7 @@ func NewRedisTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport } func subscribeToUpdate(t *RedisTransport) { - pubsub := t.client.Subscribe(t.ctx, "update") + pubsub := t.client.Subscribe(t.ctx, bucket_name) ch := pubsub.Channel() for msg := range ch { var update *Update @@ -149,7 +149,7 @@ func (t *RedisTransport) Dispatch(update *Update) error { } // publish in pubsub for others mercure instances to consume the update and dispatch it to its subscribers - if err := t.client.Publish(t.ctx, "update", updateJSON).Err(); err != nil { + if err := t.client.Publish(t.ctx, bucket_name, updateJSON).Err(); err != nil { return fmt.Errorf("error when publishing update: %w", err) } From 16d1ddc03217173555f282018ecf9365506970ab Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Tue, 18 Jul 2023 17:49:46 +0200 Subject: [PATCH 3/8] chore: add logs to debug redis transport --- redis_transport.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/redis_transport.go b/redis_transport.go index e1c40d42..e6f8c81b 100644 --- a/redis_transport.go +++ b/redis_transport.go @@ -96,16 +96,20 @@ func NewRedisTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport } func subscribeToUpdate(t *RedisTransport) { + t.logger.Info("subscribeToUpdate:Subscribe") pubsub := t.client.Subscribe(t.ctx, t.bucketName) + t.logger.Info("subscribeToUpdate:pubsub.Channel") ch := pubsub.Channel() for msg := range ch { var update *Update + t.logger.Info("subscribeToUpdate:Unmarshal") errUnmarshal := json.Unmarshal([]byte(msg.Payload), &update) if errUnmarshal != nil { t.logger.Error("error when unmarshaling message", zap.Any("message", msg), zap.Error(errUnmarshal)) continue } + t.logger.Info("subscribeToUpdate:dispatch") t.dispatch(update) } } @@ -113,6 +117,7 @@ func subscribeToUpdate(t *RedisTransport) { func getRedisLastEventID(ctx context.Context, client *redis.Client, bucketName string) string { lastEventID := EarliestLastEventID + t.logger.Info("getRedisLastEventID:LIndex") lastValue, err := client.LIndex(ctx, bucketName, 0).Result() if err == nil { var lastUpdate *Update @@ -128,26 +133,32 @@ func getRedisLastEventID(ctx context.Context, client *redis.Client, bucketName s // Dispatch dispatches an update to all subscribers and persists it in Bolt DB. func (t *RedisTransport) Dispatch(update *Update) error { + t.logger.Info("Dispatch:select") select { case <-t.closed: return ErrClosedTransport default: } + t.logger.Info("Dispatch:AssignUUID") AssignUUID(update) + t.logger.Info("Dispatch:Lock") t.Lock() defer t.Unlock() + t.logger.Info("Dispatch:Marshal") updateJSON, err := json.Marshal(*update) if err != nil { return fmt.Errorf("error when marshaling update: %w", err) } + t.logger.Info("Dispatch:persist") if err := t.persist(update.ID, updateJSON); err != nil { return err } + t.logger.Info("Dispatch:Publish") // publish in pubsub for others mercure instances to consume the update and dispatch it to its subscribers if err := t.client.Publish(t.ctx, t.bucketName, updateJSON).Err(); err != nil { return fmt.Errorf("error when publishing update: %w", err) @@ -158,17 +169,22 @@ func (t *RedisTransport) Dispatch(update *Update) error { // Called when a pubsub message is received. func (t *RedisTransport) dispatch(update *Update) error { + t.logger.Info("dispatch:select") select { case <-t.closed: return ErrClosedTransport default: } + t.logger.Info("dispatch:Lock") t.Lock() defer t.Unlock() + t.logger.Info("dispatch:update") for _, s := range t.subscribers.MatchAny(update) { + t.logger.Info("dispatch:Dispatch") if !s.Dispatch(update, false) { + t.logger.Info("dispatch:Remove") t.subscribers.Remove(s) } } @@ -178,31 +194,40 @@ func (t *RedisTransport) dispatch(update *Update) error { // persist stores update in the database. func (t *RedisTransport) persist(updateID string, updateJSON []byte) error { + t.logger.Info("persist") t.lastEventID = updateID + t.logger.Info("persist:LPush") err := t.client.LPush(t.ctx, t.bucketName, updateJSON).Err() if err != nil { return fmt.Errorf("error while persisting to redis: %w", err) } + t.logger.Info("persist:cleanup") return t.cleanup() } // AddSubscriber adds a new subscriber to the transport. func (t *RedisTransport) AddSubscriber(s *Subscriber) error { + t.logger.Info("AddSubscriber:select") select { case <-t.closed: return ErrClosedTransport default: } + t.logger.Info("AddSubscriber:Lock") t.Lock() + t.logger.Info("AddSubscriber:Add") t.subscribers.Add(s) + t.logger.Info("AddSubscriber:Unlock") t.Unlock() if s.RequestLastEventID != "" { + t.logger.Info("AddSubscriber:dispatchHistory") t.dispatchHistory(s) } + t.logger.Info("AddSubscriber:Ready") s.Ready() return nil @@ -210,14 +235,17 @@ func (t *RedisTransport) AddSubscriber(s *Subscriber) error { // RemoveSubscriber removes a new subscriber from the transport. func (t *RedisTransport) RemoveSubscriber(s *Subscriber) error { + t.logger.Info("RemoveSubscriber:select") select { case <-t.closed: return ErrClosedTransport default: } + t.logger.Info("RemoveSubscriber:Lock") t.Lock() defer t.Unlock() + t.logger.Info("RemoveSubscriber:Remove") t.subscribers.Remove(s) return nil @@ -225,22 +253,27 @@ func (t *RedisTransport) RemoveSubscriber(s *Subscriber) error { // GetSubscribers get the list of active subscribers. func (t *RedisTransport) GetSubscribers() (string, []*Subscriber, error) { + t.logger.Info("GetSubscribers:RLock") t.RLock() defer t.RUnlock() + t.logger.Info("GetSubscribers:append") var subscribers []*Subscriber t.subscribers.Walk(0, func(s *Subscriber) bool { subscribers = append(subscribers, s) return true }) + t.logger.Info("GetSubscribers:return") return t.lastEventID, subscribers, nil } func (t *RedisTransport) dispatchHistory(s *Subscriber) { + t.logger.Info("dispatchHistory:LRange") updates, err := t.client.LRange(t.ctx, t.bucketName, 0, int64(t.size)).Result() if err != nil { + t.logger.Info("dispatchHistory:HistoryDispatched") s.HistoryDispatched(EarliestLastEventID) return @@ -250,6 +283,7 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) { afterFromID := s.RequestLastEventID == EarliestLastEventID for _, update := range updates { var lastUpdate *Update + t.logger.Info("dispatchHistory:Unmarshal") errUnmarshal := json.Unmarshal([]byte(update), &lastUpdate) if errUnmarshal != nil { s.HistoryDispatched(responseLastEventID) @@ -267,6 +301,7 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) { continue } + t.logger.Info("dispatchHistory:Dispatch") if !s.Dispatch(lastUpdate, true) { s.HistoryDispatched(responseLastEventID) @@ -281,6 +316,7 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) { // Close closes the Transport. func (t *RedisTransport) Close() (err error) { + t.logger.Info("Close") t.closedOnce.Do(func() { close(t.closed) @@ -305,6 +341,7 @@ func (t *RedisTransport) Close() (err error) { // cleanup removes entries in the history above the size limit, triggered probabilistically. func (t *RedisTransport) cleanup() error { + t.logger.Info("cleanup:LLen") sizeUpdates, errLen := t.client.LLen(t.ctx, t.bucketName).Result() if errLen != nil { return fmt.Errorf("error when getting updates length: %w", errLen) @@ -317,6 +354,7 @@ func (t *RedisTransport) cleanup() error { return nil } + t.logger.Info("cleanup:LTrim") errTrim := t.client.LTrim(t.ctx, t.bucketName, 0, int64(t.size)).Err() if errTrim != nil { return fmt.Errorf("error when trimming update length: %w", errLen) From 890fbf2f5fc63bdd09c9daf86a7d1def8fa66338 Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Wed, 19 Jul 2023 10:03:16 +0200 Subject: [PATCH 4/8] ci: upgrade actions + fix goreleaser --- .github/workflows/cd.yml | 13 ++++++++----- .goreleaser.yml | 13 +++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 06729e04..b381fb49 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -5,29 +5,32 @@ on: tags: - 'v*' +permissions: + contents: write + jobs: release: name: Release runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: fetch-depth: 0 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v4 with: go-version: '1.17' - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v2 + uses: goreleaser/goreleaser-action@v4 with: version: latest - args: release --rm-dist + args: release --clean env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.goreleaser.yml b/.goreleaser.yml index 3fecc36d..c0c4d1da 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -38,12 +38,13 @@ builds: archives: - builds: - caddy - replacements: - darwin: Darwin - linux: Linux - windows: Windows - 386: i386 - amd64: x86_64 + name_template: >- + {{ .ProjectName }}_ + {{- title .Os }}_ + {{- if eq .Arch "amd64" }}x86_64 + {{- else if eq .Arch "386" }}i386 + {{- else }}{{ .Arch }}{{ end }} + {{- if .Arm }}v{{ .Arm }}{{ end }} files: - COPYRIGHT - LICENSE From 4829f8f9c4c9c74fd82f4588696e7796b035d05a Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Wed, 19 Jul 2023 10:51:37 +0200 Subject: [PATCH 5/8] fix: logger is not accessible in getRedisLastEventID --- redis_transport.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/redis_transport.go b/redis_transport.go index e6f8c81b..996f1378 100644 --- a/redis_transport.go +++ b/redis_transport.go @@ -116,8 +116,6 @@ func subscribeToUpdate(t *RedisTransport) { func getRedisLastEventID(ctx context.Context, client *redis.Client, bucketName string) string { lastEventID := EarliestLastEventID - - t.logger.Info("getRedisLastEventID:LIndex") lastValue, err := client.LIndex(ctx, bucketName, 0).Result() if err == nil { var lastUpdate *Update From fbb8373312c82086637309b4590c7fe29890b1ab Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Wed, 19 Jul 2023 11:17:23 +0200 Subject: [PATCH 6/8] ci: fix permission --- .github/workflows/cd.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index b381fb49..3df444b5 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -5,8 +5,7 @@ on: tags: - 'v*' -permissions: - contents: write +permissions: write-all jobs: release: From 04b02dde37892d273a88baa89efdc4f0d34305d4 Mon Sep 17 00:00:00 2001 From: Julien SEITE Date: Wed, 19 Jul 2023 15:04:38 +0200 Subject: [PATCH 7/8] feat: add logs --- subscribe.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/subscribe.go b/subscribe.go index 076506be..2feee276 100644 --- a/subscribe.go +++ b/subscribe.go @@ -58,7 +58,9 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { } heartbeatTimer.Reset(h.heartbeat) case update, ok := <-s.Receive(): + h.logger.Info("Subscriber:received update") if !ok || !h.write(w, s, newSerializedUpdate(update).event) { + h.logger.Info("Subscriber:received not ok") return } if heartbeatTimer != nil { From 564ddb5b6f044bc5ec355d829452502c5067e1c3 Mon Sep 17 00:00:00 2001 From: Olivier Boudet Date: Wed, 10 Apr 2024 14:41:08 +0200 Subject: [PATCH 8/8] feat(redis): use redis sentinel client --- redis_transport.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/redis_transport.go b/redis_transport.go index 996f1378..6890f3d4 100644 --- a/redis_transport.go +++ b/redis_transport.go @@ -70,10 +70,9 @@ func NewRedisTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport return nil, &TransportError{u.Redacted(), "missing path", err} } - client := redis.NewClient(&redis.Options{ - Addr: path, - Password: "", // no password set - DB: 0, // use default DB + client := redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: q.Get("master"), + SentinelAddrs: []string{path}, }) ctx := context.Background()