checkpoint

package
v2.0.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Barrier

type Barrier struct {
	CheckpointId int64
	OpId         string
}

type BarrierAligner

type BarrierAligner struct {
	// contains filtered or unexported fields
}

For qos 2, block an input until all barriers are received

func NewBarrierAligner

func NewBarrierAligner(responder Responder, inputCount int) *BarrierAligner

func (*BarrierAligner) Process

func (h *BarrierAligner) Process(data *BufferOrEvent, ctx api.StreamContext) bool

func (*BarrierAligner) SetOutput

func (h *BarrierAligner) SetOutput(output chan<- *BufferOrEvent)

type BarrierHandler

type BarrierHandler interface {
	Process(data *BufferOrEvent, ctx api.StreamContext) bool // If data is barrier return true, else return false
	SetOutput(chan<- *BufferOrEvent)                         // It is using for block a channel
}

type BarrierTracker

type BarrierTracker struct {
	// contains filtered or unexported fields
}

For qos 1, simple track barriers

func NewBarrierTracker

func NewBarrierTracker(responder Responder, inputCount int) *BarrierTracker

func (*BarrierTracker) Process

func (h *BarrierTracker) Process(data *BufferOrEvent, ctx api.StreamContext) bool

func (*BarrierTracker) SetOutput

func (h *BarrierTracker) SetOutput(_ chan<- *BufferOrEvent)

type BufferOrEvent

type BufferOrEvent struct {
	Data    interface{}
	Channel string
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []SinkTask, qos def.Qos, store api.Store, interval int, ctx api.StreamContext) *Coordinator

func (*Coordinator) Activate

func (c *Coordinator) Activate() error

func (*Coordinator) Deactivate

func (c *Coordinator) Deactivate() error

func (*Coordinator) GetCompleteCount

func (c *Coordinator) GetCompleteCount() int

For testing

func (*Coordinator) GetLatest

func (c *Coordinator) GetLatest() int64

func (*Coordinator) IsActivated

func (c *Coordinator) IsActivated() bool

type Message

type Message int
const (
	STOP Message = iota
	ACK
	DEC
)

type NonSinkTask

type NonSinkTask interface {
	Broadcast(data any)
}

type NonSourceTask

type NonSourceTask interface {
	StreamTask
	GetInputCount() int
	AddInputCount()

	SetBarrierHandler(BarrierHandler)
}

type Responder

type Responder interface {
	TriggerCheckpoint(checkpointId int64) error
	GetName() string
}

type ResponderExecutor

type ResponderExecutor struct {
	// contains filtered or unexported fields
}

func NewResponderExecutor

func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor

func (*ResponderExecutor) GetName

func (re *ResponderExecutor) GetName() string

func (*ResponderExecutor) TriggerCheckpoint

func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error

type Signal

type Signal struct {
	Message Message
	Barrier
}

type SinkTask

type SinkTask interface {
	NonSourceTask
}

type SourceSubTopoTask

type SourceSubTopoTask interface {
	EnableCheckpoint(sources *[]StreamTask, ops *[]NonSourceTask)
}

type StreamCheckpointContext

type StreamCheckpointContext interface {
	Snapshot() error
	SaveState(checkpointId int64) error
}

type StreamTask

type StreamTask interface {
	GetName() string
	GetStreamContext() api.StreamContext
	SetQos(qos def.Qos)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL