function

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package function implements the server code for User Defined Function in golang.

Example Map

package main

import (

  "context"

  functionsdk "github.com/numaproj/numaflow-go/pkg/function"
  "github.com/numaproj/numaflow-go/pkg/function/server"

)

// Simply return the same msg

func handle(ctx context.Context, keys []string, data functionsdk.Datum) functionsdk.Messages {
  _ = data.EventTime() // Event time is available
  _ = data.Watermark() // Watermark is available
  return functionsdk.MessagesBuilder().Append(functionsdk.NewMessage(data.Value()))
}

func main() {
  server.New().RegisterMapper(functionsdk.MapFunc(handle)).Start(context.Background())
}

Example MapT (extracting event time from the datum payload) MapT includes both Map and EventTime assignment functionalities. Although the input datum already contains EventTime and Watermark, it's up to the MapT implementor to decide on whether to use them for generating new EventTime. MapT can be used only at source vertex by source data transformer.

  package main

  import (
	"context"
	"time"

	functionsdk "github.com/numaproj/numaflow-go/pkg/function"
	"github.com/numaproj/numaflow-go/pkg/function/server"
  )

  func mapTHandle(_ context.Context, keys []string, d functionsdk.Datum) functionsdk.MessageTs {
	eventTime := getEventTime(d.Value())
	return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(eventTime, d.Value()).WithKeys(keys)))
  }

  func getEventTime(val []byte) time.Time {
	...
  }

  func main() {
	server.New().RegisterMapperT(functionsdk.MapTFunc(mapTHandle)).Start(context.Background())
  }

Example Reduce

package main

import (
  "context"
  "strconv"

  functionsdk "github.com/numaproj/numaflow-go/pkg/function"
  "github.com/numaproj/numaflow-go/pkg/function/server"
)

// Count the incoming events

