Documentation ¶
Overview ¶
Package function implements the server code for User Defined Function in golang.
Example:
package main import ( "context" functionsdk "github.com/numaproj/numaflow-go/pkg/function" "github.com/numaproj/numaflow-go/pkg/function/server" ) // Simply return the same msg func handle(ctx context.Context, key string, data functionsdk.Datum) functionsdk.Messages { _ = data.EventTime() // Event time is available _ = data.Watermark() // Watermark is available return functionsdk.MessagesBuilder().Append(functionsdk.MessageToAll(data.Value())) } func main() { server.New().RegisterMapper(functionsdk.MapFunc(handle)).Start(context.Background()) }
Index ¶
Constants ¶
const ( Protocol = "unix" Addr = "/var/run/numaflow/function.sock" DatumKey = "x-numaflow-datum-key" // DefaultMaxMessageSize overrides gRPC max message size configuration // https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59 // - defaultServerMaxReceiveMessageSize // - defaultServerMaxSendMessageSize DefaultMaxMessageSize = 1024 * 1024 * 4 WinStartTime = "x-numaflow-win-start-time" WinEndTime = "x-numaflow-win-end-time" )
Variables ¶
var ( DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ ALL = fmt.Sprintf("%U__ALL__", '\\') // U+005C__ALL__ )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { CloseConn(ctx context.Context) error IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error) }
Client contains methods to call a gRPC client.
type IntervalWindow ¶
IntervalWindow contains methods to get the information for a given interval window.
func NewIntervalWindow ¶ added in v0.2.4
func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow
type MapHandler ¶
type MapHandler interface { // HandleDo is the function to process each coming message HandleDo(ctx context.Context, key string, datum Datum) Messages }
MapHandler is the interface of map function implementation.
type Message ¶
Message is used to wrap the data return by UDF functions
func MessageToAll ¶
MessageToAll creates a Message that will forward to all
type Messages ¶
type Messages []Message
func MessagesBuilder ¶
func MessagesBuilder() Messages
MessagesBuilder returns an empty instance of Messages
type Metadata ¶
type Metadata interface {
IntervalWindow() IntervalWindow
}
Metadata contains methods to get the metadata for the reduce operation.
func NewMetadata ¶ added in v0.2.4
func NewMetadata(window IntervalWindow) Metadata
type ReduceFunc ¶
ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.
type ReduceHandler ¶
type ReduceHandler interface {
HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages
}
ReduceHandler is the interface of reduce function implementation.
type Service ¶
type Service struct { functionpb.UnimplementedUserDefinedFunctionServer Mapper MapHandler Reducer ReduceHandler }
Service implements the proto gen server interface and contains the map operation handler and the reduce operation handler.
func (*Service) IsReady ¶
func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*functionpb.ReadyResponse, error)
IsReady returns true to indicate the gRPC connection is ready.
func (*Service) MapFn ¶
func (fs *Service) MapFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)
MapFn applies a function to each datum element
func (*Service) ReduceFn ¶
func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer) error
ReduceFn applies a reduce function to a datum stream.