sinker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2023 License: Apache-2.0 Imports: 13 Imported by: 11

Documentation

Overview

Package sinker implements the server code for user defined sink.

Example:

 package main

import (
	"context"
	"fmt"
	"log"

	sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
)

// logSink is a sinker implementation that logs the input to stdout
type logSink struct {
}

func (l *logSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
	result := sinksdk.ResponsesBuilder()
	for d := range datumStreamCh {
		_ = d.EventTime()
		_ = d.Watermark()
		fmt.Println("User Defined Sink:", string(d.Value()))
		id := d.ID()
		result = result.Append(sinksdk.ResponseOK(id))
	}
	return result
}

func main() {
	err := sinksdk.NewServer(&logSink{}).Start(context.Background())
	if err != nil {
		log.Panic("Failed to start sink function server: ", err)
	}
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultOptions

func DefaultOptions() *options

func NewServer

func NewServer(h Sinker, inputOptions ...Option) numaflow.Server

NewServer creates a new sinkServer object.

Types

type Datum

type Datum interface {
	Keys() []string
	Value() []byte
	EventTime() time.Time
	Watermark() time.Time
	ID() string
}

Datum is the interface of incoming message payload for sink function.

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the sinkServer max receive message size and the sinkServer max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(path string) Option

WithServerInfoFilePath sets the sinkServer info file path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the sinkServer with the given sock addr. This is mainly used for testing purpose.

type Response

type Response struct {
	// ID corresponds the ID in the message.
	ID string `json:"id"`
	// Successful or not. If it's false, "err" is expected to be present.
	Success bool `json:"success"`
	// Err represents the error message when "success" is false.
	Err string `json:"err,omitempty"`
}

Response is the processing result of each message

func ResponseFailure

func ResponseFailure(id, errMsg string) Response

func ResponseOK

func ResponseOK(id string) Response

type Responses

type Responses []Response

func ResponsesBuilder

func ResponsesBuilder() Responses

ResponsesBuilder returns an empty instance of Responses

func (Responses) Append

func (r Responses) Append(rep Response) Responses

Append appends a response

func (Responses) Items

func (r Responses) Items() []Response

Items returns the response list

type Service

type Service struct {
	sinkpb.UnimplementedSinkServer

	Sinker Sinker
}

Service implements the proto gen server interface and contains the sinkfn operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) SinkFn

func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error

SinkFn applies a sink function to a every element.

type Sinker

type Sinker interface {
	// Sink is the function to process a list of incoming messages
	Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses
}

Sinker is the interface of sink function implementation.

type SinkerFunc

type SinkerFunc func(ctx context.Context, datumStreamCh <-chan Datum) Responses

SinkerFunc is utility type used to convert a Sink function to a Sinker.

func (SinkerFunc) Sink

func (sf SinkerFunc) Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses

Sink implements the function of sink function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL