Documentation ¶
Index ¶
- func DefaultPartitions() []int32
- func NewServer(source Sourcer, inputOptions ...Option) numaflow.Server
- type AckRequest
- type Message
- type Offset
- type Option
- type ReadRequest
- type Service
- func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb.AckResponse, error)
- func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sourcepb.ReadyResponse, error)
- func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PartitionsResponse, error)
- func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PendingResponse, error)
- func (fs *Service) ReadFn(d *sourcepb.ReadRequest, stream sourcepb.Source_ReadFnServer) error
- type Sourcer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultPartitions ¶ added in v0.6.0
func DefaultPartitions() []int32
DefaultPartitions returns default partitions for the source. It can be used in the Partitions() function of the Sourcer interface only if the source doesn't have partitions. DefaultPartition will be the pod replica index of the source.
Types ¶
type AckRequest ¶
type AckRequest interface { // Offsets returns the offsets of the records to ack. Offsets() []Offset }
AckRequest is the interface of ack request.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is used to wrap the data return by UDSource
func NewMessage ¶
NewMessage creates a Message with value
func (Message) WithHeaders ¶ added in v0.7.0
WithHeaders is used to assign the headers to the message
type Offset ¶
type Offset struct {
// contains filtered or unexported fields
}
func NewOffsetWithDefaultPartitionId ¶ added in v0.6.0
NewOffsetWithDefaultPartitionId creates an Offset with value and default partition id. This function can be used if you use DefaultPartitions() to implement the Sourcer interface. For most cases, this function can be used as long as the source does not have a concept of partitions. If you need to implement a custom partition, use `NewOffset`.
func (Offset) PartitionId ¶
PartitionId returns partition id of the offset
type Option ¶
type Option func(*options)
Option is the interface to apply options.
func WithMaxMessageSize ¶
WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.
func WithServerInfoFilePath ¶
WithServerInfoFilePath sets the server info file path to the given path.
func WithSockAddr ¶
WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.
type ReadRequest ¶
type ReadRequest interface { // Count returns the number of records to read. Count() uint64 // TimeOut returns the timeout of the read request. TimeOut() time.Duration }
ReadRequest is the interface of read request.
type Service ¶
type Service struct { sourcepb.UnimplementedSourceServer Source Sourcer // contains filtered or unexported fields }
Service implements the proto gen server interface
func (*Service) AckFn ¶
func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb.AckResponse, error)
AckFn applies a function to each datum element.
func (*Service) PartitionsFn ¶ added in v0.6.0
func (*Service) PendingFn ¶
func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PendingResponse, error)
PendingFn returns the number of pending messages.
func (*Service) ReadFn ¶
func (fs *Service) ReadFn(d *sourcepb.ReadRequest, stream sourcepb.Source_ReadFnServer) error
ReadFn reads the data from the source.
type Sourcer ¶
type Sourcer interface { // Read reads the data from the source and sends the data to the message channel. // If the read request is timed out, the function returns without reading new data. // Right after reading a message, the function marks the offset as to be acked. // Read should never attempt to close the message channel as the caller owns the channel. Read(ctx context.Context, readRequest ReadRequest, messageCh chan<- Message) // Ack acknowledges the data from the source. Ack(ctx context.Context, request AckRequest) // Pending returns the number of pending messages. // When the return value is negative, it indicates the pending information is not available. // With pending information being not available, the Numaflow platform doesn't auto-scale the source. Pending(ctx context.Context) int64 // Partitions returns the partitions associated with the source, will be used by the platform to determine // the partitions to which the watermark should be published. If the source doesn't have partitions, // DefaultPartitions() can be used to return the default partitions. // In most cases, the DefaultPartitions() should be enough; the cases where we need to implement custom Partitions() // is in a case like Kafka, where a reader can read from multiple Kafka partitions. Partitions(ctx context.Context) []int32 }
Sourcer is the interface for implementation of the source.