Documentation
¶
Index ¶
- Constants
- func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Schema) (array.Table, error)
- func GetFullFeatureName(featureViewName string, featureName string) string
- type FileLogStorage
- type Log
- type LoggingService
- func (s *LoggingService) EmitLog(l *Log) error
- func (l *LoggingService) GenerateLogs(featureService *model.FeatureService, ...) error
- func (s *LoggingService) GetFcos() (map[string]*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView, ...)
- func (s *LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush(t *time.Ticker)
- type MemoryBuffer
- type OfflineLogStorage
- type OfflineLogStoreConfig
- type Schema
Constants ¶
View Source
const DEFAULT_LOG_FLUSH_INTERVAL = 100 * time.Millisecond
View Source
const DEFAULT_LOG_INSERT_TIMEOUT = 20 * time.Millisecond
Variables ¶
This section is empty.
Functions ¶
func ConvertMemoryBufferToArrowTable ¶
func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Schema) (array.Table, error)
Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema and writes them to arrow table. Returns arrow table that contains all of the logs in columnar format.
func GetFullFeatureName ¶
Types ¶
type FileLogStorage ¶
type FileLogStorage struct {
// contains filtered or unexported fields
}
func NewFileOfflineStore ¶
func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error)
This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file and the parquet files will be cleaned up after the test is run.
func (*FileLogStorage) FlushToStorage ¶
func (f *FileLogStorage) FlushToStorage(tbl array.Table) error
type LoggingService ¶
type LoggingService struct {
// contains filtered or unexported fields
}
func NewLoggingService ¶
func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogProcessing bool) (*LoggingService, error)
func (*LoggingService) EmitLog ¶
func (s *LoggingService) EmitLog(l *Log) error
func (*LoggingService) GenerateLogs ¶
func (l *LoggingService) GenerateLogs(featureService *model.FeatureService, joinKeyToEntityValues map[string][]*types.Value, features []*serving.GetOnlineFeaturesResponse_FeatureVector, requestData map[string]*types.RepeatedValue, requestId string) error
func (*LoggingService) GetFcos ¶
func (s *LoggingService) GetFcos() (map[string]*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView, error)
func (*LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush ¶
func (s *LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush(t *time.Ticker)
Select that either ingests new logs that are added to the logging channel, one at a time to add to the in-memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval.
type MemoryBuffer ¶
type MemoryBuffer struct {
// contains filtered or unexported fields
}
type OfflineLogStorage ¶
type OfflineLogStorage interface { // Todo: Maybe we can add a must implement function that retrieves the correct config based on type FlushToStorage(array.Table) error }
func NewOfflineStore ¶
func NewOfflineStore(config *registry.RepoConfig) (OfflineLogStorage, error)
type OfflineLogStoreConfig ¶
type OfflineLogStoreConfig struct {
// contains filtered or unexported fields
}
func GetFileConfig ¶
func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error)
type Schema ¶
type Schema struct { Entities []string Features []string EntityTypes map[string]types.ValueType_Enum FeaturesTypes map[string]types.ValueType_Enum }
func GetSchemaFromFeatureService ¶
func GetSchemaFromFeatureService(featureService *model.FeatureService, entityMap map[string]*model.Entity, featureViews []*model.FeatureView, onDemandFeatureViews []*model.OnDemandFeatureView) (*Schema, error)
Click to show internal directories.
Click to hide internal directories.