Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BarrierAligner ¶
type BarrierAligner struct {
// contains filtered or unexported fields
}
For qos 2, block an input until all barriers are received
func NewBarrierAligner ¶
func NewBarrierAligner(responder Responder, inputCount int) *BarrierAligner
func (*BarrierAligner) Process ¶
func (h *BarrierAligner) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierAligner) SetOutput ¶
func (h *BarrierAligner) SetOutput(output chan<- *BufferOrEvent)
type BarrierHandler ¶
type BarrierHandler interface { Process(data *BufferOrEvent, ctx api.StreamContext) bool // If data is barrier return true, else return false SetOutput(chan<- *BufferOrEvent) // It is using for block a channel }
type BarrierTracker ¶
type BarrierTracker struct {
// contains filtered or unexported fields
}
For qos 1, simple track barriers
func NewBarrierTracker ¶
func NewBarrierTracker(responder Responder, inputCount int) *BarrierTracker
func (*BarrierTracker) Process ¶
func (h *BarrierTracker) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierTracker) SetOutput ¶
func (h *BarrierTracker) SetOutput(_ chan<- *BufferOrEvent)
type BufferOrEvent ¶
type BufferOrEvent struct { Data interface{} Channel string }
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []SinkTask, qos def.Qos, store api.Store, interval int, ctx api.StreamContext) *Coordinator
func (*Coordinator) Activate ¶
func (c *Coordinator) Activate() error
func (*Coordinator) Deactivate ¶
func (c *Coordinator) Deactivate() error
func (*Coordinator) GetLatest ¶
func (c *Coordinator) GetLatest() int64
func (*Coordinator) IsActivated ¶
func (c *Coordinator) IsActivated() bool
type NonSinkTask ¶
type NonSinkTask interface {
Broadcast(data any)
}
type NonSourceTask ¶
type NonSourceTask interface { StreamTask GetInputCount() int AddInputCount() SetBarrierHandler(BarrierHandler) }
type ResponderExecutor ¶
type ResponderExecutor struct {
// contains filtered or unexported fields
}
func NewResponderExecutor ¶
func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor
func (*ResponderExecutor) GetName ¶
func (re *ResponderExecutor) GetName() string
func (*ResponderExecutor) TriggerCheckpoint ¶
func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error
type SinkTask ¶
type SinkTask interface { NonSourceTask }
type SourceSubTopoTask ¶
type SourceSubTopoTask interface {
EnableCheckpoint(sources *[]StreamTask, ops *[]NonSourceTask)
}
type StreamCheckpointContext ¶
type StreamTask ¶
type StreamTask interface { GetName() string GetStreamContext() api.StreamContext SetQos(qos def.Qos) }
Click to show internal directories.
Click to hide internal directories.