function

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package function implements the server code for User Defined Function in golang.

Example Map

package main

import (

  "context"

  functionsdk "github.com/numaproj/numaflow-go/pkg/function"
  "github.com/numaproj/numaflow-go/pkg/function/server"

)

// Simply return the same msg

func handle(ctx context.Context, key string, data functionsdk.Datum) functionsdk.Messages {
  _ = data.EventTime() // Event time is available
  _ = data.Watermark() // Watermark is available
  return functionsdk.MessagesBuilder().Append(functionsdk.MessageToAll(data.Value()))
}

func main() {
  server.New().RegisterMapper(functionsdk.MapFunc(handle)).Start(context.Background())
}

Example MapT (extracting event time from the datum payload) MapT includes both Map and EventTime assignment functionalities. Although the input datum already contains EventTime and Watermark, it's up to the MapT implementor to decide on whether to use them for generating new EventTime. MapT can be used only at source vertex by source data transformer.

  package main

  import (
	"context"
	"time"

	functionsdk "github.com/numaproj/numaflow-go/pkg/function"
	"github.com/numaproj/numaflow-go/pkg/function/server"
  )

  func mapTHandle(_ context.Context, key string, d functionsdk.Datum) functionsdk.MessageTs {
	eventTime := getEventTime(d.Value())
	return types.MessageTsBuilder().Append(types.MessageTTo(eventTime, key, d.Value()))
  }

  func getEventTime(val []byte) time.Time {
	...
  }

  func main() {
	server.New().RegisterMapperT(functionsdk.MapTFunc(mapTHandle)).Start(context.Background())
  }

Example Reduce

package main

import (
  "context"
  "strconv"

  functionsdk "github.com/numaproj/numaflow-go/pkg/function"
  "github.com/numaproj/numaflow-go/pkg/function/server"
)

// Count the incoming events

func handle(_ context.Context, key string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages {
  var resultKey = key
  var resultVal []byte
  var counter = 0
  for _ = range reduceCh {
      counter++
  }
  resultVal = []byte(strconv.Itoa(counter))
  return functionsdk.MessagesBuilder().Append(functionsdk.MessageTo(resultKey, resultVal))
}

func main() {
  server.New().RegisterReducer(functionsdk.ReduceFunc(handle)).Start(context.Background())
}

Index

Constants

View Source
const (
	Protocol = "unix"
	Addr     = "/var/run/numaflow/function.sock"
	DatumKey = "x-numaflow-datum-key"
	// DefaultMaxMessageSize overrides gRPC max message size configuration
	// https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59
	//   - defaultServerMaxReceiveMessageSize
	//   - defaultServerMaxSendMessageSize
	DefaultMaxMessageSize = 1024 * 1024 * 4
	WinStartTime          = "x-numaflow-win-start-time"
	WinEndTime            = "x-numaflow-win-end-time"
)

Variables

View Source
var (
	DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
	ALL  = fmt.Sprintf("%U__ALL__", '\\')  // U+005C__ALL__
)

Functions

This section is empty.

Types

type Client

type Client interface {
	CloseConn(ctx context.Context) error
	IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
	MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error)
	MapTFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error)
	ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error)
}

Client contains methods to call a gRPC client.

type Datum

type Datum interface {
	Value() []byte
	EventTime() time.Time
	Watermark() time.Time
}

Datum contains methods to get the payload information.

type IntervalWindow

type IntervalWindow interface {
	StartTime() time.Time
	EndTime() time.Time
}

IntervalWindow contains methods to get the information for a given interval window.

func NewIntervalWindow added in v0.2.4

func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow

type MapFunc

type MapFunc func(ctx context.Context, key string, datum Datum) Messages

MapFunc is utility type used to convert a HandleDo function to a MapHandler.

func (MapFunc) HandleDo

func (mf MapFunc) HandleDo(ctx context.Context, key string, datum Datum) Messages

HandleDo implements the function of map function.

type MapHandler

type MapHandler interface {
	// HandleDo is the function to process each coming message.
	HandleDo(ctx context.Context, key string, datum Datum) Messages
}

MapHandler is the interface of map function implementation.

type MapTFunc added in v0.3.1

type MapTFunc func(ctx context.Context, key string, datum Datum) MessageTs

MapTFunc is utility type used to convert a HandleDo function to a MapTHandler.

func (MapTFunc) HandleDo added in v0.3.1

func (mf MapTFunc) HandleDo(ctx context.Context, key string, datum Datum) MessageTs

HandleDo implements the function of mapT function.

type MapTHandler added in v0.3.1

type MapTHandler interface {
	// HandleDo is the function to process each coming message.
	HandleDo(ctx context.Context, key string, datum Datum) MessageTs
}

MapTHandler is the interface of mapT function implementation.

type Message

type Message struct {
	Key   string
	Value []byte
}

Message is used to wrap the data return by UDF functions

func MessageTo

func MessageTo(to string, value []byte) Message

MessageTo creates a Message that will forward to specified "to"

func MessageToAll

func MessageToAll(value []byte) Message

MessageToAll creates a Message that will forward to all

func MessageToDrop

func MessageToDrop() Message

MessageToDrop creates a Message to be dropped

type MessageT added in v0.3.1

type MessageT struct {
	EventTime time.Time
	Key       string
	Value     []byte
}

MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT contains one more field, the event time, usually extracted from the payload.

func MessageTTo added in v0.3.1

func MessageTTo(eventTime time.Time, to string, value []byte) MessageT

MessageTTo creates a MessageT that will forward to specified "to"

func MessageTToAll added in v0.3.1

func MessageTToAll(eventTime time.Time, value []byte) MessageT

MessageTToAll creates a MessageT that will forward to all

func MessageTToDrop added in v0.3.1

func MessageTToDrop() MessageT

MessageTToDrop creates a MessageT to be dropped

type MessageTs added in v0.3.1

type MessageTs []MessageT

func MessageTsBuilder added in v0.3.1

func MessageTsBuilder() MessageTs

MessageTsBuilder returns an empty instance of MessageTs

func (MessageTs) Append added in v0.3.1

func (m MessageTs) Append(msg MessageT) MessageTs

Append appends a MessageT

func (MessageTs) Items added in v0.3.1

func (m MessageTs) Items() []MessageT

Items returns the MessageT list

type Messages

type Messages []Message

func MessagesBuilder

func MessagesBuilder() Messages

MessagesBuilder returns an empty instance of Messages

func (Messages) Append

func (m Messages) Append(msg Message) Messages

Append appends a Message

func (Messages) Items

func (m Messages) Items() []Message

Items returns the message list

type Metadata

type Metadata interface {
	IntervalWindow() IntervalWindow
}

Metadata contains methods to get the metadata for the reduce operation.

func NewMetadata added in v0.2.4

func NewMetadata(window IntervalWindow) Metadata

type ReduceFunc

type ReduceFunc func(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages

ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.

func (ReduceFunc) HandleDo

func (rf ReduceFunc) HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages

HandleDo implements the function of reduce function.

type ReduceHandler

type ReduceHandler interface {
	HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages
}

ReduceHandler is the interface of reduce function implementation.

type Service

Service implements the proto gen server interface and contains the map operation handler and the reduce operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) MapFn

MapFn applies a function to each datum element.

func (*Service) MapTFn added in v0.3.1

func (fs *Service) MapTFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)

MapTFn applies a function to each datum element. In addition to map function, MapTFn also supports assigning a new event time to datum. MapTFn can be used only at source vertex by source data transformer.

func (*Service) ReduceFn

ReduceFn applies a reduce function to a datum stream.

Directories

Path Synopsis
examples
sum

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL