Documentation
¶
Overview ¶
Package pipe provides BGP message processing with callbacks.
Index ¶
- Variables
- func ActionAccept(m *msg.Msg) *msg.Msg
- func ActionBorrow(m *msg.Msg) *msg.Msg
- func ActionClear(m *msg.Msg) *msg.Msg
- func ActionDrop(m *msg.Msg) *msg.Msg
- func ActionIsAccept(m *msg.Msg) bool
- func ActionIsBorrow(m *msg.Msg) bool
- func ActionIsDrop(m *msg.Msg) bool
- func HasContext(m *msg.Msg) bool
- func HasTags(m *msg.Msg) bool
- func MsgTags(m *msg.Msg) map[string]string
- type Action
- func (ac *Action) Accept()
- func (ac *Action) Add(a Action)
- func (ac *Action) Borrow()
- func (ac *Action) Clear()
- func (ac *Action) Drop()
- func (ac *Action) FromJSON(src []byte) error
- func (ac Action) Is(a Action) bool
- func (ac Action) IsAccept() bool
- func (ac Action) IsBorrow() bool
- func (ac Action) IsDrop() bool
- func (ac Action) Not(a Action) bool
- func (ac Action) ToJSON(dst []byte) []byte
- type Callback
- type CallbackFunc
- type Context
- func (mx *Context) DropTags()
- func (mx *Context) FromJSON(src []byte) error
- func (mx *Context) GetTag(tag string) string
- func (mx *Context) HasTag(tag string) bool
- func (mx *Context) HasTags() bool
- func (mx *Context) Reset()
- func (mx *Context) SetTag(tag string, val string)
- func (mx *Context) Tags() map[string]string
- func (mx *Context) ToJSON(dst []byte) []byte
- type Event
- type FilterMode
- type Handler
- type HandlerFunc
- type Input
- type Line
- type Options
- func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
- func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
- func (o *Options) AddInput(dst dir.Dir, tpl ...*Input) *Input
- func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
- func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnMsg(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPost(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPre(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnParseError(hdf HandlerFunc) *Handler
- func (o *Options) OnStart(hdf HandlerFunc) *Handler
- func (o *Options) OnStop(hdf HandlerFunc) *Handler
- type Pipe
- func (p *Pipe) Event(et string, args ...any) *Event
- func (p *Pipe) GetMsg() (m *msg.Msg)
- func (p *Pipe) LineFor(dst dir.Dir) *Line
- func (p *Pipe) ParseMsg(m *msg.Msg) error
- func (p *Pipe) PutMsg(m *msg.Msg)
- func (p *Pipe) Start()
- func (p *Pipe) Started() bool
- func (p *Pipe) Stop()
- func (p *Pipe) Stopped() bool
- func (p *Pipe) Wait()
Constants ¶
This section is empty.
Variables ¶
var ( ErrInClosed = errors.New("input channel closed") ErrOutClosed = errors.New("output channel closed") ErrStopped = errors.New("pipe stopped") )
var ( // pipe has finished starting EVENT_START = "bgpfix/pipe.START" // pipe is about to stop EVENT_STOP = "bgpfix/pipe.STOP" // could not parse the message before its callback EVENT_PARSE = "bgpfix/pipe.PARSE" // valid OPEN with a bigger message timestamp (seconds) made it to output EVENT_OPEN = "bgpfix/pipe.OPEN" // KEEPALIVE with a bigger message timestamp (seconds) made it to output EVENT_ALIVE = "bgpfix/pipe.ALIVE" // UPDATE with a bigger message timestamp (seconds) made it to output EVENT_UPDATE = "bgpfix/pipe.UPDATE" // session established (OPEN+KEEPALIVE made it to both sides) EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED" // End-of-RIB for new AF made it to ouput in given direction EVENT_EOR_AF = "bgpfix/pipe.EOR_AF" // End-of-RIB for all AFs in Caps made it to ouput in given direction EVENT_EOR = "bgpfix/pipe.EOR" )
a collection of events generated internally by pipe
var DefaultOptions = Options{ Logger: &log.Logger, Caps: true, }
Default BGP pipe options
Functions ¶
func ActionAccept ¶ added in v0.1.7
ActionAccept adds ACTION_ACCEPT to m and returns it.
func ActionBorrow ¶ added in v0.1.7
ActionBorrow adds ACTION_BORROW to m and returns it.
func ActionClear ¶ added in v0.1.7
ActionClear clears all action flags but ACTION_BORROW in m and returns it.
func ActionDrop ¶ added in v0.1.7
ActionDrop adds ACTION_DROP to m and returns it.
func ActionIsAccept ¶ added in v0.1.7
ActionIsAccept returns true if ACTION_ACCEPT is set in m.
func ActionIsBorrow ¶ added in v0.1.7
ActionIsBorrow returns true if ACTION_BORROW is set in m.
func ActionIsDrop ¶ added in v0.1.7
ActionIsDrop returns true if ACTION_DROP is set in m.
func HasContext ¶ added in v0.2.0
HasContext returns true iff m has a Context
Types ¶
type Action ¶
type Action byte
Action requests special handling of a message or event in a Pipe
const ( // Keep the message for later use, do not re-use its memory. // // You must use this if you wish to re-inject the message, // or keep reference to some value inside the msg. // // Once set, you must not remove this action from a message // unless you know you are the sole owner of this message. ACTION_BORROW Action = 1 << iota // Drop the message/event immediately (skip other calls, drop from output). // // If you want to re-inject the message later, set ACTION_BORROW too. // When re-injecting, clear the Action first, and remember the message will re-start // processing from the next callback, unless you clear its Context. ACTION_DROP // Accept the message/event immediately (skip other calls, proceed to output) ACTION_ACCEPT // Mask is logical OR of all defined actions ACTION_MASK Action = 1<<iota - 1 )
const ACTION_OK Action = 0
The default, zero action: keep processing as-is.
type Callback ¶
type Callback struct { Id int // optional callback id number (zero means none) Name string // optional name Order int // the lower the order, the sooner callback is run Enabled *atomic.Bool // if non-nil, disables the callback unless true Dropped bool // if true, permanently drops (unregisters) the callback Pre bool // run before non-pre callbacks? Raw bool // if true, do not parse the message (which may already be parsed, but for other reasons) Post bool // run after non-post callbacks? Dir dir.Dir // if non-zero, limits the direction Types []msg.Type // if non-empty, limits message types Func CallbackFunc // the function to call }
Callback represents a function to call for matching BGP messages
func (*Callback) Disable ¶ added in v0.2.0
Disable sets cb.Enabled to false and returns true. If cb.Enabled is nil, returns false.
func (*Callback) Drop ¶ added in v0.2.0
func (cb *Callback) Drop()
Drop drops the callback, permanently unregistering it from running
type CallbackFunc ¶
CallbackFunc processes message m. Return false to drop the message.
type Context ¶
type Context struct { Pipe *Pipe // pipe processing the message Input *Input // input processing the message (message source) Callback *Callback // currently running callback Action Action // requested message actions // contains filtered or unexported fields }
Context tracks message processing in a Pipe, stored in Msg.Value.
func GetContext ¶ added in v0.4.0
GetContext returns message Context inside m, iff it exists (or nil).
func MsgContext ¶ added in v0.1.6
MsgContext returns message Context inside m, creating one if needed.
func (*Context) DropTags ¶ added in v0.2.0
func (mx *Context) DropTags()
DropTags drops all message tags
func (*Context) HasTag ¶ added in v0.1.6
HasTag returns true iff the context has a particular Tag set
type Event ¶
type Event struct { Pipe *Pipe `json:"-"` // parent pipe Seq uint64 `json:"seq,omitempty"` // event sequence number Time time.Time `json:"time,omitempty"` // event timestamp Type string `json:"type"` // type, usually "lib/pkg.NAME" Dir dir.Dir `json:"dir"` // optional event direction Msg string `json:"msg"` // optional BGP message in JSON Error error `json:"err"` // optional error related to the event Value any `json:"value"` // optional value, type-specific Handler *Handler // currently running handler (may be nil) Action Action // optional event action (zero means none) // contains filtered or unexported fields }
Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.
type FilterMode ¶
type FilterMode = int
const ( // callback filter disabled FILTER_NONE FilterMode = iota // skip if callback id == value FILTER_EQ // skip if callback id > value FILTER_GT // skip if callback id < value FILTER_LT // skip if callback id >= value FILTER_GE // skip if callback id <= value FILTER_LE // skip if callback id != value FILTER_NE // skip all callbacks FILTER_ALL )
type Handler ¶
type Handler struct { Id int // optional handler id number (zero means none) Name string // optional name Order int // the lower the order, the sooner handler is run Enabled *atomic.Bool // if non-nil, disables the handler unless true Dropped bool // if true, permanently drops (unregisters) the handler Pre bool // run before non-pre handlers? Post bool // run after non-post handlers? Dir dir.Dir // if non-zero, limits the direction Types []string // if non-empty, limits event types Func HandlerFunc // the function to call }
Handler represents a function to call for matching pipe events
func (*Handler) Disable ¶ added in v0.2.0
Disable sets h.Enabled to false and returns true. If h.Enabled is nil, returns false.
func (*Handler) Drop ¶ added in v0.2.0
func (h *Handler) Drop()
Drop drops the handler, permanently unregistering it from running
type HandlerFunc ¶
HandlerFunc handles event ev. Return false to unregister the handler (all types).
type Input ¶
type Input struct { Pipe *Pipe // attached to this Pipe (nil before pipe start) Line *Line // attached to this Line (nil before pipe start) Id int // optional id Name string // optional name Dir dir.Dir // input direction // In is the input for incoming messages. In chan *msg.Msg // Reverse, when true, runs callbacks in reverse order. Reverse bool // CallbackFilter controls which callbacks to skip (disabled by default) CallbackFilter FilterMode // FilterValue specifies the value for CallbackFilter FilterValue any // contains filtered or unexported fields }
Input processes incoming BGP messages through Callbacks and (optionally) writes the result to attached Line.
func (*Input) Close ¶
func (in *Input) Close()
Close safely closes the .In channel, which should eventually stop the Input
func (*Input) Wait ¶
Wait blocks until the input is done processing the messages (returns true), or aborts if the Pipe context is cancelled (returns false).
func (*Input) Write ¶
Write implements io.Writer and reads all BGP messages from src into in.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.
In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.
Must not be used concurrently.
func (*Input) WriteFunc ¶ added in v0.2.0
func (in *Input) WriteFunc(src []byte, cb CallbackFunc) (int, error)
WriteFunc is the same as Input.Write(), but takes an optional callback function to be called just before the message is accepted for processing. If the callback returns false, the message is silently dropped instead.
type Line ¶
type Line struct { Pipe *Pipe // parent pipe Dir dir.Dir // line direction // the default Input, which processes messages through all callbacks. Input // Out is the Line output, where you can read processed messages from. Out chan *msg.Msg // UNIX timestamp (seconds) of the last valid OPEN message LastOpen atomic.Int64 // UNIX timestamp (seconds) of the last KEEPALIVE message LastAlive atomic.Int64 // UNIX timestamp (seconds) of the last UPDATE message LastUpdate atomic.Int64 // the OPEN message that updated LastOpen Open atomic.Pointer[msg.Open] // UNIX timestamp (seconds) of the first EoR for given AF EoR *xsync.MapOf[afi.AS, int64] // contains filtered or unexported fields }
Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.
func (*Line) Close ¶
func (l *Line) Close()
Close safely closes all inputs, which should eventually stop the line
func (*Line) CloseOutput ¶
func (l *Line) CloseOutput()
CloseOutput safely closes the output channel.
func (*Line) Wait ¶
Wait blocks until all processing is done (returns true), or aborts if the Pipe context is cancelled (returns false).
func (*Line) WriteOutput ¶ added in v0.2.0
WriteOutput safely sends m to l.Out, avoiding a panic if closed.
type Options ¶
type Options struct { Logger *zerolog.Logger // if nil logging is disabled MsgPool *sync.Pool // optional pool for msg.Msg Caps bool // overwrite pipe.Caps with the capabilities negotiated in OPEN messages? Callbacks []*Callback // message callbacks Handlers []*Handler // event handlers Inputs []*Input // input processors }
BGP pipe options
func (*Options) AddCallback ¶
func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.
func (*Options) AddHandler ¶
func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.
func (*Options) AddInput ¶
AddInput adds input processor for given pipe direction, with optional details in tpl.
func (*Options) OnEstablished ¶
func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
OnEstablished request hdf to be called when the BGP session is established.
func (*Options) OnEvent ¶
func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.
func (*Options) OnEventPost ¶
func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
OnEventPost is like OnEvent but requests to run hdf after other handlers
func (*Options) OnEventPre ¶
func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
OnEventPre is like OnEvent but requests to run hdf before other handlers
func (*Options) OnMsg ¶
OnMsg adds a callback for all messages of given types (or all types if not specified).
func (*Options) OnParseError ¶
func (o *Options) OnParseError(hdf HandlerFunc) *Handler
OnParseError request hdf to be called on BGP message parse error.
func (*Options) OnStart ¶
func (o *Options) OnStart(hdf HandlerFunc) *Handler
OnStart request hdf to be called after the pipe starts.
func (*Options) OnStop ¶
func (o *Options) OnStop(hdf HandlerFunc) *Handler
OnStop request hdf to be called when the pipe stops.
type Pipe ¶
type Pipe struct { *zerolog.Logger Options // pipe options; modify before Start() Caps caps.Caps // BGP capability context; always thread-safe L *Line // line processing messages from R to L R *Line // line processing messages from L to R // generic Key-Value store, always thread-safe KV *xsync.MapOf[string, any] // contains filtered or unexported fields }
Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.
Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.
func NewPipe ¶
NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().
func (*Pipe) Event ¶
Event announces a new event type et to the pipe, with optional arguments. The first dir.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into a single ev.Error. The remaining arguments are used as ev.Val.
func (*Pipe) LineFor ¶ added in v0.1.6
LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).
func (*Pipe) ParseMsg ¶ added in v0.4.0
ParseMsg parses given message m (if needed), in the context of this Pipe. In case of error, it emits EVENT_PARSE before returning.
func (*Pipe) Started ¶
Started returns true iff Start() has already been called = pipe is (being) started.
func (*Pipe) Stop ¶
func (p *Pipe) Stop()
Stop stops all inputs and blocks till they finish processing. Pipe must not be used again past this point. Closes all inputs, which should eventually close all outputs, possibly after this function returns.