From cbf5e4ce45d26fe5409d2e63bb4084c55920008e Mon Sep 17 00:00:00 2001 From: Pawel Foremski Date: Fri, 4 Oct 2024 14:53:34 +0200 Subject: [PATCH] ADD_PATH, grep, and caps (#6) use bgpfix v0.4.0 --- core/attach.go | 22 +- core/bgpipe.go | 176 +++++++++++- core/config.go | 65 ++++- core/errors.go | 1 - core/run.go | 12 +- core/stage.go | 6 +- core/util.go | 110 +++++--- go.mod | 17 +- go.sum | 26 +- pkg/extio/extio.go | 37 +-- scripts/release.sh | 45 +++ stages/connect.go | 3 +- stages/grep.go | 678 +++++++++++++++++++++++++++++++++++++++++++++ stages/limit.go | 44 +-- stages/listen.go | 3 +- stages/repo.go | 1 + stages/util.go | 35 ++- 17 files changed, 1115 insertions(+), 166 deletions(-) create mode 100755 scripts/release.sh create mode 100644 stages/grep.go diff --git a/core/attach.go b/core/attach.go index 2dac949..b75157b 100644 --- a/core/attach.go +++ b/core/attach.go @@ -6,7 +6,7 @@ import ( "strconv" "github.com/bgpfix/bgpfix/caps" - "github.com/bgpfix/bgpfix/msg" + "github.com/bgpfix/bgpfix/dir" "github.com/bgpfix/bgpfix/pipe" "github.com/rs/zerolog" ) @@ -102,7 +102,8 @@ func (b *Bgpipe) AttachStages() error { } // log events? - if evs := b.parseEvents(k, "events", "START", "STOP", "READY", "PREPARE"); len(evs) > 0 { + if evs := ParseEvents(k.Strings("events"), "START", "STOP", "READY", "PREPARE"); len(evs) > 0 { + b.Debug().Strs("events", evs).Msg("monitored events will be logged") p.Options.AddHandler(b.LogEvent, &pipe.Handler{ Pre: true, Order: math.MinInt, @@ -111,7 +112,8 @@ func (b *Bgpipe) AttachStages() error { } // kill events? - if evs := b.parseEvents(k, "kill", "STOP"); len(evs) > 0 { + if evs := ParseEvents(k.Strings("kill"), "STOP"); len(evs) > 0 { + b.Debug().Strs("events", evs).Msg("will kill the session on given events") p.Options.AddHandler(b.KillEvent, &pipe.Handler{ Pre: true, Order: math.MinInt + 1, @@ -158,11 +160,11 @@ func (s *StageBase) attach() error { // set s.Dir if s.IsBidir { - s.Dir = msg.DIR_LR + s.Dir = dir.DIR_LR } else if s.IsLeft { - s.Dir = msg.DIR_L + s.Dir = dir.DIR_L } else { - s.Dir = msg.DIR_R + s.Dir = dir.DIR_R } // call child attach, collect what was attached to @@ -243,7 +245,7 @@ func (s *StageBase) attach() error { li.Id = s.Index li.FilterValue = fid - if li.Dir == msg.DIR_L { + if li.Dir == dir.DIR_L { li.Reverse = true // CLI gives L stages in reverse li.CallbackFilter = frev } else { @@ -263,7 +265,8 @@ func (s *StageBase) attach() error { s.wgAdd(1) // has trigger-on events? - if evs := b.parseEvents(k, "wait", "START"); len(evs) > 0 { + if evs := ParseEvents(k.Strings("wait"), "START"); len(evs) > 0 { + s.Debug().Strs("events", evs).Msg("waiting for given events before start") po.OnEventPre(s.runStart, evs...) // trigger pipe start handlers by --wait events @@ -279,7 +282,8 @@ func (s *StageBase) attach() error { } // has trigger-off events? - if evs := b.parseEvents(k, "stop", "STOP"); len(evs) > 0 { + if evs := ParseEvents(k.Strings("stop"), "STOP"); len(evs) > 0 { + s.Debug().Strs("events", evs).Msg("will stop after given events") po.OnEventPost(s.runStop, evs...) } diff --git a/core/bgpipe.go b/core/bgpipe.go index 0d456ba..0c4b96b 100644 --- a/core/bgpipe.go +++ b/core/bgpipe.go @@ -4,10 +4,15 @@ import ( "context" "errors" "fmt" + "io" "os" + "slices" + "strings" "sync" "time" + "github.com/bgpfix/bgpfix/dir" + "github.com/bgpfix/bgpfix/msg" "github.com/bgpfix/bgpfix/pipe" "github.com/knadh/koanf/v2" "github.com/rs/zerolog" @@ -82,8 +87,17 @@ func (b *Bgpipe) Run() error { return err } + // print the pipeline and quit? + if b.K.Bool("explain") { + fmt.Printf("--> MESSAGES FLOWING RIGHT -->\n") + b.StageDump(dir.DIR_R, os.Stdout) + fmt.Printf("\n<-- MESSAGES FLOWING LEFT <--\n") + b.StageDump(dir.DIR_L, os.Stdout) + return nil + } + // attach our b.Start - b.Pipe.Options.OnStart(b.Start) + b.Pipe.Options.OnStart(b.onStart) // start the pipeline and block b.Pipe.Start() // will call b.Start @@ -105,8 +119,8 @@ func (b *Bgpipe) Run() error { return err } -// Start is called after the bgpfix pipe starts -func (b *Bgpipe) Start(ev *pipe.Event) bool { +// onStart is called after the bgpfix pipe starts +func (b *Bgpipe) onStart(ev *pipe.Event) bool { // wait for writers go func() { b.wg_lwrite.Wait() @@ -139,17 +153,16 @@ func (b *Bgpipe) LogEvent(ev *pipe.Event) bool { // will b.Info() if ev.Error is nil l := b.Err(ev.Error) - if ev.Msg != nil { - j := ev.Msg.GetJSON() - l = l.Bytes("msg", j[:len(j)-1]) + if ev.Msg != "" { + l = l.Str("msg", ev.Msg) } if ev.Dir != 0 { - l = l.Stringer("dir", ev.Dir) + l = l.Stringer("evdir", ev.Dir) } if ev.Seq != 0 { - l = l.Uint64("seq", ev.Seq) + l = l.Uint64("evseq", ev.Seq) } if vals, ok := ev.Value.([]any); ok && len(vals) > 0 { @@ -165,10 +178,11 @@ func (b *Bgpipe) LogEvent(ev *pipe.Event) bool { return true } -// KillEvent kills session because of given event ev +// KillEvent brutally kills the session because of given event ev func (b *Bgpipe) KillEvent(ev *pipe.Event) bool { - // TODO: why not pipe.Stop()? - b.Cancel(fmt.Errorf("%w: %s", ErrKill, ev)) + b.LogEvent(ev) + b.Warn().Stringer("ev", ev).Msg("session killed by event") + os.Exit(1) return false } @@ -218,3 +232,143 @@ func (b *Bgpipe) AddStage(idx int, cmd string) (*StageBase, error) { func (b *Bgpipe) StageCount() int { return max(0, len(b.Stages)-1) } + +// StageDump prints all stages in dir direction in textual form to w (by default stdout) +func (b *Bgpipe) StageDump(d dir.Dir, w io.Writer) (total int) { + // use default w? + if w == nil { + w = os.Stdout + } + colors := w == os.Stdout + + // print function shortcut + pr := func(style string, format string, a ...any) { + if colors && style != StyleNone { + fmt.Fprintf(w, style+format+StyleReset, a...) + } else { + fmt.Fprintf(w, format, a...) + } + } + + // if only Go had a (simple) reverse iterator... + indices := make([]int, 0, len(b.Stages)) + for i, s := range b.Stages { + if s != nil { + indices = append(indices, i) + } + } + if d == dir.DIR_L { + slices.Reverse(indices) + } + + // iterate through stages in good direction + for i, idx := range indices { + s := b.Stages[idx] + + // analyze callbacks + var cb_count int + var cb_all bool + var cb_types []msg.Type + for _, cb := range s.callbacks { + if cb.Dir != 0 && cb.Dir&d == 0 { + continue + } + cb_count++ + if len(cb.Types) == 0 { + cb_all = true + } else { + cb_types = append(cb_types, cb.Types...) + } + } + + // is the last stage and a consumer? treat as a callback + if s.Options.IsConsumer && i == len(indices)-1 { + cb_count++ + cb_all = true + } + + // analyze inputs + var in_count int + for _, in := range s.inputs { + if in.Dir&d == 0 { + continue + } + in_count++ + } + + // analyze event handlers + var eh_count int + var eh_all bool + var eh_types []string + for _, eh := range s.handlers { + if eh.Dir != 0 && eh.Dir&d == 0 { + continue + } + eh_count++ + if len(eh.Types) == 0 { + eh_all = true + } else { + eh_types = append(eh_types, eh.Types...) + } + } + + // should skip? + switch { + case cb_count > 0: + total++ // has callbacks in this direction + case in_count > 0: + total++ // has inputs in this direction + case len(s.callbacks)+len(s.inputs) == 0 && eh_count > 0: + total++ // no inputs or callbacks at all, but reacts to events + default: + continue // skip + } + + pr(StyleNone, " [%d] ", s.Index) + pr(StyleBold, "%s", s.Name) + pr(StyleGreen, " -%s", s.Dir) + if len(s.Flags) > 0 { + pr(StyleGreen, " %s", strings.Join(s.Flags, " ")) + } + for _, arg := range s.Args { + pr(StyleNone, " ") + pr(StyleRed+StyleUnderline, "%s", arg) + } + pr(StyleNone, "\n") + + if cb_count > 0 { + pr(StyleNone, " reads messages from pipeline") + pr(StyleMagenta, " callbacks=%d", cb_count) + if !cb_all { + slices.Sort(cb_types) + pr(StyleMagenta, " types=%v", slices.Compact(cb_types)) + } else { + pr(StyleMagenta, " types=[ALL]") + } + pr(StyleNone, "\n") + } + + if in_count > 0 { + pr(StyleNone, " writes messages to pipeline") + pr(StyleMagenta, " inputs=%d\n", in_count) + } + + if eh_count > 0 { + pr(StyleNone, " handles events") + pr(StyleMagenta, " handlers=%d", eh_count) + if !eh_all { + slices.Sort(eh_types) + pr(StyleMagenta, " types=%v", slices.Compact(eh_types)) + } else { + pr(StyleMagenta, " types=[ALL]") + } + pr(StyleNone, "\n") + } + } + + if total == 0 { + pr(StyleNone, " (none)\n") + } + + return total +} diff --git a/core/config.go b/core/config.go index 56a6490..cd2dd21 100644 --- a/core/config.go +++ b/core/config.go @@ -7,6 +7,9 @@ import ( "slices" "strings" + "net/http" + _ "net/http/pprof" + "github.com/knadh/koanf/providers/posflag" "github.com/rs/zerolog" ) @@ -18,9 +21,10 @@ func (b *Bgpipe) Configure() error { if err != nil { return fmt.Errorf("could not parse CLI flags: %w", err) } + k := b.K // debugging level - if ll := b.K.String("log"); len(ll) > 0 { + if ll := k.String("log"); len(ll) > 0 { lvl, err := zerolog.ParseLevel(ll) if err != nil { return err @@ -28,6 +32,31 @@ func (b *Bgpipe) Configure() error { zerolog.SetGlobalLevel(lvl) } + // pprof? + if v := k.String("pprof"); len(v) > 0 { + go func() { + b.Fatal().Err(http.ListenAndServe(v, nil)).Msg("pprof failed") + }() + } + + // capabilities? + switch v := k.String("caps"); { + case len(v) == 0: // none + break + case v[0] == '@': // read from file + jsv, err := os.ReadFile(v[1:]) + if err != nil { + return fmt.Errorf("could not read --caps file: %w", err) + } + if err := b.Pipe.Caps.FromJSON(jsv); err != nil { + return fmt.Errorf("could not parse --caps file: %w", err) + } + default: // parse JSON + if err := b.Pipe.Caps.FromJSON([]byte(v)); err != nil { + return fmt.Errorf("could not parse --caps: %w", err) + } + } + return nil } @@ -37,14 +66,17 @@ func (b *Bgpipe) addFlags() { f.Usage = b.usage f.SetInterspersed(false) f.BoolP("version", "v", false, "print detailed version info and quit") + f.BoolP("explain", "n", false, "print the pipeline as configured and quit") f.StringP("log", "l", "info", "log level (debug/info/warn/error/disabled)") + f.String("pprof", "", "bind pprof to given listen address") f.StringSliceP("events", "e", []string{"PARSE", "ESTABLISHED", "EOR"}, "log given events (\"all\" means all events)") - f.StringSliceP("kill", "k", []string{}, "kill session on any of these events") + f.StringSliceP("kill", "k", nil, "kill session on any of these events") f.BoolP("stdin", "i", false, "read JSON from stdin") f.BoolP("stdout", "o", false, "write JSON to stdout") f.BoolP("stdin-wait", "I", false, "like --stdin but wait for EVENT_ESTABLISHED") f.BoolP("stdout-wait", "O", false, "like --stdout but wait for EVENT_EOR") f.BoolP("short-asn", "2", false, "use 2-byte ASN numbers") + f.String("caps", "", "use given BGP capabilities (JSON format)") } func (b *Bgpipe) usage() { @@ -206,28 +238,35 @@ func (s *StageBase) parseArgs(args []string) (unused []string, err error) { f.Usage = s.usage } - // parse stage flags, export to koanf + // parse stage flags if err := f.Parse(args); err != nil { return args, s.Errorf("%w", err) - } else { - s.K.Load(posflag.Provider(f, ".", s.K), nil) } - // rewrite required CLI arguments? - sargs := f.Args() + // export flags to koanf, collect remaining args + s.K.Load(posflag.Provider(f, ".", s.K), nil) + rem := f.Args() + + // compare original args vs remaining -> consumed flags + consumed := max(0, len(args)-len(rem)) + s.Flags = args[:consumed] + + // rewrite required arguments? for _, name := range o.Args { - if len(sargs) == 0 { - return sargs, s.Errorf("needs an argument: %s", name) + if len(rem) == 0 { + return rem, s.Errorf("needs an argument: %s", name) } - s.K.Set(name, sargs[0]) - sargs = sargs[1:] + s.K.Set(name, rem[0]) + s.Args = append(s.Args, rem[0]) + rem = rem[1:] } // consume the rest of arguments? if v, _ := f.GetBool("args"); v { - s.K.Set("args", sargs) + s.K.Set("args", rem) + s.Args = append(s.Args, rem...) return nil, nil } - return sargs, nil + return rem, nil } diff --git a/core/errors.go b/core/errors.go index 2712804..3aaba23 100644 --- a/core/errors.go +++ b/core/errors.go @@ -9,5 +9,4 @@ var ( ErrFirstOrLast = errors.New("must be either the first or the last stage") ErrInject = errors.New("invalid --in option value") ErrLR = errors.New("select either --left or --right, not both") - ErrKill = errors.New("session killed by an event") ) diff --git a/core/run.go b/core/run.go index 80abb6b..dec2a4f 100644 --- a/core/run.go +++ b/core/run.go @@ -23,11 +23,8 @@ func (s *StageBase) runStart(ev *pipe.Event) bool { check_fatal := func(err error) bool { if err == nil { err = context.Cause(s.Ctx) - if err == context.Canceled { - err = nil - } } - if err == nil || errors.Is(err, ErrStageStopped) { + if err == nil || err == context.Canceled || errors.Is(err, ErrStageStopped) { return false } else { s.B.Cancel(s.Errorf("%w", err)) // game over @@ -54,13 +51,6 @@ func (s *StageBase) runStart(ev *pipe.Event) bool { // wait for all stages started in this event to finish Prepare() ev.Wait() - // catch stage panics - // defer func() { - // if r := recover(); r != nil { - // s.B.Cancel(s.Errorf("panic: %v", r)) // game over - // } - // }() - // block on Run if context still valid err := context.Cause(s.Ctx) if err == nil { diff --git a/core/stage.go b/core/stage.go index 616f98a..22eb37c 100644 --- a/core/stage.go +++ b/core/stage.go @@ -6,7 +6,7 @@ import ( "strings" "sync/atomic" - "github.com/bgpfix/bgpfix/msg" + "github.com/bgpfix/bgpfix/dir" "github.com/bgpfix/bgpfix/pipe" "github.com/knadh/koanf/v2" "github.com/rs/zerolog" @@ -76,6 +76,8 @@ type StageBase struct { Index int // stage index (zero means internal) Cmd string // stage command name Name string // human-friendly stage name + Flags []string // consumed flags + Args []string // consumed args Options StageOptions // stage options // properties set during before Attach() @@ -85,7 +87,7 @@ type StageBase struct { IsRight bool // write L->R msgs + capture L->R msgs? IsLeft bool // write R->L msgs + capture R->L msgs? IsBidir bool // true iff IsRight && IsLeft - Dir msg.Dir // target direction (IsLeft/IsRight translated, can be DIR_LR) + Dir dir.Dir // target direction (IsLeft/IsRight translated, can be DIR_LR) callbacks []*pipe.Callback // registered callbacks handlers []*pipe.Handler // registered handlers diff --git a/core/util.go b/core/util.go index 1acf874..01ffdb2 100644 --- a/core/util.go +++ b/core/util.go @@ -7,7 +7,22 @@ import ( "strconv" "strings" - "github.com/knadh/koanf/v2" + "github.com/bgpfix/bgpfix/msg" +) + +const ( + StyleNone = "" + StyleBlack = "\033[30m" + StyleRed = "\033[31m" + StyleGreen = "\033[32m" + StyleYellow = "\033[33m" + StyleBlue = "\033[34m" + StyleMagenta = "\033[35m" + StyleCyan = "\033[36m" + StyleWhite = "\033[37m" + StyleBold = "\033[1m" + StyleUnderline = "\033[4m" + StyleReset = "\033[0m" ) func IsAddr(v string) bool { @@ -47,58 +62,83 @@ func IsFile(v string) bool { } } -// parseEvents returns events from given koanf key, or nil if none found -func (b *Bgpipe) parseEvents(k *koanf.Koanf, key string, sds ...string) []string { - input := k.Strings(key) - if len(input) == 0 { - return nil - } - - // rewrite - var output []string - for _, et := range input { - // special values - if et == "all" || et == "*" { - output = append(output, "*") +// ParseEvents parses events in src and returns the result, or nil. +// If stage_defaults is given, events like "foobar" are translated to "foobar/stage_defaults[:]". +func ParseEvents(src []string, stage_defaults ...string) []string { + var dst []string + for _, event := range src { + // special catch-all value? + if event == "all" || event == "*" { + dst = append(dst[:0], "*") continue } - // split slash/dot.event - slash, et, has_slash := strings.Cut(et, "/") + // split event into slash/dot.name + slash, name, has_slash := strings.Cut(event, "/") if !has_slash { - et = slash + name = slash slash = "" } - dot, et, has_dot := strings.Cut(et, ".") + dot, name, has_dot := strings.Cut(name, ".") if !has_dot { - et = dot + name = dot dot = "" } - et_lower := strings.ToLower(et) - et_upper := strings.ToUpper(et) + + // get name as UPPER / lower case + UPPER := strings.ToUpper(name) + lower := strings.ToLower(name) switch { case has_dot && has_slash: - et = fmt.Sprintf("%s/%s.%s", slash, dot, et_upper) + // eg. foo/bar.name -> foo/bar.NAME + name = fmt.Sprintf("%s/%s.%s", slash, dot, UPPER) case has_dot: - et = fmt.Sprintf("bgpfix/%s.%s", dot, et_upper) + // eg. bar.name -> bgpfix/bar.NAME + name = fmt.Sprintf("bgpfix/%s.%s", dot, UPPER) case has_slash: - et = fmt.Sprintf("%s/%s", slash, et) // stage event - default: - // stage name + stage defaults? - if et == et_lower && len(sds) > 0 { - for _, sd := range sds { - output = append(output, fmt.Sprintf("%s/%s", et, sd)) - } - continue + // eg. foo/name -> foo/name (specific stage event) + name = fmt.Sprintf("%s/%s", slash, name) + case name == lower && len(stage_defaults) > 0: + // eg. foo -> foo/sds[0], foo/sds[1], etc. (default stage events) + for _, sd := range stage_defaults { + dst = append(dst, fmt.Sprintf("%s/%s", name, sd)) } + continue + default: + // eg. established -> bgpfix/pipe.ESTABLISHED + name = fmt.Sprintf("bgpfix/pipe.%s", UPPER) + } + + dst = append(dst, name) + } + + return dst +} + +func ParseTypes(src []string, dst []msg.Type) ([]msg.Type, error) { + for _, t := range src { + // skip empty types + if len(t) == 0 { + continue + } - et = fmt.Sprintf("bgpfix/pipe.%s", et_upper) + // canonical name? + typ, err := msg.TypeString(t) + if err == nil { + dst = append(dst, typ) + continue + } + + // a plain integer? + tnum, err2 := strconv.ParseUint(t, 0, 8) + if err2 == nil { + dst = append(dst, msg.Type(tnum)) + continue } - output = append(output, et) + return dst, err } - b.Trace().Msgf("parseEvents(): %s -> %s", input, output) - return output + return dst, nil } diff --git a/go.mod b/go.mod index 8be2bf6..277af89 100644 --- a/go.mod +++ b/go.mod @@ -1,26 +1,27 @@ module github.com/bgpfix/bgpipe -go 1.21 +go 1.22.0 + +toolchain go1.23.1 require ( - github.com/bgpfix/bgpfix v0.3.0 - github.com/gorilla/websocket v1.5.1 + github.com/bgpfix/bgpfix v0.4.0 + github.com/gorilla/websocket v1.5.3 github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.1.1 - github.com/puzpuzpuz/xsync/v3 v3.1.0 - github.com/rs/zerolog v1.32.0 + github.com/puzpuzpuz/xsync/v3 v3.4.0 + github.com/rs/zerolog v1.33.0 github.com/spf13/pflag v1.0.5 github.com/valyala/bytebufferpool v1.0.0 - golang.org/x/sys v0.20.0 + golang.org/x/sys v0.25.0 ) require ( github.com/buger/jsonparser v1.1.1 // indirect - github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect - golang.org/x/net v0.25.0 // indirect ) diff --git a/go.sum b/go.sum index e98cfec..28e46c2 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,15 @@ -github.com/bgpfix/bgpfix v0.3.0 h1:Xwd9lkzRP4wF9dzfCXJfMPb7sDJSoBtMrScM2uv9q+g= -github.com/bgpfix/bgpfix v0.3.0/go.mod h1:LW9iBUXeGt6+45Q3TW75wCMLGwIyxtvAfZs+WQ2LjZk= +github.com/bgpfix/bgpfix v0.4.0 h1:TlkNzru8Pq/mtryi6JJYnAHF9e6qJni0l9ZIYwm+6NI= +github.com/bgpfix/bgpfix v0.4.0/go.mod h1:4qADd7jUW64yLoV7V6Peg57zKY+K/axpp2yQxwazN0c= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/go-viper/mapstructure/v2 v2.1.0 h1:gHnMa2Y/pIxElCH2GlZZ1lZSsn6XMtufpGyP1XxdC/w= +github.com/go-viper/mapstructure/v2 v2.1.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= -github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/posflag v0.1.0 h1:mKJlLrKPcAP7Ootf4pBZWJ6J+4wHYujwipe7Ie3qW6U= @@ -29,23 +29,21 @@ github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= -github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= -github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/extio/extio.go b/pkg/extio/extio.go index 9889e7a..96b9ff0 100644 --- a/pkg/extio/extio.go +++ b/pkg/extio/extio.go @@ -5,10 +5,10 @@ import ( "fmt" "io" "slices" - "strconv" "time" "github.com/bgpfix/bgpfix/caps" + "github.com/bgpfix/bgpfix/dir" "github.com/bgpfix/bgpfix/mrt" "github.com/bgpfix/bgpfix/msg" "github.com/bgpfix/bgpfix/pipe" @@ -121,26 +121,9 @@ func (eio *Extio) Attach() error { } // parse --type - for _, v := range k.Strings("type") { - // skip empty types - if len(v) == 0 { - continue - } - - // canonical name? - typ, err := msg.TypeString(v) - if err == nil { - eio.opt_type = append(eio.opt_type, typ) - continue - } - - // a plain integer? - tnum, err2 := strconv.Atoi(v) - if err2 == nil && tnum >= 0 && tnum <= 0xff { - eio.opt_type = append(eio.opt_type, msg.Type(tnum)) - continue - } - + var err error + eio.opt_type, err = core.ParseTypes(k.Strings("type"), nil) + if err != nil { return fmt.Errorf("--type: %w", err) } @@ -159,19 +142,19 @@ func (eio *Extio) Attach() error { // not write-only? read input to bgpipe if !eio.opt_write { if eio.IsBidir { - eio.InputL = p.AddInput(msg.DIR_L) - eio.InputR = p.AddInput(msg.DIR_R) + eio.InputL = p.AddInput(dir.DIR_L) + eio.InputR = p.AddInput(dir.DIR_R) if eio.IsLast { eio.InputD = eio.InputL } else { eio.InputD = eio.InputR } } else if eio.IsLeft { - eio.InputL = p.AddInput(msg.DIR_L) + eio.InputL = p.AddInput(dir.DIR_L) eio.InputR = eio.InputL // redirect R messages to L eio.InputD = eio.InputL } else { - eio.InputR = p.AddInput(msg.DIR_R) + eio.InputR = p.AddInput(dir.DIR_R) eio.InputL = eio.InputR // redirect L messages to R eio.InputD = eio.InputR } @@ -272,9 +255,9 @@ func (eio *Extio) ReadSingle(buf []byte, cb pipe.CallbackFunc) (parse_err error) // sail! m.CopyData() switch m.Dir { - case msg.DIR_L: + case dir.DIR_L: return eio.InputL.WriteMsg(m) - case msg.DIR_R: + case dir.DIR_R: return eio.InputR.WriteMsg(m) default: return eio.InputD.WriteMsg(m) diff --git a/scripts/release.sh b/scripts/release.sh new file mode 100755 index 0000000..41248d5 --- /dev/null +++ b/scripts/release.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +[ -z "$1" ] && { echo "Usage: release.sh VERSION" >&1; exit 1; } + +VERSION="$1" +DEST="./bin/bgpipe-$VERSION" + +############################################### + +function build() +{ + ARCH="$1" + SUFFIX="${2:+-$2}" + + echo "Building bgpipe $VERSION for $ARCH" + CGO_ENABLED=0 GOOS="${ARCH%-*}" GOARCH="${ARCH##*-}" \ + go build -o $DEST/bgpipe-${ARCH}${SUFFIX} +} + +############################################### + +echo "Building in $DEST" +rm -fr $DEST +mkdir -p $DEST + +build linux-amd64 +build linux-arm +# GOARM=6 build linux-arm 6 +# GOARM=7 build linux-arm 7 +build linux-arm64 +build linux-mips +build linux-mips64 +build linux-mips64le +build linux-ppc64 +build linux-ppc64le + +build darwin-amd64 +build darwin-arm64 + +build freebsd-amd64 +build netbsd-amd64 +build openbsd-amd64 + +build windows-amd64 +build windows-arm64 diff --git a/stages/connect.go b/stages/connect.go index 493003c..bf78bfc 100644 --- a/stages/connect.go +++ b/stages/connect.go @@ -31,6 +31,7 @@ func NewConnect(parent *core.StageBase) core.Stage { o.IsConsumer = true f.Duration("timeout", time.Minute, "connect timeout (0 means none)") + f.Duration("closed", time.Second, "half-closed timeout (0 means none)") f.String("md5", "", "TCP MD5 password") o.Args = []string{"addr"} @@ -86,5 +87,5 @@ func (s *Connect) Prepare() error { } func (s *Connect) Run() error { - return tcp_handle(s.StageBase, s.conn, s.in) + return tcp_handle(s.StageBase, s.conn, s.in, s.K.Duration("closed")) } diff --git a/stages/grep.go b/stages/grep.go new file mode 100644 index 0000000..444b664 --- /dev/null +++ b/stages/grep.go @@ -0,0 +1,678 @@ +package stages + +import ( + "fmt" + "math" + "net/netip" + "slices" + "strconv" + "strings" + + "github.com/bgpfix/bgpfix/afi" + "github.com/bgpfix/bgpfix/msg" + "github.com/bgpfix/bgpfix/nlri" + "github.com/bgpfix/bgpfix/pipe" + "github.com/bgpfix/bgpipe/core" +) + +type Grep struct { + *core.StageBase + + opt_if_type []msg.Type + opt_if_reach bool + opt_if_unreach bool + + opt_invert bool + opt_or bool + opt_strict bool + opt_fail_accept string + opt_fail_event string + opt_fail_kill bool + opt_parse bool + + enabled_matches int // number of different checks we must do for each message + opt_type []msg.Type + opt_reach bool + opt_unreach bool + opt_af []afi.AS + opt_asn []uint32 + opt_origin []uint32 + opt_prefix []nlri.NLRI + opt_prefix_len [129]bool + opt_prefix_lens int // if >0, opt_prefix_len is enabled + opt_nexthop []netip.Prefix + opt_tag map[string]string +} + +func NewGrep(parent *core.StageBase) core.Stage { + var ( + s = &Grep{StageBase: parent} + o = &s.Options + ) + + o.Usage = "grep" + o.Descr = "drop messages that do not match" + o.Bidir = true + + f := o.Flags + + f.StringSlice("if-type", nil, "run only for messages of the specified type(s)") + f.Bool("if-reach", false, "run only if the message announces a reachable prefix") + f.Bool("if-unreach", false, "run only if the message withdraws an unreachable prefix") + + f.String("fail-event", "", "on match failure, emit given event and DROP the message") + f.String("fail-accept", "", "on match failure, emit given event and ACCEPT the message") + f.Bool("fail-kill", false, "on match failure, kill the session") + + f.BoolP("invert", "v", false, "invert the final result: drop messages that matched successfully") + f.BoolP("or", "o", false, "require any match type (default: require ALL match types)") + f.BoolP("strict", "s", false, "require all values if possible (default: require ANY value)") + + f.StringSlice("type", nil, "require message type(s)") + f.Bool("parse", false, "require the messages to parse properly, do not report message parsing errors") + f.Bool("reach", false, "require announcement of reachable prefixes") + f.Bool("unreach", false, "require withdrawal of unreachable prefixes") + f.StringSlice("af", nil, "require UPDATE for given address family (format: AFI/SAFI)") + f.Bool("ipv4", false, "shortcut: --af IPV4/UNICAST") + f.Bool("ipv6", false, "shortcut: --af IPV6/UNICAST") + f.Bool("flowspec", false, "shortcut: --af IPV4/FLOWSPEC,IPV6/FLOWSPEC") + f.IntSlice("asn", nil, "require ASNs in the AS_PATH") + f.IntSlice("origin", nil, "require origin ASN") + f.StringSlice("prefix", nil, "require any of the given IP prefixes (or their subnets)") + f.StringSlice("prefix-len", nil, "require prefix length (use eg. 0-8 for ranges)") + f.StringSlice("nexthop", nil, "require NEXT_HOP inside given prefix(es)") + f.StringSlice("tag", nil, "require context tag values (format: key=value)") + + return s +} + +func (s *Grep) Attach() error { + k := s.K + + // logic switches? + s.opt_invert = k.Bool("invert") + s.opt_or = k.Bool("or") + s.opt_strict = k.Bool("strict") + + // events? + s.opt_fail_event = k.String("fail-event") + s.opt_fail_accept = k.String("fail-accept") + s.opt_fail_kill = k.Bool("fail-kill") + if s.opt_fail_accept != "" && s.opt_fail_event != "" { + return fmt.Errorf("--fail-event and --fail-accept must not be used together") + } else if s.opt_fail_kill && s.opt_fail_accept != s.opt_fail_event { + return fmt.Errorf("--fail-kill must not be used together with other --fail-* options") + } + + s.opt_parse = k.Bool("parse") + + // --------------------- + + // run-if type + var err error + s.opt_if_type, err = core.ParseTypes(k.Strings("if-type"), nil) + if err != nil { + return fmt.Errorf("--if-type: %w", err) + } + + // run-if reach/unreach + s.opt_if_reach = k.Bool("if-reach") + s.opt_if_unreach = k.Bool("if-unreach") + if s.opt_if_reach || s.opt_if_unreach { + s.opt_if_type = append(s.opt_if_type, msg.UPDATE) // --if-type UPDATE + } + + // dedup opt_if_type + slices.Sort(s.opt_if_type) + s.opt_if_type = slices.Compact(s.opt_if_type) + + // --------------------- + + // parse tags + for i, tag := range k.Strings("tag") { + if i == 0 { + s.opt_tag = map[string]string{} + } + + key, val, found := strings.Cut(tag, "=") + if found { + s.opt_tag[key] = val + } else { + return fmt.Errorf("--tag %s: invalid format, need key=value", tag) + } + } + + // types + s.opt_type, err = core.ParseTypes(k.Strings("type"), nil) + if err != nil { + return fmt.Errorf("--type: %w", err) + } + + // reach / unreach? + s.opt_reach = k.Bool("reach") + s.opt_unreach = k.Bool("unreach") + if s.opt_reach || s.opt_unreach { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + } + + // parse asns + for _, asn := range k.Ints("asn") { + if asn < 0 || asn > math.MaxUint32 { + return fmt.Errorf("--asn %d: invalid value (must be uint32)", asn) + } + s.opt_asn = append(s.opt_asn, uint32(asn)) + } + if len(s.opt_asn) > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + slices.Sort(s.opt_asn) + s.opt_asn = slices.Compact(s.opt_asn) + } + + // parse origins + for _, asn := range k.Ints("origin") { + if asn < 0 || asn > math.MaxUint32 { + return fmt.Errorf("--origin %d: invalid value (must be uint32)", asn) + } + s.opt_origin = append(s.opt_origin, uint32(asn)) + } + if len(s.opt_origin) > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + slices.Sort(s.opt_origin) + s.opt_origin = slices.Compact(s.opt_origin) + } + + // parse AF + for _, afs := range k.Strings("af") { + var as afi.AS + if err := as.FromJSON([]byte(afs)); err != nil { + return fmt.Errorf("--af %s: %w", afs, err) + } + s.opt_af = append(s.opt_af, as) + } + if k.Bool("ipv4") { + s.opt_af = append(s.opt_af, afi.AS_IPV4_UNICAST) + } + if k.Bool("ipv6") { + s.opt_af = append(s.opt_af, afi.AS_IPV6_UNICAST) + } + if len(s.opt_af) > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + slices.Sort(s.opt_af) + s.opt_af = slices.Compact(s.opt_af) + } + + // parse NEXT_HOP + for _, nhs := range k.Strings("nexthop") { + if strings.IndexByte(nhs, '/') > 0 { + p, err := netip.ParsePrefix(nhs) + if err != nil { + return fmt.Errorf("--nexthop %s: %w", nhs, err) + } + s.opt_nexthop = append(s.opt_nexthop, p) + } else { + a, err := netip.ParseAddr(nhs) + if err != nil { + return fmt.Errorf("--nexthop %s: %w", nhs, err) + } + p := netip.PrefixFrom(a, a.BitLen()) + s.opt_nexthop = append(s.opt_nexthop, p) + } + } + // require UPDATEs if --nexthop used + if len(s.opt_nexthop) > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + } + + // parse IP prefixes + for _, ps := range k.Strings("prefix") { + p, err := netip.ParsePrefix(ps) + if err != nil { + return fmt.Errorf("--prefix %s: %w", ps, err) + } + s.opt_prefix = append(s.opt_prefix, nlri.FromPrefix(p)) + } + // require UPDATEs if --prefix used + if len(s.opt_prefix) > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + } + + // prefix lengths + for _, pl := range k.Strings("prefix-len") { + before, after, ok := strings.Cut(pl, "-") + + // parse the first (or the only) + v1, err := strconv.Atoi(before) + if err != nil { + return fmt.Errorf("--prefix-len %s: %w", pl, err) + } else if v1 < 0 || v1 > 128 { + return fmt.Errorf("--prefix-len %s: invalid value", pl) + } + + // handle ranges + v2 := v1 + if ok { + v2, err = strconv.Atoi(after) + if err != nil { + return fmt.Errorf("--prefix-len %s: %w", pl, err) + } else if v2 < 0 || v2 > 128 || v2 < v1 { + return fmt.Errorf("--prefix-len %s: invalid value", pl) + } + } + + // add + for l := v1; l <= v2; l++ { + s.opt_prefix_len[l] = true + s.opt_prefix_lens++ + } + } + // require UPDATEs if --prefix-len used + if s.opt_prefix_lens > 0 { + s.opt_type = append(s.opt_type, msg.UPDATE) // --type UPDATE + } + + // dedup --type + slices.Sort(s.opt_type) + s.opt_type = slices.Compact(s.opt_type) + + // ------------------------------------------ + + // count how many checks we need to do + if s.opt_parse { + s.enabled_matches++ + } + if len(s.opt_type) > 0 { + s.enabled_matches++ + } + if s.opt_reach { + s.enabled_matches++ + } + if s.opt_unreach { + s.enabled_matches++ + } + if len(s.opt_af) > 0 { + s.enabled_matches++ + } + if len(s.opt_asn) > 0 { + s.enabled_matches++ + } + if len(s.opt_origin) > 0 { + s.enabled_matches++ + } + if len(s.opt_nexthop) > 0 { + s.enabled_matches++ + } + if len(s.opt_prefix) > 0 { + s.enabled_matches++ + } + if len(s.opt_tag) > 0 { + s.enabled_matches++ + } + if s.opt_prefix_lens > 0 { + s.enabled_matches++ + } + if s.enabled_matches == 0 { + return fmt.Errorf("nothing to do: no matches specified") + } + + // register a raw callback + cb := s.P.OnMsg(s.check, s.Dir, s.opt_if_type...) + cb.Raw = true // prevent parsing if possible + + return nil +} + +// parse parses message m and returns true on success, +// or logs the error and returns false otherwise (unless opt_parse is true) +// NB: an event can be generated by bgpipe in case of parse error as well +func (s *Grep) parse(m *msg.Msg) bool { + // optimization: already done? + if m.Upper != msg.INVALID { + return true + } + + // silent parse errors? + if s.opt_parse { + err := m.Parse(s.P.Caps) + return err == nil + } + + // standard path, emit event on parse errors + err := s.P.ParseMsg(m) + if err == nil { + return true // success + } else { + s.Err(err).Msg("could not check the message: parse error") + return false + } +} + +// should_run returns: 1 if we should run the stage for message m, -1 if not, or 0 means abort +func (s *Grep) should_run_stage(m *msg.Msg) int { + // --if-type handled using bgpfix callback mechanism + + // --if-reach + if s.opt_if_reach { + if !s.parse(m) { + return 0 + } + if !s.check_reach(m) { + return -1 + } + } + + // --if-unreach + if s.opt_if_unreach { + if !s.parse(m) { + return 0 + } + if !s.check_unreach(m) { + return -1 + } + } + + // all --if-* checks good + return 1 +} + +func (s *Grep) check(m *msg.Msg) (accept_message bool) { + // check if we should run all of this at all + switch s.should_run_stage(m) { + case 1: + break // yeah, run below checks + case -1: + return true // ignore the message, we should not run below checks + default: + return false // drop the message as-is, something is really broken + } + + // defer does the final processing of the result + abort := false // emergency stop + defer func() { + // unconditional drop? + if abort { + accept_message = false + return + } + + // invert the match result? + if s.opt_invert { + accept_message = !accept_message + } + + // if message accepted, we're done + if accept_message { + return + } + + // handle --fail-* options + switch { + case s.opt_fail_kill: + s.Fatal().Stringer("msg", m).Msg("message did not match") + case s.opt_fail_event != "": + s.Event(s.opt_fail_event, m) + case s.opt_fail_accept != "": + s.Event(s.opt_fail_accept, m) + accept_message = true // accept the message + } + }() + + // run_match calls given check function iff needed, and interprets its results + todo := s.enabled_matches // how many match types remaining? + run_match := func(check func(*msg.Msg) bool, enabled bool) (keep_going bool) { + // is this match type enabled at all? + if !enabled { + return true // no, keep going + } else { + todo-- + } + + // interpret the result respecting the --or-match flag + switch success := check(m); { + case s.opt_or: // OR logic, require any match type + if success { + accept_message = true // accept as-is + return false // stop here + } else if todo <= 0 { + accept_message = false // that was the last match type, no success = drop as-is + return false // stop here + } else { + return true // no result yet, keep going + } + default: // AND logic, require all match types + if !success { + accept_message = false // drop as-is + return false // stop here + } else if todo <= 0 { + accept_message = true // that was the last match type, all good = accept as-is + return false // stop here + } else { + return true // no result yet, keep going + } + } + } + + // does it parse properly? + if !run_match(s.parse, s.opt_parse) { + return + } + + // check message type + if !run_match(s.check_type, len(s.opt_type) > 0) { + return + } + + // check tags + if !run_match(s.check_tag, len(s.opt_tag) > 0) { + return + } + + // ------------------------------- + // past this point we must have a parsed message - lets ensure this + if !s.parse(m) { + abort = true + return + } + // ------------------------------- + + // require reach / unreach? + if !run_match(s.check_reach, s.opt_reach) { + return + } + if !run_match(s.check_unreach, s.opt_unreach) { + return + } + + // require AF? + if !run_match(s.check_af, len(s.opt_af) > 0) { + return + } + + // check AS_PATH contents? + if !run_match(s.check_asn, len(s.opt_asn) > 0) { + return + } + + // check AS_PATH origin? + if !run_match(s.check_origin, len(s.opt_origin) > 0) { + return + } + + // check nexthop? + if !run_match(s.check_nexthop, len(s.opt_nexthop) > 0) { + return + } + + // check prefix? + if !run_match(s.check_prefix, len(s.opt_prefix) > 0) { + return + } + + // check prefix length? + if !run_match(s.check_prefix_len, s.opt_prefix_lens > 0) { + return + } + + // if AND, no failures so far is a success + // if OR, no successes so far is a failure + return !s.opt_or +} + +// returns: 1 stop with success, -1 stop with failure, 0 keep going +func (s *Grep) check_if(cond bool) int { + switch { + case s.opt_strict: // AND logic, require all values to match + if !cond { + return -1 // drop as-is, stop here + } else { + return 0 // no result yet, keep going + } + default: // OR logic, require any value to match + if cond { + return 1 // accept as-is, stop here + } else { + return 0 // no result yet, keep running + } + } +} + +func (s *Grep) check_type(m *msg.Msg) bool { + _, found := slices.BinarySearch(s.opt_type, m.Type) + return found +} + +func (s *Grep) check_tag(m *msg.Msg) bool { + if !pipe.HasTags(m) { + return false + } + + mtags := pipe.MsgTags(m) + for key, val := range s.opt_tag { + ok := mtags[key] == val + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} + +func (s *Grep) check_reach(m *msg.Msg) bool { + return m.Update.HasReach() +} + +func (s *Grep) check_unreach(m *msg.Msg) bool { + return m.Update.HasUnreach() +} + +func (s *Grep) check_af(m *msg.Msg) bool { + val := m.Update.AS() + if val == afi.AS_INVALID { + return false + } + + _, found := slices.BinarySearch(s.opt_af, val) + return found +} + +func (s *Grep) check_asn(m *msg.Msg) bool { + aspath := m.Update.AsPath() + if aspath == nil { + return false + } + + // check anywhere in AS_PATH, including AS_SETs + for _, asn := range s.opt_asn { + ok := aspath.HasAsn(asn, 0) + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} + +func (s *Grep) check_origin(m *msg.Msg) bool { + aspath := m.Update.AsPath() + if aspath == nil { + return false + } + + // check at AS_PATH origin + for _, asn := range s.opt_origin { + ok := aspath.HasOrigin(asn, 0) + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} + +func (s *Grep) check_nexthop(m *msg.Msg) bool { + nh := m.Update.NextHop() + if !nh.IsValid() { + return false + } + + for _, p := range s.opt_nexthop { + ok := p.Contains(nh) + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} + +func (s *Grep) check_prefix(m *msg.Msg) bool { + // collect all prefixes in message + var inmsg []nlri.NLRI + inmsg = m.Update.GetReach(inmsg) + inmsg = m.Update.GetUnreach(inmsg) + + for i := range inmsg { + // is the prefix covered by any --prefix value? + ok := inmsg[i].FindParent(s.opt_prefix) >= 0 + + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} + +func (s *Grep) check_prefix_len(m *msg.Msg) bool { + // collect all prefixes in message + var inmsg []nlri.NLRI + inmsg = m.Update.GetReach(inmsg) + inmsg = m.Update.GetUnreach(inmsg) + + for i := range inmsg { + pl := inmsg[i].Bits() + ok := pl >= 0 && pl <= 128 && s.opt_prefix_len[pl] + switch s.check_if(ok) { + case 1: + return true + case -1: + return false + } + } + + return s.opt_strict +} diff --git a/stages/limit.go b/stages/limit.go index cf9725c..e62bc36 100644 --- a/stages/limit.go +++ b/stages/limit.go @@ -12,13 +12,13 @@ package stages import ( "encoding/binary" "fmt" - "net/netip" "slices" "sync" - "github.com/bgpfix/bgpfix/af" + "github.com/bgpfix/bgpfix/afi" "github.com/bgpfix/bgpfix/attrs" "github.com/bgpfix/bgpfix/msg" + "github.com/bgpfix/bgpfix/nlri" "github.com/bgpfix/bgpipe/core" "github.com/puzpuzpuz/xsync/v3" ) @@ -26,9 +26,9 @@ import ( type Limit struct { *core.StageBase - afs map[af.AF]bool // address families to consider - ipv4 bool // consider IPv4 - ipv6 bool // consider IPv6 + afs map[afi.AS]bool // address families to consider + ipv4 bool // consider IPv4 + ipv6 bool // consider IPv6 minlen int // max prefix length maxlen int // max prefix length @@ -41,9 +41,9 @@ type Limit struct { limit_origin int64 // max prefix count for single origin limit_block int64 // max prefix count for IP block - session *xsync.MapOf[netip.Prefix, *limitPrefix] // session db - origin *xsync.MapOf[uint32, *limitCounter] // per-origin db - block *xsync.MapOf[uint64, *limitCounter] // per-block db + session *xsync.MapOf[nlri.NLRI, *limitPrefix] // session db + origin *xsync.MapOf[uint32, *limitCounter] // per-origin db + block *xsync.MapOf[uint64, *limitCounter] // per-block db } func NewLimit(parent *core.StageBase) core.Stage { @@ -77,8 +77,8 @@ func NewLimit(parent *core.StageBase) core.Stage { so.Bidir = true // will aggregate both directions - s.afs = make(map[af.AF]bool) - s.session = xsync.NewMapOf[netip.Prefix, *limitPrefix]() + s.afs = make(map[afi.AS]bool) + s.session = xsync.NewMapOf[nlri.NLRI, *limitPrefix]() s.origin = xsync.NewMapOf[uint32, *limitCounter]() s.block = xsync.NewMapOf[uint64, *limitCounter]() @@ -95,15 +95,15 @@ func (s *Limit) Attach() error { s.ipv4 = true // by default, IPv4 only } if s.ipv4 { - s.afs[af.New(af.AFI_IPV4, af.SAFI_UNICAST)] = true + s.afs[afi.AS_IPV4_UNICAST] = true if k.Bool("multicast") { - s.afs[af.New(af.AFI_IPV4, af.SAFI_MULTICAST)] = true + s.afs[afi.AS_IPV4_MULTICAST] = true } } if s.ipv6 { - s.afs[af.New(af.AFI_IPV6, af.SAFI_UNICAST)] = true + s.afs[afi.AS_IPV6_UNICAST] = true if k.Bool("multicast") { - s.afs[af.New(af.AFI_IPV6, af.SAFI_MULTICAST)] = true + s.afs[afi.AS_IPV6_MULTICAST] = true } } @@ -161,16 +161,16 @@ func (s *Limit) onMsg(m *msg.Msg) bool { return true } -func (s *Limit) isShort(p netip.Prefix) bool { +func (s *Limit) isShort(p nlri.NLRI) bool { return s.minlen > 0 && p.Bits() < s.minlen } -func (s *Limit) isLong(p netip.Prefix) bool { +func (s *Limit) isLong(p nlri.NLRI) bool { return s.maxlen > 0 && p.Bits() > s.maxlen } // translates IP prefix to IP block, assuming prefix length <=/64 -func (s *Limit) p2b(p netip.Prefix) uint64 { +func (s *Limit) p2b(p nlri.NLRI) uint64 { b := p.Addr().AsSlice() switch len(b) { case 4: @@ -187,10 +187,10 @@ func (s *Limit) p2b(p netip.Prefix) uint64 { } func (s *Limit) checkReach(u *msg.Update) (before, after int) { - origin := u.Attrs.AsOrigin() + origin := u.AsPath().Origin() // drops p from u if violates the rules - dropReach := func(p netip.Prefix) (drop bool) { + dropReach := func(p nlri.NLRI) (drop bool) { defer func() { if drop { u.Msg.Modified() @@ -287,7 +287,7 @@ func (s *Limit) checkReach(u *msg.Update) (before, after int) { } // prefixes in the MP part? - if mp := u.Attrs.MPPrefixes(attrs.ATTR_MP_REACH); mp != nil && s.afs[mp.AF] { + if mp := u.MP(attrs.ATTR_MP_REACH).Prefixes(); mp != nil && s.afs[mp.AS] { before += len(mp.Prefixes) mp.Prefixes = slices.DeleteFunc(mp.Prefixes, dropReach) after += len(mp.Prefixes) @@ -303,7 +303,7 @@ func (s *Limit) checkReach(u *msg.Update) (before, after int) { func (s *Limit) checkUnreach(u *msg.Update) (before, after int) { // drops p from u if violates the rules - dropUnreach := func(p netip.Prefix) (drop bool) { + dropUnreach := func(p nlri.NLRI) (drop bool) { // too long or short? if s.isShort(p) || s.isLong(p) { u.Msg.Modified() @@ -360,7 +360,7 @@ func (s *Limit) checkUnreach(u *msg.Update) (before, after int) { } // prefixes in the MP part? - if mp := u.Attrs.MPPrefixes(attrs.ATTR_MP_UNREACH); mp != nil && s.afs[mp.AF] { + if mp := u.MP(attrs.ATTR_MP_UNREACH).Prefixes(); mp != nil && s.afs[mp.AS] { before += len(mp.Prefixes) mp.Prefixes = slices.DeleteFunc(mp.Prefixes, dropUnreach) after += len(mp.Prefixes) diff --git a/stages/listen.go b/stages/listen.go index 8ebf54e..5555c7d 100644 --- a/stages/listen.go +++ b/stages/listen.go @@ -26,6 +26,7 @@ func NewListen(parent *core.StageBase) core.Stage { ) f.Duration("timeout", 0, "connect timeout (0 means none)") + f.Duration("closed", time.Second, "half-closed timeout (0 means none)") if runtime.GOOS == "linux" { f.String("md5", "", "TCP MD5 password") } @@ -89,5 +90,5 @@ func (s *Listen) Prepare() error { } func (s *Listen) Run() error { - return tcp_handle(s.StageBase, s.conn, s.in) + return tcp_handle(s.StageBase, s.conn, s.in, s.K.Duration("closed")) } diff --git a/stages/repo.go b/stages/repo.go index c803ff5..5743dc8 100644 --- a/stages/repo.go +++ b/stages/repo.go @@ -5,6 +5,7 @@ import "github.com/bgpfix/bgpipe/core" var Repo = map[string]core.NewStage{ "connect": NewConnect, "exec": NewExec, + "grep": NewGrep, "limit": NewLimit, "listen": NewListen, "pipe": NewPipe, diff --git a/stages/util.go b/stages/util.go index bfe3b46..b361869 100644 --- a/stages/util.go +++ b/stages/util.go @@ -5,12 +5,13 @@ import ( "fmt" "io" "net" + "time" "github.com/bgpfix/bgpfix/pipe" "github.com/bgpfix/bgpipe/core" ) -func tcp_handle(s *core.StageBase, conn net.Conn, in *pipe.Input) error { +func tcp_handle(s *core.StageBase, conn net.Conn, in *pipe.Input, timeout time.Duration) error { s.Info().Msgf("connected %s -> %s", conn.LocalAddr(), conn.RemoteAddr()) defer conn.Close() @@ -39,6 +40,11 @@ func tcp_handle(s *core.StageBase, conn net.Conn, in *pipe.Input) error { s.Trace().Err(err).Msg("connection reader returned") tcp.CloseRead() rch <- retval{n, err} + + if timeout > 0 { + time.Sleep(timeout) + s.Cancel(io.EOF) + } }() // write to conn @@ -48,32 +54,39 @@ func tcp_handle(s *core.StageBase, conn net.Conn, in *pipe.Input) error { s.Trace().Err(err).Msg("connection writer returned") tcp.CloseWrite() wch <- retval{n, err} + + if timeout > 0 { + time.Sleep(timeout) + s.Cancel(io.EOF) + } }() // wait for error on any side, or both sides EOF var read, wrote int64 + var err error running := 2 - for running > 0 { + for err == nil && running > 0 { select { case <-s.Ctx.Done(): - return context.Cause(s.Ctx) + err = context.Cause(s.Ctx) + running = 0 case r := <-rch: - read = r.n + read, err = r.n, r.err running-- - if r.err != nil && r.err != io.EOF { - return r.err + if err == io.EOF { + err = nil } case w := <-wch: - wrote = w.n + wrote, err = w.n, w.err running-- - if w.err != nil && w.err != io.EOF { - return w.err + if err == io.EOF { + err = nil } } } - s.Info().Int64("read", read).Int64("wrote", wrote).Msg("connection closed") - return nil + s.Info().Err(err).Int64("read", read).Int64("wrote", wrote).Msg("connection closed") + return err } func close_safe[T any](ch chan T) (ok bool) {