logging

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const DEFAULT_LOG_FLUSH_INTERVAL = 100 * time.Millisecond
View Source
const DEFAULT_LOG_INSERT_TIMEOUT = 20 * time.Millisecond

Variables

This section is empty.

Functions

func ConvertMemoryBufferToArrowTable

func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Schema) (array.Table, error)

Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema and writes them to arrow table. Returns arrow table that contains all of the logs in columnar format.

func GetFullFeatureName

func GetFullFeatureName(featureViewName string, featureName string) string

Types

type FileLogStorage

type FileLogStorage struct {
	// contains filtered or unexported fields
}

func NewFileOfflineStore

func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error)

This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file and the parquet files will be cleaned up after the test is run.

func (*FileLogStorage) FlushToStorage

func (f *FileLogStorage) FlushToStorage(tbl array.Table) error

type Log

type Log struct {
	// Example: val{int64_val: 5017}, val{int64_val: 1003}
	EntityValue []*types.Value

	FeatureValues   []*types.Value
	FeatureStatuses []serving.FieldStatus
	EventTimestamps []*timestamppb.Timestamp
	RequestContext  map[string]*types.Value
	RequestId       string
}

type LoggingService

type LoggingService struct {
	// contains filtered or unexported fields
}

func NewLoggingService

func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogProcessing bool) (*LoggingService, error)

func (*LoggingService) EmitLog

func (s *LoggingService) EmitLog(l *Log) error

func (*LoggingService) GenerateLogs

func (l *LoggingService) GenerateLogs(featureService *model.FeatureService, joinKeyToEntityValues map[string][]*types.Value, features []*serving.GetOnlineFeaturesResponse_FeatureVector, requestData map[string]*types.RepeatedValue, requestId string) error

func (*LoggingService) GetFcos

func (*LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush

func (s *LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush(t *time.Ticker)

Select that either ingests new logs that are added to the logging channel, one at a time to add to the in-memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval.

type MemoryBuffer

type MemoryBuffer struct {
	// contains filtered or unexported fields
}

type OfflineLogStorage

type OfflineLogStorage interface {
	// Todo: Maybe we can add a must implement function that retrieves the correct config based on type
	FlushToStorage(array.Table) error
}

func NewOfflineStore

func NewOfflineStore(config *registry.RepoConfig) (OfflineLogStorage, error)

type OfflineLogStoreConfig

type OfflineLogStoreConfig struct {
	// contains filtered or unexported fields
}

func GetFileConfig

func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error)

type Schema

type Schema struct {
	Entities      []string
	Features      []string
	EntityTypes   map[string]types.ValueType_Enum
	FeaturesTypes map[string]types.ValueType_Enum
}

func GetSchemaFromFeatureService

func GetSchemaFromFeatureService(featureService *model.FeatureService, entityMap map[string]*model.Entity, featureViews []*model.FeatureView, onDemandFeatureViews []*model.OnDemandFeatureView) (*Schema, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL