Documentation
¶
Overview ¶
Package selina pipeline library
Index ¶
- Variables
- func ATPipelineContextCancel(p Pipeliner) error
- func ATPipelineStartAll(p Pipeliner) error
- func ATPipelineStats(p Pipeliner) error
- func ChannelAsSlice[T any](in <-chan T) []T
- func DefaultErrorHandler(e error) bool
- func FreeBuffer(b *bytes.Buffer)
- func GetBuffer() *bytes.Buffer
- func Graph(p Pipeliner, w io.Writer) error
- func SendContext(ctx context.Context, msg *bytes.Buffer, output chan<- *bytes.Buffer) error
- func SliceAsChannel[T any](data []T, autoClose bool) chan T
- func SliceAsChannelOfBuffer(data []string, autoClose bool) chan *bytes.Buffer
- func SliceAsChannelRaw[T any](data []T, autoClose bool) chan T
- type Broadcaster
- type DataCounter
- type ErrorHandler
- type Marshaler
- type Node
- func (n *Node) Chain(next *Node) *Node
- func (n *Node) ID() string
- func (n *Node) IsChained(other *Node) bool
- func (n *Node) Name() string
- func (n *Node) Next() []string
- func (n *Node) Running() bool
- func (n *Node) Start(ctx context.Context) error
- func (n *Node) Stats() Stats
- func (n *Node) Stop() error
- type OptionsChecker
- type Pipeliner
- type ProcessArgs
- type Receiver
- type SimplePipeline
- type Stats
- type Unmarshaler
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotHaveNodes attempt to start a pipeline without nodes ErrNotHaveNodes = errors.New("Pipeliner does not have nodes") // ErrInconsistentStart pipeline does not start all given nodes ErrInconsistentStart = errors.New("Pipeliner does not start all nodes") // ErrMissingStats some nodes stats are absent on call Stat method ErrMissingStats = errors.New("missing nodes in Stats map") )
var ( DefaultUnmarshaler = json.Unmarshal DefaultMarshaler = json.Marshal )
Default Marshaler and Unmarshaler
var ErrAlreadyStarted = errors.New("node already started")
ErrAlreadyStarted returned if Start method is called more than once
var ErrNilUpstream = errors.New("nil upstream channel")
ErrNilUpstream is returned when a worker does not allow to not have an upstream worker
var ErrStopNotStarted = errors.New("stopping a not started worker")
ErrStopNotStarted returned when Stop is called before Start method
Functions ¶
func ATPipelineContextCancel ¶
ATPipelineContextCancel context must be propagated to all Nodes
func ATPipelineStartAll ¶
ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called
func ATPipelineStats ¶ added in v0.3.0
ATPipelineStats check if a Pipeliner implementation get all nodes stats
func ChannelAsSlice ¶
func ChannelAsSlice[T any](in <-chan T) []T
ChannelAsSlice read from in channel until is closed return an slice with all messages received
func DefaultErrorHandler ¶ added in v0.19.0
DefaultErrorHandler is a pessimist error handler always returns false on error
func FreeBuffer ¶ added in v0.23.0
FreeBuffer calls Buffer.Reset and return buffer to the pool
func SendContext ¶ added in v0.3.0
SendContext try to send msg to output, it returns an error if context is canceled before msg is sent
func SliceAsChannel ¶
SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed
func SliceAsChannelOfBuffer ¶ added in v0.23.0
SliceAsChannelOfBuffer return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed
func SliceAsChannelRaw ¶
SliceAsChannelRaw same as SliceAsChannel
Types ¶
type Broadcaster ¶
type Broadcaster struct { DataCounter // contains filtered or unexported fields }
Broadcaster allow to write same value to multiple groutines
func (*Broadcaster) Broadcast ¶
func (b *Broadcaster) Broadcast(input <-chan *bytes.Buffer)
Broadcast read values from input and send it to output channels
func (*Broadcaster) Client ¶
func (b *Broadcaster) Client() <-chan *bytes.Buffer
Client create an output chanel, it panics if Broadcast is already called
type DataCounter ¶ added in v0.3.0
type DataCounter struct {
// contains filtered or unexported fields
}
DataCounter a simple atomic wrapper
func (*DataCounter) Stats ¶ added in v0.3.0
func (c *DataCounter) Stats() (count int64, data int64)
Stats return values as an atomic operation per value count and data will be consistent only with itself
func (*DataCounter) SumData ¶ added in v0.3.0
func (c *DataCounter) SumData(msg []byte)
SumData icrement count+1 and data + len(msg) while both values are incremented in an atomic way is posible to get inconsistent reads on call Stats while object is in use
type ErrorHandler ¶ added in v0.15.0
ErrorHandler return true if error was handled inside function if error is handled Worker must continue proccesing and just skip failure
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node a node that can send and receive data
func NewNode ¶
NewNode create a new node that wraps Worker name is a user defined identifier, internally Node generates an unique id
func (*Node) Chain ¶
Chain send messages emitted by worker to next node, it returns next node to be chained again if next is already chained this operation does nothing
func (*Node) IsChained ¶ added in v0.4.0
IsChained returns true if Chain was called before with other
func (*Node) Start ¶
Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error
type OptionsChecker ¶ added in v0.8.0
type OptionsChecker interface { // Check return an error if options has an invalid value // it must not modify values at all and by preference should be // implemented as a value receiver Check() error }
OptionsChecker provide a way to determine if a state is valid or not
type Pipeliner ¶
Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Nod
func FreePipeline ¶ added in v0.4.0
FreePipeline provide a method to run arbitrary chained Nodes this method does not call Node.Chain
func LinealPipeline ¶ added in v0.4.0
LinealPipeline creates a Pipeliner Nodes in "nodes" are chained in a slingle branch Pipeline Node(0)->Node(1)->Node(2)->....Node(n)
type ProcessArgs ¶ added in v0.3.0
type ProcessArgs struct { // Input is nil when there is no upstream channel Input <-chan *bytes.Buffer Output chan<- *bytes.Buffer Err chan error }
ProcessArgs encapsulate arguments to Worker.Process
type Receiver ¶
type Receiver struct { DataCounter // contains filtered or unexported fields }
Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called
type SimplePipeline ¶
type SimplePipeline struct {
// contains filtered or unexported fields
}
SimplePipeline default value is unusable, you must create it with NewSimplePipeline
func (*SimplePipeline) Nodes ¶
func (p *SimplePipeline) Nodes() []*Node
Nodes return all instances of *Node
func (*SimplePipeline) Run ¶
func (p *SimplePipeline) Run(ctx context.Context) error
Run init pipeline proccesing, return an error!= nil if any Node fail
func (*SimplePipeline) Stats ¶
func (p *SimplePipeline) Stats() map[string]Stats
Stats returns a map with all nodes Stats object
type Stats ¶ added in v0.3.0
Stats contain node overall statistics Counters are garanted to be consistent only when node finalize
type Unmarshaler ¶ added in v0.18.2
Unmarshaler is a function type compatible with json.Unmarshal
type Worker ¶
type Worker interface { // Process must close write only channel Process(ctx context.Context, args ProcessArgs) error }
Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions if a worker does not have another worker in upstream then its receive a nil channel in input this is useful to idetify the situation and return and error On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
Package workers common structure for selina workers
|
Package workers common structure for selina workers |
csv
Package csv workers to read and write csv format
|
Package csv workers to read and write csv format |
custom
Package custom implements an user defined function
|
Package custom implements an user defined function |
filesystem
Package filesystem utilities for read and write files
|
Package filesystem utilities for read and write files |