pipe

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: MIT Imports: 19 Imported by: 3

Documentation

Overview

Package pipe provides BGP message processing with callbacks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInClosed  = errors.New("input channel closed")
	ErrOutClosed = errors.New("output channel closed")
	ErrStopped   = errors.New("pipe stopped")
)
View Source
var (
	// pipe has finished starting
	EVENT_START = "bgpfix/pipe.START"

	// pipe is about to stop
	EVENT_STOP = "bgpfix/pipe.STOP"

	// could not parse the message before its callback
	EVENT_PARSE = "bgpfix/pipe.PARSE"

	// valid OPEN with a bigger message timestamp (seconds) made it to output
	EVENT_OPEN = "bgpfix/pipe.OPEN"

	// KEEPALIVE with a bigger message timestamp (seconds) made it to output
	EVENT_ALIVE = "bgpfix/pipe.ALIVE"

	// UPDATE with a bigger message timestamp (seconds) made it to output
	EVENT_UPDATE = "bgpfix/pipe.UPDATE"

	// session established (OPEN+KEEPALIVE made it to both sides)
	EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED"
)

a collection of events generated internally by pipe

View Source
var DefaultOptions = Options{
	Logger: &log.Logger,
	Caps:   true,
}

Default BGP pipe options

Functions

func ActionAccept added in v0.1.7

func ActionAccept(m *msg.Msg) *msg.Msg

ActionAccept adds ACTION_ACCEPT to m and returns it.

func ActionBorrow added in v0.1.7

func ActionBorrow(m *msg.Msg) *msg.Msg

ActionBorrow adds ACTION_BORROW to m and returns it.

func ActionClear added in v0.1.7

func ActionClear(m *msg.Msg) *msg.Msg

ActionClear clears all action flags but ACTION_BORROW in m and returns it.

func ActionDrop added in v0.1.7

func ActionDrop(m *msg.Msg) *msg.Msg

ActionDrop adds ACTION_DROP to m and returns it.

func ActionIsAccept added in v0.1.7

func ActionIsAccept(m *msg.Msg) bool

ActionIsAccept returns true if ACTION_ACCEPT is set in m.

func ActionIsBorrow added in v0.1.7

func ActionIsBorrow(m *msg.Msg) bool

ActionIsBorrow returns true if ACTION_BORROW is set in m.

func ActionIsDrop added in v0.1.7

func ActionIsDrop(m *msg.Msg) bool

ActionIsDrop returns true if ACTION_DROP is set in m.

func HasContext added in v0.2.0

func HasContext(m *msg.Msg) bool

HasContext returns true iff m has a Context

func HasTags added in v0.2.0

func HasTags(m *msg.Msg) bool

HasTags returns true iff m has a Context and non-empty Tags

func MsgTags added in v0.2.0

func MsgTags(m *msg.Msg) map[string]string

MsgTags returns message Tags inside m, creating them first if needed

Types

type Action

type Action byte

Action requests special handling of a message or event in a Pipe

const (
	// Keep the message for later use, do not re-use its memory.
	//
	// You must use this if you wish to re-inject the message,
	// or keep reference to some value inside the msg.
	//
	// Once set, you must not remove this action from a message
	// unless you know you are the sole owner of this message.
	ACTION_BORROW Action = 1 << iota

	// Drop the message/event immediately (skip other calls, drop from output).
	//
	// If you want to re-inject the message later, set ACTION_BORROW too.
	// When re-injecting, clear the Action first, and remember the message will re-start
	// processing from the next callback, unless you clear its Context.
	ACTION_DROP

	// Accept the message/event immediately (skip other calls, proceed to output)
	ACTION_ACCEPT

	// Mask is logical OR of all defined actions
	ACTION_MASK Action = 1<<iota - 1
)
const ACTION_OK Action = 0

The default, zero action: keep processing as-is.

func (*Action) Accept added in v0.2.0

func (ac *Action) Accept()

Accept adds ACTION_ACCEPT

func (*Action) Add

func (ac *Action) Add(a Action)

Add adds a to action ac

func (*Action) Borrow added in v0.2.0

func (ac *Action) Borrow()

Borrow adds ACTION_BORROW

func (*Action) Clear

func (ac *Action) Clear()

Clear clears all bits except for ACTION_BORROW

func (*Action) Drop added in v0.2.0

func (ac *Action) Drop()

Drop adds ACTION_DROP

func (*Action) FromJSON added in v0.2.0

func (ac *Action) FromJSON(src []byte) error

FromJSON parses JSON representation in src

func (Action) Is

func (ac Action) Is(a Action) bool

Is returns true iff a is set in ac

func (Action) IsAccept added in v0.2.0

func (ac Action) IsAccept() bool

IsAccept returns true iff ACTION_ACCEPT is set in ac

func (Action) IsBorrow added in v0.2.0

func (ac Action) IsBorrow() bool

IsBorrow returns true iff ACTION_BORROW is set in ac

func (Action) IsDrop added in v0.2.0

func (ac Action) IsDrop() bool

IsDrop returns true iff ACTION_DROP is set in ac

func (Action) Not

func (ac Action) Not(a Action) bool

IsNot returns true iff a is NOT set in ac

func (Action) ToJSON added in v0.2.0

func (ac Action) ToJSON(dst []byte) []byte

ToJSON appends JSON representation to dst

type Callback

type Callback struct {
	Id      int          // optional callback id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner callback is run
	Enabled *atomic.Bool // if non-nil, disables the callback unless true
	Dropped bool         // if true, permanently drops (unregisters) the callback

	Pre  bool // run before non-pre callbacks?
	Raw  bool // if true, do not parse the message (which may already be parsed, but for other reasons)
	Post bool // run after non-post callbacks?

	Dir   msg.Dir      // if non-zero, limits the direction
	Types []msg.Type   // if non-empty, limits message types
	Func  CallbackFunc // the function to call
}

Callback represents a function to call for matching BGP messages

func (*Callback) Disable added in v0.2.0

func (cb *Callback) Disable() bool

Disable sets cb.Enabled to false and returns true. If cb.Enabled is nil, returns false.

func (*Callback) Drop added in v0.2.0

func (cb *Callback) Drop()

Drop drops the callback, permanently unregistering it from running

func (*Callback) Enable added in v0.2.0

func (cb *Callback) Enable() bool

Enable sets cb.Enabled to true and returns true. If cb.Enabled is nil, returns false.

func (*Callback) String added in v0.2.0

func (cb *Callback) String() string

String returns callback name and id as string

type CallbackFunc

type CallbackFunc func(m *msg.Msg) (keep_message bool)

CallbackFunc processes message m. Return false to drop the message.

type Context

type Context struct {
	Pipe     *Pipe     // pipe processing the message
	Input    *Input    // input processing the message (message source)
	Callback *Callback // currently running callback

	Action Action // requested message actions
	// contains filtered or unexported fields
}

Context tracks message processing in a Pipe, stored in Msg.Value.

func MsgContext added in v0.1.6

func MsgContext(m *msg.Msg) *Context

MsgContext returns message Context inside m, creating one if needed.

func (*Context) DropTags added in v0.2.0

func (mx *Context) DropTags()

DropTags drops all message tags

func (*Context) FromJSON added in v0.1.6

func (mx *Context) FromJSON(src []byte) error

FromJSON unmarshals Context from JSON

func (*Context) GetTag added in v0.1.6

func (mx *Context) GetTag(tag string) string

GetTag returns given Tag value, or "" if not set

func (*Context) HasTag added in v0.1.6

func (mx *Context) HasTag(tag string) bool

HasTag returns true iff the context has a particular Tag set

func (*Context) HasTags added in v0.2.0

func (mx *Context) HasTags() bool

HasTags returns true iff the context has any Tags set

func (*Context) Reset added in v0.1.6

func (mx *Context) Reset()

Reset resets pc to empty state

func (*Context) SetTag added in v0.1.6

func (mx *Context) SetTag(tag string, val string)

SetTag set given Tag to given value.

func (*Context) Tags added in v0.1.6

func (mx *Context) Tags() map[string]string

Tags returns message Tags inside mx, creating them first if needed

func (*Context) ToJSON added in v0.1.6

func (mx *Context) ToJSON(dst []byte) []byte

ToJSON marshals Context to JSON

type Event

type Event struct {
	Pipe *Pipe     `json:"-"`              // parent pipe
	Seq  uint64    `json:"seq,omitempty"`  // event sequence number
	Time time.Time `json:"time,omitempty"` // event timestamp

	Type  string   `json:"type"`  // type, usually "lib/pkg.NAME"
	Dir   msg.Dir  `json:"dir"`   // optional event direction
	Msg   *msg.Msg `json:"-"`     // optional message that caused the event
	Error error    `json:"err"`   // optional error related to the event
	Value any      `json:"value"` // optional value, type-specific

	Handler *Handler // currently running handler (may be nil)
	Action  Action   // optional event action (zero means none)
	// contains filtered or unexported fields
}

Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.

func (*Event) String

func (ev *Event) String() string

String returns event type and seq number as string

func (*Event) Wait

func (ev *Event) Wait()

Wait blocks until the event is handled

type FilterMode

type FilterMode = int
const (
	// callback filter disabled
	FILTER_NONE FilterMode = iota

	// skip if callback id == value
	FILTER_EQ

	// skip if callback id > value
	FILTER_GT

	// skip if callback id < value
	FILTER_LT

	// skip if callback id >= value
	FILTER_GE

	// skip if callback id <= value
	FILTER_LE

	// skip if callback id != value
	FILTER_NE

	// skip all callbacks
	FILTER_ALL
)

type Handler

type Handler struct {
	Id      int          // optional handler id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner handler is run
	Enabled *atomic.Bool // if non-nil, disables the handler unless true
	Dropped bool         // if true, permanently drops (unregisters) the handler

	Pre  bool // run before non-pre handlers?
	Post bool // run after non-post handlers?

	Dir   msg.Dir     // if non-zero, limits the direction
	Types []string    // if non-empty, limits event types
	Func  HandlerFunc // the function to call
}

Handler represents a function to call for matching pipe events

func (*Handler) Disable added in v0.2.0

func (h *Handler) Disable() bool

Disable sets h.Enabled to false and returns true. If h.Enabled is nil, returns false.

func (*Handler) Drop added in v0.2.0

func (h *Handler) Drop()

Drop drops the handler, permanently unregistering it from running

func (*Handler) Enable added in v0.2.0

func (h *Handler) Enable() bool

Enable sets h.Enabled to true and returns true. If h.Enabled is nil, returns false.

func (*Handler) String added in v0.2.0

func (h *Handler) String() string

String returns handler name and id as string

type HandlerFunc

type HandlerFunc func(ev *Event) (keep_handler bool)

HandlerFunc handles event ev. Return false to unregister the handler (all types).

type Input

type Input struct {
	Pipe *Pipe // attached to this Pipe (nil before pipe start)
	Line *Line // attached to this Line (nil before pipe start)

	Id   int     // optional id
	Name string  // optional name
	Dir  msg.Dir // line direction

	// In is the input for incoming messages.
	In chan *msg.Msg

	// Reverse, when true, runs callbacks in reverse order.
	Reverse bool

	// CallbackFilter controls which callbacks to skip (disabled by default)
	CallbackFilter FilterMode

	// FilterValue specifies the value for CallbackFilter
	FilterValue any
	// contains filtered or unexported fields
}

Input processes incoming BGP messages through Callbacks and (optionally) writes the result to attached Line.

func (*Input) Close

func (in *Input) Close()

Close safely closes the .In channel, which should eventually stop the Input

func (*Input) Wait

func (in *Input) Wait()

Wait blocks until the input is done processing the messages

func (*Input) Write

func (in *Input) Write(src []byte) (int, error)

Write implements io.Writer and reads all BGP messages from src into in.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.

In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.

Must not be used concurrently.

func (*Input) WriteFunc added in v0.2.0

func (in *Input) WriteFunc(src []byte, cb CallbackFunc) (int, error)

WriteFunc is the same as Input.Write(), but takes an optional callback function to be called just before the message is accepted for processing. If the callback returns false, the message is silently dropped instead.

func (*Input) WriteMsg

func (in *Input) WriteMsg(m *msg.Msg) (write_error error)

WriteMsg safely sends m to in.In, avoiding a panic if it is closed. It assigns a sequence number and timestamp before writing to the channel.

type Line

type Line struct {
	Pipe *Pipe   // parent pipe
	Dir  msg.Dir // line direction

	// the default Input, which processes messages through all callbacks.
	Input

	// Out is the Line output, where you can read processed messages from.
	Out chan *msg.Msg

	// UNIX timestamp (seconds) of the last valid OPEN message
	LastOpen atomic.Int64

	// UNIX timestamp (seconds) of the last KEEPALIVE message
	LastAlive atomic.Int64

	// UNIX timestamp (seconds) of the last UPDATE message
	LastUpdate atomic.Int64

	// the OPEN message that updated LastOpen
	Open atomic.Pointer[msg.Open]
	// contains filtered or unexported fields
}

Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.

func (*Line) Close

func (l *Line) Close()

Close safely closes all inputs, which should eventually stop the line

func (*Line) CloseOutput

func (l *Line) CloseOutput()

CloseOutput safely closes the output channel.

func (*Line) Read

func (l *Line) Read(dst []byte) (int, error)

Read reads l.Out and writes raw BGP data to dst. Must not be used concurrently.

func (*Line) Wait

func (l *Line) Wait()

Wait blocks until all processing is done

func (*Line) WriteOutput added in v0.2.0

func (l *Line) WriteOutput(m *msg.Msg) (write_error error)

WriteOutput safely sends m to l.Out, avoiding a panic if closed.

func (*Line) WriteTo

func (l *Line) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface, writing raw BGP data to w

type Options

type Options struct {
	Logger  *zerolog.Logger // if nil logging is disabled
	MsgPool *sync.Pool      // optional pool for msg.Msg

	Caps bool // overwrite pipe.Caps using OPEN messages?

	Callbacks []*Callback // message callbacks
	Handlers  []*Handler  // event handlers
	Inputs    []*Input    // input processors
}

BGP pipe options

func (*Options) AddCallback

func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback

AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.

func (*Options) AddHandler

func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler

AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.

func (*Options) AddInput

func (o *Options) AddInput(dir msg.Dir, tpl ...*Input) *Input

AddInput adds input processor for given pipe direction, with optional details in tpl.

func (*Options) OnEstablished

func (o *Options) OnEstablished(hdf HandlerFunc) *Handler

OnEstablished request hdf to be called when the BGP session is established.

func (*Options) OnEvent

func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler

OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.

func (*Options) OnEventPost

func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler

OnEventPost is like OnEvent but requests to run hdf after other handlers

func (*Options) OnEventPre

func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler

OnEventPre is like OnEvent but requests to run hdf before other handlers

func (*Options) OnMsg

func (o *Options) OnMsg(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsg adds a callback for all messages of given types (or all types if not specified).

func (*Options) OnMsgPost

func (o *Options) OnMsgPost(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPost is like OnMsg but requests to run cb after other callbacks

func (*Options) OnMsgPre

func (o *Options) OnMsgPre(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPre is like OnMsg but requests to run cb before other callbacks

func (*Options) OnParseError

func (o *Options) OnParseError(hdf HandlerFunc) *Handler

OnParseError request hdf to be called on BGP message parse error.

func (*Options) OnStart

func (o *Options) OnStart(hdf HandlerFunc) *Handler

OnStart request hdf to be called after the pipe starts.

func (*Options) OnStop

func (o *Options) OnStop(hdf HandlerFunc) *Handler

OnStop request hdf to be called when the pipe stops.

type Pipe

type Pipe struct {
	*zerolog.Logger

	Options           // pipe options; modify before Start()
	Caps    caps.Caps // BGP capability context; always thread-safe
	L       *Line     // line processing messages from R to L
	R       *Line     // line processing messages from L to R

	// generic Key-Value store, always thread-safe
	KV *xsync.MapOf[string, any]
	// contains filtered or unexported fields
}

Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.

Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.

func NewPipe

func NewPipe(ctx context.Context) *Pipe

NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().

func (*Pipe) Event

func (p *Pipe) Event(et string, args ...any) *Event

Event announces a new event type et to the pipe, with optional arguments. The first msg.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into a single ev.Error. The remaining arguments are used as ev.Val.

func (*Pipe) GetMsg added in v0.1.7

func (p *Pipe) GetMsg() (m *msg.Msg)

GetMsg returns empty msg from pool, or a new msg object

func (*Pipe) LineFor added in v0.1.6

func (p *Pipe) LineFor(dst msg.Dir) *Line

LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).

func (*Pipe) PutMsg added in v0.1.7

func (p *Pipe) PutMsg(m *msg.Msg)

PutMsg resets msg and returns it to pool, which might free it

func (*Pipe) Start

func (p *Pipe) Start()

Start starts the Pipe in background and returns.

func (*Pipe) Started

func (p *Pipe) Started() bool

Started returns true iff Start() has already been called = pipe is (being) started.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop stops all inputs and blocks till they finish processing. Pipe must not be used again past this point. Closes all inputs, which should eventually close all outputs, possibly after this function returns.

func (*Pipe) Stopped

func (p *Pipe) Stopped() bool

Stopped returns true iff Stop() has already been called = pipe is (being) stopped.

func (*Pipe) Wait

func (p *Pipe) Wait()

Wait blocks until the pipe starts and stops completely.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL