Documentation ¶
Index ¶
- type Fetcher
- type OffsetTimeline
- func (t *OffsetTimeline) Capacity() int
- func (t *OffsetTimeline) Dump() string
- func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
- func (t *OffsetTimeline) GetEventTimeFromInt64(inputOffsetInt64 int64) int64
- func (t *OffsetTimeline) GetHeadOffset() int64
- func (t *OffsetTimeline) GetHeadOffsetWatermark() OffsetWatermark
- func (t *OffsetTimeline) GetHeadWatermark() int64
- func (t *OffsetTimeline) GetOffset(eventTime int64) int64
- func (t *OffsetTimeline) GetReferredWatermark(idleWM int64) OffsetWatermark
- func (t *OffsetTimeline) Put(node OffsetWatermark)
- func (t *OffsetTimeline) PutIdle(node OffsetWatermark)
- type OffsetWatermark
- type ProcessorHeartbeat
- type ProcessorManager
- type ProcessorManagerOption
- type ProcessorToFetch
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Fetcher ¶
type Fetcher interface { io.Closer // GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1. GetWatermark(offset isb.Offset) wmb.Watermark // GetHeadWatermark returns the latest watermark based on the head offset GetHeadWatermark() wmb.Watermark }
Fetcher fetches watermark data from Vn-1 vertex.
func NewEdgeFetcher ¶ added in v0.5.6
func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.WatermarkStoreWatcher) Fetcher
NewEdgeFetcher returns a new edge fetcher.
func NewSourceFetcher ¶ added in v0.5.6
func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher store.WatermarkStoreWatcher) Fetcher
NewSourceFetcher returns a new source fetcher, processorManager has the details about the processors responsible for writing to the buckets of the source buffer.
type OffsetTimeline ¶
type OffsetTimeline struct {
// contains filtered or unexported fields
}
OffsetTimeline is to store the event time to the offset records. Our list is sorted by event time from highest to lowest.
func NewOffsetTimeline ¶
func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline
NewOffsetTimeline returns OffsetTimeline.
func (*OffsetTimeline) Capacity ¶
func (t *OffsetTimeline) Capacity() int
Capacity returns the capacity of the OffsetTimeline list.
func (*OffsetTimeline) Dump ¶
func (t *OffsetTimeline) Dump() string
Dump dumps the in-memory representation of the OffsetTimeline. Could get very ugly if the list is large, like > 100 elements. I am assuming we will have it in 10K+ (86400 seconds are there in a day).
func (*OffsetTimeline) GetEventTime ¶
func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
GetEventTime will return the event-time for the given offset. TODO(jyu6): will make Watermark an interface make it easy to get a Watermark and return an Offset?
func (*OffsetTimeline) GetEventTimeFromInt64 ¶ added in v0.6.2
func (t *OffsetTimeline) GetEventTimeFromInt64(inputOffsetInt64 int64) int64
func (*OffsetTimeline) GetHeadOffset ¶
func (t *OffsetTimeline) GetHeadOffset() int64
GetHeadOffset returns the head offset, that is the most recent offset which will have the highest Watermark.
func (*OffsetTimeline) GetHeadOffsetWatermark ¶ added in v0.7.1
func (t *OffsetTimeline) GetHeadOffsetWatermark() OffsetWatermark
GetHeadOffsetWatermark returns the largest offset with the largest watermark.
func (*OffsetTimeline) GetHeadWatermark ¶ added in v0.5.6
func (t *OffsetTimeline) GetHeadWatermark() int64
GetHeadWatermark returns the head watermark, which is the highest one.
func (*OffsetTimeline) GetOffset ¶
func (t *OffsetTimeline) GetOffset(eventTime int64) int64
GetOffset will return the offset for the given event-time. TODO(jyu6): will make Watermark an interface make it easy to pass an Offset and return a Watermark?
func (*OffsetTimeline) GetReferredWatermark ¶ added in v0.7.2
func (t *OffsetTimeline) GetReferredWatermark(idleWM int64) OffsetWatermark
GetReferredWatermark returns the referred watermark that will be used to replace the idle watermark value
func (*OffsetTimeline) Put ¶
func (t *OffsetTimeline) Put(node OffsetWatermark)
Put inserts the OffsetWatermark into list. It ensures that the list will remain sorted after the insert.
func (*OffsetTimeline) PutIdle ¶ added in v0.7.1
func (t *OffsetTimeline) PutIdle(node OffsetWatermark)
PutIdle inserts the assumed OffsetWatermark which replaces the idle watermark into list. It ensures that the list will remain sorted after the insert.
type OffsetWatermark ¶
type OffsetWatermark struct {
// contains filtered or unexported fields
}
OffsetWatermark stores the maximum offset for the given event time we use basic data type int64 to compare the value
type ProcessorHeartbeat ¶
type ProcessorHeartbeat struct {
// contains filtered or unexported fields
}
ProcessorHeartbeat has details about each processor heartbeat. This information is populated by watching the Vn-1th vertex's processors. It stores only the latest heartbeat value.
func NewProcessorHeartbeat ¶
func NewProcessorHeartbeat() *ProcessorHeartbeat
NewProcessorHeartbeat returns ProcessorHeartbeat.
func (*ProcessorHeartbeat) Delete ¶
func (hb *ProcessorHeartbeat) Delete(key string)
Delete deletes a processor from the ProcessorHeartbeat table.
func (*ProcessorHeartbeat) Get ¶
func (hb *ProcessorHeartbeat) Get(key string) int64
Get gets the heartbeat for a given processor.
func (*ProcessorHeartbeat) GetAll ¶
func (hb *ProcessorHeartbeat) GetAll() map[string]int64
GetAll returns all the heartbeat entries in the heartbeat table.
func (*ProcessorHeartbeat) Put ¶
func (hb *ProcessorHeartbeat) Put(key string, value int64)
Put inserts a heartbeat entry for a given processor key and value.
type ProcessorManager ¶ added in v0.5.6
type ProcessorManager struct {
// contains filtered or unexported fields
}
ProcessorManager manages the point of view of Vn-1 from Vn vertex processors (or source processor). The code is running on Vn vertex. It has the mapping of all the processors which in turn has all the information about each processor timelines.
func NewProcessorManager ¶ added in v0.5.6
func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.WatermarkStoreWatcher, inputOpts ...ProcessorManagerOption) *ProcessorManager
NewProcessorManager returns a new ProcessorManager instance
func (*ProcessorManager) DeleteProcessor ¶ added in v0.5.6
func (v *ProcessorManager) DeleteProcessor(processor string)
DeleteProcessor deletes a processor.
func (*ProcessorManager) GetAllProcessors ¶ added in v0.5.6
func (v *ProcessorManager) GetAllProcessors() map[string]*ProcessorToFetch
GetAllProcessors returns all the processors.
func (*ProcessorManager) GetProcessor ¶ added in v0.5.6
func (v *ProcessorManager) GetProcessor(processor string) *ProcessorToFetch
GetProcessor gets a processor.
type ProcessorManagerOption ¶ added in v0.5.6
type ProcessorManagerOption func(*processorManagerOptions)
ProcessorManagerOption set options for FromVertex.
func WithPodHeartbeatRate ¶
func WithPodHeartbeatRate(rate int64) ProcessorManagerOption
WithPodHeartbeatRate sets the heartbeat rate in seconds.
func WithRefreshingProcessorsRate ¶
func WithRefreshingProcessorsRate(rate int64) ProcessorManagerOption
WithRefreshingProcessorsRate sets the processor refreshing rate in seconds.
type ProcessorToFetch ¶ added in v0.5.3
type ProcessorToFetch struct {
// contains filtered or unexported fields
}
ProcessorToFetch is the smallest unit of entity (from which we fetch data) that does inorder processing or contains inorder data.
func NewProcessorToFetch ¶ added in v0.5.3
func NewProcessorToFetch(ctx context.Context, processor processor.ProcessorEntitier, capacity int) *ProcessorToFetch
NewProcessorToFetch creates ProcessorToFetch.
func (*ProcessorToFetch) IsActive ¶ added in v0.5.3
func (p *ProcessorToFetch) IsActive() bool
IsActive returns whether a processor is active.
func (*ProcessorToFetch) IsDeleted ¶ added in v0.5.3
func (p *ProcessorToFetch) IsDeleted() bool
IsDeleted returns whether a processor has been deleted.
func (*ProcessorToFetch) IsInactive ¶ added in v0.5.3
func (p *ProcessorToFetch) IsInactive() bool
IsInactive returns whether a processor is inactive (no heartbeats or any sort).
func (*ProcessorToFetch) String ¶ added in v0.5.3
func (p *ProcessorToFetch) String() string