func handle(_ context.Context, keys []string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages {
  var resultKeys = keys
  var resultVal []byte
  var counter = 0
  for _ = range reduceCh {
      counter++
  }
  resultVal = []byte(strconv.Itoa(counter))
  return functionsdk.MessagesBuilder().Append(functionsdk.NewMessage(resultVal).WithKeys(resultKey))
}

func main() {
  server.New().RegisterReducer(functionsdk.ReduceFunc(handle)).Start(context.Background())
}

The Datum object contains the message payload and metadata. Currently, there are two fields in metadata: the message ID, the message delivery count to indicate how many times the message has been delivered. You can use these metadata to implement customized logic. For example,

...

func handle(ctx context.Context, keys []string, data functionsdk.Datum) functionsdk.Messages {
    deliveryCount := data.Metadata().NumDelivered()
    // Choose to do specific actions, if the message delivery count reaches a certain threshold.
    if deliveryCount > 3:
        ...
}

Index

Constants

View Source
const (
	TCP      = "tcp"
	UDS      = "unix"
	UDS_ADDR = "/var/run/numaflow/function.sock"
	TCP_ADDR = ":55551"
	// DefaultMaxMessageSize overrides gRPC max message size configuration
	// https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59
	//   - defaultServerMaxReceiveMessageSize
	//   - defaultServerMaxSendMessageSize
	DefaultMaxMessageSize = 1024 * 1024 * 4
	WinStartTime          = "x-numaflow-win-start-time"
	WinEndTime            = "x-numaflow-win-end-time"
	Delimiter             = ":"
)

Variables

View Source
var (
	DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

Functions

func IsMapMultiProcEnabled added in v0.4.2

func IsMapMultiProcEnabled() bool

Types

type Client

type Client interface {
	CloseConn(ctx context.Context) error
	IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
	MapFn(ctx context.Context, datum *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error)
	MapTFn(ctx context.Context, datum *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error)
	ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error)
}

Client contains methods to call a gRPC client.

type Datum

type Datum interface {
	Value() []byte
	EventTime() time.Time
	Watermark() time.Time
	Metadata() DatumMetadata
}

Datum contains methods to get the payload information.

type DatumMetadata added in v0.4.1

type DatumMetadata interface {
	// ID returns the ID of the datum.
	ID() string
	// NumDelivered returns the number of times the datum has been delivered.
	NumDelivered() uint64
}

DatumMetadata contains methods to get the metadata information for datum.

type IntervalWindow

type IntervalWindow interface {
	StartTime() time.Time
	EndTime() time.Time
}

IntervalWindow contains methods to get the information for a given interval window.

func NewIntervalWindow added in v0.2.4

func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow

type MapFunc

type MapFunc func(ctx context.Context, keys []string, datum Datum) Messages

MapFunc is utility type used to convert a HandleDo function to a MapHandler.

func (MapFunc) HandleDo

func (mf MapFunc) HandleDo(ctx context.Context, keys []string, datum Datum) Messages

HandleDo implements the function of map function.

type MapHandler

type MapHandler interface {
	// HandleDo is the function to process each coming message.
	HandleDo(ctx context.Context, keys []string, datum Datum) Messages
}

MapHandler is the interface of map function implementation.

type MapTFunc added in v0.3.1

type MapTFunc func(ctx context.Context, keys []string, datum Datum) MessageTs

MapTFunc is utility type used to convert a HandleDo function to a MapTHandler.

func (MapTFunc) HandleDo added in v0.3.1

func (mf MapTFunc) HandleDo(ctx context.Context, keys []string, datum Datum) MessageTs

HandleDo implements the function of mapT function.

type MapTHandler added in v0.3.1

type MapTHandler interface {
	// HandleDo is the function to process each coming message.
	HandleDo(ctx context.Context, keys []string, datum Datum) MessageTs
}

MapTHandler is the interface of mapT function implementation.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by UDF functions

func MessageToDrop

func MessageToDrop() Message

MessageToDrop creates a Message to be dropped

func NewMessage added in v0.4.3

func NewMessage(value []byte) Message

NewMessage creates a Message with value

func (Message) Keys added in v0.4.0

func (m Message) Keys() []string

Keys returns message keys

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithKeys added in v0.4.3

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to the message

func (Message) WithTags added in v0.4.3

func (m Message) WithTags(tags []string) Message

WithTags is used to assign the tags to the message tags will be used for conditional forwarding

type MessageT added in v0.3.1

type MessageT struct {
	// contains filtered or unexported fields
}

MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT contains one more field, the event time, usually extracted from the payload.

func MessageTToDrop added in v0.3.1

func MessageTToDrop() MessageT

MessageTToDrop creates a MessageT to be dropped

func NewMessageT added in v0.4.3

func NewMessageT(value []byte, eventTime time.Time) MessageT

NewMessageT creates a Message with eventTime and value

func (MessageT) EventTime added in v0.3.1

func (m MessageT) EventTime() time.Time

EventTime returns message eventTime

func (MessageT) Keys added in v0.4.0

func (m MessageT) Keys() []string

Keys returns message keys

func (MessageT) Value added in v0.3.1

func (m MessageT) Value() []byte

Value returns message value

func (MessageT) WithKeys added in v0.4.3

func (m MessageT) WithKeys(keys []string) MessageT

WithKeys is used to assign the keys to messageT

func (MessageT) WithTags added in v0.4.3

func (m MessageT) WithTags(tags []string) MessageT

WithTags is used to assign the tags to messageT tags will be used for conditional forwarding

type MessageTs added in v0.3.1

type MessageTs []MessageT

func MessageTsBuilder added in v0.3.1

func MessageTsBuilder() MessageTs

MessageTsBuilder returns an empty instance of MessageTs

func (MessageTs) Append added in v0.3.1

func (m MessageTs) Append(msg MessageT) MessageTs

Append appends a MessageT

func (MessageTs) Items added in v0.3.1

func (m MessageTs) Items() []MessageT

Items returns the MessageT list

type Messages

type Messages []Message

func MessagesBuilder

func MessagesBuilder() Messages

MessagesBuilder returns an empty instance of Messages

func (Messages) Append

func (m Messages) Append(msg Message) Messages

Append appends a Message

func (Messages) Items

func (m Messages) Items() []Message

Items returns the message list

type Metadata

type Metadata interface {
	IntervalWindow() IntervalWindow
}

Metadata contains methods to get the metadata for the reduce operation.

func NewMetadata added in v0.2.4

func NewMetadata(window IntervalWindow) Metadata

type ReduceFunc

type ReduceFunc func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages

ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.

func (ReduceFunc) HandleDo

func (rf ReduceFunc) HandleDo(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages

HandleDo implements the function of reduce function.

type ReduceHandler

type ReduceHandler interface {
	HandleDo(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
}

ReduceHandler is the interface of reduce function implementation.

type Service

Service implements the proto gen server interface and contains the map operation handler and the reduce operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) MapFn

MapFn applies a function to each datum element.

func (*Service) MapTFn added in v0.3.1

MapTFn applies a function to each datum element. In addition to map function, MapTFn also supports assigning a new event time to datum. MapTFn can be used only at source vertex by source data transformer.

func (*Service) ReduceFn

ReduceFn applies a reduce function to a datum stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL