Documentation ¶
Index ¶
- func CreateProcessorBucketIfMissing(bucketName string, js nats.JetStreamContext) (err error)
- func GetFetchKeyspace(v *dfv1.Vertex) string
- func GetHeartbeatBucket(js nats.JetStreamContext, publishKeyspace string) (nats.KeyValue, error)
- func GetJetStreamConnection(ctx context.Context) (nats.JetStreamContext, error)
- func GetPublishKeySpace(v *dfv1.Vertex) string
- type GenericProgress
- type GenericProgressOption
- type Progressor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateProcessorBucketIfMissing ¶ added in v0.5.2
CreateProcessorBucketIfMissing creates the KV bucket if missing TODO: this should be moved to controller
func GetFetchKeyspace ¶ added in v0.5.2
GetFetchKeyspace gets the fetch keyspace name from the vertex.
func GetHeartbeatBucket ¶ added in v0.5.2
GetHeartbeatBucket returns the heartbeat bucket.
func GetJetStreamConnection ¶ added in v0.5.2
func GetPublishKeySpace ¶ added in v0.5.2
GetPublishKeySpace gets the publish keyspace name from the vertex
Types ¶
type GenericProgress ¶
type GenericProgress struct {
// contains filtered or unexported fields
}
GenericProgress implements `Progressor` to progress the watermark for UDFs and Sinks.
func NewGenericProgress ¶
func NewGenericProgress(ctx context.Context, processorName string, fetchKeyspace string, publishKeyspace string, js nats.JetStreamContext, inputOpts ...GenericProgressOption) *GenericProgress
NewGenericProgress will move the watermark for all the vertices once consumed from the source.
func (*GenericProgress) GetLatestWatermark ¶
func (u *GenericProgress) GetLatestWatermark() processor.Watermark
GetLatestWatermark returns the latest head watermark.
func (*GenericProgress) GetWatermark ¶
func (u *GenericProgress) GetWatermark(offset isb.Offset) processor.Watermark
GetWatermark gets the watermark.
func (*GenericProgress) PublishWatermark ¶
func (u *GenericProgress) PublishWatermark(watermark processor.Watermark, offset isb.Offset)
PublishWatermark publishes the watermark.
func (*GenericProgress) StopPublisher ¶ added in v0.5.2
func (g *GenericProgress) StopPublisher()
type GenericProgressOption ¶
type GenericProgressOption func(options *genericProgressOptions)
GenericProgressOption sets options for GenericProgress.
func WithSeparateOTBuckets ¶
func WithSeparateOTBuckets(separate bool) GenericProgressOption
WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.