Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: external state storage for netflow/ipfix template & sampling rate data #289

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6472b4c
include ObservationDomainId in netflowv9
iqbalaydrus Jan 26, 2024
44cfde2
ip list renderer
iqbalaydrus Jan 26, 2024
41bfb86
Merge branch 'iqbal/netflowv9-obsdomid'
iqbalaydrus Jan 26, 2024
6d32e0c
allocate early
iqbalaydrus Jan 26, 2024
35a72e7
Merge branch 'iqbal/ip-list-renderer'
iqbalaydrus Jan 26, 2024
a4f42be
actually render is capable of rendering slice's element
iqbalaydrus Jan 26, 2024
4ba9971
Merge branch 'iqbal/ip-list-renderer'
iqbalaydrus Jan 26, 2024
1c10c42
protection against uint64 overflow - some flow might have startime/en…
iqbalaydrus Jan 26, 2024
7bc1c2a
Merge branch 'iqbal/timeflow-overflow'
iqbalaydrus Jan 26, 2024
41e41c6
fix mpls label decoding
iqbalaydrus Jan 26, 2024
a002158
update mpls label test case to cover 3 stacks
iqbalaydrus Jan 26, 2024
b3b4ab0
Merge branch 'iqbal/timeflow-overflow'
iqbalaydrus Jan 29, 2024
7436e07
state engine
iqbalaydrus Feb 14, 2024
dda293f
sampling rate system
iqbalaydrus Feb 15, 2024
2a5832b
template system
iqbalaydrus Feb 15, 2024
b30e24f
Merge branch 'main' into iqbal/state
iqbalaydrus Feb 15, 2024
55691cc
import nits
iqbalaydrus Feb 15, 2024
bf6621a
remove badger as it has cli arg conflicts
iqbalaydrus Feb 15, 2024
6c546df
Revert "Merge branch 'main' into iqbal/state"
iqbalaydrus Feb 15, 2024
ccdc7d0
import nits
iqbalaydrus Feb 15, 2024
8d18a13
fix redis
iqbalaydrus Feb 15, 2024
03f0a2c
fix & benchmark
iqbalaydrus Feb 16, 2024
580b06e
remove unused codes
iqbalaydrus Feb 16, 2024
6defb08
move templates and sampling rate to the previous position
iqbalaydrus Feb 16, 2024
04b6135
changes nits
iqbalaydrus Feb 16, 2024
5264972
state documentation
iqbalaydrus Feb 16, 2024
8a66b09
state documentation
iqbalaydrus Feb 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ functions to marshal as JSON or text for instance.
The `transport` provides different way of processing the message. Either sending it via Kafka or
send it to a file (or stdout).

The `state` supports external storage for a way of synchronizing templates across multiple GoFlow2
instances.

GoFlow2 is a wrapper of all the functions and chains thems.

You can build your own collector using this base and replace parts:
* Use different transport (e.g: RabbitMQ instead of Kafka)
* Convert to another format (e.g: Cap'n Proto, Avro, instead of protobuf)
* Decode different samples (e.g: not only IP networks, add MPLS)
* Different metrics system (e.g: [OpenTelemetry](https://opentelemetry.io/))
* Other external state storage (e.g: RDBMS, MongoDB, memcached)

### Protocol difference

Expand Down Expand Up @@ -85,8 +89,8 @@ Production:
* Convert to protobuf or json
* Prints to the console/file
* Sends to Kafka and partition

Monitoring via Prometheus metrics
* Set up multiple GoFlow2 instances backed by the same external state storage
* Monitoring via Prometheus metrics

## Get started

Expand Down Expand Up @@ -165,6 +169,12 @@ $ ./goflow2 -listen 'sflow://:6343?count=4,nfl://:2055'

More information about workers and resource usage is avaialble on the [Performance page](/docs/performance.md).

When you have multiple GoFlow2 instances, it's important to enable external state storage.
```bash
$ ./goflow2 -state.netflow.templates redis://127.0.0.1:6379/0?prefix=nftemplate -state.sampling redis://127.0.0.1:6379/0?prefix=nfsampling
```
Details available on [State page](/docs/state_storage.md).

### Docker

You can also run directly with a container:
Expand Down
11 changes: 11 additions & 0 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

err = protoproducer.InitSamplingRate()
if err != nil {
log.Fatal(err)
}
defer protoproducer.CloseSamplingRate()
err = netflow.InitTemplates()
if err != nil {
log.Fatal(err)
}
defer netflow.CloseTemplates()

var receivers []*utils.UDPReceiver
var pipes []utils.FlowPipe

Expand Down
6 changes: 4 additions & 2 deletions decoders/netflow/netflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
)

func TestDecodeNetFlowV9(t *testing.T) {
templates := CreateTemplateSystem()
err := InitTemplates()
assert.NoError(t, err)
templates := CreateTemplateSystem("TestDecodeNetFlowV9")

// Decode a template
template := []byte{
Expand All @@ -23,7 +25,7 @@ func TestDecodeNetFlowV9(t *testing.T) {
}
buf := bytes.NewBuffer(template)
var decNfv9 NFv9Packet
err := DecodeMessageVersion(buf, templates, &decNfv9, nil)
err = DecodeMessageVersion(buf, templates, &decNfv9, nil)
assert.Nil(t, err)
assert.Equal(t,
NFv9Packet{
Expand Down
215 changes: 165 additions & 50 deletions decoders/netflow/templates.go
Original file line number Diff line number Diff line change
@@ -1,85 +1,200 @@
package netflow

import (
"encoding/json"
"errors"
"flag"
"fmt"
"net/url"
"reflect"
"strings"
"sync"

"github.com/netsampler/goflow2/v2/state"
)

var (
ErrorTemplateNotFound = fmt.Errorf("Error template not found")
StateTemplates = flag.String("state.netflow.templates", "memory://", fmt.Sprintf("Define state templates engine URL (available schemes: %s)", strings.Join(state.SupportedSchemes, ", ")))
templatesDB state.State[templatesKey, templatesValue]
templatesInitLock = new(sync.Mutex)
)

type FlowBaseTemplateSet map[uint64]interface{}

func templateKey(version uint16, obsDomainId uint32, templateId uint16) uint64 {
return (uint64(version) << 48) | (uint64(obsDomainId) << 16) | uint64(templateId)
}

// Store interface that allows storing, removing and retrieving template data
type NetFlowTemplateSystem interface {
RemoveTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error)
GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error)
AddTemplate(version uint16, obsDomainId uint32, templateId uint16, template interface{}) error
}

func (ts *BasicTemplateSystem) GetTemplates() FlowBaseTemplateSet {
ts.templateslock.RLock()
tmp := ts.templates
ts.templateslock.RUnlock()
return tmp
type templatesKey struct {
Key string `json:"key"`
Version uint16 `json:"ver"`
ObsDomainId uint32 `json:"obs"`
TemplateID uint16 `json:"tid"`
}

func (ts *BasicTemplateSystem) AddTemplate(version uint16, obsDomainId uint32, templateId uint16, template interface{}) error {
ts.templateslock.Lock()
defer ts.templateslock.Unlock()
const (
templateTypeTest = 0
templateTypeTemplateRecord = 1
templateTypeIPFIXOptionsTemplateRecord = 2
templateTypeNFv9OptionsTemplateRecord = 3
)

/*var templateId uint16
switch templateIdConv := template.(type) {
case IPFIXOptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case NFv9OptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case TemplateRecord:
templateId = templateIdConv.TemplateId
}*/
key := templateKey(version, obsDomainId, templateId)
ts.templates[key] = template
return nil
type templatesValue struct {
TemplateType int `json:"ttype"`
Data interface{} `json:"data"`
}

type templatesValueUnmarshal struct {
TemplateType int `json:"ttype"`
Data json.RawMessage `json:"data"`
}

func (ts *BasicTemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
ts.templateslock.RLock()
defer ts.templateslock.RUnlock()
key := templateKey(version, obsDomainId, templateId)
if template, ok := ts.templates[key]; ok {
return template, nil
func (t *templatesValue) UnmarshalJSON(bytes []byte) error {
var v templatesValueUnmarshal
err := json.Unmarshal(bytes, &v)
if err != nil {
return err
}
t.TemplateType = v.TemplateType
switch v.TemplateType {
case templateTypeTest:
var data int
err = json.Unmarshal(v.Data, &data)
if err != nil {
return err
}
t.Data = data
case templateTypeTemplateRecord:
var data TemplateRecord
err = json.Unmarshal(v.Data, &data)
if err != nil {
return err
}
t.Data = data
case templateTypeIPFIXOptionsTemplateRecord:
var data IPFIXOptionsTemplateRecord
err = json.Unmarshal(v.Data, &data)
if err != nil {
return err
}
t.Data = data
case templateTypeNFv9OptionsTemplateRecord:
var data NFv9OptionsTemplateRecord
err = json.Unmarshal(v.Data, &data)
if err != nil {
return err
}
t.Data = data
default:
return fmt.Errorf("unknown template type: %d", v.TemplateType)
}
return nil, ErrorTemplateNotFound
return nil
}

type NetflowTemplate struct {
key string
}

func (ts *BasicTemplateSystem) RemoveTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
ts.templateslock.Lock()
defer ts.templateslock.Unlock()
func (t *NetflowTemplate) RemoveTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
if v, err := templatesDB.Pop(templatesKey{
Key: t.key,
Version: version,
ObsDomainId: obsDomainId,
TemplateID: templateId,
}); err != nil && errors.Is(err, state.ErrorKeyNotFound) {
return nil, ErrorTemplateNotFound
} else if err != nil {
return nil, err
} else {
return v.Data, nil
}
}

key := templateKey(version, obsDomainId, templateId)
if template, ok := ts.templates[key]; ok {
delete(ts.templates, key)
return template, nil
func (t *NetflowTemplate) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
if v, err := templatesDB.Get(templatesKey{
Key: t.key,
Version: version,
ObsDomainId: obsDomainId,
TemplateID: templateId,
}); err != nil && errors.Is(err, state.ErrorKeyNotFound) {
return nil, ErrorTemplateNotFound
} else if err != nil {
return nil, err
} else {
return v.Data, nil
}
return nil, ErrorTemplateNotFound
}

type BasicTemplateSystem struct {
templates FlowBaseTemplateSet
templateslock *sync.RWMutex
func (t *NetflowTemplate) AddTemplate(version uint16, obsDomainId uint32, templateId uint16, template interface{}) error {
k := templatesKey{
Key: t.key,
Version: version,
ObsDomainId: obsDomainId,
TemplateID: templateId,
}
var err error
switch templatec := template.(type) {
case TemplateRecord:
err = templatesDB.Add(k, templatesValue{
TemplateType: templateTypeTemplateRecord,
Data: templatec,
})
case IPFIXOptionsTemplateRecord:
err = templatesDB.Add(k, templatesValue{
TemplateType: templateTypeIPFIXOptionsTemplateRecord,
Data: templatec,
})
case NFv9OptionsTemplateRecord:
err = templatesDB.Add(k, templatesValue{
TemplateType: templateTypeNFv9OptionsTemplateRecord,
Data: templatec,
})
case int:
err = templatesDB.Add(k, templatesValue{
TemplateType: templateTypeTest,
Data: templatec,
})
default:
return fmt.Errorf("unknown template type: %s", reflect.TypeOf(template).String())
}
return err
}

// Creates a basic store for NetFlow and IPFIX templates.
// Everyting is stored in memory.
func CreateTemplateSystem() NetFlowTemplateSystem {
ts := &BasicTemplateSystem{
templates: make(FlowBaseTemplateSet),
templateslock: &sync.RWMutex{},
func CreateTemplateSystem(key string) NetFlowTemplateSystem {
ts := &NetflowTemplate{
key: key,
}
return ts
}

func InitTemplates() error {
templatesInitLock.Lock()
defer templatesInitLock.Unlock()
if templatesDB != nil {
return nil
}
templatesUrl, err := url.Parse(*StateTemplates)
if err != nil {
return err
}
if !templatesUrl.Query().Has("prefix") {
q := templatesUrl.Query()
q.Set("prefix", "goflow2:nf_templates:")
templatesUrl.RawQuery = q.Encode()
}
templatesDB, err = state.NewState[templatesKey, templatesValue](templatesUrl.String())
return err
}

func CloseTemplates() error {
templatesInitLock.Lock()
defer templatesInitLock.Unlock()
if templatesDB == nil {
return nil
}
err := templatesDB.Close()
templatesDB = nil
return err
}
6 changes: 4 additions & 2 deletions decoders/netflow/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ func benchTemplatesAdd(ts NetFlowTemplateSystem, obs uint32, N int, b *testing.B
}

func BenchmarkTemplatesAdd(b *testing.B) {
ts := CreateTemplateSystem()
InitTemplates()
ts := CreateTemplateSystem("BenchmarkTemplatesAdd")
b.Log("Creating", b.N, "templates")
benchTemplatesAdd(ts, uint32(b.N)%0xffff+1, b.N, b)
}

func BenchmarkTemplatesAddGet(b *testing.B) {
ts := CreateTemplateSystem()
InitTemplates()
ts := CreateTemplateSystem("BenchmarkTemplatesAddGet")
templates := 1000
b.Log("Adding", templates, "templates")
benchTemplatesAdd(ts, 1, templates, b)
Expand Down
28 changes: 28 additions & 0 deletions docs/state_storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# State Storage

For protocols with template system (Netflow V9 and IPFIX), GoFlow2 stores the information on memory by default.
When using memory, you will lose the template and sampling rate information if GoFlow2 is restarted. So incoming
flows will fail to decode until the next template/option data is sent from the agent.

However, you can use an external state storage to overcome this issue. With external storage, you will have these
benefits:
- Supports UDP per-packet load balancer (e.g. with F5 or Envoy Proxy)
- Pod/container auto-scaling to handle traffic surge
- Persistent state, GoFlow2 restarts won't need to wait template/option data

## Memory
The default method for storing state. It's not synced across multiple GoFlow2 instances and lost on process restart.

## Redis
The supported URL format for redis
is explained at [uri specifications](https://github.com/redis/redis-specifications/blob/master/uri/redis.txt).
GoFlow2 uses the key-value storage provided by redis for persistence, and pub-sub to broadcast any new template data
to other GoFlow2 instances.
GoFlow2 also have other query parameters specific for redis:
- prefix
- this will override key prefix and channel prefix for pubsub
- e.g. redis://127.0.0.1/0?prefix=goflow
- interval
- specify in seconds on how frequent we should re-retrieve values (in case the pubsub doesn't work for some reason).
defaults to `900` seconds, use `0` to disable
- e.g. redis://127.0.0.1/0?interval=0
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/libp2p/go-reuseport v0.4.0
github.com/oschwald/geoip2-golang v1.9.0
github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.4.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
Expand All @@ -18,6 +19,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
Loading