From 285f480c01306b9cc7a4fb94dcc60dd83fd88a13 Mon Sep 17 00:00:00 2001 From: futuarmo Date: Thu, 28 Apr 2022 13:40:08 +0300 Subject: [PATCH] Cluster configuration support (#130) * Add several hosts separated by ; * Reconnect on host change * Issue #130: fix error --- backend/datasource.go | 84 ++++++++++++++++++++++++++----------------- backend/plugin.go | 13 +------ 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/backend/datasource.go b/backend/datasource.go index 4e689fb..6e21733 100644 --- a/backend/datasource.go +++ b/backend/datasource.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "strconv" "time" @@ -23,7 +24,6 @@ type CassandraDatasource struct { resourceHandler backend.CallResourceHandler builder *QueryBuilder processor *QueryProcessor - sessions map[int]*gocql.Session session *gocql.Session } @@ -43,7 +43,7 @@ func NewDataSource() *CassandraDatasource { } func (ds *CassandraDatasource) handleKeyspaces(rw http.ResponseWriter, req *http.Request) { - ds.logger.Info("Process 'keyspaces' request") + ds.logger.Debug("Process 'keyspaces' request") ctx := httpadapter.PluginConfigFromContext(req.Context()) @@ -59,7 +59,7 @@ func (ds *CassandraDatasource) handleKeyspaces(rw http.ResponseWriter, req *http } func (ds *CassandraDatasource) handleTables(rw http.ResponseWriter, req *http.Request) { - ds.logger.Info("Process 'tables' request") + ds.logger.Debug("Process 'tables' request") ctx := httpadapter.PluginConfigFromContext(req.Context()) @@ -127,10 +127,10 @@ func (ds *CassandraDatasource) HandleMetricQueries(ctx context.Context, req *bac } func (ds *CassandraDatasource) handleMetricQuery(ctx *backend.PluginContext, query backend.DataQuery) backend.DataResponse { - _, err := ds.connect(ctx) + err := ds.connectIfNeeded(ctx) if err != nil { ds.logger.Warn("Failed to connect", "Message", err) - return dataResponse(data.Frames{}, fmt.Errorf("Failed to connect to server, please inspect Grafana server log for details")) + return dataResponse(data.Frames{}, errors.New("Failed to connect to server, please inspect Grafana server log for details")) } frames, err := ds.metricQuery(&query) @@ -139,7 +139,7 @@ func (ds *CassandraDatasource) handleMetricQuery(ctx *backend.PluginContext, que } func (ds *CassandraDatasource) getConnected(ctx *backend.PluginContext, getData func() ([]string, error)) ([]string, error) { - _, err := ds.connect(ctx) + err := ds.connectIfNeeded(ctx) if err != nil { ds.logger.Warn("Failed to connect", "Message", err) @@ -223,14 +223,21 @@ func (ds *CassandraDatasource) getKeyspaces(ctx *backend.PluginContext) ([]strin return ds.getConnected(ctx, getKeyspaces) } -func (ds *CassandraDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) error { - _, err := ds.connect(&req.PluginContext) +func (ds *CassandraDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + err := ds.connect(&req.PluginContext) if err != nil { ds.logger.Warn("Failed to connect", "Message", err) - return fmt.Errorf("Failed to connect to server, please inspect Grafana server log for details") + + return &backend.CheckHealthResult{ + Status: backend.HealthStatusError, + Message: "Failed to connect to server, please inspect Grafana server log for details", + }, nil } - return nil + return &backend.CheckHealthResult{ + Status: backend.HealthStatusOk, + Message: "Database connected", + }, nil } func processQueries(req *backend.QueryDataRequest, handler func(*backend.PluginContext, backend.DataQuery) backend.DataResponse) backend.Responses { @@ -247,7 +254,8 @@ func (ds *CassandraDatasource) metricQuery(query *backend.DataQuery) (data.Frame err := json.Unmarshal(query.JSON, &cassQuery) if err != nil { ds.logger.Error("Failed to parse queries", "Message", err) - return nil, fmt.Errorf("Failed to parse queries, please inspect Grafana server log for details") + + return nil, errors.New("Failed to parse queries, please inspect Grafana server log for details") } if cassQuery.RawQuery { @@ -267,35 +275,44 @@ func (ds *CassandraDatasource) getRequestOptions(jsonData []byte) (DataSourceOpt err := json.Unmarshal(jsonData, &options) if err != nil { ds.logger.Error("Failed to parse request", "Message", err) - return options, fmt.Errorf("Failed to parse request, please inspect Grafana server log for details") + + return options, errors.New("Failed to parse request, please inspect Grafana server log for details") } return options, nil } -func (ds *CassandraDatasource) connect(context *backend.PluginContext) (bool, error) { - options, err := ds.getRequestOptions(context.DataSourceInstanceSettings.JSONData) - if err != nil { - ds.logger.Error("Failed to parse connection parameters", "Message", err) - return false, fmt.Errorf("Failed to parse connection parameters, please inspect Grafana server log for details") +func (ds *CassandraDatasource) connectIfNeeded(context *backend.PluginContext) error { + if ds.session != nil { + return nil } - datasourceID := int(context.DataSourceInstanceSettings.ID) + return ds.connect(context) +} - if ds.sessions == nil { - ds.sessions = make(map[int]*gocql.Session) - } +func (ds *CassandraDatasource) connect(context *backend.PluginContext) error { + hosts := strings.Split(context.DataSourceInstanceSettings.URL, ";") - if ds.sessions[datasourceID] != nil { - ds.session = ds.sessions[datasourceID] + err := ds.tryToConnect(hosts, context) + if err != nil { + ds.logger.Error("Failed to connect", "Message", err) - return true, nil + return errors.New("Failed to connect, please inspect Grafana server log for details") } - host := context.DataSourceInstanceSettings.URL + return nil +} + +func (ds *CassandraDatasource) tryToConnect(hosts []string, context *backend.PluginContext) error { + options, err := ds.getRequestOptions(context.DataSourceInstanceSettings.JSONData) + if err != nil { + ds.logger.Error("Failed to parse connection parameters", "Message", err) - ds.logger.Debug(fmt.Sprintf("Connecting to %s...\n", host)) + return errors.New("Failed to parse connection parameters, please inspect Grafana server log for details") + } - cluster := gocql.NewCluster(host) + ds.logger.Debug("Connecting", "Hosts", hosts) + + cluster := gocql.NewCluster(hosts...) if options.Timeout != nil { cluster.Timeout = time.Duration(*options.Timeout) * time.Second @@ -304,7 +321,8 @@ func (ds *CassandraDatasource) connect(context *backend.PluginContext) (bool, er consistency, err := parseConsistency(options.Consistency) if err != nil { ds.logger.Error("Failed to parse consistency", "Message", err, "Consistency", options.Consistency) - return false, fmt.Errorf("Failed to parse consistency, please inspect Grafana server log for details") + + return errors.New("Failed to parse consistency, please inspect Grafana server log for details") } cluster.Consistency = consistency @@ -321,7 +339,8 @@ func (ds *CassandraDatasource) connect(context *backend.PluginContext) (bool, er tlsConfig, err := PrepareTLSCfg(options.CertPath, options.RootPath, options.CaPath, options.AllowInsecureTLS) if err != nil { ds.logger.Error("Failed to create TLS config", "Message", err) - return false, fmt.Errorf("Failed to create TLS config, please inspect Grafana server log for details") + + return errors.New("Failed to create TLS config, please inspect Grafana server log for details") } cluster.SslOpts = &gocql.SslOptions{Config: tlsConfig} @@ -330,14 +349,13 @@ func (ds *CassandraDatasource) connect(context *backend.PluginContext) (bool, er session, err := cluster.CreateSession() if err != nil { ds.logger.Warn("Failed to create session", "Message", err) - return false, fmt.Errorf("Failed to create Cassandra session, please inspect Grafana server log for details") + + return errors.New("Failed to create Cassandra session, please inspect Grafana server log for details") } - ds.sessions[datasourceID] = session ds.session = session - ds.logger.Debug("Connection successful") - return true, nil + return nil } func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) { diff --git a/backend/plugin.go b/backend/plugin.go index 3f3dece..7249ddd 100644 --- a/backend/plugin.go +++ b/backend/plugin.go @@ -59,18 +59,7 @@ func (handler *Handler) CheckHealth(ctx context.Context, req *backend.CheckHealt }, nil } - err = datasource.CheckHealth(ctx, req) - if err != nil { - return &backend.CheckHealthResult{ - Status: backend.HealthStatusError, - Message: fmt.Sprintf("Connection test failed, error = %v", err), - }, nil - } - - return &backend.CheckHealthResult{ - Status: backend.HealthStatusOk, - Message: "Database connected", - }, nil + return datasource.CheckHealth(ctx, req) } func NewHandler() *Handler {