sink

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package sink implements the server code for User Defined Sink in golang.

Example:

package main

import (

  "context"
  "fmt"

  sinksdk "github.com/numaproj/numaflow-go/pkg/sink"
  "github.com/numaproj/numaflow-go/pkg/sink/server"

)

func handle(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
  result := sinksdk.ResponsesBuilder()
  for datum := range datumStreamCh {
    fmt.Println(string(datum.Value()))
    result = result.Append(sinksdk.ResponseOK(datum.ID()))
  }
  return result
}

func main() {
  server.New().RegisterSinker(sinksdk.SinkFunc(handle)).Start(context.Background())
}

Index

Constants

View Source
const (
	Protocol = "unix"
	Addr     = "/var/run/numaflow/udsink.sock"
	// DefaultMaxMessageSize overrides gRPC max message size configuration
	// https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59
	//   - defaultServerMaxReceiveMessageSize
	//   - defaultServerMaxSendMessageSize
	DefaultMaxMessageSize = 1024 * 1024 * 4
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	CloseConn(ctx context.Context) error
	IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
	SinkFn(ctx context.Context, datumList []*sinkpb.DatumRequest) ([]*sinkpb.Response, error)
}

Client contains methods to call a gRPC client.

type Datum

type Datum interface {
	Keys() []string
	Value() []byte
	EventTime() time.Time
	Watermark() time.Time
	ID() string
}

type Response

type Response struct {
	// ID corresponds the ID in the message.
	ID string `json:"id"`
	// Successful or not. If it's false, "err" is expected to be present.
	Success bool `json:"success"`
	// Err represents the error message when "success" is false.
	Err string `json:"err,omitempty"`
}

Response is the processing result of each message

func ResponseFailure

func ResponseFailure(id, errMsg string) Response

func ResponseOK

func ResponseOK(id string) Response

type Responses

type Responses []Response

func ResponsesBuilder

func ResponsesBuilder() Responses

ResponsesBuilder returns an empty instance of Responses

func (Responses) Append

func (r Responses) Append(rep Response) Responses

Append appends a response

func (Responses) Items

func (r Responses) Items() []Response

Items returns the response list

type Service

type Service struct {
	sinkpb.UnimplementedUserDefinedSinkServer

	Sinker SinkHandler
}

Service implements the proto gen server interface and contains the sink operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) SinkFn

func (fs *Service) SinkFn(stream sinkpb.UserDefinedSink_SinkFnServer) error

SinkFn applies a function to a list of datum element.

type SinkFunc

type SinkFunc func(ctx context.Context, datumStreamCh <-chan Datum) Responses

SinkFunc is utility type used to convert a HandleDo function to a SinkHandler.

func (SinkFunc) HandleDo

func (sf SinkFunc) HandleDo(ctx context.Context, datumStreamCh <-chan Datum) Responses

HandleDo implements the function of sink function.

type SinkHandler

type SinkHandler interface {
	// HandleDo is the function to process a list of incoming messages
	HandleDo(ctx context.Context, datumStreamCh <-chan Datum) Responses
}

SinkHandler is the interface of sink function implementation.

Directories

Path Synopsis
examples
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL