pipeline

package
v0.0.0-...-a345a4b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command struct {
	Tp CommandType
}

Command is the command about table pipeline

type CommandType

type CommandType int

CommandType is the type of Command

const (
	// CommandTypeUnknown is unknown message type
	CommandTypeUnknown CommandType = iota
	// CommandTypeStop means the table pipeline should stop at once
	CommandTypeStop
)

type Message

type Message struct {
	// TODO add more kind of messages
	// Tp is the type of Message
	Tp MessageType
	// Command is the command in this message
	Command *Command
	// PolymorphicEvent represents the row change event
	PolymorphicEvent *model.PolymorphicEvent
	// BarrierTs
	BarrierTs model.Ts
}

Message is a vehicle for transferring information between nodes

func BarrierMessage

func BarrierMessage(barrierTs model.Ts) Message

BarrierMessage creates the message of Command

func CommandMessage

func CommandMessage(command *Command) Message

CommandMessage creates the message of Command

func PolymorphicEventMessage

func PolymorphicEventMessage(event *model.PolymorphicEvent) Message

PolymorphicEventMessage creates the message of PolymorphicEvent

func SendMessageToNode4Test

func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outputCh chan Message) (Message, error)

SendMessageToNode4Test sends messages to specified `Node` through `Receive` in order. This function is only for testing. Only `Node.Receive` will be called, other functions in `Node` will never be called. When the `Receive` function of the `Node` returns an error, this function will return the message that caused the error and the error

func TickMessage

func TickMessage() Message

TickMessage creates the message of Tick Note: the returned message is READ-ONLY.

type MessageType

type MessageType int

MessageType is the type of Message

const (
	MessageTypeUnknown MessageType = iota
	MessageTypeCommand
	MessageTypePolymorphicEvent
	MessageTypeBarrier
	MessageTypeTick
)

types of Message

type Node

type Node interface {
	// Init initializes the node
	// when the pipeline is started, this function will be called in order
	// you can call `ctx.SendToNextNode(msg)` to send the message to the next node
	// but it will return nil if you try to call the `ctx.Message()`
	Init(ctx NodeContext) error

	// Receive receives the message from the previous node
	// when the node receives a message, this function will be called
	// you can call `ctx.Message()` to receive the message
	// you can call `ctx.SendToNextNode(msg)` to send the message to the next node
	Receive(ctx NodeContext) error

	// Destroy frees the resources in this node
	// you can call `ctx.SendToNextNode(msg)` to send the message to the next node
	// but it will return nil if you try to call the `ctx.Message()`
	Destroy(ctx NodeContext) error
}

Node represents a handle unit for the message stream in the pipeline The following functions in this interface will be called in one goroutine. It's NO NEED to consider concurrency issues

type NodeContext

type NodeContext interface {
	context.Context

	// Message returns the message sent by the previous node
	Message() Message
	// SendToNextNode sends the message to the next node
	SendToNextNode(msg Message)
}

NodeContext adds two functions from `coutext.Context` and created by pipeline

func MockNodeContext4Test

func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext

MockNodeContext4Test creates a node context with a message and an output channel for tests.

func NewNodeContext

func NewNodeContext(ctx context.Context, msg Message, outputCh chan<- Message) NodeContext

NewNodeContext returns a new NodeContext.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a pipeline includes a number of nodes

func NewPipeline

func NewPipeline(
	ctx context.Context, tickDuration time.Duration, initRunnerSize, outputChSize int,
) *Pipeline

NewPipeline creates a new pipeline

func (*Pipeline) AppendNode

func (p *Pipeline) AppendNode(ctx context.Context, name string, node Node)

AppendNode appends the node to the pipeline

func (*Pipeline) SendToFirstNode

func (p *Pipeline) SendToFirstNode(msg Message) error

SendToFirstNode sends the message to the first node

func (*Pipeline) Wait

func (p *Pipeline) Wait()

Wait all the nodes exited

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL