Documentation ¶
Index ¶
- func RetryUntilSuccessfulWatcherCreation(js nats.JetStreamContext, bucketName string, infiniteLoop bool, ...) nats.KeyWatcher
- type EdgeBuffer
- type Fetcher
- type FromProcessor
- type FromVertex
- type FromVertexer
- type OffsetTimeline
- func (t *OffsetTimeline) Capacity() int
- func (t *OffsetTimeline) Dump() string
- func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
- func (t *OffsetTimeline) GetHeadOffset() int64
- func (t *OffsetTimeline) GetOffset(eventTime int64) int64
- func (t *OffsetTimeline) GetTailOffset() int64
- func (t *OffsetTimeline) Put(node OffsetWatermark)
- type OffsetWatermark
- type ProcessorHeartbeat
- type VertexOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RetryUntilSuccessfulWatcherCreation ¶
func RetryUntilSuccessfulWatcherCreation(js nats.JetStreamContext, bucketName string, infiniteLoop bool, log *zap.SugaredLogger) nats.KeyWatcher
RetryUntilSuccessfulWatcherCreation creates a watcher and will wait till it is created if infiniteLoop is set to true. TODO: use `wait.ExponentialBackoffWithContext`
Types ¶
type EdgeBuffer ¶
type EdgeBuffer struct {
// contains filtered or unexported fields
}
EdgeBuffer is the edge relation between two vertices.
func NewEdgeBuffer ¶
func NewEdgeBuffer(ctx context.Context, name string, fromV *FromVertex) *EdgeBuffer
NewEdgeBuffer returns a new EdgeBuffer.
func (*EdgeBuffer) GetWatermark ¶
func (e *EdgeBuffer) GetWatermark(inputOffset isb.Offset) processor.Watermark
GetWatermark gets the smallest timestamp for the given offset
type Fetcher ¶
type Fetcher interface { // GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1. GetWatermark(offset isb.Offset) processor.Watermark }
Fetcher fetches watermark data from Vn-1 vertex.
type FromProcessor ¶
type FromProcessor struct {
// contains filtered or unexported fields
}
FromProcessor is the smallest unit of entity who does inorder processing or contains inorder data.
func NewProcessor ¶
func NewProcessor(ctx context.Context, processor processor.ProcessorEntitier, capacity int, watcher nats.KeyWatcher) *FromProcessor
func (*FromProcessor) IsActive ¶
func (p *FromProcessor) IsActive() bool
IsActive returns whether a processor is active.
func (*FromProcessor) IsDeleted ¶
func (p *FromProcessor) IsDeleted() bool
IsDeleted returns whether a processor has been deleted.
func (*FromProcessor) IsInactive ¶
func (p *FromProcessor) IsInactive() bool
IsInactive returns whether a processor is inactive (no heartbeats or any sort).
func (*FromProcessor) String ¶
func (p *FromProcessor) String() string
type FromVertex ¶
type FromVertex struct {
// contains filtered or unexported fields
}
FromVertex is the point of view of Vn-1 from Vn vertex. The code is running on Vn vertex. It has the mapping of all the processors which in turn has all the information about each processor timelines.
func NewFromVertex ¶
func NewFromVertex(ctx context.Context, keyspace string, js nats.JetStreamContext, heartbeatWatcher nats.KeyWatcher, inputOpts ...VertexOption) *FromVertex
NewFromVertex returns `FromVertex`
func (*FromVertex) AddProcessor ¶
func (v *FromVertex) AddProcessor(processor string, p *FromProcessor)
AddProcessor adds a new processor.
func (*FromVertex) DeleteProcessor ¶
func (v *FromVertex) DeleteProcessor(processor string)
DeleteProcessor deletes a processor.
func (*FromVertex) GetAllProcessors ¶
func (v *FromVertex) GetAllProcessors() map[string]*FromProcessor
GetAllProcessors returns all the processors.
func (*FromVertex) GetProcessor ¶
func (v *FromVertex) GetProcessor(processor string) *FromProcessor
GetProcessor gets a processor.
type FromVertexer ¶
type FromVertexer interface { // GetAllProcessors fetches all the processors from Vn-1 vertex. processors could be pods or when the vertex is a // source vertex, it could be partitions if the source is Kafka. GetAllProcessors() map[string]*FromProcessor }
FromVertexer defines an interface which builds the view of Vn-th vertex from the point of view of Vn-th vertex.
type OffsetTimeline ¶
type OffsetTimeline struct {
// contains filtered or unexported fields
}
OffsetTimeline is to store the event time to the offset records. Our list is sorted by event time from highest to lowest.
func NewOffsetTimeline ¶
func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline
NewOffsetTimeline returns OffsetTimeline.
func (*OffsetTimeline) Capacity ¶
func (t *OffsetTimeline) Capacity() int
Capacity returns the capacity of the OffsetTimeline list.
func (*OffsetTimeline) Dump ¶
func (t *OffsetTimeline) Dump() string
Dump dumps the in-memory representation of the OffsetTimeline. Could get very ugly if the list is large, like > 100 elements. I am assuming we will have it in 10K+ (86400 seconds are there in a day).
func (*OffsetTimeline) GetEventTime ¶
func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
GetEventTime will return the event-time for the given offset. TODO(jyu6): will make Watermark an interface make it easy to get a Watermark and return an Offset?
func (*OffsetTimeline) GetHeadOffset ¶
func (t *OffsetTimeline) GetHeadOffset() int64
GetHeadOffset returns the head offset, that is the most recent offset which will have the highest Watermark.
func (*OffsetTimeline) GetOffset ¶
func (t *OffsetTimeline) GetOffset(eventTime int64) int64
GetOffset will return the offset for the given event-time. TODO(jyu6): will make Watermark an interface make it easy to pass an Offset and return a Watermark?
func (*OffsetTimeline) GetTailOffset ¶
func (t *OffsetTimeline) GetTailOffset() int64
GetTailOffset returns the smallest offset with the smallest watermark.
func (*OffsetTimeline) Put ¶
func (t *OffsetTimeline) Put(node OffsetWatermark)
Put inserts the OffsetWatermark into list. It ensures that the list will remain sorted after the insert.
type OffsetWatermark ¶
type OffsetWatermark struct {
// contains filtered or unexported fields
}
OffsetWatermark stores the maximum offset for the given event time we use basic data type int64 to compare the value
type ProcessorHeartbeat ¶
type ProcessorHeartbeat struct {
// contains filtered or unexported fields
}
ProcessorHeartbeat has details about each processor heartbeat. This information is populated by watching the Vn-1th vertex's processors. It stores only the latest heartbeat value.
func NewProcessorHeartbeat ¶
func NewProcessorHeartbeat() *ProcessorHeartbeat
NewProcessorHeartbeat returns ProcessorHeartbeat.
func (*ProcessorHeartbeat) Delete ¶
func (hb *ProcessorHeartbeat) Delete(key string)
Delete deletes a processor from the ProcessorHeartbeat table.
func (*ProcessorHeartbeat) Get ¶
func (hb *ProcessorHeartbeat) Get(key string) int64
Get gets the heartbeat for a given processor.
func (*ProcessorHeartbeat) GetAll ¶
func (hb *ProcessorHeartbeat) GetAll() map[string]int64
GetAll returns all the heartbeat entries in the heartbeat table.
func (*ProcessorHeartbeat) Put ¶
func (hb *ProcessorHeartbeat) Put(key string, value int64)
Put inserts a heartbeat entry for a given processor key and value.
type VertexOption ¶
type VertexOption func(*vertexOptions)
VertexOption set options for FromVertex.
func WithPodHeartbeatRate ¶
func WithPodHeartbeatRate(rate int64) VertexOption
WithPodHeartbeatRate sets the heartbeat rate in seconds.
func WithRefreshingProcessorsRate ¶
func WithRefreshingProcessorsRate(rate int64) VertexOption
WithRefreshingProcessorsRate sets the processor refreshing rate in seconds.
func WithSeparateOTBuckets ¶
func WithSeparateOTBuckets(separate bool) VertexOption
WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.