Skip to content

Commit

Permalink
Merge pull request #77 from fly-apps/sess-pool-support
Browse files Browse the repository at this point in the history
Kill session connection if we need to go readonly
  • Loading branch information
davissp14 authored Feb 7, 2023
2 parents a17fdae + 8cd58e1 commit 80f4bb5
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 47 deletions.
48 changes: 3 additions & 45 deletions internal/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,11 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque

defer close()

internal, err := flypg.ReadFromFile(s.node.PGConfig.InternalConfigFile())
all, err := s.node.PGConfig.CurrentConfig()
if err != nil {
renderErr(w, err)
return
}
user, err := flypg.ReadFromFile(s.node.PGConfig.UserConfigFile())
if err != nil {
renderErr(w, err)
return
}

all := map[string]interface{}{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

var in []string

Expand All @@ -266,26 +252,12 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque
}

func (s *Server) handleViewBouncerSettings(w http.ResponseWriter, r *http.Request) {
internal, err := flypg.ReadFromFile(s.node.PGBouncer.InternalConfigFile())
if err != nil {
renderErr(w, err)
return
}
user, err := flypg.ReadFromFile(s.node.PGBouncer.UserConfigFile())
all, err := s.node.PGBouncer.CurrentConfig()
if err != nil {
renderErr(w, err)
return
}

all := map[string]interface{}{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

var in []string

if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
Expand All @@ -307,26 +279,12 @@ func (s *Server) handleViewBouncerSettings(w http.ResponseWriter, r *http.Reques
}

func (s *Server) handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) {
internal, err := flypg.ReadFromFile(s.node.RepMgr.InternalConfigFile())
if err != nil {
renderErr(w, err)
return
}
user, err := flypg.ReadFromFile(s.node.RepMgr.UserConfigFile())
all, err := s.node.RepMgr.CurrentConfig()
if err != nil {
renderErr(w, err)
return
}

all := map[string]interface{}{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

var in []string

if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/flypg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config interface {
UserConfig() ConfigMap
SetUserConfig(configMap ConfigMap)
ConsulKey() string
CurrentConfig() (ConfigMap, error)
}

func WriteUserConfig(c Config, consul *state.Store) error {
Expand Down
22 changes: 22 additions & 0 deletions internal/flypg/flypg.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ func (c *FlyPGConfig) UserConfigFile() string {
return c.userConfigFilePath
}

func (c *FlyPGConfig) CurrentConfig() (ConfigMap, error) {
internal, err := ReadFromFile(c.InternalConfigFile())
if err != nil {
return nil, err
}
user, err := ReadFromFile(c.UserConfigFile())
if err != nil {
return nil, err
}

all := ConfigMap{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

return all, nil
}

func (c *FlyPGConfig) initialize() error {
c.SetDefaults()

Expand Down
22 changes: 22 additions & 0 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ func (c *PGConfig) UserConfigFile() string {
return c.userConfigFilePath
}

func (c *PGConfig) CurrentConfig() (ConfigMap, error) {
internal, err := ReadFromFile(c.InternalConfigFile())
if err != nil {
return nil, err
}
user, err := ReadFromFile(c.UserConfigFile())
if err != nil {
return nil, err
}

all := ConfigMap{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

return all, nil
}

func NewConfig(dataDir string, port int) *PGConfig {
return &PGConfig{
dataDir: dataDir,
Expand Down
71 changes: 71 additions & 0 deletions internal/flypg/pgbouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/jackc/pgx/v5"
)

const (
transactionPooler = "transaction"
sessionPooler = "session"
statementPooler = "statement"
)

type PGBouncer struct {
PrivateIP string
Credentials Credentials
Expand Down Expand Up @@ -68,6 +74,37 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload
return nil
}

func (p *PGBouncer) CurrentConfig() (ConfigMap, error) {
internal, err := ReadFromFile(p.InternalConfigFile())
if err != nil {
return nil, err
}
user, err := ReadFromFile(p.UserConfigFile())
if err != nil {
return nil, err
}

all := ConfigMap{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

return all, nil
}

func (p *PGBouncer) poolMode() (string, error) {
conf, err := p.CurrentConfig()
if err != nil {
return "", err
}

return conf["pool_mode"].(string), nil
}

func (p *PGBouncer) initialize() error {
cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath)
if err := utils.RunCommand(cmdStr); err != nil {
Expand Down Expand Up @@ -157,6 +194,40 @@ func (p *PGBouncer) forceReconnect(ctx context.Context, databases []string) erro
return nil
}

func (p *PGBouncer) killConnections(ctx context.Context, databases []string) error {
conn, err := p.NewConnection(ctx)
if err != nil {
return err
}
defer conn.Close(ctx)

for _, db := range databases {
_, err = conn.Exec(ctx, fmt.Sprintf("KILL %s;", db))
if err != nil {
return err
}
}

return nil
}

func (p *PGBouncer) resumeConnections(ctx context.Context, databases []string) error {
conn, err := p.NewConnection(ctx)
if err != nil {
return err
}
defer conn.Close(ctx)

for _, db := range databases {
_, err = conn.Exec(ctx, fmt.Sprintf("RESUME %s;", db))
if err != nil {
return err
}
}

return nil
}

func (p *PGBouncer) NewConnection(ctx context.Context) (*pgx.Conn, error) {
host := net.JoinHostPort(p.PrivateIP, strconv.Itoa(p.Port))
return openConnection(ctx, host, "pgbouncer", p.Credentials)
Expand Down
22 changes: 20 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,26 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error {
}
defer bConn.Close(ctx)

if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil {
return fmt.Errorf("failed to force connection reset: %s", err)
poolMode, err := n.PGBouncer.poolMode()
if err != nil {
return fmt.Errorf("failed to resolve active pool mode: %s", err)
}

switch poolMode {
case transactionPooler, statementPooler:
if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil {
return fmt.Errorf("failed to force connection reset: %s", err)
}
case sessionPooler:
if err := n.PGBouncer.killConnections(ctx, dbNames); err != nil {
return fmt.Errorf("failed to kill connections: %s", err)
}

if err := n.PGBouncer.resumeConnections(ctx, dbNames); err != nil {
return fmt.Errorf("failed to resume connections: %s", err)
}
default:
return fmt.Errorf("failed to resolve valid pooler. found: %s", poolMode)
}

return nil
Expand Down
22 changes: 22 additions & 0 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ func (r *RepMgr) SetUserConfig(configMap ConfigMap) {
r.userConfig = configMap
}

func (r *RepMgr) CurrentConfig() (ConfigMap, error) {
internal, err := ReadFromFile(r.InternalConfigFile())
if err != nil {
return nil, err
}
user, err := ReadFromFile(r.UserConfigFile())
if err != nil {
return nil, err
}

all := ConfigMap{}

for k, v := range internal {
all[k] = v
}
for k, v := range user {
all[k] = v
}

return all, nil
}

func (r *RepMgr) ConsulKey() string {
return "repmgr"
}
Expand Down

0 comments on commit 80f4bb5

Please sign in to comment.