Skip to content

Commit

Permalink
ADD_PATH, grep, and caps (#6)
Browse files Browse the repository at this point in the history
use bgpfix v0.4.0
  • Loading branch information
pforemski authored Oct 4, 2024
1 parent 53488a4 commit cbf5e4c
Show file tree
Hide file tree
Showing 17 changed files with 1,115 additions and 166 deletions.
22 changes: 13 additions & 9 deletions core/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"

"github.com/bgpfix/bgpfix/caps"
"github.com/bgpfix/bgpfix/msg"
"github.com/bgpfix/bgpfix/dir"
"github.com/bgpfix/bgpfix/pipe"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -102,7 +102,8 @@ func (b *Bgpipe) AttachStages() error {
}

// log events?
if evs := b.parseEvents(k, "events", "START", "STOP", "READY", "PREPARE"); len(evs) > 0 {
if evs := ParseEvents(k.Strings("events"), "START", "STOP", "READY", "PREPARE"); len(evs) > 0 {
b.Debug().Strs("events", evs).Msg("monitored events will be logged")
p.Options.AddHandler(b.LogEvent, &pipe.Handler{
Pre: true,
Order: math.MinInt,
Expand All @@ -111,7 +112,8 @@ func (b *Bgpipe) AttachStages() error {
}

// kill events?
if evs := b.parseEvents(k, "kill", "STOP"); len(evs) > 0 {
if evs := ParseEvents(k.Strings("kill"), "STOP"); len(evs) > 0 {
b.Debug().Strs("events", evs).Msg("will kill the session on given events")
p.Options.AddHandler(b.KillEvent, &pipe.Handler{
Pre: true,
Order: math.MinInt + 1,
Expand Down Expand Up @@ -158,11 +160,11 @@ func (s *StageBase) attach() error {

// set s.Dir
if s.IsBidir {
s.Dir = msg.DIR_LR
s.Dir = dir.DIR_LR
} else if s.IsLeft {
s.Dir = msg.DIR_L
s.Dir = dir.DIR_L
} else {
s.Dir = msg.DIR_R
s.Dir = dir.DIR_R
}

// call child attach, collect what was attached to
Expand Down Expand Up @@ -243,7 +245,7 @@ func (s *StageBase) attach() error {
li.Id = s.Index
li.FilterValue = fid

if li.Dir == msg.DIR_L {
if li.Dir == dir.DIR_L {
li.Reverse = true // CLI gives L stages in reverse
li.CallbackFilter = frev
} else {
Expand All @@ -263,7 +265,8 @@ func (s *StageBase) attach() error {
s.wgAdd(1)

// has trigger-on events?
if evs := b.parseEvents(k, "wait", "START"); len(evs) > 0 {
if evs := ParseEvents(k.Strings("wait"), "START"); len(evs) > 0 {
s.Debug().Strs("events", evs).Msg("waiting for given events before start")
po.OnEventPre(s.runStart, evs...)

// trigger pipe start handlers by --wait events
Expand All @@ -279,7 +282,8 @@ func (s *StageBase) attach() error {
}

// has trigger-off events?
if evs := b.parseEvents(k, "stop", "STOP"); len(evs) > 0 {
if evs := ParseEvents(k.Strings("stop"), "STOP"); len(evs) > 0 {
s.Debug().Strs("events", evs).Msg("will stop after given events")
po.OnEventPost(s.runStop, evs...)
}

Expand Down
176 changes: 165 additions & 11 deletions core/bgpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"slices"
"strings"
"sync"
"time"

"github.com/bgpfix/bgpfix/dir"
"github.com/bgpfix/bgpfix/msg"
"github.com/bgpfix/bgpfix/pipe"
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -82,8 +87,17 @@ func (b *Bgpipe) Run() error {
return err
}

// print the pipeline and quit?
if b.K.Bool("explain") {
fmt.Printf("--> MESSAGES FLOWING RIGHT -->\n")
b.StageDump(dir.DIR_R, os.Stdout)
fmt.Printf("\n<-- MESSAGES FLOWING LEFT <--\n")
b.StageDump(dir.DIR_L, os.Stdout)
return nil
}

// attach our b.Start
b.Pipe.Options.OnStart(b.Start)
b.Pipe.Options.OnStart(b.onStart)

// start the pipeline and block
b.Pipe.Start() // will call b.Start
Expand All @@ -105,8 +119,8 @@ func (b *Bgpipe) Run() error {
return err
}

// Start is called after the bgpfix pipe starts
func (b *Bgpipe) Start(ev *pipe.Event) bool {
// onStart is called after the bgpfix pipe starts
func (b *Bgpipe) onStart(ev *pipe.Event) bool {
// wait for writers
go func() {
b.wg_lwrite.Wait()
Expand Down Expand Up @@ -139,17 +153,16 @@ func (b *Bgpipe) LogEvent(ev *pipe.Event) bool {
// will b.Info() if ev.Error is nil
l := b.Err(ev.Error)

if ev.Msg != nil {
j := ev.Msg.GetJSON()
l = l.Bytes("msg", j[:len(j)-1])
if ev.Msg != "" {
l = l.Str("msg", ev.Msg)
}

if ev.Dir != 0 {
l = l.Stringer("dir", ev.Dir)
l = l.Stringer("evdir", ev.Dir)
}

if ev.Seq != 0 {
l = l.Uint64("seq", ev.Seq)
l = l.Uint64("evseq", ev.Seq)
}

if vals, ok := ev.Value.([]any); ok && len(vals) > 0 {
Expand All @@ -165,10 +178,11 @@ func (b *Bgpipe) LogEvent(ev *pipe.Event) bool {
return true
}

// KillEvent kills session because of given event ev
// KillEvent brutally kills the session because of given event ev
func (b *Bgpipe) KillEvent(ev *pipe.Event) bool {
// TODO: why not pipe.Stop()?
b.Cancel(fmt.Errorf("%w: %s", ErrKill, ev))
b.LogEvent(ev)
b.Warn().Stringer("ev", ev).Msg("session killed by event")
os.Exit(1)
return false
}

Expand Down Expand Up @@ -218,3 +232,143 @@ func (b *Bgpipe) AddStage(idx int, cmd string) (*StageBase, error) {
func (b *Bgpipe) StageCount() int {
return max(0, len(b.Stages)-1)
}

// StageDump prints all stages in dir direction in textual form to w (by default stdout)
func (b *Bgpipe) StageDump(d dir.Dir, w io.Writer) (total int) {
// use default w?
if w == nil {
w = os.Stdout
}
colors := w == os.Stdout

// print function shortcut
pr := func(style string, format string, a ...any) {
if colors && style != StyleNone {
fmt.Fprintf(w, style+format+StyleReset, a...)
} else {
fmt.Fprintf(w, format, a...)
}
}

// if only Go had a (simple) reverse iterator...
indices := make([]int, 0, len(b.Stages))
for i, s := range b.Stages {
if s != nil {
indices = append(indices, i)
}
}
if d == dir.DIR_L {
slices.Reverse(indices)
}

// iterate through stages in good direction
for i, idx := range indices {
s := b.Stages[idx]

// analyze callbacks
var cb_count int
var cb_all bool
var cb_types []msg.Type
for _, cb := range s.callbacks {
if cb.Dir != 0 && cb.Dir&d == 0 {
continue
}
cb_count++
if len(cb.Types) == 0 {
cb_all = true
} else {
cb_types = append(cb_types, cb.Types...)
}
}

// is the last stage and a consumer? treat as a callback
if s.Options.IsConsumer && i == len(indices)-1 {
cb_count++
cb_all = true
}

// analyze inputs
var in_count int
for _, in := range s.inputs {
if in.Dir&d == 0 {
continue
}
in_count++
}

// analyze event handlers
var eh_count int
var eh_all bool
var eh_types []string
for _, eh := range s.handlers {
if eh.Dir != 0 && eh.Dir&d == 0 {
continue
}
eh_count++
if len(eh.Types) == 0 {
eh_all = true
} else {
eh_types = append(eh_types, eh.Types...)
}
}

// should skip?
switch {
case cb_count > 0:
total++ // has callbacks in this direction
case in_count > 0:
total++ // has inputs in this direction
case len(s.callbacks)+len(s.inputs) == 0 && eh_count > 0:
total++ // no inputs or callbacks at all, but reacts to events
default:
continue // skip
}

pr(StyleNone, " [%d] ", s.Index)
pr(StyleBold, "%s", s.Name)
pr(StyleGreen, " -%s", s.Dir)
if len(s.Flags) > 0 {
pr(StyleGreen, " %s", strings.Join(s.Flags, " "))
}
for _, arg := range s.Args {
pr(StyleNone, " ")
pr(StyleRed+StyleUnderline, "%s", arg)
}
pr(StyleNone, "\n")

if cb_count > 0 {
pr(StyleNone, " reads messages from pipeline")
pr(StyleMagenta, " callbacks=%d", cb_count)
if !cb_all {
slices.Sort(cb_types)
pr(StyleMagenta, " types=%v", slices.Compact(cb_types))
} else {
pr(StyleMagenta, " types=[ALL]")
}
pr(StyleNone, "\n")
}

if in_count > 0 {
pr(StyleNone, " writes messages to pipeline")
pr(StyleMagenta, " inputs=%d\n", in_count)
}

if eh_count > 0 {
pr(StyleNone, " handles events")
pr(StyleMagenta, " handlers=%d", eh_count)
if !eh_all {
slices.Sort(eh_types)
pr(StyleMagenta, " types=%v", slices.Compact(eh_types))
} else {
pr(StyleMagenta, " types=[ALL]")
}
pr(StyleNone, "\n")
}
}

if total == 0 {
pr(StyleNone, " (none)\n")
}

return total
}
Loading

0 comments on commit cbf5e4c

Please sign in to comment.