sinker

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 13 Imported by: 10

Documentation

Index

Constants

View Source
const (
	EnvUDContainerType      = "NUMAFLOW_UD_CONTAINER_TYPE"
	UDContainerFallbackSink = "fb-udsink"
)

Variables

This section is empty.

Functions

func NewServer

func NewServer(h Sinker, inputOptions ...Option) numaflow.Server

NewServer creates a new sinkServer object.

Types

type Datum

type Datum interface {
	// Keys returns the keys of the message.
	Keys() []string
	// Value returns the payload of the message.
	Value() []byte
	// EventTime returns the event time of the message.
	EventTime() time.Time
	// Watermark returns the watermark of the message.
	Watermark() time.Time
	// ID returns the ID of the message.
	ID() string
	// Headers returns the headers of the message.
	Headers() map[string]string
}

Datum is the interface of incoming message payload for sink function.

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the sinkServer max receive message size and the sinkServer max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(path string) Option

WithServerInfoFilePath sets the sinkServer info file path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the sinkServer with the given sock addr. This is mainly used for testing purpose.

type Response

type Response struct {
	// ID corresponds the ID in the message.
	ID string `json:"id"`
	// Successful or not. If it's false, "err" is expected to be present.
	Success bool `json:"success"`
	// Err represents the error message when "success" is false.
	Err string `json:"err,omitempty"`
	// Fallback is true if the message to be sent to the fallback sink.
	Fallback bool `json:"fallback,omitempty"`
}

Response is the processing result of each message

func ResponseFailure

func ResponseFailure(id, errMsg string) Response

ResponseFailure creates a failed Response with the given id and error message. The Success field is set to false and the Err field is set to the provided error message.

func ResponseFallback added in v0.7.0

func ResponseFallback(id string) Response

ResponseFallback creates a Response with the Fallback field set to true. This indicates that the message should be sent to the fallback sink.

func ResponseOK

func ResponseOK(id string) Response

ResponseOK creates a successful Response with the given id. The Success field is set to true.

type Responses

type Responses []Response

func ResponsesBuilder

func ResponsesBuilder() Responses

ResponsesBuilder returns an empty instance of Responses

func (Responses) Append

func (r Responses) Append(rep Response) Responses

Append appends a response

func (Responses) Items

func (r Responses) Items() []Response

Items returns the response list

type Service

type Service struct {
	sinkpb.UnimplementedSinkServer

	Sinker Sinker
}

Service implements the proto gen server interface and contains the sinkfn operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) SinkFn

func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error

SinkFn applies a sink function to a every element.

type Sinker

type Sinker interface {
	// Sink is the function to process a list of incoming messages
	Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses
}

Sinker is the interface of sink function implementation.

type SinkerFunc

type SinkerFunc func(ctx context.Context, datumStreamCh <-chan Datum) Responses

SinkerFunc is utility type used to convert a Sink function to a Sinker.

func (SinkerFunc) Sink

func (sf SinkerFunc) Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses

Sink implements the function of sink function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL