Documentation ¶
Index ¶
- type Command
- type CommandType
- type Message
- func BarrierMessage(barrierTs model.Ts) Message
- func CommandMessage(command *Command) Message
- func PolymorphicEventMessage(event *model.PolymorphicEvent) Message
- func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outputCh chan Message) (Message, error)
- func TickMessage() Message
- type MessageType
- type Node
- type NodeContext
- type Pipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CommandType ¶
type CommandType int
CommandType is the type of Command
const ( // CommandTypeUnknown is unknown message type CommandTypeUnknown CommandType = iota // CommandTypeStop means the table pipeline should stop at once CommandTypeStop )
type Message ¶
type Message struct { // TODO add more kind of messages // Tp is the type of Message Tp MessageType // Command is the command in this message Command *Command // PolymorphicEvent represents the row change event PolymorphicEvent *model.PolymorphicEvent // BarrierTs BarrierTs model.Ts }
Message is a vehicle for transferring information between nodes
func BarrierMessage ¶
BarrierMessage creates the message of Command
func CommandMessage ¶
CommandMessage creates the message of Command
func PolymorphicEventMessage ¶
func PolymorphicEventMessage(event *model.PolymorphicEvent) Message
PolymorphicEventMessage creates the message of PolymorphicEvent
func SendMessageToNode4Test ¶
func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outputCh chan Message) (Message, error)
SendMessageToNode4Test sends messages to specified `Node` through `Receive` in order. This function is only for testing. Only `Node.Receive` will be called, other functions in `Node` will never be called. When the `Receive` function of the `Node` returns an error, this function will return the message that caused the error and the error
func TickMessage ¶
func TickMessage() Message
TickMessage creates the message of Tick Note: the returned message is READ-ONLY.
type MessageType ¶
type MessageType int
MessageType is the type of Message
const ( MessageTypeUnknown MessageType = iota MessageTypeCommand MessageTypePolymorphicEvent MessageTypeBarrier MessageTypeTick )
types of Message
type Node ¶
type Node interface { // Init initializes the node // when the pipeline is started, this function will be called in order // you can call `ctx.SendToNextNode(msg)` to send the message to the next node // but it will return nil if you try to call the `ctx.Message()` Init(ctx NodeContext) error // Receive receives the message from the previous node // when the node receives a message, this function will be called // you can call `ctx.Message()` to receive the message // you can call `ctx.SendToNextNode(msg)` to send the message to the next node Receive(ctx NodeContext) error // Destroy frees the resources in this node // you can call `ctx.SendToNextNode(msg)` to send the message to the next node // but it will return nil if you try to call the `ctx.Message()` Destroy(ctx NodeContext) error }
Node represents a handle unit for the message stream in the pipeline The following functions in this interface will be called in one goroutine. It's NO NEED to consider concurrency issues
type NodeContext ¶
type NodeContext interface { context.Context // Message returns the message sent by the previous node Message() Message // SendToNextNode sends the message to the next node SendToNextNode(msg Message) }
NodeContext adds two functions from `coutext.Context` and created by pipeline
func MockNodeContext4Test ¶
func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext
MockNodeContext4Test creates a node context with a message and an output channel for tests.
func NewNodeContext ¶
func NewNodeContext(ctx context.Context, msg Message, outputCh chan<- Message) NodeContext
NewNodeContext returns a new NodeContext.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a pipeline includes a number of nodes
func NewPipeline ¶
func NewPipeline( ctx context.Context, tickDuration time.Duration, initRunnerSize, outputChSize int, ) *Pipeline
NewPipeline creates a new pipeline
func (*Pipeline) AppendNode ¶
AppendNode appends the node to the pipeline
func (*Pipeline) SendToFirstNode ¶
SendToFirstNode sends the message to the first node