Skip to content

Commit

Permalink
fingerprint: convert consul and vault fingerprinters to be reloadable
Browse files Browse the repository at this point in the history
This PR changes the Consul and Vault fingerprint implementations to be
reloadable rather than periodic. Reasons described in the issue.

Closes: #24049
  • Loading branch information
shoenig committed Dec 4, 2024
1 parent 76e39b1 commit 21ed04c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 190 deletions.
122 changes: 46 additions & 76 deletions client/fingerprint/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr"
"github.com/hashicorp/go-version"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
Expand All @@ -28,34 +26,34 @@ var (
// perform different fingerprinting depending on which version of Consul it
// is communicating with.
consulGRPCPortChangeVersion = version.Must(version.NewVersion("1.14.0"))

// consulBaseFingerprintInterval is the initial interval for periodic
// fingerprinting
consulBaseFingerprintInterval = 15 * time.Second
)

// ConsulFingerprint is used to fingerprint for Consul
type ConsulFingerprint struct {
logger log.Logger
states map[string]*consulFingerprintState
logger hclog.Logger

// clusters maintains the latest fingerprinted state for each cluster
// defined in nomad consul client configuration(s).
clusters map[string]*consulState
}

type consulFingerprintState struct {
client *consulapi.Client
isAvailable bool
extractors map[string]consulExtractor
nextCheck time.Time
type consulState struct {
client *consulapi.Client

// readers associates a function used to parse the value associated
// with the given key from a consul api response
readers map[string]valueReader
}

// consulExtractor is used to parse out one attribute from consulInfo. Returns
// valueReader is used to parse out one attribute from consulInfo. Returns
// the value of the attribute, and whether the attribute exists.
type consulExtractor func(agentconsul.Self) (string, bool)
type valueReader func(agentconsul.Self) (string, bool)

// NewConsulFingerprint is used to create a Consul fingerprint
func NewConsulFingerprint(logger log.Logger) Fingerprint {
func NewConsulFingerprint(logger hclog.Logger) Fingerprint {
return &ConsulFingerprint{
logger: logger.Named("consul"),
states: map[string]*consulFingerprintState{},
logger: logger.Named("consul"),
clusters: map[string]*consulState{},
}
}

Expand All @@ -73,16 +71,12 @@ func (f *ConsulFingerprint) Fingerprint(req *FingerprintRequest, resp *Fingerpri
}

func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *FingerprintResponse) error {

logger := f.logger.With("cluster", cfg.Name)

state, ok := f.states[cfg.Name]
state, ok := f.clusters[cfg.Name]
if !ok {
state = &consulFingerprintState{}
f.states[cfg.Name] = state
}
if state.nextCheck.After(time.Now()) {
return nil
state = &consulState{}
f.clusters[cfg.Name] = state
}

if err := state.initialize(cfg, logger); err != nil {
Expand All @@ -92,12 +86,13 @@ func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *Fing
// query consul for agent self api
info := state.query(logger)
if len(info) == 0 {
// unable to reach consul, nothing to do this time
// unable to reach consul, clear out existing attributes
resp.Detected = true
return nil
}

// apply the extractor for each attribute
for attr, extractor := range state.extractors {
for attr, extractor := range state.readers {
if s, ok := extractor(info); !ok {
logger.Warn("unable to fingerprint consul", "attribute", attr)
} else if s != "" {
Expand All @@ -108,38 +103,18 @@ func (f *ConsulFingerprint) fingerprintImpl(cfg *config.ConsulConfig, resp *Fing
// create link for consul
f.link(resp)

// indicate Consul is now available
if !state.isAvailable {
logger.Info("consul agent is available")
}

// Widen the minimum window to the next check so that if one out of a set of
// Consuls is unhealthy we don't greatly increase requests to the healthy
// ones. This is less than the minimum window if all Consuls are healthy so
// that we don't desync from the larger window provided by Periodic
state.nextCheck = time.Now().Add(29 * time.Second)
state.isAvailable = true
resp.Detected = true
return nil
}

func (f *ConsulFingerprint) Periodic() (bool, time.Duration) {
if len(f.states) == 0 {
return true, consulBaseFingerprintInterval
}
for _, state := range f.states {
if !state.isAvailable {
return true, consulBaseFingerprintInterval
}
}

// Once all Consuls are initially discovered and healthy we fingerprint with
// a wide jitter to avoid thundering herds of fingerprints against central
// Consul servers.
return true, (30 * time.Second) + helper.RandomStagger(90*time.Second)
return false, 0
}

func (cfs *consulFingerprintState) initialize(cfg *config.ConsulConfig, logger hclog.Logger) error {
// Reload satisfies ReloadableFingerprint.
func (f *ConsulFingerprint) Reload() {}

func (cfs *consulState) initialize(cfg *config.ConsulConfig, logger hclog.Logger) error {
if cfs.client != nil {
return nil // already initialized!
}
Expand All @@ -155,7 +130,7 @@ func (cfs *consulFingerprintState) initialize(cfg *config.ConsulConfig, logger h
}

if cfg.Name == structs.ConsulDefaultCluster {
cfs.extractors = map[string]consulExtractor{
cfs.readers = map[string]valueReader{
"consul.server": cfs.server,
"consul.version": cfs.version,
"consul.sku": cfs.sku,
Expand All @@ -171,7 +146,7 @@ func (cfs *consulFingerprintState) initialize(cfg *config.ConsulConfig, logger h
"consul.dns.addr": cfs.dnsAddr(logger),
}
} else {
cfs.extractors = map[string]consulExtractor{
cfs.readers = map[string]valueReader{
fmt.Sprintf("consul.%s.server", cfg.Name): cfs.server,
fmt.Sprintf("consul.%s.version", cfg.Name): cfs.version,
fmt.Sprintf("consul.%s.sku", cfg.Name): cfs.sku,
Expand All @@ -190,17 +165,12 @@ func (cfs *consulFingerprintState) initialize(cfg *config.ConsulConfig, logger h
return nil
}

func (cfs *consulFingerprintState) query(logger hclog.Logger) agentconsul.Self {
func (cfs *consulState) query(logger hclog.Logger) agentconsul.Self {
// We'll try to detect consul by making a query to to the agent's self API.
// If we can't hit this URL consul is probably not running on this machine.
info, err := cfs.client.Agent().Self()
if err != nil {
// indicate consul no longer available
if cfs.isAvailable {
logger.Info("consul agent is unavailable", "error", err)
}
cfs.isAvailable = false
cfs.nextCheck = time.Time{} // force check on next interval
logger.Warn("failed to acquire consul self endpoint", "error", err)
return nil
}
return info
Expand All @@ -216,36 +186,36 @@ func (f *ConsulFingerprint) link(resp *FingerprintResponse) {
}
}

func (cfs *consulFingerprintState) server(info agentconsul.Self) (string, bool) {
func (cfs *consulState) server(info agentconsul.Self) (string, bool) {
s, ok := info["Config"]["Server"].(bool)
return strconv.FormatBool(s), ok
}

func (cfs *consulFingerprintState) version(info agentconsul.Self) (string, bool) {
func (cfs *consulState) version(info agentconsul.Self) (string, bool) {
v, ok := info["Config"]["Version"].(string)
return v, ok
}

func (cfs *consulFingerprintState) sku(info agentconsul.Self) (string, bool) {
func (cfs *consulState) sku(info agentconsul.Self) (string, bool) {
return agentconsul.SKU(info)
}

func (cfs *consulFingerprintState) revision(info agentconsul.Self) (string, bool) {
func (cfs *consulState) revision(info agentconsul.Self) (string, bool) {
r, ok := info["Config"]["Revision"].(string)
return r, ok
}

func (cfs *consulFingerprintState) name(info agentconsul.Self) (string, bool) {
func (cfs *consulState) name(info agentconsul.Self) (string, bool) {
n, ok := info["Config"]["NodeName"].(string)
return n, ok
}

func (cfs *consulFingerprintState) dc(info agentconsul.Self) (string, bool) {
func (cfs *consulState) dc(info agentconsul.Self) (string, bool) {
d, ok := info["Config"]["Datacenter"].(string)
return d, ok
}

func (cfs *consulFingerprintState) segment(info agentconsul.Self) (string, bool) {
func (cfs *consulState) segment(info agentconsul.Self) (string, bool) {
tags, tagsOK := info["Member"]["Tags"].(map[string]interface{})
if !tagsOK {
return "", false
Expand All @@ -254,12 +224,12 @@ func (cfs *consulFingerprintState) segment(info agentconsul.Self) (string, bool)
return s, ok
}

func (cfs *consulFingerprintState) connect(info agentconsul.Self) (string, bool) {
func (cfs *consulState) connect(info agentconsul.Self) (string, bool) {
c, ok := info["DebugConfig"]["ConnectEnabled"].(bool)
return strconv.FormatBool(c), ok
}

func (cfs *consulFingerprintState) grpc(scheme string, logger hclog.Logger) func(info agentconsul.Self) (string, bool) {
func (cfs *consulState) grpc(scheme string, logger hclog.Logger) func(info agentconsul.Self) (string, bool) {
return func(info agentconsul.Self) (string, bool) {

// The version is needed in order to understand which config object to
Expand Down Expand Up @@ -294,24 +264,24 @@ func (cfs *consulFingerprintState) grpc(scheme string, logger hclog.Logger) func
}
}

func (cfs *consulFingerprintState) grpcPort(info agentconsul.Self) (string, bool) {
func (cfs *consulState) grpcPort(info agentconsul.Self) (string, bool) {
p, ok := info["DebugConfig"]["GRPCPort"].(float64)
return fmt.Sprintf("%d", int(p)), ok
}

func (cfs *consulFingerprintState) grpcTLSPort(info agentconsul.Self) (string, bool) {
func (cfs *consulState) grpcTLSPort(info agentconsul.Self) (string, bool) {
p, ok := info["DebugConfig"]["GRPCTLSPort"].(float64)
return fmt.Sprintf("%d", int(p)), ok
}

func (cfs *consulFingerprintState) dnsPort(info agentconsul.Self) (string, bool) {
func (cfs *consulState) dnsPort(info agentconsul.Self) (string, bool) {
p, ok := info["DebugConfig"]["DNSPort"].(float64)
return fmt.Sprintf("%d", int(p)), ok
}

// dnsAddr fingerprints the Consul DNS address, but only if Nomad can use it
// usefully to provide an iptables rule to a task
func (cfs *consulFingerprintState) dnsAddr(logger hclog.Logger) func(info agentconsul.Self) (string, bool) {
func (cfs *consulState) dnsAddr(logger hclog.Logger) func(info agentconsul.Self) (string, bool) {
return func(info agentconsul.Self) (string, bool) {

var listenOnEveryIP bool
Expand Down Expand Up @@ -382,11 +352,11 @@ func (cfs *consulFingerprintState) dnsAddr(logger hclog.Logger) func(info agentc
}
}

func (cfs *consulFingerprintState) namespaces(info agentconsul.Self) (string, bool) {
func (cfs *consulState) namespaces(info agentconsul.Self) (string, bool) {
return strconv.FormatBool(agentconsul.Namespaces(info)), true
}

func (cfs *consulFingerprintState) partition(info agentconsul.Self) (string, bool) {
func (cfs *consulState) partition(info agentconsul.Self) (string, bool) {
sku, ok := agentconsul.SKU(info)
if ok && sku == "ent" {
p, ok := info["Config"]["Partition"].(string)
Expand Down
Loading

0 comments on commit 21ed04c

Please sign in to comment.