Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable keepalives in server. #150

Merged
merged 5 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion zkclient/completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (z *ZKC) completeNickCommandLine(args []string) {
var c string
switch len(args) {
case 1:
c = ""
return
case 2:
c = args[1]
Expand Down
33 changes: 25 additions & 8 deletions zkserver/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ import (

"github.com/companyzero/zkc/inidb"
"github.com/companyzero/zkc/zkidentity"
"github.com/davecgh/go-xdr/xdr2"
xdr "github.com/davecgh/go-xdr/xdr2"
)

const (
CacheDir = "cache"
UserIdentityFilename = "user.ini"
)

type ErrAlreadyOnline struct {
err error
}

func (e ErrAlreadyOnline) Error() string {
return e.err.Error()
}

// Account opaque type that handles account related services.
type Account struct {
root string // root location of all accounts
Expand Down Expand Up @@ -315,15 +323,22 @@ func (a *Account) Delete(from [zkidentity.IdentitySize]byte, identifier string)
return nil
}

func (a *Account) Offline(who [zkidentity.IdentitySize]byte) {
a.Lock()
defer a.Unlock()

// offline closes open quit channels and deletes an account from the online
// map. This function must be called WITH the mutex held.
func (a *Account) offline(who [zkidentity.IdentitySize]byte) {
dn, found := a.online[who]
if found {
close(dn.quit)
delete(a.online, who)
}
delete(a.online, who)
}

// Offline knocks a user offline. This function must be called WITHOUT the
// mutex held.
func (a *Account) Offline(who [zkidentity.IdentitySize]byte) {
a.Lock()
defer a.Unlock()
a.offline(who)
}

// Online notifies Account that a user has become available. It reads all
Expand All @@ -333,12 +348,14 @@ func (a *Account) Online(who [zkidentity.IdentitySize]byte, ntfn chan *Notificat

cache := a.accountFile(who, CacheDir)

// make sure we are online
a.Lock()
_, found := a.online[who]
if found {
a.Unlock()
return fmt.Errorf("already online: %v ", hex.EncodeToString(who[:]))
return ErrAlreadyOnline{
err: fmt.Errorf("already online: %v ",
hex.EncodeToString(who[:])),
}
}

dn := diskNotification{
Expand Down
81 changes: 61 additions & 20 deletions zkserver/zkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/companyzero/zkc/zkserver/settings"
"github.com/companyzero/zkc/zkutil"
"github.com/davecgh/go-spew/spew"
"github.com/davecgh/go-xdr/xdr2"
xdr "github.com/davecgh/go-xdr/xdr2"
)

const (
Expand Down Expand Up @@ -61,7 +61,24 @@ type RPCWrapper struct {
Identifier string
}

type sessionContext struct {
ntfn chan *account.Notification
writer chan *RPCWrapper
quit chan struct{}
kx *session.KX
rids string
tagStack *tagstack.TagStack

// protected
sync.Mutex
tagMessage []*RPCWrapper
}

type ZKS struct {
sync.Mutex
sessions map[string]*sessionContext

// Not mutex entries
*debug.Debug
account *account.Account
settings *settings.Settings
Expand Down Expand Up @@ -199,7 +216,7 @@ func (z *ZKS) sessionWriter(sc *sessionContext) {

func (z *ZKS) sessionNtfn(sc *sessionContext) {
defer func() {
z.T(idS, "sessionNtfn exit: %v", sc.rids)
z.Dbg(idS, "sessionNtfn exit: %v", sc.rids)

// close underlying connection in order to fail read
sc.kx.Close()
Expand Down Expand Up @@ -272,20 +289,6 @@ func (z *ZKS) sessionNtfn(sc *sessionContext) {
}
}

type sessionContext struct {
ntfn chan *account.Notification
writer chan *RPCWrapper
quit chan struct{}
//done chan bool
kx *session.KX
rids string
tagStack *tagstack.TagStack

// protected
sync.Mutex
tagMessage []*RPCWrapper
}

// handleSession deals with incoming RPC calls. For now treat all errors as
// critical and return which in turns shuts down the connection.
func (z *ZKS) handleSession(kx *session.KX) error {
Expand All @@ -309,15 +312,47 @@ func (z *ZKS) handleSession(kx *session.KX) error {
// register identity
err := z.account.Online(rid, sc.ntfn)
if err != nil {
// If the account is already online we are knocking it offline.
// This may be annoying for users that have two clients open
// but it fixes the issue where phantom server connections
// remain online preventing the client from connecting to the
// server altogether.
if _, ok := err.(account.ErrAlreadyOnline); ok {
z.Dbg(idS, "handleSession forced offline: %v", rids)
z.Lock()
if oldSc, ok := z.sessions[rids]; ok {
// Closing the connection should knock
// everything offline.
oldSc.kx.Close()
delete(z.sessions, rids)
} else {
// This should not happen.
z.Unlock()
return fmt.Errorf("handleSession: account "+
"online without a session %v", rids)
}
z.Unlock()
}

// Regardless of the failure we return an error in order to
// give the server the opportunity to close the connection and
// settle down.
return fmt.Errorf("handleSession: %v %v", rids, err)
}

// mark session online
z.Lock()
z.sessions[rids] = &sc
z.Unlock()

z.Dbg(idS, "handleSession account online: %v", rids)

// populate identity in directory
if z.settings.Directory {
err := z.account.Push(rid)
if err != nil {
z.Dbg(idS, "handleSession: Push(%v) = %v", rids, err)
return fmt.Errorf("handleSession: Push(%v) = %v",
rids, err)
}
}

Expand All @@ -332,6 +367,11 @@ func (z *ZKS) handleSession(kx *session.KX) error {

z.account.Offline(rid)

// mark session offline
z.Lock()
delete(z.sessions, rids)
z.Unlock()

z.Dbg(idS, "handleSession exit: %v", rids)
}()

Expand Down Expand Up @@ -659,8 +699,7 @@ func (z *ZKS) listen() error {
continue
}

conn.(*net.TCPConn).SetKeepAlive(true)
conn.(*net.TCPConn).SetKeepAlivePeriod(time.Second)
conn.(*net.TCPConn).SetKeepAlive(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend just removing altogether

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it being explicit and I don't know what the OS is doing underneath.

conn = tls.Server(conn, &config)

go z.preSession(conn)
Expand All @@ -671,7 +710,9 @@ func (z *ZKS) listen() error {
}

func _main() error {
z := &ZKS{}
z := &ZKS{
sessions: make(map[string]*sessionContext),
}

// flags and settings
var err error
Expand Down