collecttask

package
v0.0.0-...-5c7ffcf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

README

介绍

管理采集配置

Documentation

Index

Constants

View Source
const (
	TargetLocalhost = "localhost"
	TargetPod       = "pod"
	TargetSlsShard  = "sls_shard"
	TargetNone      = "none"
	TargetContainer = "container"

	// ext
	TargetObNodeTenant = "ob_node_tenant"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketDO

type BucketDO struct {
	ID          int64 `gorm:"primarykey"`
	GmtCreate   time.Time
	GmtModified time.Time
	Name        string `gorm:"unique;"`
	State       string `gorm:"not null;"`
}

存储方案1: 使用sqlite存储, 每个 CollectTaskDO 对应一条记录, columns 大概有 bucket,task_key,config_key,target_key,data(是个二进制) 存储方案2: 使用boltdb存储, 每个buckets自己一个bucket(boltdb的概念), 以 task_key 作为 boltdb 的 key, 整个对象序列化后作为 bolt db 的 value; 但bolt年代比较久远了,不太活泼, 起码得用bbolt代替 存储方案3: 使用一般的持久化kv存储, 比如 badgerdb, 但它没有boltdb的bucket的概念(类似namespace), 因此所有buckets的数据是存在一起的

type BucketInfo

type BucketInfo struct {
	// contains filtered or unexported fields
}

type ChangeListener

type ChangeListener interface {
	// OnUpdate apply config delta to listener
	OnUpdate(*Delta)
}

type CollectConfig

type CollectConfig struct {
	Key  string `json:"key"`
	Type string `json:"type"`
	// 需要有一个版本号之类的东西, key相同的通过version判断新旧, 不用判断新旧, 只要版本不一样就直接替换
	Version string `json:"version"`
	// 站在reg的角度, 它就是给agent发 "一坨配置", 就是一堆二进制格式的数据
	// 怎么去解释这些数据是业务上的事情, 产品层与agent需要约定好内容格式
	Content []byte `json:"-"`
	// ContentObj is for internal usage
	ContentObj interface{} `json:"contentObj"`
}

描述一个采集配置

type CollectTarget

type CollectTarget struct {
	Key string `json:"key"`
	// localhost 其实现在只有 localhost/pod
	Type    string `json:"type"`
	Version string `json:"version"`
	// Meta的内容根据type解释
	Meta map[string]string `json:"meta"`
}

采集目标

func (*CollectTarget) GetApp

func (t *CollectTarget) GetApp() string

func (*CollectTarget) GetHostname

func (t *CollectTarget) GetHostname() string

func (*CollectTarget) GetIP

func (t *CollectTarget) GetIP() string

func (*CollectTarget) GetNamespace

func (t *CollectTarget) GetNamespace() string

func (*CollectTarget) GetPodName

func (t *CollectTarget) GetPodName() string

func (*CollectTarget) GetTenant

func (t *CollectTarget) GetTenant() string

func (*CollectTarget) IsTypeLocalhost

func (t *CollectTarget) IsTypeLocalhost() bool

func (*CollectTarget) IsTypePod

func (t *CollectTarget) IsTypePod() bool

type CollectTask

type CollectTask struct {
	// collect task key
	Key string `json:"key"`
	// collect task version
	Version string `json:"version"`
	// collect config
	Config *CollectConfig `json:"config"`
	// collect target
	Target *CollectTarget `json:"target"`
}

CollectTask = CollectConfig + CollectTarget

func (*CollectTask) IsDifferentWith

func (t *CollectTask) IsDifferentWith(o *CollectTask) bool

type CollectTaskDO

type CollectTaskDO struct {
	ID           int64 `gorm:"primarykey"`
	GmtCreate    time.Time
	GmtModified  time.Time
	Bucket       string `gorm:"index;"`
	Key          string `gorm:"unique;"`
	Version      string `gorm:""`
	CollectBytes []byte `gorm:""`
	TargetBytes  []byte `gorm:""`
}

type CommonResources

type CommonResources struct {
	Configs map[string]*CollectConfig
	Targets map[string]*CollectTarget
}

type Delta

type Delta struct {
	// 一次增量更新的uuid
	Uuid string
	Add  []*CollectTask
	Del  []*CollectTask
}

type IManager

type IManager interface {
	GetAll() []*CollectTask
	Listen(listener ChangeListener)
	RemoveListen(listener ChangeListener)
}

管理采集配置 1. 本地缓存 2. 从Registry缓存

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(rs *registry.Service, agentId string) (*Manager, error)

依赖 regsitry 服务 建议提供一个 AgentMetaService 供获取信息

func (*Manager) AddHttpFuncs

func (m *Manager) AddHttpFuncs()

func (*Manager) AddStaticTasks

func (m *Manager) AddStaticTasks(tasks ...*CollectTask)

func (*Manager) CheckTask

func (m *Manager) CheckTask(configKey, configVersion, targetKey, targetVersion string) int

func (*Manager) GetAll

func (m *Manager) GetAll() []*CollectTask

func (*Manager) InitLoad

func (m *Manager) InitLoad()

func (*Manager) Listen

func (m *Manager) Listen(listener ChangeListener)

Listen to config change

func (*Manager) MaybeSyncOnce

func (m *Manager) MaybeSyncOnce()

func (*Manager) RemoveListen

func (m *Manager) RemoveListen(listener ChangeListener)

func (*Manager) StartListen

func (m *Manager) StartListen()

func (*Manager) Stop

func (m *Manager) Stop()

type One

type One struct {
	One *CollectTask
}

func (*One) GetAll

func (f *One) GetAll() []*CollectTask

func (*One) Listen

func (f *One) Listen(listener ChangeListener)

func (*One) RemoveListen

func (f *One) RemoveListen(listener ChangeListener)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Collect Task Storage

func NewStorage

func NewStorage(path string) (*Storage, error)

func (*Storage) ApplyDelta

func (s *Storage) ApplyDelta() error

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) GetAll

func (s *Storage) GetAll() (map[string]*BucketInfo, error)

func (*Storage) Remove

func (s *Storage) Remove(bucket string) error

func (*Storage) Set

func (s *Storage) Set(bucket *BucketInfo) error

func (*Storage) SetBucketState

func (s *Storage) SetBucketState(bucket string, state string) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL