bgpipe

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStageCmd     = errors.New("invalid stage command")
	ErrStageDiff    = errors.New("already defined but different")
	ErrStageStopped = errors.New("stage stopped")
	ErrFirstOrLast  = errors.New("must be either the first or the last stage")
	ErrInject       = errors.New("invalid --in option value")
	ErrLR           = errors.New("select either --left or --right, not both")
	ErrKill         = errors.New("session killed by an event")
)

Functions

func IsAddr

func IsAddr(v string) bool

func IsBind

func IsBind(v string) bool

func IsFile

func IsFile(v string) bool

Types

type Bgpipe

type Bgpipe struct {
	zerolog.Logger

	Ctx    context.Context
	Cancel context.CancelCauseFunc

	F      *pflag.FlagSet // global flags
	K      *koanf.Koanf   // global config
	Pipe   *pipe.Pipe     // bgpfix pipe
	Stages []*StageBase   // pipe stages
	// contains filtered or unexported fields
}

Bgpipe represents a BGP pipeline consisting of several stages, built on top of bgpfix.Pipe

func NewBgpipe

func NewBgpipe(repo ...map[string]NewStage) *Bgpipe

NewBgpipe creates a new bgpipe instance using given repositories of stage commands

func (*Bgpipe) AddRepo

func (b *Bgpipe) AddRepo(cmds map[string]NewStage)

AddRepo adds mapping between stage commands and their NewStageFunc

func (*Bgpipe) AddStage

func (b *Bgpipe) AddStage(idx int, cmd string) (*StageBase, error)

AddStage adds and returns a new stage at idx for cmd, or returns an existing instance if it's for the same cmd.

func (*Bgpipe) Attach

func (b *Bgpipe) Attach() error

Attach attaches all stages to pipe

func (*Bgpipe) Configure

func (b *Bgpipe) Configure() error

Configure configures bgpipe

func (*Bgpipe) KillEvent

func (b *Bgpipe) KillEvent(ev *pipe.Event) bool

KillEvent kills session because of given event ev

func (*Bgpipe) LogEvent

func (b *Bgpipe) LogEvent(ev *pipe.Event) bool

LogEvent logs given event

func (*Bgpipe) NewStage

func (b *Bgpipe) NewStage(cmd string) *StageBase

NewStage returns new stage for given cmd, or nil on error

func (*Bgpipe) Run

func (b *Bgpipe) Run() error

Run configures and runs the bgpipe

func (*Bgpipe) StageCount

func (b *Bgpipe) StageCount() int

func (*Bgpipe) Start

func (b *Bgpipe) Start(ev *pipe.Event) bool

Start is called after the bgpfix pipe starts

type NewStage

type NewStage func(base *StageBase) Stage

NewStage returns a new Stage for given parent base. It should modify base.Options.

type Stage

type Stage interface {
	// Attach is run before the pipe starts.
	// It should check the config and attach to the bgpfix pipe.
	Attach() error

	// Prepare is called when the stage starts, but before Run, callbacks, and handlers.
	// It should prepare required I/O, eg. files, network connections, etc.
	//
	// If no error is returned, the stage emits a "READY" event, all callbacks and handlers
	// are enabled, and Run is called when all stages starting in parallel are ready too.
	Prepare() error

	// Run runs the stage and returns after all work has finished.
	// It must respect StageBase.Ctx. Returning a non-nil error different
	// than ErrStopped results in a fatal error that stops the whole pipe.
	//
	// Emits "START" just before, and "STOP" after stage operation is finished.
	Run() error

	// Stop is called when the stage is requested to stop.
	// It should safely finish all I/O and make Run return if it's still running.
	Stop() error
}

Stage implements a bgpipe stage

type StageBase

type StageBase struct {
	zerolog.Logger // logger with stage name
	Stage          // the real implementation

	Ctx    context.Context         // stage context
	Cancel context.CancelCauseFunc // cancel to stop the stage

	B *Bgpipe      // parent
	P *pipe.Pipe   // bgpfix pipe
	K *koanf.Koanf // integrated config (args / config file / etc)

	Index   int          // stage index (zero means internal)
	Cmd     string       // stage command name
	Name    string       // human-friendly stage name
	Options StageOptions // stage options, can be updated in NewStage

	IsFirst bool // is the first stage in pipe? (L peer)
	IsLast  bool // is the last stage in pipe? (R peer)
	IsLeft  bool // operates on L direction?
	IsRight bool // operates on R direction?

	Dir        msg.Dir    // -L/-R translated to Dir (can be DIR_LR)
	Upstream   *pipe.Line // pipeline processing messages to Dst (may be nil)
	Downstream *pipe.Line // pipeline processing messages from Dst (may be nil)
	// contains filtered or unexported fields
}

StageBase represents a bgpipe stage base

func (*StageBase) Attach

func (s *StageBase) Attach() error

Attach is the default Stage implementation that does nothing.

func (*StageBase) Errorf

func (s *StageBase) Errorf(format string, a ...any) error

Errorf wraps fmt.Errorf and adds a prefix with the stage name

func (*StageBase) Event

func (s *StageBase) Event(et string, args ...any) *pipe.Event

Event sends an event, prefixing et with stage name + slash

func (*StageBase) Prepare

func (s *StageBase) Prepare() error

Prepare is the default Stage implementation that does nothing.

func (*StageBase) Run

func (s *StageBase) Run() error

Run is the default Stage implementation that just waits for the context and returns its cancel cause

func (*StageBase) Running

func (s *StageBase) Running() bool

Running returns true if the stage is in Run(), false otherwise.

func (*StageBase) Stop

func (s *StageBase) Stop() error

Stop is the default Stage implementation that does nothing.

func (*StageBase) String

func (s *StageBase) String() string

String returns stage "[index] name" or "name" if index is 0

type StageOptions

type StageOptions struct {
	Descr  string            // one-line description
	Flags  *pflag.FlagSet    // CLI flags
	Usage  string            // usage string
	Args   []string          // argument names
	Events map[string]string // event names and descriptions

	IsProducer bool // produces messages? (writes to Line input)
	IsConsumer bool // consumes messages? (reads from Line output)
	IsStdin    bool // reads from stdin?
	IsStdout   bool // writes to stdout?
	Bidir      bool // allow -LR (bidir mode)?
}

StageOptions describe high-level settings of a stage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL