Documentation ¶
Index ¶
- Constants
- Variables
- func AliveWindowBuckets(staleness, bucketSize time.Duration) []string
- func BucketDeadTime(bucketName string, bucketSize, staleness time.Duration) time.Time
- func BucketName(ts time.Time, bucketSize time.Duration) string
- func BucketTime(bucketName string, bucketSize time.Duration) time.Time
- func ContextWithSelector(ctx context.Context, selector string) context.Context
- func DeadWindowBuckets(staleness, bucketSize time.Duration) []string
- func LoggerFromContext(ctx context.Context) logr.Logger
- func NormalizeAny(t any) (any, error)
- func NormalizeFQN(fqn, defaultNamespace string) (string, error)
- func NormalizeSelector(selector, defaultNamespace string) (string, error)
- func ScalarFromString(val string, scalar PrimitiveType) (any, error)
- func ScalarString(val any) string
- func SelectorFromContext(ctx context.Context) (string, error)
- func ToLowLevelValue[T LowLevelValue](v any) T
- type AggrFn
- type BindConfig
- type CollectNotification
- type CollectNotifierFactory
- type ContextKey
- type DataSource
- type DataSourceGetter
- type DataSourceManager
- type DataSourceReconcile
- type DataSourceReconcileRequest
- type Engine
- type ExtendedManager
- type FeatureApply
- type FeatureDescriptor
- type FeatureDescriptorGetter
- type FeatureManager
- type HistoricalWriter
- type HistoricalWriterFactory
- type KeepPrevious
- type Keys
- type Logger
- type LowLevelValue
- type ManagerEngine
- type Middleware
- type MiddlewareHandler
- type ModelDescriptor
- type ModelReconcileRequest
- type ModelServer
- type Notification
- type Notifier
- type NotifierFactory
- type ParsedProgram
- type Pipeliner
- type Plugins
- type PrimitiveType
- type RawBucket
- type RawBuckets
- type RuntimeManager
- type State
- type StateFactory
- type StateMethod
- type Value
- type WindowResultMap
- type WriteNotification
- type WriteNotifierFactory
Constants ¶
const DeadGracePeriod = time.Minute * 10
DeadGracePeriod is the *extra* time that the bucket should be kept alive on top of the feature's Staleness. Bucket TTL = staleness + DeadGracePeriod
const ModelBuilder = "model"
const SourcelessBuilder = "sourceless"
Variables ¶
var ErrFeatureAlreadyExists = fmt.Errorf("feature already exists")
ErrFeatureAlreadyExists is returned when a feature is already registered in the Core's engine manager.
var ErrFeatureNotFound = fmt.Errorf("feature not found")
ErrFeatureNotFound is returned when a feature is not found in the Core's engine manager.
var ErrInvalidPipelineContext = fmt.Errorf("invalid pipeline context")
ErrInvalidPipelineContext is returned when the context is invalid for pipelining.
var ErrUnsupportedAggrError = fmt.Errorf("unsupported aggr")
ErrUnsupportedAggrError is returned when an aggregate function is not supported.
var ErrUnsupportedPrimitiveError = fmt.Errorf("unsupported primitive")
ErrUnsupportedPrimitiveError is returned when a primitive is not supported.
var FQNRegExp = regexp.MustCompile(`(?si)^((?P<namespace>[a-z0-9]+(?:_[a-z0-9]+)*)\.)?(?P<name>[a-z0-9]+(?:_[a-z0-9]+)*)(\+(?P<aggrFn>([a-z]+_*[a-z]+)))?(@-(?P<version>([0-9]+)))?(\[(?P<encoding>([a-z]+_*[a-z]+))])?$`)
Functions ¶
func AliveWindowBuckets ¶
AliveWindowBuckets returns a list of all the *valid* buckets up until now
func BucketDeadTime ¶
BucketDeadTime returns the end time of a given bucket by its name
func BucketName ¶
BucketName returns a bucket name for a given timestamp and a bucket size
func BucketTime ¶
BucketTime returns the start time of a given bucket by its name
func ContextWithSelector ¶
func DeadWindowBuckets ¶
DeadWindowBuckets returns a list of bucket names of *dead* bucket (bucket that is outside the window) that should be available
func LoggerFromContext ¶
LoggerFromContext returns the logger from the context. If not found it returns a discarded logger.
func NormalizeAny ¶
func NormalizeFQN ¶
NormalizeFQN returns an FQN with the namespace
func NormalizeSelector ¶
NormalizeSelector returns a selector with the default namespace if not specified
func ScalarFromString ¶
func ScalarFromString(val string, scalar PrimitiveType) (any, error)
func ScalarString ¶
func ToLowLevelValue ¶
func ToLowLevelValue[T LowLevelValue](v any) T
ToLowLevelValue returns the low level value of the feature
Types ¶
type AggrFn ¶
type AggrFn int
AggrFn is an aggregation function
func ParseSelector ¶
func StringToAggrFn ¶
func StringsToAggrFns ¶
type BindConfig ¶
BindConfig adds config flags for the plugin.
type CollectNotification ¶
type CollectNotifierFactory ¶
type CollectNotifierFactory NotifierFactory[CollectNotification]
type ContextKey ¶
type ContextKey int
ContextKey is a key to store data in context.
const ( // ContextKeyCachePostGet is a key to store the flag to cache in the storage postGet value. // If not set it is defaulting to true. ContextKeyCachePostGet ContextKey = iota // ContextKeyCacheFresh is a key to store the flag that indicate if the result from the cache was fresh. ContextKeyCacheFresh // ContextKeyFromCache is a key to store the flag to indicate if the value is from the cache. ContextKeyFromCache // ContextKeyLogger is a key to store a logger. ContextKeyLogger // ContextKeySelector is a key to store the requested Feature Selector. ContextKeySelector )
type DataSource ¶
type DataSource struct { FQN string `json:"fqn"` Kind string `json:"kind"` Config manifests.ParsedConfig `json:"config"` }
DataSource is a parsed abstracted representation of a manifests.DataSource
func DataSourceFromManifest ¶
func DataSourceFromManifest(ctx context.Context, src *manifests.DataSource, r client.Reader) (DataSource, error)
DataSourceFromManifest returns a DataSource from a manifests.DataSource
type DataSourceGetter ¶
type DataSourceGetter interface {
GetDataSource(FQN string) (DataSource, error)
}
DataSourceGetter is a simple interface that returns a DataSource
type DataSourceManager ¶
type DataSourceManager interface { BindDataSource(fd DataSource) error UnbindDataSource(FQN string) error HasDataSource(FQN string) bool }
DataSourceManager is managing DataSource(s) within Core It is responsible for maintaining the DataSource(s) in an internal store
type DataSourceReconcile ¶
type DataSourceReconcile func(ctx context.Context, rr DataSourceReconcileRequest) (changed bool, err error)
DataSourceReconcile is the interface to be implemented by plugins that want to be reconciled in the operator. This is useful for plugins that need to spawn an external Feature Ingestion.
It returns ture if the reconciliation has changed the object (and therefore the operator should re-queue).
type DataSourceReconcileRequest ¶
type DataSourceReconcileRequest struct { DataSource *manifests.DataSource RuntimeManager RuntimeManager Client client.Client Scheme *runtime.Scheme CoreAddress string }
DataSourceReconcileRequest contains metadata for the reconcile.
type Engine ¶
type Engine interface { // FeatureDescriptor returns the FeatureDescriptor for the given FQN FeatureDescriptor(ctx context.Context, selector string) (FeatureDescriptor, error) // Get returns the value for the given FQN and keys // If the feature is not available, it returns nil. // If the feature is windowed, the returned Value is a map from window function to Value. Get(ctx context.Context, selector string, keys Keys) (Value, FeatureDescriptor, error) // Set sets the raw value for the given FQN and keys // If the feature's primitive is a List, it replaces the entire list. // If the feature is windowed, it is aliased to WindowAdd instead of Set. Set(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error // Append appends to the raw value for the given FQN and keys // If the feature's primitive is NOT a List it will throw an error. Append(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error // Incr increments the raw value of the feature. // If the feature's primitive is NOT a Scalar it will throw an error. // It returns the updated value in the state, and an error if occurred. Incr(ctx context.Context, FQN string, keys Keys, by any, ts time.Time) error // Update is the common function to update a feature SimpleValue. // Under the hood, it utilizes lower-level functions depending on the type of the feature. // - Set for Scalars // - Append for Lists // - WindowAdd for Windows Update(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error }
Engine is the main engine of the Core It is responsible for the low-level operation for the features against the feature store
type ExtendedManager ¶
type ExtendedManager interface { Engine RuntimeManager DataSourceGetter }
ExtendedManager is an Engine that has a DataSource
type FeatureApply ¶
type FeatureApply func(fd FeatureDescriptor, builder manifests.FeatureBuilder, pl Pipeliner, src ExtendedManager) error
FeatureApply applies changes on the feature abstraction.
type FeatureDescriptor ¶
type FeatureDescriptor struct { FQN string `json:"FQN"` Primitive PrimitiveType `json:"primitive"` Aggr []AggrFn `json:"aggr"` Freshness time.Duration `json:"freshness"` Staleness time.Duration `json:"staleness"` Timeout time.Duration `json:"timeout"` KeepPrevious *KeepPrevious `json:"keep_previous"` Keys []string `json:"keys"` Builder string `json:"builder"` RuntimeEnv string `json:"runtimeEnv"` DataSource string `json:"data_source"` Dependencies []string `json:"dependencies"` }
FeatureDescriptor is describing a feature definition for an internal use of the Core.
func FeatureDescriptorFromManifest ¶
func FeatureDescriptorFromManifest(in *manifests.Feature) (*FeatureDescriptor, error)
FeatureDescriptorFromManifest returns a FeatureDescriptor from a manifests.Feature
func (FeatureDescriptor) ValidWindow ¶
func (fd FeatureDescriptor) ValidWindow() bool
ValidWindow checks if the feature have aggregation enabled, and if it is valid
type FeatureDescriptorGetter ¶
type FeatureDescriptorGetter func(ctx context.Context, FQN string) (FeatureDescriptor, error)
type FeatureManager ¶
type FeatureManager interface { BindFeature(in *manifests.Feature) error UnbindFeature(FQN string) error HasFeature(FQN string) bool }
FeatureManager is managing Feature(s) within Core It is responsible for managing features as well as operating on them
type HistoricalWriter ¶
type HistoricalWriterFactory ¶
type HistoricalWriterFactory func(viper *viper.Viper) (HistoricalWriter, error)
type KeepPrevious ¶
type LowLevelValue ¶
type LowLevelValue interface { ~int | ~string | ~float64 | time.Time | ~[]int | ~[]string | ~[]float64 | ~[]time.Time | WindowResultMap }
LowLevelValue is a low level value that can be cast to any type
type ManagerEngine ¶
type ManagerEngine interface { Logger FeatureManager DataSourceManager RuntimeManager Engine }
ManagerEngine is the business-logic implementation of the Core
type Middleware ¶
type Middleware func(next MiddlewareHandler) MiddlewareHandler
type MiddlewareHandler ¶
type ModelDescriptor ¶
type ModelReconcileRequest ¶
type ModelReconcileRequest struct { Model *manifests.Model Client client.Client Scheme *runtime.Scheme }
ModelReconcileRequest contains metadata for the reconcile.
type ModelServer ¶
type ModelServer interface { Reconcile(ctx context.Context, rr ModelReconcileRequest) (changed bool, err error) Owns() []client.Object Serve(ctx context.Context, fd FeatureDescriptor, md ModelDescriptor, val Value) (Value, error) }
ModelServer is the interface to be implemented by plugins that implements a Model Server.
type Notification ¶
type Notification interface { CollectNotification | WriteNotification }
type Notifier ¶
type Notifier[T Notification] interface { Notify(context.Context, T) error Subscribe(context.Context) (<-chan T, error) }
Notifier is the interface to be implemented by plugins that want to provide a Queue implementation The Queue is used to sync notifications between instances
type NotifierFactory ¶
type NotifierFactory[T Notification] func(viper *viper.Viper) (Notifier[T], error)
NotifierFactory is the interface to be implemented by plugins that implements Notifier.
type ParsedProgram ¶
type ParsedProgram struct { // Primitive is the primitive that this program is returning Primitive PrimitiveType // Dependencies is a list of FQNs that this program *might* be depended on Dependencies []string }
type Pipeliner ¶
type Pipeliner interface { AddPreGetMiddleware(priority int, fn Middleware) AddPostGetMiddleware(priority int, fn Middleware) AddPreSetMiddleware(priority int, fn Middleware) AddPostSetMiddleware(priority int, fn Middleware) }
Pipeliner is the interface that plugins can use to modify the Core's feature pipelines on creation time
type Plugins ¶
type Plugins interface { BindConfig | FeatureApply | DataSourceReconcile | StateFactory | CollectNotifierFactory | WriteNotifierFactory | HistoricalWriterFactory }
type PrimitiveType ¶
type PrimitiveType int
const ( PrimitiveTypeUnknown PrimitiveType = iota PrimitiveTypeString PrimitiveTypeInteger PrimitiveTypeFloat PrimitiveTypeBoolean PrimitiveTypeTimestamp PrimitiveTypeStringList PrimitiveTypeIntegerList PrimitiveTypeFloatList PrimitiveTypeBooleanList PrimitiveTypeTimestampList )
func StringToPrimitiveType ¶
func StringToPrimitiveType(s string) PrimitiveType
func TypeDetect ¶
func TypeDetect(t any) PrimitiveType
TypeDetect detects the PrimitiveType of the value.
func (PrimitiveType) Interface ¶
func (pt PrimitiveType) Interface() any
func (PrimitiveType) Plural ¶
func (pt PrimitiveType) Plural() PrimitiveType
func (PrimitiveType) Scalar ¶
func (pt PrimitiveType) Scalar() bool
func (PrimitiveType) Singular ¶
func (pt PrimitiveType) Singular() PrimitiveType
func (PrimitiveType) String ¶
func (pt PrimitiveType) String() string
type RawBucket ¶
type RawBucket struct { FQN string `json:"FQN"` Bucket string `json:"bucket"` EncodedKeys string `json:"encoded_keys"` Data WindowResultMap `json:"raw"` }
RawBucket is the data that is stored in the raw bucket.
type RawBuckets ¶
type RawBuckets []RawBucket
type RuntimeManager ¶
type RuntimeManager interface { // LoadProgram loads a program into the runtime. LoadProgram(env, fqn, program string, packages []string) (*ParsedProgram, error) // ExecuteProgram executes a program in the runtime. ExecuteProgram(ctx context.Context, env string, fqn string, keys Keys, row map[string]any, ts time.Time, dryRun bool) (value Value, keyz Keys, err error) // GetSidecars returns the sidecar containers attached to the current container. GetSidecars() []v1.Container // GetDefaultEnv returns the default environment for the current container. GetDefaultEnv() string }
type State ¶
type State interface { // Get returns the SimpleValue of the feature. // If the feature is not available, it returns nil. // If the feature is windowed, the returned SimpleValue is a map from window function to SimpleValue. // version indicates the previous version of the feature. If version is 0, the latest version is returned. Get(ctx context.Context, fd FeatureDescriptor, keys Keys, version uint) (*Value, error) // Set sets the SimpleValue of the feature. // If the feature's primitive is a List, it replaces the entire list. // If the feature is windowed, it is aliased to WindowAdd instead of Set. Set(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error // Append appends the SimpleValue to the feature. // If the feature's primitive is NOT a List it will throw an error. Append(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, ts time.Time) error // Incr increments the SimpleValue of the feature. // If the feature's primitive is NOT a Scalar it will throw an error. // It returns the updated value in the state, and an error if occurred. Incr(ctx context.Context, fd FeatureDescriptor, keys Keys, by any, timestamp time.Time) error // Update is the common function to update a feature SimpleValue. // Under the hood, it utilizes lower-level functions depending on the type of the feature. // - Set for Scalars // - Append for Lists // - WindowAdd for Windows Update(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error // WindowAdd adds a Bucket to the window that contains aggregated data internally // Later the bucket's aggregations should be aggregated for the whole Window via Get // // Buckets should last *at least* as long as the feature's staleness time + DeadGracePeriod WindowAdd(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error // WindowBuckets returns the list of RawBuckets for the feature and specific Keys. WindowBuckets(ctx context.Context, fd FeatureDescriptor, keys Keys, buckets []string) (RawBuckets, error) // DeadWindowBuckets returns the list of all the dead feature's RawBuckets of all the entities. DeadWindowBuckets(ctx context.Context, fd FeatureDescriptor, ignore RawBuckets) (RawBuckets, error) // Ping is a simple keepalive check for the state. // It should return an error in case an error occurred, or nil if everything is alright. Ping(ctx context.Context) error }
State is a feature state management layer
type StateFactory ¶
StateFactory is the interface to be implemented by plugins that implements storage providers.
type StateMethod ¶
type StateMethod int
StateMethod is a method that can be used with a State.
const ( StateMethodGet StateMethod = iota StateMethodSet StateMethodAppend StateMethodIncr StateMethodUpdate StateMethodWindowAdd )
func (StateMethod) String ¶
func (s StateMethod) String() string
type Value ¶
type Value struct { // Value can be cast to LowLevelValue using the ToLowLevelValue() method Value any `json:"value"` Timestamp time.Time `json:"timestamp"` Fresh bool `json:"fresh"` }
Value is storing a feature value.
type WindowResultMap ¶
WindowResultMap is a map of AggrFn and their aggregated results
type WriteNotification ¶
type WriteNotifierFactory ¶
type WriteNotifierFactory NotifierFactory[WriteNotification]
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
proto
|
|
gen/go
Module
|
|
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml
|
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml |