From 997e72cf1295b79c95946f50010db241ae49c899 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Mon, 6 Feb 2023 17:37:42 -0600 Subject: [PATCH 1/4] Kill session connection if we need to go readonly --- internal/api/handle_admin.go | 1 + internal/flypg/pgbouncer.go | 73 +++++++++++++++++++++++++++++++++++- internal/flypg/readonly.go | 26 ++++++++++++- 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/internal/api/handle_admin.go b/internal/api/handle_admin.go index ff36ab9b..6d1416f1 100644 --- a/internal/api/handle_admin.go +++ b/internal/api/handle_admin.go @@ -216,6 +216,7 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque } defer close() + internal := s.node.PGConfig.InternalConfig() user := s.node.PGConfig.UserConfig() diff --git a/internal/flypg/pgbouncer.go b/internal/flypg/pgbouncer.go index 238e04fb..06f4a1cb 100644 --- a/internal/flypg/pgbouncer.go +++ b/internal/flypg/pgbouncer.go @@ -12,6 +12,12 @@ import ( "github.com/jackc/pgx/v5" ) +const ( + transactionPooler = "transaction" + sessionPooler = "session" + statementPooler = "statement" +) + type PGBouncer struct { PrivateIP string Credentials Credentials @@ -68,6 +74,37 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload return nil } +func (p *PGBouncer) CurrentConfig() (map[string]interface{}, error) { + internal, err := ReadFromFile(p.InternalConfigFile()) + if err != nil { + return nil, err + } + user, err := ReadFromFile(p.UserConfigFile()) + if err != nil { + return nil, err + } + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + return all, nil +} + +func (p *PGBouncer) PoolMode() (string, error) { + conf, err := p.CurrentConfig() + if err != nil { + return "", err + } + + return conf["pool_mode"].(string), nil +} + func (p *PGBouncer) initialize() error { cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath) if err := utils.RunCommand(cmdStr); err != nil { @@ -110,7 +147,7 @@ func (p *PGBouncer) setDefaults() { "auth_file": fmt.Sprintf("%s/pgbouncer.auth", p.ConfigPath), "admin_users": "postgres", "user": "postgres", - "pool_mode": "transaction", + "pool_mode": "session", "min_pool_size": "5", "reserve_pool_size": "5", "reserve_pool_timeout": "3", @@ -157,6 +194,40 @@ func (p *PGBouncer) forceReconnect(ctx context.Context, databases []string) erro return nil } +func (p *PGBouncer) killConnections(ctx context.Context, databases []string) error { + conn, err := p.NewConnection(ctx) + if err != nil { + return err + } + defer conn.Close(ctx) + + for _, db := range databases { + _, err = conn.Exec(ctx, fmt.Sprintf("KILL %s;", db)) + if err != nil { + return err + } + } + + return nil +} + +func (p *PGBouncer) resumeConnections(ctx context.Context, databases []string) error { + conn, err := p.NewConnection(ctx) + if err != nil { + return err + } + defer conn.Close(ctx) + + for _, db := range databases { + _, err = conn.Exec(ctx, fmt.Sprintf("RESUME %s;", db)) + if err != nil { + return err + } + } + + return nil +} + func (p *PGBouncer) NewConnection(ctx context.Context) (*pgx.Conn, error) { host := net.JoinHostPort(p.PrivateIP, strconv.Itoa(p.Port)) return openConnection(ctx, host, "pgbouncer", p.Credentials) diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index a43ad711..fcbb528e 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -148,8 +148,30 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error { } defer bConn.Close(ctx) - if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil { - return fmt.Errorf("failed to force connection reset: %s", err) + poolMode, err := n.PGBouncer.PoolMode() + if err != nil { + return fmt.Errorf("failed to resolve active pool mode: %s", err) + } + + switch poolMode { + case transactionPooler: + if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil { + return fmt.Errorf("failed to force connection reset: %s", err) + } + case sessionPooler: + if err := n.PGBouncer.killConnections(ctx, dbNames); err != nil { + return fmt.Errorf("failed to kill connections: %s", err) + } + + if err := n.PGBouncer.resumeConnections(ctx, dbNames); err != nil { + return fmt.Errorf("failed to resume connections: %s", err) + } + case statementPooler: + if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil { + return fmt.Errorf("failed to force connection reset: %s", err) + } + default: + return fmt.Errorf("failed to resolve valid pooler. found: %s", poolMode) } return nil From c5d4b2c1ba9e359730efafb0b4b1d4789d6a81c6 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Mon, 6 Feb 2023 17:39:15 -0600 Subject: [PATCH 2/4] Turn default back to transaction for now --- internal/flypg/pgbouncer.go | 4 ++-- internal/flypg/readonly.go | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/flypg/pgbouncer.go b/internal/flypg/pgbouncer.go index 06f4a1cb..ab8e7040 100644 --- a/internal/flypg/pgbouncer.go +++ b/internal/flypg/pgbouncer.go @@ -96,7 +96,7 @@ func (p *PGBouncer) CurrentConfig() (map[string]interface{}, error) { return all, nil } -func (p *PGBouncer) PoolMode() (string, error) { +func (p *PGBouncer) poolMode() (string, error) { conf, err := p.CurrentConfig() if err != nil { return "", err @@ -147,7 +147,7 @@ func (p *PGBouncer) setDefaults() { "auth_file": fmt.Sprintf("%s/pgbouncer.auth", p.ConfigPath), "admin_users": "postgres", "user": "postgres", - "pool_mode": "session", + "pool_mode": "transaction", "min_pool_size": "5", "reserve_pool_size": "5", "reserve_pool_timeout": "3", diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index fcbb528e..107b2208 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -148,13 +148,13 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error { } defer bConn.Close(ctx) - poolMode, err := n.PGBouncer.PoolMode() + poolMode, err := n.PGBouncer.poolMode() if err != nil { return fmt.Errorf("failed to resolve active pool mode: %s", err) } switch poolMode { - case transactionPooler: + case transactionPooler, statementPooler: if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil { return fmt.Errorf("failed to force connection reset: %s", err) } @@ -166,10 +166,6 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error { if err := n.PGBouncer.resumeConnections(ctx, dbNames); err != nil { return fmt.Errorf("failed to resume connections: %s", err) } - case statementPooler: - if err := n.PGBouncer.forceReconnect(ctx, dbNames); err != nil { - return fmt.Errorf("failed to force connection reset: %s", err) - } default: return fmt.Errorf("failed to resolve valid pooler. found: %s", poolMode) } From dae1ab7af7779644f04f431cab8912d9878d01a9 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Mon, 6 Feb 2023 17:58:44 -0600 Subject: [PATCH 3/4] Keep things consistent --- internal/api/handle_admin.go | 48 +++--------------------------------- internal/flypg/flypg.go | 22 +++++++++++++++++ internal/flypg/pg.go | 22 +++++++++++++++++ internal/flypg/repmgr.go | 22 +++++++++++++++++ 4 files changed, 69 insertions(+), 45 deletions(-) diff --git a/internal/api/handle_admin.go b/internal/api/handle_admin.go index 76943229..8d1972e7 100644 --- a/internal/api/handle_admin.go +++ b/internal/api/handle_admin.go @@ -221,25 +221,11 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque defer close() - internal, err := flypg.ReadFromFile(s.node.PGConfig.InternalConfigFile()) + all, err := s.node.PGConfig.CurrentConfig() if err != nil { renderErr(w, err) return } - user, err := flypg.ReadFromFile(s.node.PGConfig.UserConfigFile()) - if err != nil { - renderErr(w, err) - return - } - - all := map[string]interface{}{} - - for k, v := range internal { - all[k] = v - } - for k, v := range user { - all[k] = v - } var in []string @@ -266,26 +252,12 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque } func (s *Server) handleViewBouncerSettings(w http.ResponseWriter, r *http.Request) { - internal, err := flypg.ReadFromFile(s.node.PGBouncer.InternalConfigFile()) - if err != nil { - renderErr(w, err) - return - } - user, err := flypg.ReadFromFile(s.node.PGBouncer.UserConfigFile()) + all, err := s.node.PGBouncer.CurrentConfig() if err != nil { renderErr(w, err) return } - all := map[string]interface{}{} - - for k, v := range internal { - all[k] = v - } - for k, v := range user { - all[k] = v - } - var in []string if err := json.NewDecoder(r.Body).Decode(&in); err != nil { @@ -307,26 +279,12 @@ func (s *Server) handleViewBouncerSettings(w http.ResponseWriter, r *http.Reques } func (s *Server) handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) { - internal, err := flypg.ReadFromFile(s.node.RepMgr.InternalConfigFile()) - if err != nil { - renderErr(w, err) - return - } - user, err := flypg.ReadFromFile(s.node.RepMgr.UserConfigFile()) + all, err := s.node.RepMgr.CurrentConfig() if err != nil { renderErr(w, err) return } - all := map[string]interface{}{} - - for k, v := range internal { - all[k] = v - } - for k, v := range user { - all[k] = v - } - var in []string if err := json.NewDecoder(r.Body).Decode(&in); err != nil { diff --git a/internal/flypg/flypg.go b/internal/flypg/flypg.go index 515eb3e3..c25e8688 100644 --- a/internal/flypg/flypg.go +++ b/internal/flypg/flypg.go @@ -55,6 +55,28 @@ func (c *FlyPGConfig) UserConfigFile() string { return c.userConfigFilePath } +func (c *FlyPGConfig) CurrentConfig() (map[string]interface{}, error) { + internal, err := ReadFromFile(c.InternalConfigFile()) + if err != nil { + return nil, err + } + user, err := ReadFromFile(c.UserConfigFile()) + if err != nil { + return nil, err + } + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + return all, nil +} + func (c *FlyPGConfig) initialize() error { c.SetDefaults() diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index 050c4cb0..b6fbea8b 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -55,6 +55,28 @@ func (c *PGConfig) UserConfigFile() string { return c.userConfigFilePath } +func (c *PGConfig) CurrentConfig() (map[string]interface{}, error) { + internal, err := ReadFromFile(c.InternalConfigFile()) + if err != nil { + return nil, err + } + user, err := ReadFromFile(c.UserConfigFile()) + if err != nil { + return nil, err + } + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + return all, nil +} + func NewConfig(dataDir string, port int) *PGConfig { return &PGConfig{ dataDir: dataDir, diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 73526d04..50f726aa 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -58,6 +58,28 @@ func (r *RepMgr) SetUserConfig(configMap ConfigMap) { r.userConfig = configMap } +func (r *RepMgr) CurrentConfig() (map[string]interface{}, error) { + internal, err := ReadFromFile(r.InternalConfigFile()) + if err != nil { + return nil, err + } + user, err := ReadFromFile(r.UserConfigFile()) + if err != nil { + return nil, err + } + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + return all, nil +} + func (r *RepMgr) ConsulKey() string { return "repmgr" } From 8cd58e1ede0415a5735caeeb523d0d1e6331b290 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Mon, 6 Feb 2023 18:04:27 -0600 Subject: [PATCH 4/4] Cleanup --- internal/flypg/config.go | 1 + internal/flypg/flypg.go | 4 ++-- internal/flypg/pg.go | 4 ++-- internal/flypg/pgbouncer.go | 4 ++-- internal/flypg/repmgr.go | 4 ++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/flypg/config.go b/internal/flypg/config.go index 87c48ba7..6702f528 100644 --- a/internal/flypg/config.go +++ b/internal/flypg/config.go @@ -19,6 +19,7 @@ type Config interface { UserConfig() ConfigMap SetUserConfig(configMap ConfigMap) ConsulKey() string + CurrentConfig() (ConfigMap, error) } func WriteUserConfig(c Config, consul *state.Store) error { diff --git a/internal/flypg/flypg.go b/internal/flypg/flypg.go index c25e8688..ef362526 100644 --- a/internal/flypg/flypg.go +++ b/internal/flypg/flypg.go @@ -55,7 +55,7 @@ func (c *FlyPGConfig) UserConfigFile() string { return c.userConfigFilePath } -func (c *FlyPGConfig) CurrentConfig() (map[string]interface{}, error) { +func (c *FlyPGConfig) CurrentConfig() (ConfigMap, error) { internal, err := ReadFromFile(c.InternalConfigFile()) if err != nil { return nil, err @@ -65,7 +65,7 @@ func (c *FlyPGConfig) CurrentConfig() (map[string]interface{}, error) { return nil, err } - all := map[string]interface{}{} + all := ConfigMap{} for k, v := range internal { all[k] = v diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index b6fbea8b..eba4229a 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -55,7 +55,7 @@ func (c *PGConfig) UserConfigFile() string { return c.userConfigFilePath } -func (c *PGConfig) CurrentConfig() (map[string]interface{}, error) { +func (c *PGConfig) CurrentConfig() (ConfigMap, error) { internal, err := ReadFromFile(c.InternalConfigFile()) if err != nil { return nil, err @@ -65,7 +65,7 @@ func (c *PGConfig) CurrentConfig() (map[string]interface{}, error) { return nil, err } - all := map[string]interface{}{} + all := ConfigMap{} for k, v := range internal { all[k] = v diff --git a/internal/flypg/pgbouncer.go b/internal/flypg/pgbouncer.go index ab8e7040..9383c04b 100644 --- a/internal/flypg/pgbouncer.go +++ b/internal/flypg/pgbouncer.go @@ -74,7 +74,7 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload return nil } -func (p *PGBouncer) CurrentConfig() (map[string]interface{}, error) { +func (p *PGBouncer) CurrentConfig() (ConfigMap, error) { internal, err := ReadFromFile(p.InternalConfigFile()) if err != nil { return nil, err @@ -84,7 +84,7 @@ func (p *PGBouncer) CurrentConfig() (map[string]interface{}, error) { return nil, err } - all := map[string]interface{}{} + all := ConfigMap{} for k, v := range internal { all[k] = v diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 50f726aa..bd555b08 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -58,7 +58,7 @@ func (r *RepMgr) SetUserConfig(configMap ConfigMap) { r.userConfig = configMap } -func (r *RepMgr) CurrentConfig() (map[string]interface{}, error) { +func (r *RepMgr) CurrentConfig() (ConfigMap, error) { internal, err := ReadFromFile(r.InternalConfigFile()) if err != nil { return nil, err @@ -68,7 +68,7 @@ func (r *RepMgr) CurrentConfig() (map[string]interface{}, error) { return nil, err } - all := map[string]interface{}{} + all := ConfigMap{} for k, v := range internal { all[k] = v