Skip to content

Commit

Permalink
switch to bgpfix@main + proper exec backend
Browse files Browse the repository at this point in the history
  • Loading branch information
pforemski committed Oct 25, 2023
1 parent 93a20c1 commit 198d732
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 211 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ $ cat input.json | bgpipe --stdin speaker 1.2.3.4 | tee output.json
$ bgpipe updates.20230301.0000.bz2 > output.json

# proxy a connection, print the conversation to stdout
# 1st stage: bind to TCP *:179
# 2nd stage: wait for connection, and proxy to 1.2.3.4 adding TCP-MD5
# 1st stage: listen on TCP *:179 for new connection
# 2nd stage: wait for new connection and proxy it to 1.2.3.4, adding TCP-MD5
$ bgpipe \
-- listen :179 \
-- connect --wait listen/connected --md5 solarwinds123 1.2.3.4
-- connect --wait listen --md5 solarwinds123 1.2.3.4

# a BGP speaker that streams MRT file after the session is established
# 1st stage: active BGP speaker (AS65055), starting when client connects
# 2nd stage: MRT file reader, starting after session is established
# 3rd stage: bind to TCP *:179
# a BGP speaker that streams an MRT file
# 1st stage: active BGP speaker for AS65055
# 2nd stage: MRT file reader, starting when the BGP session is established
# 3rd stage: listen on TCP *:179 for new connection
$ bgpipe \
-- speaker --wait listen/connected --active --asn 65055 \
-- mrt --wait established updates.20230301.0000.bz2 \
-- speaker --active --asn 65055 \
-- mrt --wait ESTABLISHED updates.20230301.0000.bz2 \
-- listen :179
```

Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/bgpfix/bgpipe
go 1.21.0

require (
github.com/bgpfix/bgpfix v0.0.0-20231003232953-99970152bc36
github.com/bgpfix/bgpfix v0.0.0-20231025125100-6d78c672faa6
github.com/spf13/pflag v1.0.5
)

Expand All @@ -19,10 +19,10 @@ require (
github.com/knadh/koanf/providers/posflag v0.1.0
github.com/knadh/koanf/v2 v2.0.1
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/puzpuzpuz/xsync/v2 v2.4.1 // indirect
github.com/rs/zerolog v1.30.0
golang.org/x/sys v0.12.0
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/rs/zerolog v1.31.0
golang.org/x/sys v0.13.0
)

// replace github.com/bgpfix/bgpfix => ../bgpfix
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/bgpfix/bgpfix v0.0.0-20231003232953-99970152bc36 h1:Ubl0c9kAuepqI7RQ+hbNIn38l2dXzjgayxujcDh5QKQ=
github.com/bgpfix/bgpfix v0.0.0-20231003232953-99970152bc36/go.mod h1:EkeLwGiW9F1LSOO4ziPXEoJRiWeEw/C7p7iJhGQbDYw=
github.com/bgpfix/bgpfix v0.0.0-20231025125100-6d78c672faa6 h1:73ShjqaNqxzyLvvN+9pH+DaLoXEJylIxMe+PCHJE5AA=
github.com/bgpfix/bgpfix v0.0.0-20231025125100-6d78c672faa6/go.mod h1:EkeLwGiW9F1LSOO4ziPXEoJRiWeEw/C7p7iJhGQbDYw=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand All @@ -19,6 +21,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand All @@ -30,9 +34,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/puzpuzpuz/xsync/v2 v2.4.1 h1:aGdE1C/HaR/QC6YAFdtZXi60Df8/qBIrs8PKrzkItcM=
github.com/puzpuzpuz/xsync/v2 v2.4.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU=
github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand All @@ -43,5 +51,7 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
125 changes: 88 additions & 37 deletions pkg/bgpipe/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strconv"

"github.com/bgpfix/bgpfix/caps"
"github.com/bgpfix/bgpfix/msg"
"github.com/bgpfix/bgpfix/pipe"
"github.com/rs/zerolog"
)

// Attach attaches all stages to pipe
Expand Down Expand Up @@ -120,10 +122,10 @@ func (s *StageBase) attach() error {
s.IsLeft = k.Bool("left")
s.IsRight = k.Bool("right")
if s.IsLeft && s.IsRight {
if !s.Options.AllowLR {
if !s.Options.Bidir {
return ErrLR
}
} else if s.IsLeft == s.IsRight { // both false
} else if s.IsLeft == s.IsRight { // both false = apply a default
s.IsRight = true // the default

// exceptions
Expand All @@ -137,36 +139,23 @@ func (s *StageBase) attach() error {
s.IsLeft = !s.IsRight
}

// where to inject new messages?
switch v := k.String("in"); v {
case "next", "":
s.SkipId = -s.Index
case "here":
s.SkipId = s.Index
case "first":
s.SkipId = 0
default:
if id, err := strconv.Atoi(v); err == nil {
// an integer
s.SkipId = id
break
} else if len(v) > 0 && v[0] == '@' {
// a stage name reference?
for _, s2 := range s.B.Stages {
if s2 != nil && s2.Name == v {
s.SkipId = s2.Index
break
}
}
}
if s.SkipId == 0 {
return fmt.Errorf("%w: %s", ErrInject, v)
}
// set s.Dir
if s.IsLeft && s.IsRight {
s.Dir = msg.DIR_LR
} else if s.IsLeft {
s.Dir = msg.DIR_L
s.Upstream = p.L
s.Downstream = p.R
} else {
s.Dir = msg.DIR_R
s.Upstream = p.R
s.Downstream = p.L
}

// call child attach
cbs := len(po.Callbacks)
hds := len(po.Handlers)
ins := len(po.Inputs)
if err := s.Stage.Attach(); err != nil {
return err
}
Expand All @@ -180,24 +169,73 @@ func (s *StageBase) attach() error {
}
s.Logger = s.B.With().Str("stage", name).Logger()

// needs raw stream access?
if s.Options.IsRawReader || s.Options.IsRawWriter {
// consumes messages?
if s.Options.IsConsumer {
if !(s.IsFirst || s.IsLast) {
return ErrFirstOrLast
}
}

// make stage callbacks and handlers depend on s.enabled
s.Callbacks = po.Callbacks[cbs:]
for _, cb := range s.Callbacks {
// fix callbacks
s.callbacks = po.Callbacks[cbs:]
for _, cb := range s.callbacks {
cb.Id = s.Index
cb.Enabled = &s.running
}
s.Handlers = po.Handlers[hds:]
for _, h := range s.Handlers {

// fix handlers
s.handlers = po.Handlers[hds:]
for _, h := range s.handlers {
h.Id = s.Index
h.Enabled = &s.running
}

// where to inject new messages?
var frev, ffwd pipe.FilterMode // input filter mode
var fid int // input filter callback id
switch v := k.String("in"); v {
case "next", "":
frev, ffwd = pipe.FILTER_GE, pipe.FILTER_LE
fid = s.Index
case "here":
frev, ffwd = pipe.FILTER_GT, pipe.FILTER_LT
fid = s.Index
case "first":
frev, ffwd = pipe.FILTER_NONE, pipe.FILTER_NONE
case "last":
frev, ffwd = pipe.FILTER_ALL, pipe.FILTER_ALL
default:
frev, ffwd = pipe.FILTER_GE, pipe.FILTER_LE
if id, err := strconv.Atoi(v); err == nil {
fid = id
} else if len(v) > 0 && v[0] == '@' {
// a stage name reference?
for _, s2 := range s.B.Stages {
if s2 != nil && s2.Name == v {
fid = s2.Index
break
}
}
}
if fid <= 0 {
return fmt.Errorf("%w: %s", ErrInject, v)
}
}

// fix inputs
s.inputs = po.Inputs[ins:]
for _, li := range s.inputs {
li.Id = s.Index
li.FilterValue = fid

if li.Dir == msg.DIR_L {
li.Reverse = true // CLI gives L stages in reverse
li.CallbackFilter = frev
} else {
li.Reverse = false
li.CallbackFilter = ffwd
}
}
}

// update related waitgroups
Expand All @@ -208,7 +246,7 @@ func (s *StageBase) attach() error {
po.OnEventPre(s.runStart, evs...)

// re-target pipe.EVENT_START handlers to the --wait events
for _, h := range s.Handlers {
for _, h := range s.handlers {
for i, t := range h.Types {
if t == pipe.EVENT_START {
h.Types[i] = evs[0]
Expand All @@ -225,8 +263,21 @@ func (s *StageBase) attach() error {
po.OnEventPost(s.runStop, evs...)
}

s.Trace().Msgf("attached [%d] first/last=%v/%v L/R=%v,%v startat=%d",
s.Index, s.IsFirst, s.IsLast, s.IsLeft, s.IsRight, s.SkipId)
// debug?
if s.GetLevel() <= zerolog.TraceLevel {
s.Trace().Msgf("[%d] attached %s first/last=%v/%v L/R=%v,%v",
s.Index, s.Name, s.IsFirst, s.IsLast, s.IsLeft, s.IsRight)
for _, cb := range s.callbacks {
s.Trace().Msgf(" callback %#v", cb)
}
for _, hd := range s.handlers {
s.Trace().Msgf(" handler %#v", hd)
}
for _, in := range s.inputs {
s.Trace().Msgf(" input %s dir=%s reverse=%v filt=%d filt_id=%d",
in.Name, in.Dir, in.Reverse, in.CallbackFilter, in.FilterValue)
}
}

return nil
}
36 changes: 23 additions & 13 deletions pkg/bgpipe/bgpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func NewBgpipe(repo ...map[string]NewStage) *Bgpipe {
b.Pipe = pipe.NewPipe(b.Ctx)
po := &b.Pipe.Options
po.Logger = &b.Logger
po.Lreverse = true // it's just the case for bgpipe

// global config
b.K = koanf.New(".")
Expand Down Expand Up @@ -92,6 +91,8 @@ func (b *Bgpipe) Run() error {
b.Pipe.Start() // will call b.Start
b.Pipe.Wait() // until error or all processing is done

// TODO: wait until all pipe output is read

// any errors on the global context?
err := context.Cause(b.Ctx)
switch {
Expand All @@ -111,13 +112,13 @@ func (b *Bgpipe) Start(ev *pipe.Event) bool {
// wait for writers
go func() {
b.wg_lwrite.Wait()
b.Debug().Msg("closing L input")
b.Pipe.L.CloseInput()
b.Debug().Msg("closing L inputs")
b.Pipe.L.Close()
}()
go func() {
b.wg_rwrite.Wait()
b.Debug().Msg("closing R input")
b.Pipe.R.CloseInput()
b.Debug().Msg("closing R inputs")
b.Pipe.R.Close()
}()

// wait for readers
Expand All @@ -137,18 +138,27 @@ func (b *Bgpipe) Start(ev *pipe.Event) bool {

// LogEvent logs given event
func (b *Bgpipe) LogEvent(ev *pipe.Event) bool {
// will b.Info() if ev.Error is nil
l := b.Err(ev.Error)

if ev.Msg != nil {
b.logbuf = ev.Msg.ToJSON(b.logbuf[:0])
} else {
b.logbuf = b.logbuf[:0]
l = l.Bytes("msg", b.logbuf)
}

if ev.Dir != 0 {
l = l.Stringer("dir", ev.Dir)
}

if ev.Seq != 0 {
l = l.Uint64("seq", ev.Seq)
}

if ev.Value != nil {
l = l.Interface("val", ev.Value)
}

b.
Err(ev.Error). // will b.Info() if nil
Uint64("seq", ev.Seq).
Bytes("msg", b.logbuf).
Interface("val", ev.Value).
Msgf("event %s", ev.Type)
l.Msgf("event %s", ev.Type)
return true
}

Expand Down
Loading

0 comments on commit 198d732

Please sign in to comment.