Documentation
¶
Overview ¶
Package sinker implements the server code for user defined sink.
Example:
package main import ( "context" "fmt" "log" sinksdk "github.com/numaproj/numaflow-go/pkg/sinker" ) // logSink is a sinker implementation that logs the input to stdout type logSink struct { } func (l *logSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses { result := sinksdk.ResponsesBuilder() for d := range datumStreamCh { _ = d.EventTime() _ = d.Watermark() fmt.Println("User Defined Sink:", string(d.Value())) id := d.ID() result = result.Append(sinksdk.ResponseOK(id)) } return result } func main() { err := sinksdk.NewServer(&logSink{}).Start(context.Background()) if err != nil { log.Panic("Failed to start sink function server: ", err) } }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultOptions ¶
func DefaultOptions() *options
Types ¶
type Datum ¶
type Datum interface { Keys() []string Value() []byte EventTime() time.Time Watermark() time.Time ID() string }
Datum is the interface of incoming message payload for sink function.
type Option ¶
type Option func(*options)
Option is the interface to apply options.
func WithMaxMessageSize ¶
WithMaxMessageSize sets the sinkServer max receive message size and the sinkServer max send message size to the given size.
func WithServerInfoFilePath ¶
WithServerInfoFilePath sets the sinkServer info file path.
func WithSockAddr ¶
WithSockAddr start the sinkServer with the given sock addr. This is mainly used for testing purpose.
type Response ¶
type Response struct { // ID corresponds the ID in the message. ID string `json:"id"` // Successful or not. If it's false, "err" is expected to be present. Success bool `json:"success"` // Err represents the error message when "success" is false. Err string `json:"err,omitempty"` }
Response is the processing result of each message
func ResponseFailure ¶
func ResponseOK ¶
type Responses ¶
type Responses []Response
func ResponsesBuilder ¶
func ResponsesBuilder() Responses
ResponsesBuilder returns an empty instance of Responses
type Service ¶
type Service struct { sinkpb.UnimplementedSinkServer Sinker Sinker }
Service implements the proto gen server interface and contains the sinkfn operation handler.
type Sinker ¶
type Sinker interface { // Sink is the function to process a list of incoming messages Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses }
Sinker is the interface of sink function implementation.
type SinkerFunc ¶
SinkerFunc is utility type used to convert a Sink function to a Sinker.