fetch

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RetryUntilSuccessfulWatcherCreation

func RetryUntilSuccessfulWatcherCreation(js nats.JetStreamContext, bucketName string, infiniteLoop bool, log *zap.SugaredLogger) nats.KeyWatcher

RetryUntilSuccessfulWatcherCreation creates a watcher and will wait till it is created if infiniteLoop is set to true. TODO: use `wait.ExponentialBackoffWithContext`

Types

type EdgeBuffer

type EdgeBuffer struct {
	// contains filtered or unexported fields
}

EdgeBuffer is the edge relation between two vertices.

func NewEdgeBuffer

func NewEdgeBuffer(ctx context.Context, name string, fromV *FromVertex) *EdgeBuffer

NewEdgeBuffer returns a new EdgeBuffer.

func (*EdgeBuffer) GetWatermark

func (e *EdgeBuffer) GetWatermark(inputOffset isb.Offset) processor.Watermark

GetWatermark gets the smallest timestamp for the given offset

type Fetcher

type Fetcher interface {
	// GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1.
	GetWatermark(offset isb.Offset) processor.Watermark
}

Fetcher fetches watermark data from Vn-1 vertex.

type FromProcessor

type FromProcessor struct {
	// contains filtered or unexported fields
}

FromProcessor is the smallest unit of entity who does inorder processing or contains inorder data.

func NewProcessor

func NewProcessor(ctx context.Context, processor processor.ProcessorEntitier, capacity int, watcher nats.KeyWatcher) *FromProcessor

func (*FromProcessor) IsActive

func (p *FromProcessor) IsActive() bool

IsActive returns whether a processor is active.

func (*FromProcessor) IsDeleted

func (p *FromProcessor) IsDeleted() bool

IsDeleted returns whether a processor has been deleted.

func (*FromProcessor) IsInactive

func (p *FromProcessor) IsInactive() bool

IsInactive returns whether a processor is inactive (no heartbeats or any sort).

func (*FromProcessor) String

func (p *FromProcessor) String() string

type FromVertex

type FromVertex struct {
	// contains filtered or unexported fields
}

FromVertex is the point of view of Vn-1 from Vn vertex. The code is running on Vn vertex. It has the mapping of all the processors which in turn has all the information about each processor timelines.

func NewFromVertex

func NewFromVertex(ctx context.Context, keyspace string, js nats.JetStreamContext, heartbeatWatcher nats.KeyWatcher, inputOpts ...VertexOption) *FromVertex

NewFromVertex returns `FromVertex`

func (*FromVertex) AddProcessor

func (v *FromVertex) AddProcessor(processor string, p *FromProcessor)

AddProcessor adds a new processor.

func (*FromVertex) DeleteProcessor

func (v *FromVertex) DeleteProcessor(processor string)

DeleteProcessor deletes a processor.

func (*FromVertex) GetAllProcessors

func (v *FromVertex) GetAllProcessors() map[string]*FromProcessor

GetAllProcessors returns all the processors.

func (*FromVertex) GetProcessor

func (v *FromVertex) GetProcessor(processor string) *FromProcessor

GetProcessor gets a processor.

type FromVertexer

type FromVertexer interface {
	// GetAllProcessors fetches all the processors from Vn-1 vertex. processors could be pods or when the vertex is a
	// source vertex, it could be partitions if the source is Kafka.
	GetAllProcessors() map[string]*FromProcessor
}

FromVertexer defines an interface which builds the view of Vn-th vertex from the point of view of Vn-th vertex.

type OffsetTimeline

type OffsetTimeline struct {
	// contains filtered or unexported fields
}

OffsetTimeline is to store the event time to the offset records. Our list is sorted by event time from highest to lowest.

func NewOffsetTimeline

func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline

NewOffsetTimeline returns OffsetTimeline.

func (*OffsetTimeline) Capacity

func (t *OffsetTimeline) Capacity() int

Capacity returns the capacity of the OffsetTimeline list.

func (*OffsetTimeline) Dump

func (t *OffsetTimeline) Dump() string

Dump dumps the in-memory representation of the OffsetTimeline. Could get very ugly if the list is large, like > 100 elements. I am assuming we will have it in 10K+ (86400 seconds are there in a day).

func (*OffsetTimeline) GetEventTime

func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64

GetEventTime will return the event-time for the given offset. TODO(jyu6): will make Watermark an interface make it easy to get a Watermark and return an Offset?

func (*OffsetTimeline) GetHeadOffset

func (t *OffsetTimeline) GetHeadOffset() int64

GetHeadOffset returns the head offset, that is the most recent offset which will have the highest Watermark.

func (*OffsetTimeline) GetOffset

func (t *OffsetTimeline) GetOffset(eventTime int64) int64

GetOffset will return the offset for the given event-time. TODO(jyu6): will make Watermark an interface make it easy to pass an Offset and return a Watermark?

func (*OffsetTimeline) GetTailOffset

func (t *OffsetTimeline) GetTailOffset() int64

GetTailOffset returns the smallest offset with the smallest watermark.

func (*OffsetTimeline) Put

func (t *OffsetTimeline) Put(node OffsetWatermark)

Put inserts the OffsetWatermark into list. It ensures that the list will remain sorted after the insert.

type OffsetWatermark

type OffsetWatermark struct {
	// contains filtered or unexported fields
}

OffsetWatermark stores the maximum offset for the given event time we use basic data type int64 to compare the value

type ProcessorHeartbeat

type ProcessorHeartbeat struct {
	// contains filtered or unexported fields
}

ProcessorHeartbeat has details about each processor heartbeat. This information is populated by watching the Vn-1th vertex's processors. It stores only the latest heartbeat value.

func NewProcessorHeartbeat

func NewProcessorHeartbeat() *ProcessorHeartbeat

NewProcessorHeartbeat returns ProcessorHeartbeat.

func (*ProcessorHeartbeat) Delete

func (hb *ProcessorHeartbeat) Delete(key string)

Delete deletes a processor from the ProcessorHeartbeat table.

func (*ProcessorHeartbeat) Get

func (hb *ProcessorHeartbeat) Get(key string) int64

Get gets the heartbeat for a given processor.

func (*ProcessorHeartbeat) GetAll

func (hb *ProcessorHeartbeat) GetAll() map[string]int64

GetAll returns all the heartbeat entries in the heartbeat table.

func (*ProcessorHeartbeat) Put

func (hb *ProcessorHeartbeat) Put(key string, value int64)

Put inserts a heartbeat entry for a given processor key and value.

type VertexOption

type VertexOption func(*vertexOptions)

VertexOption set options for FromVertex.

func WithPodHeartbeatRate

func WithPodHeartbeatRate(rate int64) VertexOption

WithPodHeartbeatRate sets the heartbeat rate in seconds.

func WithRefreshingProcessorsRate

func WithRefreshingProcessorsRate(rate int64) VertexOption

WithRefreshingProcessorsRate sets the processor refreshing rate in seconds.

func WithSeparateOTBuckets

func WithSeparateOTBuckets(separate bool) VertexOption

WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL