api

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const DeadGracePeriod = time.Minute * 10

DeadGracePeriod is the *extra* time that the bucket should be kept alive on top of the feature's Staleness. Bucket TTL = staleness + DeadGracePeriod

View Source
const ModelBuilder = "model"
View Source
const SourcelessBuilder = "sourceless"

Variables

View Source
var ErrFeatureAlreadyExists = fmt.Errorf("feature already exists")

ErrFeatureAlreadyExists is returned when a feature is already registered in the Core's engine manager.

View Source
var ErrFeatureNotFound = fmt.Errorf("feature not found")

ErrFeatureNotFound is returned when a feature is not found in the Core's engine manager.

View Source
var ErrInvalidPipelineContext = fmt.Errorf("invalid pipeline context")

ErrInvalidPipelineContext is returned when the context is invalid for pipelining.

View Source
var ErrUnsupportedAggrError = fmt.Errorf("unsupported aggr")

ErrUnsupportedAggrError is returned when an aggregate function is not supported.

View Source
var ErrUnsupportedPrimitiveError = fmt.Errorf("unsupported primitive")

ErrUnsupportedPrimitiveError is returned when a primitive is not supported.

View Source
var FQNRegExp = regexp.MustCompile(`(?si)^((?P<namespace>[a-z0-9]+(?:_[a-z0-9]+)*)\.)?(?P<name>[a-z0-9]+(?:_[a-z0-9]+)*)(\+(?P<aggrFn>([a-z]+_*[a-z]+)))?(@-(?P<version>([0-9]+)))?(\[(?P<encoding>([a-z]+_*[a-z]+))])?$`)

Functions

func AliveWindowBuckets

func AliveWindowBuckets(staleness, bucketSize time.Duration) []string

AliveWindowBuckets returns a list of all the *valid* buckets up until now

func BucketDeadTime

func BucketDeadTime(bucketName string, bucketSize, staleness time.Duration) time.Time

BucketDeadTime returns the end time of a given bucket by its name

func BucketName

func BucketName(ts time.Time, bucketSize time.Duration) string

BucketName returns a bucket name for a given timestamp and a bucket size

func BucketTime

func BucketTime(bucketName string, bucketSize time.Duration) time.Time

BucketTime returns the start time of a given bucket by its name

func ContextWithSelector

func ContextWithSelector(ctx context.Context, selector string) context.Context

func DeadWindowBuckets

func DeadWindowBuckets(staleness, bucketSize time.Duration) []string

DeadWindowBuckets returns a list of bucket names of *dead* bucket (bucket that is outside the window) that should be available

func LoggerFromContext

func LoggerFromContext(ctx context.Context) logr.Logger

LoggerFromContext returns the logger from the context. If not found it returns a discarded logger.

func NormalizeAny

func NormalizeAny(t any) (any, error)

func NormalizeFQN

func NormalizeFQN(fqn, defaultNamespace string) (string, error)

NormalizeFQN returns an FQN with the namespace

func NormalizeSelector

func NormalizeSelector(selector, defaultNamespace string) (string, error)

NormalizeSelector returns a selector with the default namespace if not specified

func ScalarFromString

func ScalarFromString(val string, scalar PrimitiveType) (any, error)

func ScalarString

func ScalarString(val any) string

func SelectorFromContext

func SelectorFromContext(ctx context.Context) (string, error)

func ToLowLevelValue

func ToLowLevelValue[T LowLevelValue](v any) T

ToLowLevelValue returns the low level value of the feature

Types

type AggrFn

type AggrFn int

AggrFn is an aggregation function

const (
	AggrFnUnknown AggrFn = iota
	AggrFnSum
	AggrFnAvg
	AggrFnMax
	AggrFnMin
	AggrFnCount
)

func ParseSelector

func ParseSelector(fqn string) (namespace, name string, aggrFn AggrFn, version uint, encoding string, err error)

func StringToAggrFn

func StringToAggrFn(s string) AggrFn

func StringsToAggrFns

func StringsToAggrFns(fns []string) ([]AggrFn, error)

func (AggrFn) String

func (w AggrFn) String() string

type BindConfig

type BindConfig func(set *pflag.FlagSet) error

BindConfig adds config flags for the plugin.

type CollectNotification

type CollectNotification struct {
	FQN         string `json:"fqn"`
	EncodedKeys string `json:"encoded_keys"`
	Bucket      string `json:"bucket,omitempty"`
}

type CollectNotifierFactory

type CollectNotifierFactory NotifierFactory[CollectNotification]

type ContextKey

type ContextKey int

ContextKey is a key to store data in context.

const (
	// ContextKeyCachePostGet is a key to store the flag to cache in the storage postGet value.
	// If not set it is defaulting to true.
	ContextKeyCachePostGet ContextKey = iota

	// ContextKeyCacheFresh is a key to store the flag that indicate if the result from the cache was fresh.
	ContextKeyCacheFresh

	// ContextKeyFromCache is a key to store the flag to indicate if the value is from the cache.
	ContextKeyFromCache

	// ContextKeyLogger is a key to store a logger.
	ContextKeyLogger

	// ContextKeySelector is a key to store the requested Feature Selector.
	ContextKeySelector
)

type DataSource

type DataSource struct {
	FQN    string                 `json:"fqn"`
	Kind   string                 `json:"kind"`
	Config manifests.ParsedConfig `json:"config"`
}

DataSource is a parsed abstracted representation of a manifests.DataSource

func DataSourceFromManifest

func DataSourceFromManifest(ctx context.Context, src *manifests.DataSource, r client.Reader) (DataSource, error)

DataSourceFromManifest returns a DataSource from a manifests.DataSource

type DataSourceGetter

type DataSourceGetter interface {
	GetDataSource(FQN string) (DataSource, error)
}

DataSourceGetter is a simple interface that returns a DataSource

type DataSourceManager

type DataSourceManager interface {
	BindDataSource(fd DataSource) error
	UnbindDataSource(FQN string) error
	HasDataSource(FQN string) bool
}

DataSourceManager is managing DataSource(s) within Core It is responsible for maintaining the DataSource(s) in an internal store

type DataSourceReconcile

type DataSourceReconcile func(ctx context.Context, rr DataSourceReconcileRequest) (changed bool, err error)

DataSourceReconcile is the interface to be implemented by plugins that want to be reconciled in the operator. This is useful for plugins that need to spawn an external Feature Ingestion.

It returns ture if the reconciliation has changed the object (and therefore the operator should re-queue).

type DataSourceReconcileRequest

type DataSourceReconcileRequest struct {
	DataSource     *manifests.DataSource
	RuntimeManager RuntimeManager
	Client         client.Client
	Scheme         *runtime.Scheme
	CoreAddress    string
}

DataSourceReconcileRequest contains metadata for the reconcile.

type Engine

type Engine interface {
	// FeatureDescriptor returns the FeatureDescriptor for the given FQN
	FeatureDescriptor(ctx context.Context, selector string) (FeatureDescriptor, error)
	// Get returns the value for the given FQN and keys
	// If the feature is not available, it returns nil.
	// If the feature is windowed, the returned Value is a map from window function to Value.
	Get(ctx context.Context, selector string, keys Keys) (Value, FeatureDescriptor, error)
	// Set sets the raw value for the given FQN and keys
	// If the feature's primitive is a List, it replaces the entire list.
	// If the feature is windowed, it is aliased to WindowAdd instead of Set.
	Set(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
	// Append appends to the raw value for the given FQN and keys
	// If the feature's primitive is NOT a List it will throw an error.
	Append(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
	// Incr increments the raw value of the feature.
	// If the feature's primitive is NOT a Scalar it will throw an error.
	// It returns the updated value in the state, and an error if occurred.
	Incr(ctx context.Context, FQN string, keys Keys, by any, ts time.Time) error
	// Update is the common function to update a feature SimpleValue.
	// Under the hood, it utilizes lower-level functions depending on the type of the feature.
	//  - Set for Scalars
	//	- Append for Lists
	//  - WindowAdd for Windows
	Update(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
}

Engine is the main engine of the Core It is responsible for the low-level operation for the features against the feature store

type ExtendedManager

type ExtendedManager interface {
	Engine
	RuntimeManager
	DataSourceGetter
}

ExtendedManager is an Engine that has a DataSource

type FeatureApply

type FeatureApply func(fd FeatureDescriptor, builder manifests.FeatureBuilder, pl Pipeliner, src ExtendedManager) error

FeatureApply applies changes on the feature abstraction.

type FeatureDescriptor

type FeatureDescriptor struct {
	FQN          string        `json:"FQN"`
	Primitive    PrimitiveType `json:"primitive"`
	Aggr         []AggrFn      `json:"aggr"`
	Freshness    time.Duration `json:"freshness"`
	Staleness    time.Duration `json:"staleness"`
	Timeout      time.Duration `json:"timeout"`
	KeepPrevious *KeepPrevious `json:"keep_previous"`
	Keys         []string      `json:"keys"`
	Builder      string        `json:"builder"`
	RuntimeEnv   string        `json:"runtimeEnv"`
	DataSource   string        `json:"data_source"`
	Dependencies []string      `json:"dependencies"`
}

FeatureDescriptor is describing a feature definition for an internal use of the Core.

func FeatureDescriptorFromManifest

func FeatureDescriptorFromManifest(in *manifests.Feature) (*FeatureDescriptor, error)

FeatureDescriptorFromManifest returns a FeatureDescriptor from a manifests.Feature

func (FeatureDescriptor) ValidWindow

func (fd FeatureDescriptor) ValidWindow() bool

ValidWindow checks if the feature have aggregation enabled, and if it is valid

type FeatureDescriptorGetter

type FeatureDescriptorGetter func(ctx context.Context, FQN string) (FeatureDescriptor, error)

type FeatureManager

type FeatureManager interface {
	BindFeature(in *manifests.Feature) error
	UnbindFeature(FQN string) error
	HasFeature(FQN string) bool
}

FeatureManager is managing Feature(s) within Core It is responsible for managing features as well as operating on them

type HistoricalWriter

type HistoricalWriter interface {
	Commit(context.Context, WriteNotification) error
	Flush(ctx context.Context, fqn string) error
	FlushAll(context.Context) error
	Close(ctx context.Context) error
	BindFeature(fd *FeatureDescriptor, model *manifests.ModelSpec, getter FeatureDescriptorGetter) error
}

type HistoricalWriterFactory

type HistoricalWriterFactory func(viper *viper.Viper) (HistoricalWriter, error)

type KeepPrevious

type KeepPrevious struct {
	Versions uint
	Over     time.Duration
}

type Keys

type Keys map[string]string

func (*Keys) Decode

func (k *Keys) Decode(encodedKeys string, fd FeatureDescriptor) error

func (*Keys) Encode

func (k *Keys) Encode(fd FeatureDescriptor) (string, error)

func (*Keys) String

func (k *Keys) String() string

type Logger

type Logger interface {
	Logger() logr.Logger
}

Logger is a simple interface that returns a Logr.Logger

type LowLevelValue

type LowLevelValue interface {
	~int | ~string | ~float64 | time.Time | ~[]int | ~[]string | ~[]float64 | ~[]time.Time | WindowResultMap
}

LowLevelValue is a low level value that can be cast to any type

type ManagerEngine

type ManagerEngine interface {
	Logger
	FeatureManager
	DataSourceManager
	RuntimeManager
	Engine
}

ManagerEngine is the business-logic implementation of the Core

type Middleware

type Middleware func(next MiddlewareHandler) MiddlewareHandler

type MiddlewareHandler

type MiddlewareHandler func(ctx context.Context, fd FeatureDescriptor, keys Keys, val Value) (Value, error)

type ModelDescriptor

type ModelDescriptor struct {
	Features        []string               `json:"features"`
	KeyFeature      string                 `json:"keyFeature,omitempty"`
	Keys            []string               `json:"keys"`
	ModelFramework  string                 `json:"modelFramework"`
	ModelServer     string                 `json:"modelServer"`
	InferenceConfig manifests.ParsedConfig `json:"inferenceConfig"`
}

type ModelReconcileRequest

type ModelReconcileRequest struct {
	Model  *manifests.Model
	Client client.Client
	Scheme *runtime.Scheme
}

ModelReconcileRequest contains metadata for the reconcile.

type ModelServer

type ModelServer interface {
	Reconcile(ctx context.Context, rr ModelReconcileRequest) (changed bool, err error)
	Owns() []client.Object
	Serve(ctx context.Context, fd FeatureDescriptor, md ModelDescriptor, val Value) (Value, error)
}

ModelServer is the interface to be implemented by plugins that implements a Model Server.

type Notification

type Notification interface {
	CollectNotification | WriteNotification
}

type Notifier

type Notifier[T Notification] interface {
	Notify(context.Context, T) error
	Subscribe(context.Context) (<-chan T, error)
}

Notifier is the interface to be implemented by plugins that want to provide a Queue implementation The Queue is used to sync notifications between instances

type NotifierFactory

type NotifierFactory[T Notification] func(viper *viper.Viper) (Notifier[T], error)

NotifierFactory is the interface to be implemented by plugins that implements Notifier.

type ParsedProgram

type ParsedProgram struct {
	// Primitive is the primitive that this program is returning
	Primitive PrimitiveType

	// Dependencies is a list of FQNs that this program *might* be depended on
	Dependencies []string
}

type Pipeliner

type Pipeliner interface {
	AddPreGetMiddleware(priority int, fn Middleware)
	AddPostGetMiddleware(priority int, fn Middleware)
	AddPreSetMiddleware(priority int, fn Middleware)
	AddPostSetMiddleware(priority int, fn Middleware)
}

Pipeliner is the interface that plugins can use to modify the Core's feature pipelines on creation time

type PrimitiveType

type PrimitiveType int
const (
	PrimitiveTypeUnknown PrimitiveType = iota
	PrimitiveTypeString
	PrimitiveTypeInteger
	PrimitiveTypeFloat
	PrimitiveTypeBoolean
	PrimitiveTypeTimestamp

	PrimitiveTypeStringList
	PrimitiveTypeIntegerList
	PrimitiveTypeFloatList
	PrimitiveTypeBooleanList
	PrimitiveTypeTimestampList
)

func StringToPrimitiveType

func StringToPrimitiveType(s string) PrimitiveType

func TypeDetect

func TypeDetect(t any) PrimitiveType

TypeDetect detects the PrimitiveType of the value.

func (PrimitiveType) Interface

func (pt PrimitiveType) Interface() any

func (PrimitiveType) Plural

func (pt PrimitiveType) Plural() PrimitiveType

func (PrimitiveType) Scalar

func (pt PrimitiveType) Scalar() bool

func (PrimitiveType) Singular

func (pt PrimitiveType) Singular() PrimitiveType

func (PrimitiveType) String

func (pt PrimitiveType) String() string

type RawBucket

type RawBucket struct {
	FQN         string          `json:"FQN"`
	Bucket      string          `json:"bucket"`
	EncodedKeys string          `json:"encoded_keys"`
	Data        WindowResultMap `json:"raw"`
}

RawBucket is the data that is stored in the raw bucket.

type RawBuckets

type RawBuckets []RawBucket

type RuntimeManager

type RuntimeManager interface {
	// LoadProgram loads a program into the runtime.
	LoadProgram(env, fqn, program string, packages []string) (*ParsedProgram, error)

	// ExecuteProgram executes a program in the runtime.
	ExecuteProgram(ctx context.Context, env string, fqn string, keys Keys, row map[string]any, ts time.Time, dryRun bool) (value Value, keyz Keys, err error)

	// GetSidecars returns the sidecar containers attached to the current container.
	GetSidecars() []v1.Container

	// GetDefaultEnv returns the default environment for the current container.
	GetDefaultEnv() string
}

type State

type State interface {
	// Get returns the SimpleValue of the feature.
	// If the feature is not available, it returns nil.
	// If the feature is windowed, the returned SimpleValue is a map from window function to SimpleValue.
	// version indicates the previous version of the feature. If version is 0, the latest version is returned.
	Get(ctx context.Context, fd FeatureDescriptor, keys Keys, version uint) (*Value, error)

	// Set sets the SimpleValue of the feature.
	// If the feature's primitive is a List, it replaces the entire list.
	// If the feature is windowed, it is aliased to WindowAdd instead of Set.
	Set(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error

	// Append appends the SimpleValue to the feature.
	// If the feature's primitive is NOT a List it will throw an error.
	Append(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, ts time.Time) error

	// Incr increments the SimpleValue of the feature.
	// If the feature's primitive is NOT a Scalar it will throw an error.
	// It returns the updated value in the state, and an error if occurred.
	Incr(ctx context.Context, fd FeatureDescriptor, keys Keys, by any, timestamp time.Time) error
	// Update is the common function to update a feature SimpleValue.
	// Under the hood, it utilizes lower-level functions depending on the type of the feature.
	//  - Set for Scalars
	//	- Append for Lists
	//  - WindowAdd for Windows
	Update(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error

	// WindowAdd adds a Bucket to the window that contains aggregated data internally
	// Later the bucket's aggregations should be aggregated for the whole Window via Get
	//
	// Buckets should last *at least* as long as the feature's staleness time + DeadGracePeriod
	WindowAdd(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error

	// WindowBuckets returns the list of RawBuckets for the feature and specific Keys.
	WindowBuckets(ctx context.Context, fd FeatureDescriptor, keys Keys, buckets []string) (RawBuckets, error)

	// DeadWindowBuckets returns the list of all the dead feature's RawBuckets of all the entities.
	DeadWindowBuckets(ctx context.Context, fd FeatureDescriptor, ignore RawBuckets) (RawBuckets, error)

	// Ping is a simple keepalive check for the state.
	// It should return an error in case an error occurred, or nil if everything is alright.
	Ping(ctx context.Context) error
}

State is a feature state management layer

type StateFactory

type StateFactory func(viper *viper.Viper) (State, error)

StateFactory is the interface to be implemented by plugins that implements storage providers.

type StateMethod

type StateMethod int

StateMethod is a method that can be used with a State.

const (
	StateMethodGet StateMethod = iota
	StateMethodSet
	StateMethodAppend
	StateMethodIncr
	StateMethodUpdate
	StateMethodWindowAdd
)

func (StateMethod) String

func (s StateMethod) String() string

type Value

type Value struct {
	// Value can be cast to LowLevelValue using the ToLowLevelValue() method
	Value     any       `json:"value"`
	Timestamp time.Time `json:"timestamp"`
	Fresh     bool      `json:"fresh"`
}

Value is storing a feature value.

type WindowResultMap

type WindowResultMap map[AggrFn]float64

WindowResultMap is a map of AggrFn and their aggregated results

type WriteNotification

type WriteNotification struct {
	FQN          string `json:"fqn"`
	EncodedKeys  string `json:"encoded_keys"`
	Bucket       string `json:"bucket,omitempty"`
	ActiveBucket bool   `json:"active_bucket,omitempty"`
	Value        *Value `json:"value,omitempty"`
}

type WriteNotifierFactory

type WriteNotifierFactory NotifierFactory[WriteNotification]

Directories

Path Synopsis
proto
gen/go Module
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL