Documentation ¶
Overview ¶
Package function implements the server code for User Defined Function in golang.
Example Map
package main import ( "context" functionsdk "github.com/ashwinidulams/numaflow-go/pkg/function" "github.com/ashwinidulams/numaflow-go/pkg/function/server" ) // Simply return the same msg func handle(ctx context.Context, key string, data functionsdk.Datum) functionsdk.Messages { _ = data.EventTime() // Event time is available _ = data.Watermark() // Watermark is available return functionsdk.MessagesBuilder().Append(functionsdk.MessageToAll(data.Value())) } func main() { server.New().RegisterMapper(functionsdk.MapFunc(handle)).Start(context.Background()) }
Example MapT (extracting event time from the datum payload) MapT includes both Map and EventTime assignment functionalities. Although the input datum already contains EventTime and Watermark, it's up to the MapT implementor to decide on whether to use them for generating new EventTime. MapT can be used only at source vertex by source data transformer.
package main import ( "context" "time" functionsdk "github.com/numaproj/numaflow-go/pkg/function" "github.com/numaproj/numaflow-go/pkg/function/server" ) func mapTHandle(_ context.Context, key string, d functionsdk.Datum) functionsdk.MessageTs { eventTime := getEventTime(d.Value()) return types.MessageTsBuilder().Append(types.MessageTTo(eventTime, key, d.Value())) } func getEventTime(val []byte) time.Time { ... } func main() { server.New().RegisterMapperT(functionsdk.MapTFunc(mapTHandle)).Start(context.Background()) }
Example Reduce
package main import ( "context" "strconv" functionsdk "github.com/ashwinidulams/numaflow-go/pkg/function" "github.com/ashwinidulams/numaflow-go/pkg/function/server" ) // Count the incoming events func handle(_ context.Context, key string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages { var resultKey = key var resultVal []byte var counter = 0 for _ = range reduceCh { counter++ } resultVal = []byte(strconv.Itoa(counter)) return functionsdk.MessagesBuilder().Append(functionsdk.MessageTo(resultKey, resultVal)) } func main() { server.New().RegisterReducer(functionsdk.ReduceFunc(handle)).Start(context.Background()) }
Index ¶
- Constants
- Variables
- type Client
- type Datum
- type IntervalWindow
- type MapFunc
- type MapHandler
- type MapTFunc
- type MapTHandler
- type Message
- type MessageT
- type MessageTs
- type Messages
- type Metadata
- type ReduceFunc
- type ReduceHandler
- type Service
- func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*functionpb.ReadyResponse, error)
- func (fs *Service) MapFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)
- func (fs *Service) MapTFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)
- func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer) error
Constants ¶
const ( Protocol = "unix" Addr = "/var/run/numaflow/function.sock" DatumKey = "x-numaflow-datum-key" // DefaultMaxMessageSize overrides gRPC max message size configuration // https://github.com/grpc/grpc-go/blob/master/server.go#L58-L59 // - defaultServerMaxReceiveMessageSize // - defaultServerMaxSendMessageSize DefaultMaxMessageSize = 1024 * 1024 * 4 WinStartTime = "x-numaflow-win-start-time" WinEndTime = "x-numaflow-win-end-time" )
Variables ¶
var ( DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ ALL = fmt.Sprintf("%U__ALL__", '\\') // U+005C__ALL__ )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { CloseConn(ctx context.Context) error IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) MapTFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error) }
Client contains methods to call a gRPC client.
type IntervalWindow ¶
IntervalWindow contains methods to get the information for a given interval window.
func NewIntervalWindow ¶
func NewIntervalWindow(startTime time.Time, endTime time.Time) IntervalWindow
type MapHandler ¶
type MapHandler interface { // HandleDo is the function to process each coming message. HandleDo(ctx context.Context, key string, datum Datum) Messages }
MapHandler is the interface of map function implementation.
type MapTFunc ¶ added in v0.1008.0
MapTFunc is utility type used to convert a HandleDo function to a MapTHandler.
type MapTHandler ¶ added in v0.1008.0
type MapTHandler interface { // HandleDo is the function to process each coming message. HandleDo(ctx context.Context, key string, datum Datum) MessageTs }
MapTHandler is the interface of mapT function implementation.
type Message ¶
Message is used to wrap the data return by UDF functions
func MessageToAll ¶
MessageToAll creates a Message that will forward to all
type MessageT ¶ added in v0.1008.0
MessageT is used to wrap the data return by UDF functions. Compared with Message, MessageT contains one more field, the event time, usually extracted from the payload.
func MessageTTo ¶ added in v0.1008.0
MessageTTo creates a MessageT that will forward to specified "to"
func MessageTToAll ¶ added in v0.1008.0
MessageTToAll creates a MessageT that will forward to all
func MessageTToDrop ¶ added in v0.1008.0
func MessageTToDrop() MessageT
MessageTToDrop creates a MessageT to be dropped
type MessageTs ¶ added in v0.1008.0
type MessageTs []MessageT
func MessageTsBuilder ¶ added in v0.1008.0
func MessageTsBuilder() MessageTs
MessageTsBuilder returns an empty instance of MessageTs
type Messages ¶
type Messages []Message
func MessagesBuilder ¶
func MessagesBuilder() Messages
MessagesBuilder returns an empty instance of Messages
type Metadata ¶
type Metadata interface {
IntervalWindow() IntervalWindow
}
Metadata contains methods to get the metadata for the reduce operation.
func NewMetadata ¶
func NewMetadata(window IntervalWindow) Metadata
type ReduceFunc ¶
ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.
type ReduceHandler ¶
type ReduceHandler interface {
HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages
}
ReduceHandler is the interface of reduce function implementation.
type Service ¶
type Service struct { functionpb.UnimplementedUserDefinedFunctionServer Mapper MapHandler MapperT MapTHandler Reducer ReduceHandler }
Service implements the proto gen server interface and contains the map operation handler and the reduce operation handler.
func (*Service) IsReady ¶
func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*functionpb.ReadyResponse, error)
IsReady returns true to indicate the gRPC connection is ready.
func (*Service) MapFn ¶
func (fs *Service) MapFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)
MapFn applies a function to each datum element.
func (*Service) MapTFn ¶ added in v0.1008.0
func (fs *Service) MapTFn(ctx context.Context, d *functionpb.Datum) (*functionpb.DatumList, error)
MapTFn applies a function to each datum element. In addition to map function, MapTFn also supports assigning a new event time to datum. MapTFn can be used only at source vertex by source data transformer.
func (*Service) ReduceFn ¶
func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer) error
ReduceFn applies a reduce function to a datum stream.