Documentation
¶
Index ¶
- type Barrier
- type BarrierAligner
- type BarrierHandler
- type BarrierTracker
- type BufferOrEvent
- type Coordinator
- func (c *Coordinator) Activate() error
- func (c *Coordinator) ActiveForceSaveState()
- func (c *Coordinator) Deactivate() error
- func (c *Coordinator) FinishForceSaveState()
- func (c *Coordinator) ForceSaveState() (chan any, error)
- func (c *Coordinator) GetCompleteCount() int
- func (c *Coordinator) GetLatest() int64
- func (c *Coordinator) IsActivated() bool
- type Message
- type NonSinkTask
- type NonSourceTask
- type Responder
- type ResponderExecutor
- type Signal
- type SinkTask
- type SourceSubTopoTask
- type StreamCheckpointContext
- type StreamTask
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BarrierAligner ¶
type BarrierAligner struct {
// contains filtered or unexported fields
}
For qos 2, block an input until all barriers are received
func NewBarrierAligner ¶
func NewBarrierAligner(responder Responder, inputCount int) *BarrierAligner
func (*BarrierAligner) Process ¶
func (h *BarrierAligner) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierAligner) SetOutput ¶
func (h *BarrierAligner) SetOutput(output chan<- *BufferOrEvent)
type BarrierHandler ¶
type BarrierHandler interface { Process(data *BufferOrEvent, ctx api.StreamContext) bool // If data is barrier return true, else return false SetOutput(chan<- *BufferOrEvent) // It is using for block a channel }
type BarrierTracker ¶
type BarrierTracker struct {
// contains filtered or unexported fields
}
For qos 1, simple track barriers
func NewBarrierTracker ¶
func NewBarrierTracker(responder Responder, inputCount int) *BarrierTracker
func (*BarrierTracker) Process ¶
func (h *BarrierTracker) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierTracker) SetOutput ¶
func (h *BarrierTracker) SetOutput(_ chan<- *BufferOrEvent)
type BufferOrEvent ¶
type BufferOrEvent struct { Data interface{} Channel string }
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []SinkTask, qos def.Qos, store api.Store, interval time.Duration, ctx api.StreamContext) *Coordinator
func (*Coordinator) Activate ¶
func (c *Coordinator) Activate() error
func (*Coordinator) ActiveForceSaveState ¶
func (c *Coordinator) ActiveForceSaveState()
func (*Coordinator) Deactivate ¶
func (c *Coordinator) Deactivate() error
func (*Coordinator) FinishForceSaveState ¶
func (c *Coordinator) FinishForceSaveState()
func (*Coordinator) ForceSaveState ¶
func (c *Coordinator) ForceSaveState() (chan any, error)
func (*Coordinator) GetLatest ¶
func (c *Coordinator) GetLatest() int64
func (*Coordinator) IsActivated ¶
func (c *Coordinator) IsActivated() bool
type NonSinkTask ¶
type NonSinkTask interface {
Broadcast(data any)
}
type NonSourceTask ¶
type NonSourceTask interface { StreamTask GetInputCount() int AddInputCount() SetBarrierHandler(BarrierHandler) }
type ResponderExecutor ¶
type ResponderExecutor struct {
// contains filtered or unexported fields
}
func NewResponderExecutor ¶
func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor
func (*ResponderExecutor) GetName ¶
func (re *ResponderExecutor) GetName() string
func (*ResponderExecutor) TriggerCheckpoint ¶
func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error
type SinkTask ¶
type SinkTask interface { NonSourceTask }
type SourceSubTopoTask ¶
type SourceSubTopoTask interface {
EnableCheckpoint(sources *[]StreamTask, ops *[]NonSourceTask)
}
type StreamCheckpointContext ¶
type StreamTask ¶
type StreamTask interface { GetName() string GetStreamContext() api.StreamContext SetQos(qos def.Qos) }
Click to show internal directories.
Click to hide internal directories.