Skip to content

Commit

Permalink
deadlock on publisher
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Feb 23, 2024
1 parent 9f77828 commit 039d2d7
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 59 deletions.
99 changes: 99 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
run:
concurrency: 16
timeout: 5m
issues-exit-code: 1
tests: true

skip-dirs:
- docs/
- nanodocker/
- cli/migrations

skip-files:
- "*_gen\\.go$"

modules-download-mode: mod
allow-parallel-runners: true
go: '1.21'

output:
format: colored-line-number
print-issued-lines: true
print-linter-name: true
uniq-by-line: true
sort-results: true

linters:
disable-all: true
enable:
- asasalint
- bidichk
- bodyclose
- decorder
- dupl
- durationcheck
- errcheck
- errchkjson
- errname
- errorlint
- exportloopref
- forbidigo
- gofumpt
- ginkgolinter
- gocheckcompilerdirectives
- gochecknoglobals
- gocognit
- goconst
- gocritic
- gocyclo
- godox
- goerr113
- goheader
- goimports
- gomodguard
- goprintffuncname
- gosec
- gosimple
- gosmopolitan
- govet
- grouper
- importas
- ineffassign
- interfacebloat
- loggercheck
- maintidx
- makezero
- mirror
- misspell
- nakedret
- nestif
- nilerr
- nilnil
- noctx
- nolintlint
- nonamedreturns
- nosprintfhostport
- paralleltest
- prealloc
- predeclared
- promlinter
- reassign
- revive
- rowserrcheck
- sqlclosecheck
- staticcheck
- stylecheck
- tenv
- testableexamples
- testpackage
- thelper
- tparallel
- unconvert
- unparam
- unused
- usestdlibvars
- wastedassign
- whitespace
- zerologlint
- prealloc
# - perfsprint
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# dev
# 2.0.7


## Bufix

* Publisher deadlock on channel close

# 2.0.4


## Bufix
Expand Down
43 changes: 43 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
version: '3'

tasks:
fmt:
cmds:
- gofumpt -l -w .
lint:
cmds:
- golangci-lint run --color "always" -v -j 8
sec:
cmds:
- gosec ./...
tidy:
cmds:
- rm -f go.sum
- go mod tidy
update:
cmds:
- go get -u ./... # Updates regular packages
- go get -u -t ./... # Updates Test packages
cli-tools:
cmds:
- go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
- go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
- go install github.com/securego/gosec/v2/cmd/gosec@latest
- go install github.com/uudashr/gopkgs/v2/cmd/gopkgs@latest
- go install github.com/ramya-rao-a/go-outline@latest
- go install github.com/cweill/gotests/gotests@latest
- go install github.com/fatih/gomodifytags@latest
- go install github.com/daixiang0/gci@latest
- go install github.com/josharian/impl@latest
- go install github.com/haya14busa/goplay/cmd/goplay@latest
- go install github.com/go-delve/delve/cmd/dlv@latest
- go install mvdan.cc/gofumpt@latest
- go install github.com/swaggo/swag/cmd/swag@latest
- go install github.com/cosmtrek/air@latest
- go install google.golang.org/protobuf/cmd/[email protected]
- go install google.golang.org/grpc/cmd/[email protected]
test:
cmds:
- go test -covermode=atomic -race -coverprofile=coverage.txt -timeout 5m -json -v ./... | gotestfmt -showteststatus
env:
GOMAXPROCS: 4
55 changes: 26 additions & 29 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ type (
onBeforeConnectionReady OnReconnectingFunc
onConnectionReady OnConnectionReady
onError OnErrorFunc
once sync.Once
closing atomic.Bool
once func()
}

Config struct {
Expand Down Expand Up @@ -71,14 +70,18 @@ func New(ctx context.Context, config Config, events Events) (*Connection, error)
onError: events.OnError,
}

var err error
c.once = sync.OnceFunc(func() {
c.cancel()
c.connectionDispose()
})

connect := c.connect()

if err = connect(newCtx); err == nil {
if err := connect(newCtx); err == nil {
return c, nil
}

var err error
for i := 0; i < config.ReconnectRetry; i++ {
time.Sleep(config.ReconnectInterval)

Expand All @@ -90,6 +93,14 @@ func New(ctx context.Context, config Config, events Events) (*Connection, error)
return nil, err
}

func (c *Connection) hasConnectionClosed(err error) bool {
return !errors.Is(err, amqp091.ErrClosed) && c.conn.Load().IsClosed()
}

func (c *Connection) hasChannelClosed(err error) bool {
return errors.Is(err, amqp091.ErrClosed) && !c.conn.Load().IsClosed()
}

func (c *Connection) handleReconnect(ctx context.Context, connection *amqp091.Connection, connect func(ctx context.Context) error) {
notifyClose := connection.NotifyClose(make(chan *amqp091.Error))
for {
Expand All @@ -101,19 +112,12 @@ func (c *Connection) handleReconnect(ctx context.Context, connection *amqp091.Co
return
}

if c.closing.Load() {
if err := c.connectionDispose(); err != nil && c.onError != nil {
c.onError(&OnConnectionCloseError{inner: err})
}

return
}

// No need to reconnect if connection is not closed (this error means that channel is closed)
if !errors.Is(amqpErr, amqp091.ErrClosed) && !c.conn.Load().IsClosed() {
if c.hasChannelClosed(amqpErr) {
continue
}

c.connectionDispose()

var i int

for i = 0; i < c.ReconnectRetry; i++ {
Expand Down Expand Up @@ -143,9 +147,7 @@ func (c *Connection) connect() func(ctx context.Context) error {
properties.SetClientConnectionName(c.ConnectionName)

return func(ctx context.Context) error {
if err := c.connectionDispose(); err != nil && c.onError != nil {
c.onError(&OnConnectionCloseError{inner: err})
}
c.connectionDispose()

config := amqp091.Config{
Vhost: c.Vhost,
Expand All @@ -170,7 +172,6 @@ func (c *Connection) connect() func(ctx context.Context) error {
return err
}

defer c.closing.Store(false)
c.conn.Store(conn)

go c.handleReconnect(ctx, conn, c.connect())
Expand All @@ -187,24 +188,20 @@ func (c *Connection) connect() func(ctx context.Context) error {
}
}

func (c *Connection) connectionDispose() error {
c.closing.Store(true)
func (c *Connection) connectionDispose() {
conn := c.conn.Load()

if conn == nil || conn.IsClosed() {
return nil
return
}

return conn.Close()
if err := conn.Close(); err != nil && c.onError != nil {
c.onError(&OnConnectionCloseError{inner: err})
}
}

func (c *Connection) Close() error {
var err error

c.once.Do(func() {
c.cancel()
err = c.connectionDispose()
})
c.once()

return err
return nil
}
6 changes: 4 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/nano-interactive/go-amqp/v2/serializer"
)

var _ io.Closer = Consumer[any]{}
var _ io.Closer = (*Consumer[any])(nil)
var (
_ io.Closer = Consumer[any]{}
_ io.Closer = (*Consumer[any])(nil)
)

type (
Message interface{}
Expand Down
1 change: 0 additions & 1 deletion consumer/watch_dog.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func watchdog[T any](
queueDeclare.NoWait,
nil,
)

if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
module github.com/nano-interactive/go-amqp/v2

go 1.20
go 1.21

require (
github.com/rabbitmq/amqp091-go v1.5.0
github.com/stretchr/testify v1.8.1
github.com/rabbitmq/amqp091-go v1.9.0
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,27 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0=
github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
2 changes: 0 additions & 2 deletions publisher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ type (
messageBuffering int
}

PublisherConfig struct{}

Option[T any] func(*Config[T])
)

Expand Down
Loading

0 comments on commit 039d2d7

Please sign in to comment.