Skip to content

Commit

Permalink
Added unified alerting support (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
unflag committed Oct 15, 2023
1 parent 120c3cb commit e33d252
Show file tree
Hide file tree
Showing 10 changed files with 650 additions and 75 deletions.
52 changes: 46 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Cassandra DataSource for Grafana
# Apache Cassandra Datasource for Grafana

Apache Cassandra Datasource for Grafana. This datasource is to visualise **time-series data** stored in Cassandra/DSE, if you are looking for Cassandra **metrics**, you may need [datastax/metric-collector-for-apache-cassandra](https://github.com/datastax/metric-collector-for-apache-cassandra) instead.
This datasource is to visualise **time-series data** stored in Cassandra/DSE, if you are looking for Cassandra **metrics**, you may need [datastax/metric-collector-for-apache-cassandra](https://github.com/datastax/metric-collector-for-apache-cassandra) instead.

![Release Status](https://github.com/HadesArchitect/GrafanaCassandraDatasource/workflows/Handle%20Release/badge.svg)
![CodeQL](https://github.com/HadesArchitect/grafana-cassandra-source/workflows/CodeQL/badge.svg?branch=master)
Expand All @@ -18,10 +18,28 @@ To see the datasource in action, please follow the [Quick Demo](https://github.c
* AWS Keyspaces (limited support) ([docs](https://github.com/HadesArchitect/GrafanaCassandraDatasource/wiki/AWS-Keyspaces))
* Linux, OSX (incl. M1), Windows

**Features**:
* Connect to Cassandra using auth credentials and TLS
* Query configurator
* Raw CQL query editor
* Table mode
* Annotations
* Alerting

**Contacts**:

* [![Discord Chat](https://img.shields.io/badge/discord-chat%20with%20us-green)](https://discord.gg/FU2Cb4KTyp)
* [![Github discussions](https://img.shields.io/badge/github-discussions-green)](https://github.com/HadesArchitect/GrafanaCassandraDatasource/discussions)
* [![Github discussions](https://img.shields.io/badge/github-discussions-green)](https://github.com/HadesArchitect/GrafanaCassandraDatasource/discussions)

**TOC**
- [About](#about)
- [Usage](#usage)
- [Installation](#installation)
- [Building query](#building-query)
- [Table Mode](#table-mode)
- [Alerting](#alerting)
- [Tips and tricks](#tips-and-tricks)
- [Development](#tips-and-tricks)

## Usage

Expand All @@ -36,7 +54,7 @@ You can find more detailed instructions in [the datasource wiki](https://github.

![Datasource Configuration](https://user-images.githubusercontent.com/1742301/148654400-3ac4a477-8ca3-4606-86e7-5d10cbdc4ea9.png)

### Panel Setup
### Building query

There are **two ways** to query data from Cassandra/DSE, **Query Configurator** and **Query Editor**. Configurator is easier to use but has limited capabilities, Editor is more powerful but requires understanding of [CQL](https://cassandra.apache.org/doc/latest/cql/).

Expand Down Expand Up @@ -106,7 +124,7 @@ SELECT sensor_id, temperature, registered_at, location FROM test.test WHERE sens

![103153625-1fd85280-4792-11eb-9c00-085297802117](https://user-images.githubusercontent.com/1742301/148654522-8e50617d-0ba9-4c5a-a3f0-7badec92e31f.png)

#### Table Mode
### Table Mode
In addition to TimeSeries mode datasource supports Table mode to draw tables using Cassandra query results. Use `Merge`, `Sort by`, `Organize fields` and other transformations to shape the table in any desirable way.
There are two ways to plot not a whole timeseries but only last(most rescent) values.
1. Inefficient way
Expand All @@ -120,7 +138,7 @@ AND registered_at > $__timeFrom and registered_at < $__timeTo
ORDER BY registered_at
LIMIT 1
```
Note that `WHERE IN ()` clause could not be used with `ORDER BY`, so query must be duplicated for additional `sensor_id`.
Note that `WHERE IN ()` clause could not be used with `ORDER BY`, so query must be duplicated for any additional `sensor_id`.

2. Efficient way

Expand All @@ -144,6 +162,28 @@ PER PARTITION LIMIT 1
```
Note that `PER PARTITION LIMIT 1` used instead of `LIMIT 1` to query one row for each partition and not just one row total.

### Annotations
[Grafana Annotations documentation](https://grafana.com/docs/grafana/latest/dashboards/build-dashboards/annotate-visualizations/)

### Alerting
Alerting is supported, however it has some limitations. Grafana does not support long(narrow) series in alerting
and query must select only three fields. It has to be some id, numeric value and timestamp:
```
SELECT sensor_id, temperature, registered_at
FROM test.test
WHERE sensor_id IN (99051fe9-6a9c-46c2-b949-38ef78858dd0, 99051fe9-6a9c-46c2-b949-38ef78858dd0)
AND registered_at > $__timeFrom AND registered_at < $__timeTo
```
Otherwise grafana will return an error:
```
Failed to evaluate queries and expressions: [sse.readDataError] [A] got error: input data must be a wide series but got type long (input refid)
```
More information on series types in [grafana developers documentation](https://grafana.com/developers/plugin-tools/introduction/data-frames#data-frames-as-time-series).

[Grafana Alerting documentation](https://grafana.com/docs/grafana/latest/alerting/alerting-rules/create-grafana-managed-rule/)

### Tips and tricks

#### Unix epoch time format
Usually there are no problems - Cassandra can store timestamps using different formats as shown in [documentation](https://cassandra.apache.org/doc/latest/cassandra/cql/types.html#timestamps).
However, it is not always enough. One of possible cases could be unix time, which is just number of seconds or milliseconds and usually stored as integer type.
Expand Down
29 changes: 29 additions & 0 deletions backend/handler/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@ package handler
import (
"encoding/json"
"fmt"
"regexp"
"time"

"github.com/HadesArchitect/GrafanaCassandraDatasource/backend/plugin"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)

var (
timeFromRegexp = regexp.MustCompile(`\$__timeFrom`)
timeToRegexp = regexp.MustCompile(`\$__timeTo`)
unixEpochFromRegexp = regexp.MustCompile(`\$__unixEpochFrom`)
unixEpochToRegexp = regexp.MustCompile(`\$__unixEpochTo`)
)

const (
queryTypeQuery = "query"
queryTypeAlert = "alert"
)

type dataQuery struct {
DatasourceID int `json:"datasourceId"`
QueryType string `json:"queryType"`
Expand Down Expand Up @@ -35,6 +49,10 @@ func parseDataQuery(q *backend.DataQuery) (*plugin.Query, error) {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
}

if dq.RawQuery {
dq.applyTimeRange(q.TimeRange.From, q.TimeRange.To)
}

return &plugin.Query{
RawQuery: dq.RawQuery,
Target: dq.Target,
Expand All @@ -49,5 +67,16 @@ func parseDataQuery(q *backend.DataQuery) (*plugin.Query, error) {
TimeTo: q.TimeRange.To,
AllowFiltering: dq.AllowFiltering,
Instant: dq.Instant,
IsAlertQuery: dq.QueryType == queryTypeAlert,
}, nil
}

func (dq *dataQuery) applyTimeRange(from time.Time, to time.Time) {
query := []byte(dq.Target)
query = timeFromRegexp.ReplaceAll(query, []byte(fmt.Sprintf("%d", from.UnixMilli())))
query = timeToRegexp.ReplaceAll(query, []byte(fmt.Sprintf("%d", to.UnixMilli())))
query = unixEpochFromRegexp.ReplaceAll(query, []byte(fmt.Sprintf("%d", from.Unix())))
query = unixEpochToRegexp.ReplaceAll(query, []byte(fmt.Sprintf("%d", to.Unix())))

dq.Target = string(query)
}
23 changes: 23 additions & 0 deletions backend/handler/data_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ func Test_parseDataQuery(t *testing.T) {
Instant: false,
},
},
{
name: "alert query",
timeRange: backend.TimeRange{From: time.Unix(1257894000, 0), To: time.Unix(1257894010, 0)},
jsonStr: []byte(`{"datasourceId": 1, "queryType": "alert", "rawQuery": true, "refId": "123456789",
"target": "SELECT * from Keyspace.Table", "columnTime": "Time", "columnValue": "Value",
"keyspace": "Keyspace", "table": "Table", "columnId": "ID", "valueId": "123"}`),
want: &plugin.Query{
RawQuery: true,
Target: "SELECT * from Keyspace.Table",
Keyspace: "Keyspace",
Table: "Table",
ColumnValue: "Value",
ColumnID: "ID",
ValueID: "123",
AliasID: "",
ColumnTime: "Time",
TimeFrom: time.Unix(1257894000, 0),
TimeTo: time.Unix(1257894010, 0),
AllowFiltering: false,
Instant: false,
IsAlertQuery: true,
},
},
}

for _, tc := range testCases {
Expand Down
2 changes: 2 additions & 0 deletions backend/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func New(fn datasource.InstanceFactoryFunc) datasource.ServeOpts {
// QueryDataHandler
queryTypeMux := datasource.NewQueryTypeMux()
queryTypeMux.HandleFunc("query", h.queryMetricData)
queryTypeMux.HandleFunc("alert", h.queryMetricData)

return datasource.ServeOpts{
CheckHealthHandler: h,
Expand All @@ -59,6 +60,7 @@ func (h *handler) queryMetricData(ctx context.Context, req *backend.QueryDataReq

responses := backend.Responses{}
for _, q := range req.Queries {
backend.Logger.Debug("Process metrics request", "Request", q.JSON)
cassQuery, err := parseDataQuery(&q)
if err != nil {
backend.Logger.Error("Failed to parse query", "Message", err)
Expand Down
104 changes: 88 additions & 16 deletions backend/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ func (p *Plugin) execRawMetricQuery(ctx context.Context, q *Query) (data.Frames,
return nil, fmt.Errorf("repo.Select: %w", err)
}

var frames data.Frames
for id, points := range rows {
frame := makeDataFrameFromPoints(id, q.AliasID, points)
frames = append(frames, frame)
}

return frames, nil
return makeDataFrames(q, rows), nil
}

// execStrictMetricQuery executes repository ExecStrictQuery method and transforms reposonse to data.Frames.
Expand All @@ -80,13 +74,7 @@ func (p *Plugin) execStrictMetricQuery(ctx context.Context, q *Query) (data.Fram
return nil, fmt.Errorf("repo.ExecStrictQuery: %w", err)
}

var frames data.Frames
for id, points := range rows {
frame := makeDataFrameFromPoints(id, q.AliasID, points)
frames = append(frames, frame)
}

return frames, nil
return makeDataFrames(q, rows), nil
}

// GetKeyspaces fetches and returns Cassandra's list of keyspaces.
Expand Down Expand Up @@ -143,14 +131,29 @@ func splitIDs(s string) []string {
return ids
}

// makeDataFrameFromPoints creates data frames from time series points returned by repository.
func makeDataFrameFromPoints(id string, alias string, rows []cassandra.Row) *data.Frame {
func makeDataFrames(q *Query, rows map[string][]cassandra.Row) data.Frames {
var frames data.Frames
for id, points := range rows {
frame := makeDataFrameFromRows(id, q.AliasID, points)
if q.IsAlertQuery {
// alerting doesn't support narrow frames
frame = narrowFrameToWideFrame(frame)
}
frames = append(frames, frame)
}

return frames
}

// makeDataFrameFromRows creates data frames from time series points returned by repository.
func makeDataFrameFromRows(id string, alias string, rows []cassandra.Row) *data.Frame {
if len(rows) == 0 {
return nil
}

frame := data.NewFrame(id, nil)

// use first of the returned rows to interpolate legend alias.
alias = formatAlias(alias, rows[0].Fields)
fields := make([]*data.Field, 0, len(rows[0].Columns))
for _, colName := range rows[0].Columns {
Expand All @@ -174,6 +177,7 @@ func makeDataFrameFromPoints(id string, alias string, rows []cassandra.Row) *dat
return frame
}

// formatAlias performs legend alies interpolation.
func formatAlias(alias string, values map[string]interface{}) string {
formattedAlias := aliasFormatRegexp.ReplaceAllFunc([]byte(alias), func(in []byte) []byte {
fieldName := strings.Replace(string(in), "{{", "", 1)
Expand All @@ -200,3 +204,71 @@ func formatAlias(alias string, values map[string]interface{}) string {

return string(formattedAlias)
}

// narrowFrameToWideFrame performs rudimentary frames conversion from narrow to wide format.
// It puts non-TS fields to labels and removes from fields list. Conflicting labels are replaced.
// Any other field is ignored and could cause grafana alerting error during alert query execution.
// https://grafana.com/developers/plugin-tools/introduction/data-frames#data-frames-as-time-series
func narrowFrameToWideFrame(frame *data.Frame) *data.Frame {
if len(frame.Fields) == 0 {
return frame
}

labels := makeLabelsFromNonTSFields(frame)
for i := 0; i < len(frame.Fields); i++ {
if frame.Fields[i].Type().Numeric() {
if frame.Fields[i].Labels == nil {
frame.Fields[i].Labels = make(map[string]string, len(labels))
}
for k, v := range labels {
frame.Fields[i].Labels[k] = v
}
}
}

return removeNonTSFields(frame)
}

// makeLabelsFromNonTSFields creates map of labels and their corresponding
// values from all fields that are not numeric or timestamps. Only first
// value from each field used as label value.
func makeLabelsFromNonTSFields(frame *data.Frame) map[string]string {
labels := make(map[string]string)
if frame == nil || len(frame.Fields) == 0 || frame.Fields[0].Len() == 0 {
return labels
}

for _, f := range frame.Fields {
if !f.Type().Numeric() && !f.Type().Time() {
labels[f.Name] = fmt.Sprintf("%v", f.CopyAt(0))
}
}

return labels
}

// removeNonTSFields deletes all fields that are not numeric or timestamps
// from frame. These values should be previously stored to labels
// using makeLabelsFromNonTSFields method. Result is not stable, e.g.
// elements can change their positions during filtration.
func removeNonTSFields(frame *data.Frame) *data.Frame {
if frame == nil {
return nil
}

i, j := 0, len(frame.Fields)-1
for i <= j {
// keep numeric and timestamp fields
if frame.Fields[i].Type().Numeric() || frame.Fields[i].Type().Time() {
i++
continue
}
// move all the other fields to the end of the Fields slice
frame.Fields[i], frame.Fields[j] = frame.Fields[j], frame.Fields[i]
j--
}

frame.Fields = frame.Fields[0:i]

return frame
}
Loading

0 comments on commit e33d252

Please sign in to comment.