Documentation ¶
Index ¶
- Constants
- Variables
- func AlignStepTms(step, tms int64) int64
- func CleanLoop()
- func GetCachedAll() string
- func GetLatestTmsAndDelay(filepath string) (int64, int64, bool)
- func Init(cfg WorkerSection)
- func PostToCache(paramPoints []*dataobj.MetricValue)
- func PosterLoop()
- func PushToCount(Point *AnalysPoint) error
- func PusherLoop()
- func PusherStart()
- func ToPushQueue(strategy *stra.Strategy, tms int64, pointMap map[string]*PointCounter) error
- func UpdateConfigsLoop()
- func Zeroize()
- type AnalysPoint
- type ConfigInfo
- type GlobalCounter
- type Job
- type PointCounter
- type PointsCounter
- type SortByTms
- type StrategyCounter
- type Worker
- type WorkerGroup
- type WorkerSection
Constants ¶
View Source
const CACHED_DURATION = 60
cached时间周期
Variables ¶
View Source
var ManagerConfig map[int64]*ConfigInfo
View Source
var ManagerJob map[string]*Job //管理job,文件路径为key
View Source
var ManagerJobLock *sync.RWMutex
Functions ¶
func GetCachedAll ¶
func GetCachedAll() string
func Init ¶
func Init(cfg WorkerSection)
func PostToCache ¶
func PostToCache(paramPoints []*dataobj.MetricValue)
func PusherLoop ¶
func PusherLoop()
func PusherStart ¶
func PusherStart()
func ToPushQueue ¶
这个参数是为了最大限度的对接 pointMap的key,是打平了的tagkv
func UpdateConfigsLoop ¶
func UpdateConfigsLoop()
Types ¶
type AnalysPoint ¶
从worker往计算部分推的Point
type ConfigInfo ¶
type GlobalCounter ¶
type GlobalCounter struct { sync.RWMutex StrategyCounts map[int64]*StrategyCounter }
全局counter对象, 以key为索引,索引每个策略的统计 key : Strategy ID
var GlobalCount *GlobalCounter
func (*GlobalCounter) AddStrategyCount ¶
func (this *GlobalCounter) AddStrategyCount(st *stra.Strategy)
func (*GlobalCounter) GetIDs ¶
func (this *GlobalCounter) GetIDs() []int64
func (*GlobalCounter) GetStrategyCountByID ¶
func (this *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error)
func (*GlobalCounter) UpdateByStrategy ¶
func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy)
只做更新和删除,添加 由数据驱动
type PointCounter ¶
统计的实体
func (*PointCounter) UpdateCnt ¶
func (this *PointCounter) UpdateCnt()
func (*PointCounter) UpdateMaxMin ¶
func (this *PointCounter) UpdateMaxMin(value float64)
func (*PointCounter) UpdateSum ¶
func (this *PointCounter) UpdateSum(value float64)
type PointsCounter ¶
type PointsCounter struct { sync.RWMutex TagstringMap map[string]*PointCounter }
单策略下,单step的统计对象 以Sorted的tagkv的字符串来做索引
func (*PointsCounter) GetBytagstring ¶
func (this *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error)
type SortByTms ¶
type SortByTms []*dataobj.MetricValue
type StrategyCounter ¶
type StrategyCounter struct { sync.RWMutex Strategy *stra.Strategy //Strategy结构体扔这里,以备不时之需 TmsPoints map[int64]*PointsCounter //按照时间戳分类的分别的counter }
单策略下的对象, 以step为索引, 索引每一个Step的统计 单step统计, 推送完则删
func (*StrategyCounter) AddTms ¶
func (this *StrategyCounter) AddTms(tms int64) error
func (*StrategyCounter) DeleteTms ¶
func (this *StrategyCounter) DeleteTms(tms int64)
func (*StrategyCounter) GetByTms ¶
func (this *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error)
func (*StrategyCounter) GetTmsList ¶
func (this *StrategyCounter) GetTmsList() []int64
type Worker ¶
type Worker struct { FilePath string Counter int64 LatestTms int64 //正在处理的单条日志时间 Delay int64 //时间戳乱序差值, 每个worker独立更新 Close chan struct{} Stream chan string Mark string //标记该worker信息,方便打log及上报自监控指标, 追查问题 Analyzing bool //标记当前Worker状态是否在分析中,还是空闲状态 Callback callbackHandler }
单个worker对象
type WorkerGroup ¶
type WorkerGroup struct { WorkerNum int LatestTms int64 //日志文件最新处理的时间戳 MaxDelay int64 //日志文件存在的时间戳乱序最大差值 ResetTms int64 //maxDelay上次重置的时间 Workers []*Worker TimeFormatStrategy string }
worker组
func NewWorkerGroup ¶
func NewWorkerGroup(filePath string, stream chan string, st *stra.Strategy) *WorkerGroup
* filepath和stream依赖外部,其他的都自己创建
func (WorkerGroup) GetLatestTmsAndDelay ¶
func (this WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64)
func (*WorkerGroup) ResetMaxDelay ¶
func (wg *WorkerGroup) ResetMaxDelay()
func (*WorkerGroup) SetLatestTmsAndDelay ¶
func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64)
func (*WorkerGroup) Start ¶
func (wg *WorkerGroup) Start()
func (*WorkerGroup) Stop ¶
func (wg *WorkerGroup) Stop()
type WorkerSection ¶
type WorkerSection struct { WorkerNum int `yaml:"workerNum"` QueueSize int `yaml:"queueSize"` PushInterval int `yaml:"pushInterval"` WaitPush int `yaml:"waitPush"` }
var WorkerConfig WorkerSection
Click to show internal directories.
Click to hide internal directories.