Documentation ¶
Overview ¶
Package function implements the server code for User Defined Function in golang.
Example Map
package main import ( "context" functionsdk "github.com/numaproj/numaflow-go/pkg/function" "github.com/numaproj/numaflow-go/pkg/function/server" ) // Simply return the same msg func handle(ctx context.Context, keys []string, data functionsdk.Datum) functionsdk.Messages { _ = data.EventTime() // Event time is available _ = data.Watermark() // Watermark is available return functionsdk.MessagesBuilder().Append(functionsdk.NewMessage(data.Value())) } func main() { server.New().RegisterMapper(functionsdk.MapFunc(handle)).Start(context.Background()) }
Example MapT (extracting event time from the datum payload) MapT includes both Map and EventTime assignment functionalities. Although the input datum already contains EventTime and Watermark, it's up to the MapT implementor to decide on whether to use them for generating new EventTime. MapT can be used only at source vertex by source data transformer.
package main import ( "context" "time" functionsdk "github.com/numaproj/numaflow-go/pkg/function" "github.com/numaproj/numaflow-go/pkg/function/server" ) func mapTHandle(_ context.Context, keys []string, d functionsdk.Datum) functionsdk.MessageTs { eventTime := getEventTime(d.Value()) return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(eventTime, d.Value()).WithKeys(keys))) } func getEventTime(val []byte) time.Time { ... } func main() { server.New().RegisterMapperT(functionsdk.MapTFunc(mapTHandle)).Start(context.Background()) }
Example Reduce
package main import ( "context" "strconv" functionsdk "github.com/numaproj/numaflow-go/pkg/function" "github.com/numaproj/numaflow-go/pkg/function/server" ) // Count the incoming events func handle(_ context.Context, keys []string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages { var resultKeys = keys var resultVal []byte var counter = 0 for _ = range reduceCh { counter++ } resultVal = []byte(strconv.Itoa(counter)) return functionsdk.MessagesBuilder().Append(functionsdk.NewMessage(resultVal).WithKeys(resultKey)) } func main() { server.New().RegisterReducer(functionsdk.ReduceFunc(handle)).Start(context.Background()) }
The Datum object contains the message payload and metadata. Currently, there are two fields in metadata: the message ID, the message delivery count to indicate how many times the message has been delivered. You can use these metadata to implement customized logic. For example,
...
func handle(ctx context.Context, keys []string, data functionsdk.Datum) functionsdk.Messages { deliveryCount := data.Metadata().NumDelivered() // Choose to do specific actions, if the message delivery count reaches a certain threshold. if deliveryCount > 3: ... }
Index ¶
- Constants
- Variables
- func IsMapMultiProcEnabled() bool
- type Client
- type Datum
- type DatumMetadata
- type IntervalWindow
- type MapFunc
- type MapHandler
- type MapTFunc
- type MapTHandler
- type Message
- type MessageT
- type MessageTs
- type Messages
- type Metadata
- type ReduceFunc
- type ReduceHandler
- type Service
- func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*functionpb.ReadyResponse, error)
- func (fs *Service) MapFn(ctx context.Context, d *functionpb.DatumRequest) (*functionpb.DatumResponseList, error)
- func (fs *Service) MapTFn(ctx context.Context, d *functionpb.DatumRequest) (*functionpb.DatumResponseList, error)
- func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer) error
Constants ¶
const ( TCP = "tcp" UDS = "unix" UDS_ADDR = "/var/run/numaflow/function.sock" TCP_ADDR = ":55551" // DefaultMaxMessageSize overrides gRPC max message size configuration // https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59 // - defaultServerMaxReceiveMessageSize // - defaultServerMaxSendMessageSize DefaultMaxMessageSize = 1024 * 1024 * 4 WinStartTime = "x-numaflow-win-start-time" WinEndTime = "x-numaflow-win-end-time" Delimiter = ":" )
Variables ¶
var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)
Functions ¶
func IsMapMultiProcEnabled ¶ added in v0.4.2
func IsMapMultiProcEnabled() bool
Types ¶
type Client ¶
type Client interface { CloseConn(ctx context.Context) error IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) MapFn(ctx context.Context, datum *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error) MapTFn(ctx context.Context, datum *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error) ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.DatumRequest) ([]*functionpb.DatumResponse, error) }
Client contains methods to call a gRPC client.
type Datum ¶
type Datum interface { Value() []byte EventTime() time.Time Watermark() time.Time Metadata() DatumMetadata }
Datum contains methods to get the payload information.
type DatumMetadata ¶ added in v0.4.1
type DatumMetadata interface { // ID returns the ID of the datum. ID() string // NumDelivered returns the number of times the datum has been delivered. NumDelivered() uint64 }
DatumMetadata contains methods to get the metadata information for datum.
type IntervalWindow ¶
IntervalWindow contains methods to get the information for a given interval window.
func NewIntervalWindow ¶ added in v0.2.4
func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow
type MapHandler ¶
type MapHandler interface { // HandleDo is the function to process each coming message. HandleDo(ctx context.Context, keys []string, datum Datum) Messages }
MapHandler is the interface of map function implementation.
type MapTFunc ¶ added in v0.3.1
MapTFunc is utility type used to convert a HandleDo function to a MapTHandler.
type MapTHandler ¶ added in v0.3.1
type MapTHandler interface { // HandleDo is the function to process each coming message. HandleDo(ctx context.Context, keys []string, datum Datum) MessageTs }
MapTHandler is the interface of mapT function implementation.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is used to wrap the data return by UDF functions
func NewMessage ¶ added in v0.4.3
NewMessage creates a Message with value
type MessageT ¶ added in v0.3.1
type MessageT struct {
// contains filtered or unexported fields
}
MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT contains one more field, the event time, usually extracted from the payload.
func MessageTToDrop ¶ added in v0.3.1
func MessageTToDrop() MessageT
MessageTToDrop creates a MessageT to be dropped
func NewMessageT ¶ added in v0.4.3
NewMessageT creates a Message with eventTime and value
type MessageTs ¶ added in v0.3.1
type MessageTs []MessageT
func MessageTsBuilder ¶ added in v0.3.1
func MessageTsBuilder() MessageTs
MessageTsBuilder returns an empty instance of MessageTs
type Messages ¶
type Messages []Message
func MessagesBuilder ¶
func MessagesBuilder() Messages
MessagesBuilder returns an empty instance of Messages
type Metadata ¶
type Metadata interface {
IntervalWindow() IntervalWindow
}
Metadata contains methods to get the metadata for the reduce operation.
func NewMetadata ¶ added in v0.2.4
func NewMetadata(window IntervalWindow) Metadata
type ReduceFunc ¶
type ReduceFunc func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.
type ReduceHandler ¶
type ReduceHandler interface {
HandleDo(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
}
ReduceHandler is the interface of reduce function implementation.
type Service ¶
type Service struct { functionpb.UnimplementedUserDefinedFunctionServer Mapper MapHandler MapperT MapTHandler Reducer ReduceHandler }
Service implements the proto gen server interface and contains the map operation handler and the reduce operation handler.
func (*Service) IsReady ¶
func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*functionpb.ReadyResponse, error)
IsReady returns true to indicate the gRPC connection is ready.
func (*Service) MapFn ¶
func (fs *Service) MapFn(ctx context.Context, d *functionpb.DatumRequest) (*functionpb.DatumResponseList, error)
MapFn applies a function to each datum element.
func (*Service) MapTFn ¶ added in v0.3.1
func (fs *Service) MapTFn(ctx context.Context, d *functionpb.DatumRequest) (*functionpb.DatumResponseList, error)
MapTFn applies a function to each datum element. In addition to map function, MapTFn also supports assigning a new event time to datum. MapTFn can be used only at source vertex by source data transformer.
func (*Service) ReduceFn ¶
func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer) error
ReduceFn applies a reduce function to a datum stream.