diff --git a/mailbox.go b/mailbox.go index bf1f6ca..73582d3 100644 --- a/mailbox.go +++ b/mailbox.go @@ -121,40 +121,18 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) { s.mu.Unlock() s.mumb.Lock() - defer s.mumb.Unlock() - _, ok := s.mailboxes[nsName] if ok { return nil, fmt.Errorf("%w: nsName=%s", ErrAlreadyRegistered, nsName) } + s.mumb.Unlock() - timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout) - err := s.registry.Register(timeout, nsName) - cancel() - // Check if the error is a particular fatal error - // from etcd. Some errors have no recovery. See - // the list of all possible errors here: - // - // https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go - // - // They are unfortunately not classidied into - // recoverable or non-recoverable. - if err != nil && strings.Contains(err.Error(), "etcdserver: requested lease not found") { - s.reportFatalError(err) - return nil, err - } - if err != nil { - return nil, err - } - - boxC := make(chan Request, size) cleanup := func() { s.mumb.Lock() - defer s.mumb.Unlock() - // Immediately delete the subscription so that no one // can send to it, at least from this host. delete(s.mailboxes, nsName) + s.mumb.Unlock() // Deregister the name. var err error @@ -165,13 +143,37 @@ func newMailbox(s *Server, name, nsName string, size int) (*Mailbox, error) { cancel() return err != nil }) - // Ingnore ErrNotOwner because most likely the previous owner panic'ed or exited badly. + // Ingnore ErrNotOwner because most likely the previous owner panic'ed or exited badly. // So we'll ignore the error and let the mailbox creator retry later. We don't want to panic - // in that case because it will take down more mailboxes and make it worse. + // in that case because it will take down more mailboxes and make it worse. if err != nil && err != registry.ErrNotOwner { panic(fmt.Errorf("%w: unable to deregister mailbox: %v, error: %v", errDeregisteredFailed, nsName, err)) } } + + timeout, cancel := context.WithTimeout(context.Background(), s.cfg.Timeout) + err := s.registry.Register(timeout, nsName) + cancel() + // Check if the error is a particular fatal error + // from etcd. Some errors have no recovery. See + // the list of all possible errors here: + // + // https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go + // + // They are unfortunately not classidied into + // recoverable or non-recoverable. + if err != nil && strings.Contains(err.Error(), "etcdserver: requested lease not found") { + s.reportFatalError(err) + return nil, err + } + if err != nil { + cleanup() + return nil, err + } + + s.mumb.Lock() + defer s.mumb.Unlock() + boxC := make(chan Request, size) box := &Mailbox{ name: name, nsName: nsName,