sourcer

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 20 Imported by: 15

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultPartitions added in v0.6.0

func DefaultPartitions() []int32

DefaultPartitions returns default partitions for the source. It can be used in the Partitions() function of the Sourcer interface only if the source doesn't have partitions. DefaultPartition will be the pod replica index of the source.

func NewServer

func NewServer(
	source Sourcer,
	inputOptions ...Option) numaflow.Server

NewServer creates a new server object.

Types

type AckRequest

type AckRequest interface {
	// Offsets returns the offsets to be acknowledged.
	Offsets() []Offset
}

AckRequest is the interface of ack request.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by UDSource

func NewMessage

func NewMessage(value []byte, offset Offset, eventTime time.Time) Message

NewMessage creates a Message with value

func (Message) EventTime

func (m Message) EventTime() time.Time

EventTime returns message event time

func (Message) Headers added in v0.7.0

func (m Message) Headers() map[string]string

Headers returns message headers

func (Message) Keys

func (m Message) Keys() []string

Keys returns message keys

func (Message) Offset

func (m Message) Offset() Offset

Offset returns message offset

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithHeaders added in v0.7.0

func (m Message) WithHeaders(headers map[string]string) Message

WithHeaders is used to assign the headers to the message

func (Message) WithKeys

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to the message

type Offset

type Offset struct {
	// contains filtered or unexported fields
}

func NewOffset

func NewOffset(value []byte, partitionId int32) Offset

NewOffset creates an Offset with value and partition id

func NewOffsetWithDefaultPartitionId added in v0.6.0

func NewOffsetWithDefaultPartitionId(value []byte) Offset

NewOffsetWithDefaultPartitionId creates an Offset with value and default partition id. This function can be used if you use DefaultPartitions() to implement the Sourcer interface. For most cases, this function can be used as long as the source does not have a concept of partitions. If you need to implement a custom partition, use `NewOffset`.

func (Offset) PartitionId

func (o Offset) PartitionId() int32

PartitionId returns partition id of the offset

func (Offset) Value

func (o Offset) Value() []byte

Value returns value of the offset

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(f string) Option

WithServerInfoFilePath sets the server info file path to the given path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.

type ReadRequest

type ReadRequest interface {
	// Count returns the number of records to read.
	Count() uint64
	// TimeOut returns the timeout of the read request.
	TimeOut() time.Duration
}

ReadRequest is the interface of read request.

type Service

type Service struct {
	sourcepb.UnimplementedSourceServer
	Source Sourcer
	// contains filtered or unexported fields
}

Service implements the proto gen server interface

func (*Service) AckFn

func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error

AckFn acknowledges the data from the source.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) PartitionsFn added in v0.6.0

func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PartitionsResponse, error)

func (*Service) PendingFn

func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PendingResponse, error)

PendingFn returns the number of pending messages.

func (*Service) ReadFn

func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error

ReadFn reads the data from the source.

type Sourcer

type Sourcer interface {
	// Read reads the data from the source and sends the data to the message channel.
	// If the read request is timed out, the function returns without reading new data.
	// Right after reading a message, the function marks the offset as to be acked.
	// Read should never attempt to close the message channel as the caller owns the channel.
	Read(ctx context.Context, readRequest ReadRequest, messageCh chan<- Message)
	// Ack acknowledges the data from the source.
	Ack(ctx context.Context, request AckRequest)
	// Pending returns the number of pending messages.
	// When the return value is negative, it indicates the pending information is not available.
	// With pending information being not available, the Numaflow platform doesn't auto-scale the source.
	Pending(ctx context.Context) int64
	// Partitions returns the partitions associated with the source, will be used by the platform to determine
	// the partitions to which the watermark should be published. If the source doesn't have partitions,
	// DefaultPartitions() can be used to return the default partitions.
	// In most cases, the DefaultPartitions() should be enough; the cases where we need to implement custom Partitions()
	// is in a case like Kafka, where a reader can read from multiple Kafka partitions.
	Partitions(ctx context.Context) []int32
}

Sourcer is the interface for implementation of the source.

Directories

Path Synopsis
examples
simple_source Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL