processor

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package processor is the smallest processor entity for which the watermark will strictly monotonically increase.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EntityOption

type EntityOption func(*entityOptions)

EntityOption set options for FromVertex.

func WithSeparateOTBuckets

func WithSeparateOTBuckets(separate bool) EntityOption

WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.

type ProcessorEntitier

type ProcessorEntitier interface {
	GetID() string
	BuildOTWatcherKey(Watermark) string
	ParseOTWatcherKey(string) (int64, bool, error)
	IsOTBucketShared() bool
}

ProcessorEntitier defines what can be a processor. The Processor is the smallest unit where the watermark will monotonically increase.

type ProcessorEntity

type ProcessorEntity struct {
	// contains filtered or unexported fields
}

ProcessorEntity implements ProcessorEntitier.

func NewProcessorEntity

func NewProcessorEntity(name string, inputOpts ...EntityOption) *ProcessorEntity

NewProcessorEntity returns a new `ProcessorEntity`.

func (*ProcessorEntity) BuildOTWatcherKey

func (p *ProcessorEntity) BuildOTWatcherKey(watermark Watermark) string

BuildOTWatcherKey builds the offset-timeline key name

func (*ProcessorEntity) GetID

func (p *ProcessorEntity) GetID() string

GetID returns the ID of the processor.

func (*ProcessorEntity) IsOTBucketShared added in v0.5.3

func (p *ProcessorEntity) IsOTBucketShared() bool

IsOTBucketShared returns true if the OT bucket is shared.

func (*ProcessorEntity) ParseOTWatcherKey

func (p *ProcessorEntity) ParseOTWatcherKey(key string) (epoch int64, skip bool, err error)

ParseOTWatcherKey parses the key of the KeyValue OT watcher and returns the epoch, a boolean to indicate whether the record can be skipped and error if any. NOTE: _defaultKeySeparator has constraints, please make sure we will not end up with multiple values

type Watermark

type Watermark time.Time

Watermark is the monotonically increasing watermark. It is tightly coupled with ProcessorEntity as the processor is responsible for monotonically increasing Watermark for that processor. NOTE: today we support only second progression of watermark, we need to support millisecond too.

func (Watermark) After added in v0.5.6

func (w Watermark) After(t time.Time) bool

func (Watermark) String

func (w Watermark) String() string
Example
location, _ := time.LoadLocation("UTC")
wm := Watermark(time.Unix(1651129200, 0).In(location))
fmt.Println(wm)
Output:

2022-04-28T07:00:00Z

func (Watermark) Unix added in v0.6.1

func (w Watermark) Unix() int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL