logging

package
v0.38.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LOG_TIMESTAMP_FIELD  = "__log_timestamp"
	LOG_DATE_FIELD       = "__log_date"
	LOG_REQUEST_ID_FIELD = "__request_id"
	RECORD_SIZE          = 1000
)

Variables

View Source
var (
	DefaultOptions = LoggingOptions{
		ChannelCapacity: 100000,
		FlushInterval:   10 * time.Minute,
		WriteInterval:   10 * time.Second,
		EmitTimeout:     10 * time.Millisecond,
	}
)

Functions

This section is empty.

Types

type DummyLoggerImpl

type DummyLoggerImpl struct{}

func (*DummyLoggerImpl) Log

func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error

type FeatureServiceSchema

type FeatureServiceSchema struct {
	JoinKeys    []string
	Features    []string
	RequestData []string

	JoinKeysTypes    map[string]types.ValueType_Enum
	FeaturesTypes    map[string]types.ValueType_Enum
	RequestDataTypes map[string]types.ValueType_Enum
}

func GenerateSchemaFromFeatureService

func GenerateSchemaFromFeatureService(fs FeatureStore, featureServiceName string) (*FeatureServiceSchema, error)

type FeatureStore

type FeatureStore interface {
	GetFcosMap() (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error)
	GetFeatureService(name string) (*model.FeatureService, error)
}

type FileLogSink

type FileLogSink struct {
	// contains filtered or unexported fields
}

func NewFileLogSink

func NewFileLogSink(path string) (*FileLogSink, error)

FileLogSink is currently only used for testing. It will be instantiated during go unit tests to log to file and the parquet files will be cleaned up after the test is run.

func (*FileLogSink) Flush

func (s *FileLogSink) Flush(featureServiceName string) error

func (*FileLogSink) Write

func (s *FileLogSink) Write(records []arrow.Record) error

type Log

type Log struct {
	// Example: val{int64_val: 5017}, val{int64_val: 1003}
	EntityValue []*types.Value
	RequestData []*types.Value

	FeatureValues   []*types.Value
	FeatureStatuses []serving.FieldStatus
	EventTimestamps []*timestamppb.Timestamp

	RequestId    string
	LogTimestamp time.Time
}

type LogSink

type LogSink interface {
	// Write is used to unload logs from memory buffer.
	// Logs are not guaranteed to be flushed to sink on this point.
	// The data can just be written to local disk (depending on implementation).
	Write(data []arrow.Record) error

	// Flush actually send data to a sink.
	// We want to control amount to interaction with sink, since it could be a costly operation.
	// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
	Flush(featureServiceName string) error
}

type Logger

type Logger interface {
	Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
}

type LoggerConfig

type LoggerConfig struct {
	LoggingOptions

	SampleRate float32
}

func NewLoggerConfig

func NewLoggerConfig(sampleRate float32, opts LoggingOptions) LoggerConfig

type LoggerImpl

type LoggerImpl struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(schema *FeatureServiceSchema, featureServiceName string, sink LogSink, config LoggerConfig) (*LoggerImpl, error)

func (*LoggerImpl) EmitLog

func (l *LoggerImpl) EmitLog(log *Log) error

func (*LoggerImpl) Log

func (l *LoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error

func (*LoggerImpl) Stop

func (l *LoggerImpl) Stop()

Stop the loop goroutine gracefully

func (*LoggerImpl) WaitUntilStopped

func (l *LoggerImpl) WaitUntilStopped()

type LoggingOptions

type LoggingOptions struct {
	// How many log items can be buffered in channel
	ChannelCapacity int

	// Waiting time when inserting new log into the channel
	EmitTimeout time.Duration

	// Interval on which logs buffered in memory will be written to sink
	WriteInterval time.Duration

	// Interval on which sink will be flushed
	// (see LogSink interface for better explanation on differences with Write)
	FlushInterval time.Duration
}

type LoggingService

type LoggingService struct {
	// contains filtered or unexported fields
}

func NewLoggingService

func NewLoggingService(fs FeatureStore, sink LogSink, opts ...LoggingOptions) (*LoggingService, error)

func (*LoggingService) GetOrCreateLogger

func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService) (Logger, error)

func (*LoggingService) Stop

func (s *LoggingService) Stop()

type MemoryBuffer

type MemoryBuffer struct {
	// contains filtered or unexported fields
}

func NewMemoryBuffer added in v0.22.0

func NewMemoryBuffer(schema *FeatureServiceSchema) (*MemoryBuffer, error)

func (*MemoryBuffer) Append

func (b *MemoryBuffer) Append(log *Log) error

func (*MemoryBuffer) Compact added in v0.22.0

func (b *MemoryBuffer) Compact() error

type OfflineStoreSink

type OfflineStoreSink struct {
	// contains filtered or unexported fields
}

func NewOfflineStoreSink

func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error)

func (*OfflineStoreSink) Flush

func (s *OfflineStoreSink) Flush(featureServiceName string) error

func (*OfflineStoreSink) Write

func (s *OfflineStoreSink) Write(records []arrow.Record) error

type OfflineStoreWriteCallback

type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL