flow_tag

package
v0.0.0-...-8d187f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WRITER_QUEUE_COUNT   = 1
	WRITER_QUEUE_SIZE    = 64 << 10
	WRITER_BATCH_SIZE    = 32 << 10
	WRITER_FLUSH_TIMEOUT = 10
)
View Source
const (
	FLOW_TAG_CACHE_INIT_SIZE = 1 << 14
	MIN_FLUSH_CACHE_TIMEOUT  = 60
	PROMETHEUS_KEYWORD       = "prometheus"
)
View Source
const (
	FLOW_TAG_DB = "flow_tag"
)

Variables

This section is empty.

Functions

func AppServiceTagColumns

func AppServiceTagColumns() []*ckdb.Column

func GenAppServiceTagCKTable

func GenAppServiceTagCKTable(cluster, storagePolicy, tableName, ckdbType string, ttl int, partition ckdb.TimeFuncType) *ckdb.Table

func ReleaseAppServiceTag

func ReleaseAppServiceTag(t *AppServiceTag)

func ReleaseFlowTag

func ReleaseFlowTag(t *FlowTag)

Types

type AppServiceCounter

type AppServiceCounter struct {
	CacheExpiredCount int64 `statsd:"cache-expired-count"`
	CacheAddCount     int64 `statsd:"cache-add-count"`
	CacheHitCount     int64 `statsd:"cache-hit-count"`
	CacheCount        int64 `statsd:"cache-count"`
}

type AppServiceTag

type AppServiceTag struct {
	Time        uint32 // s
	Table       string
	AppService  string
	AppInstance string
	TeamID      uint16
	OrgId       uint16
}

func AcquireAppServiceTag

func AcquireAppServiceTag() *AppServiceTag

func (*AppServiceTag) OrgID

func (t *AppServiceTag) OrgID() uint16

func (*AppServiceTag) Release

func (t *AppServiceTag) Release()

func (*AppServiceTag) WriteBlock

func (t *AppServiceTag) WriteBlock(block *ckdb.Block)

type AppServiceTagWriter

type AppServiceTagWriter struct {
	Cache             *lru.Cache[AppServiceTag, uint32]
	CacheFlushTimeout uint32
	CacheKeyBuf       AppServiceTag

	utils.Closable
	// contains filtered or unexported fields
}

func NewAppServiceTagWriter

func NewAppServiceTagWriter(
	decoderIndex int,
	db string,
	ttl int,
	partition ckdb.TimeFuncType,
	config *config.Config) (*AppServiceTagWriter, error)

func (*AppServiceTagWriter) GetCounter

func (w *AppServiceTagWriter) GetCounter() interface{}

func (*AppServiceTagWriter) Write

func (w *AppServiceTagWriter) Write(time uint32, table, appService, appInstance string, orgID, teamID uint16)

type Counter

type Counter struct {
	NewFieldCount        int64 `statsd:"new-field-count"`
	NewFieldValueCount   int64 `statsd:"new-field-value-count"`
	FieldCacheCount      int64 `statsd:"field-cache-count"`
	FieldValueCacheCount int64 `statsd:"field-value-cache-count"`
}

type FieldType

type FieldType uint8
const (
	FieldTag FieldType = iota
	FieldMetrics
)

func (FieldType) String

func (t FieldType) String() string

type FieldValueType

type FieldValueType uint8
const (
	FieldValueTypeAuto FieldValueType = iota
	FieldValueTypeString
	FieldValueTypeFloat
	FieldValueTypeInt
)

func (FieldValueType) String

func (t FieldValueType) String() string

type FlowTag

type FlowTag struct {
	pool.ReferenceCount
	TagType

	Timestamp uint32 // s
	FlowTagInfo
}

func AcquireFlowTag

func AcquireFlowTag(tagType TagType) *FlowTag

func (*FlowTag) Columns

func (t *FlowTag) Columns() []*ckdb.Column

func (*FlowTag) GenCKTable

func (t *FlowTag) GenCKTable(cluster, storagePolicy, tableName, ckdbType string, ttl int, partition ckdb.TimeFuncType) *ckdb.Table

func (*FlowTag) OrgID

func (t *FlowTag) OrgID() uint16

func (*FlowTag) Release

func (t *FlowTag) Release()

func (*FlowTag) WriteBlock

func (t *FlowTag) WriteBlock(block *ckdb.Block)

type FlowTagCache

type FlowTagCache struct {
	Id                          int
	FieldCache, FieldValueCache *lru.Cache[FlowTagInfo, uint32]
	CacheFlushTimeout           uint32

	// only for prometheus
	PrometheusFieldCache, PrometheusFieldValueCache *lru128.U128LRU

	// temporary buffers for generating new flow_tags
	FlowTagInfoBuffer FlowTagInfo
	Fields            []interface{}
	FieldValues       []interface{}
}

func NewFlowTagCache

func NewFlowTagCache(name string, id int, cacheFlushTimeout, cacheMaxSize uint32) *FlowTagCache

type FlowTagInfo

type FlowTagInfo struct {
	Table          string // Represents virtual_table_name in ext_metrics
	FieldName      string
	FieldValue     string
	FieldValueType FieldValueType
	VtapId         uint16

	// IDs only for prometheus
	TableId      uint32
	FieldNameId  uint32
	FieldValueId uint32

	VpcId     int32 // XXX: can use int16
	PodNsId   uint16
	FieldType FieldType

	// Not stored, only determines which database to store in.
	// When Orgid is 0 or 1, it is stored in database 'flow_tag', otherwise stored in '<OrgId>_flow_tag'.
	OrgId  uint16
	TeamID uint16
}

This structure will be used as a map key, and it is hoped to be as compact as possible in terms of memory layout. In addition, in order to distinguish as early as possible when comparing two values, put the highly distinguishable fields at the front.

type FlowTagWriter

type FlowTagWriter struct {
	Cache *FlowTagCache

	utils.Closable
	// contains filtered or unexported fields
}

func NewFlowTagWriter

func NewFlowTagWriter(
	decoderIndex int,
	name string,
	srcDB string,
	ttl int,
	partition ckdb.TimeFuncType,
	config *config.Config,
	writerConfig *config.CKWriterConfig) (*FlowTagWriter, error)

func (*FlowTagWriter) GetCounter

func (w *FlowTagWriter) GetCounter() interface{}

func (*FlowTagWriter) Write

func (w *FlowTagWriter) Write(t TagType, values ...interface{})

func (*FlowTagWriter) WriteFieldsAndFieldValuesInCache

func (w *FlowTagWriter) WriteFieldsAndFieldValuesInCache()

type TagType

type TagType uint8
const (
	TagField TagType = iota
	TagFieldValue
	TagTypeMax
)

func (TagType) String

func (t TagType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL