udf

package
v0.77.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

README

#User Defined Function Service

This service enables custom UDF registration with pre defined provider set.

Usage

Register UDF

endly -r=reqister

@register.yaml

pipeline:
    register-udf:
      action: udf:register
      udfs:
        - id: AddressBookToProto
          provider: ProtoWriter
          params:
            - /Project/proto/address_book.proto
            - AddressBook
        - id: ProtoToAddressBook
          provider: ProtoReader
          params:
            - /Project/proto/address_book.proto
            - AddressBook

Avro Reader UDF data validation:

endly -r=test

@test.yaml

pipeline:
  loadData:
    action: storage:download
    udf: AvroReader
    source:
      URL: gs://mye2ebucket/data/output/1/app_data00000.avro
    credentials: $gcpSecrets
    destKey: matchedData
    post:
      myData: $Transformed

  infoMyData:
    action: print
    message: $AsJSON(${myData})

  infoMatched:
    action: print
    message: $AsJSON(${matchedData})

  infoAll:
    action: print
    message: $AsJSON(${loadData})
    
  assert:
    action: validator:assert
    actual: $AsData(${matchedData})
    expect: $Cat(${parent.path}/expect/mydata.json)
    

Predefined UDF Providers

Provider Arguments
ProtoReader schemaFile, messageType, importPath
ProtoReader schemaFile, messageType, importPath
AvroWriter avroSchema/URL, compression
CsvReader headerFields, delimiter

Service actions

UDF Service

Service Id Action Description Request Response
udf register register custom UDF with udf provider RegisterRequest RegisterResponse

Documentation

Index

Constants

View Source
const (
	//ServiceID represents UDF service id.
	ServiceID = "udf"
)

Variables

This section is empty.

Functions

func AsProtobufMessage

func AsProtobufMessage(source interface{}, state data.Map, target proto.Message) (interface{}, error)

AsProtobufMessage generic method for converting a map, or json string into a proto message

func DateOfBirth

func DateOfBirth(source interface{}, state data.Map) (interface{}, error)

DateOfBirth returns formatted date of birth, it take desired age, [month], [day], [timeformat]

func FromProtobufMessage

func FromProtobufMessage(source interface{}, state data.Map, sourceMessage proto.Message) (interface{}, error)

FromProtobufMessage generic method for converting a proto message into a map

func GZipContentCorrupter

func GZipContentCorrupter(source interface{}, state data.Map) (interface{}, error)

GZipContentCorrupter corrupt zip content modifier

func GZipper

func GZipper(source interface{}, state data.Map) (interface{}, error)

GZipper copy modifier, mofidies source using zip udf

func Hostname

func Hostname(source interface{}, state data.Map) (interface{}, error)

Hostname return host from URL

func LoadData

func LoadData(source interface{}, state data.Map) (interface{}, error)

func New

func New() endly.Service

New creates a new udf service.

func NewAvroReader

func NewAvroReader(source interface{}, state data.Map) (interface{}, error)

NewAvroReader creates a new avro reader UDFs

func NewAvroWriter

func NewAvroWriter(args ...interface{}) (func(source interface{}, state data.Map) (interface{}, error), error)

NewAvroWriter creates a new avro writer provider

func NewCsvReader

func NewCsvReader(args ...interface{}) (func(source interface{}, state data.Map) (interface{}, error), error)

func NewProtoReader

func NewProtoReader(args ...interface{}) (func(source interface{}, state data.Map) (interface{}, error), error)

NewProtoWriter creates a new proto writer provider

func NewProtoWriter

func NewProtoWriter(args ...interface{}) (func(source interface{}, state data.Map) (interface{}, error), error)

NewProtoWriter creates a new proto writer provider

func RegisterProviders

func RegisterProviders(providers []*endly.UdfProvider) error

RegisterProviders register the supplied providers

func TransformWithUDF

func TransformWithUDF(context *endly.Context, udfName, source string, payload interface{}) (interface{}, error)

TransformWithUDF transform payload with provided UDFs name.

func URLJoin

func URLJoin(source interface{}, state data.Map) (interface{}, error)

URLJoin joins base URL and URI path

func URLPath

func URLPath(source interface{}, state data.Map) (interface{}, error)

URLPath return path from URL

Types

type ProtoCodec

type ProtoCodec struct {
	// contains filtered or unexported fields
}

ProtoCodec represent a proto codec

func NewProtoCodec

func NewProtoCodec(schemaFile, importPath string, msgType string, lowercaseKey bool) (*ProtoCodec, error)

NewProtoCodec creates a new protobuf codec

func (*ProtoCodec) AsBinary

func (c *ProtoCodec) AsBinary(msgType string, msg interface{}) ([]byte, error)

func (*ProtoCodec) AsMessage

func (c *ProtoCodec) AsMessage(msgType string, data []byte) (interface{}, error)

type RegisterRequest

type RegisterRequest struct {
	UDFs []*endly.UdfProvider `description:"collection of predefined udf provider name with custom parameters and new registration id"`
}

RegisterRequest represents a register udf request

func NewRegisterRequestFromURL

func NewRegisterRequestFromURL(URL string) (*RegisterRequest, error)

type RegisterResponse

type RegisterResponse struct{}

RegisterRequest represents a register response

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL