Documentation
¶
Index ¶
Constants ¶
View Source
const ( LOG_TIMESTAMP_FIELD = "__log_timestamp" LOG_DATE_FIELD = "__log_date" LOG_REQUEST_ID_FIELD = "__request_id" RECORD_SIZE = 1000 )
Variables ¶
View Source
var ( DefaultOptions = LoggingOptions{ ChannelCapacity: 100000, FlushInterval: 10 * time.Minute, WriteInterval: 10 * time.Second, EmitTimeout: 10 * time.Millisecond, } )
Functions ¶
This section is empty.
Types ¶
type DummyLoggerImpl ¶
type DummyLoggerImpl struct{}
func (*DummyLoggerImpl) Log ¶
func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
type FeatureServiceSchema ¶
type FeatureServiceSchema struct { JoinKeys []string Features []string RequestData []string JoinKeysTypes map[string]types.ValueType_Enum FeaturesTypes map[string]types.ValueType_Enum RequestDataTypes map[string]types.ValueType_Enum }
func GenerateSchemaFromFeatureService ¶
func GenerateSchemaFromFeatureService(fs FeatureStore, featureServiceName string) (*FeatureServiceSchema, error)
type FeatureStore ¶
type FeatureStore interface { GetFcosMap() (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error) GetFeatureService(name string) (*model.FeatureService, error) }
type FileLogSink ¶
type FileLogSink struct {
// contains filtered or unexported fields
}
func NewFileLogSink ¶
func NewFileLogSink(path string) (*FileLogSink, error)
FileLogSink is currently only used for testing. It will be instantiated during go unit tests to log to file and the parquet files will be cleaned up after the test is run.
func (*FileLogSink) Flush ¶
func (s *FileLogSink) Flush(featureServiceName string) error
type LogSink ¶
type LogSink interface { // Write is used to unload logs from memory buffer. // Logs are not guaranteed to be flushed to sink on this point. // The data can just be written to local disk (depending on implementation). Write(data []arrow.Record) error // Flush actually send data to a sink. // We want to control amount to interaction with sink, since it could be a costly operation. // Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day. Flush(featureServiceName string) error }
type Logger ¶
type Logger interface {
Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
}
type LoggerConfig ¶
type LoggerConfig struct { LoggingOptions SampleRate float32 }
func NewLoggerConfig ¶
func NewLoggerConfig(sampleRate float32, opts LoggingOptions) LoggerConfig
type LoggerImpl ¶
type LoggerImpl struct {
// contains filtered or unexported fields
}
func NewLogger ¶
func NewLogger(schema *FeatureServiceSchema, featureServiceName string, sink LogSink, config LoggerConfig) (*LoggerImpl, error)
func (*LoggerImpl) EmitLog ¶
func (l *LoggerImpl) EmitLog(log *Log) error
func (*LoggerImpl) Log ¶
func (l *LoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
func (*LoggerImpl) WaitUntilStopped ¶
func (l *LoggerImpl) WaitUntilStopped()
type LoggingOptions ¶
type LoggingOptions struct { // How many log items can be buffered in channel ChannelCapacity int // Waiting time when inserting new log into the channel EmitTimeout time.Duration // Interval on which logs buffered in memory will be written to sink WriteInterval time.Duration // Interval on which sink will be flushed // (see LogSink interface for better explanation on differences with Write) FlushInterval time.Duration }
type LoggingService ¶
type LoggingService struct {
// contains filtered or unexported fields
}
func NewLoggingService ¶
func NewLoggingService(fs FeatureStore, sink LogSink, opts ...LoggingOptions) (*LoggingService, error)
func (*LoggingService) GetOrCreateLogger ¶
func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService) (Logger, error)
func (*LoggingService) Stop ¶
func (s *LoggingService) Stop()
type MemoryBuffer ¶
type MemoryBuffer struct {
// contains filtered or unexported fields
}
func NewMemoryBuffer ¶ added in v0.22.0
func NewMemoryBuffer(schema *FeatureServiceSchema) (*MemoryBuffer, error)
func (*MemoryBuffer) Append ¶
func (b *MemoryBuffer) Append(log *Log) error
func (*MemoryBuffer) Compact ¶ added in v0.22.0
func (b *MemoryBuffer) Compact() error
type OfflineStoreSink ¶
type OfflineStoreSink struct {
// contains filtered or unexported fields
}
func NewOfflineStoreSink ¶
func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error)
func (*OfflineStoreSink) Flush ¶
func (s *OfflineStoreSink) Flush(featureServiceName string) error
Click to show internal directories.
Click to hide internal directories.