worker

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2020 License: Apache-2.0 Imports: 19 Imported by: 6

Documentation

Index

Constants

View Source
const CACHED_DURATION = 60

cached时间周期

Variables

View Source
var ManagerConfig map[int64]*ConfigInfo
View Source
var ManagerJob map[string]*Job //管理job,文件路径为key
View Source
var ManagerJobLock *sync.RWMutex

Functions

func AlignStepTms

func AlignStepTms(step, tms int64) int64

时间戳向前对齐

func CleanLoop

func CleanLoop()

func GetCachedAll

func GetCachedAll() string

func GetLatestTmsAndDelay

func GetLatestTmsAndDelay(filepath string) (int64, int64, bool)

func Init

func Init(cfg WorkerSection)

func PostToCache

func PostToCache(paramPoints []*dataobj.MetricValue)

func PosterLoop

func PosterLoop()

循环推送,10s一次

func PushToCount

func PushToCount(Point *AnalysPoint) error

提供给Worker用来Push计算后的信息 需保证线程安全

func PusherLoop

func PusherLoop()

func PusherStart

func PusherStart()

func ToPushQueue

func ToPushQueue(strategy *stra.Strategy, tms int64, pointMap map[string]*PointCounter) error

这个参数是为了最大限度的对接 pointMap的key,是打平了的tagkv

func UpdateConfigsLoop

func UpdateConfigsLoop()

func Zeroize

func Zeroize()

Types

type AnalysPoint

type AnalysPoint struct {
	StrategyID int64
	Value      float64
	Tms        int64
	Tags       map[string]string
}

从worker往计算部分推的Point

type ConfigInfo

type ConfigInfo struct {
	Id       int64
	FilePath string
}

type GlobalCounter

type GlobalCounter struct {
	sync.RWMutex
	StrategyCounts map[int64]*StrategyCounter
}

全局counter对象, 以key为索引,索引每个策略的统计 key : Strategy ID

var GlobalCount *GlobalCounter

func (*GlobalCounter) AddStrategyCount

func (this *GlobalCounter) AddStrategyCount(st *stra.Strategy)

func (*GlobalCounter) GetIDs

func (this *GlobalCounter) GetIDs() []int64

func (*GlobalCounter) GetStrategyCountByID

func (this *GlobalCounter) GetStrategyCountByID(id int64) (*StrategyCounter, error)

func (*GlobalCounter) UpdateByStrategy

func (this *GlobalCounter) UpdateByStrategy(globalStras map[int64]*stra.Strategy)

只做更新和删除,添加 由数据驱动

type Job

type Job struct {
	// contains filtered or unexported fields
}

type PointCounter

type PointCounter struct {
	sync.RWMutex
	Count int64
	Sum   float64
	Max   float64
	Min   float64
}

统计的实体

func (*PointCounter) UpdateCnt

func (this *PointCounter) UpdateCnt()

func (*PointCounter) UpdateMaxMin

func (this *PointCounter) UpdateMaxMin(value float64)

func (*PointCounter) UpdateSum

func (this *PointCounter) UpdateSum(value float64)

type PointsCounter

type PointsCounter struct {
	sync.RWMutex
	TagstringMap map[string]*PointCounter
}

单策略下,单step的统计对象 以Sorted的tagkv的字符串来做索引

func (*PointsCounter) GetBytagstring

func (this *PointsCounter) GetBytagstring(tagstring string) (*PointCounter, error)

func (*PointsCounter) Update

func (this *PointsCounter) Update(tagstring string, value float64) error

type SortByTms

type SortByTms []*dataobj.MetricValue

func (SortByTms) Len

func (p SortByTms) Len() int

func (SortByTms) Less

func (p SortByTms) Less(i, j int) bool

func (SortByTms) Swap

func (p SortByTms) Swap(i, j int)

type StrategyCounter

type StrategyCounter struct {
	sync.RWMutex
	Strategy  *stra.Strategy           //Strategy结构体扔这里,以备不时之需
	TmsPoints map[int64]*PointsCounter //按照时间戳分类的分别的counter
}

单策略下的对象, 以step为索引, 索引每一个Step的统计 单step统计, 推送完则删

func (*StrategyCounter) AddTms

func (this *StrategyCounter) AddTms(tms int64) error

func (*StrategyCounter) DeleteTms

func (this *StrategyCounter) DeleteTms(tms int64)

func (*StrategyCounter) GetByTms

func (this *StrategyCounter) GetByTms(tms int64) (*PointsCounter, error)

func (*StrategyCounter) GetTmsList

func (this *StrategyCounter) GetTmsList() []int64

type Worker

type Worker struct {
	FilePath  string
	Counter   int64
	LatestTms int64 //正在处理的单条日志时间
	Delay     int64 //时间戳乱序差值, 每个worker独立更新
	Close     chan struct{}
	Stream    chan string
	Mark      string //标记该worker信息,方便打log及上报自监控指标, 追查问题
	Analyzing bool   //标记当前Worker状态是否在分析中,还是空闲状态
	Callback  callbackHandler
}

单个worker对象

func (*Worker) Start

func (w *Worker) Start()

func (*Worker) Stop

func (w *Worker) Stop()

func (*Worker) Work

func (w *Worker) Work()

type WorkerGroup

type WorkerGroup struct {
	WorkerNum          int
	LatestTms          int64 //日志文件最新处理的时间戳
	MaxDelay           int64 //日志文件存在的时间戳乱序最大差值
	ResetTms           int64 //maxDelay上次重置的时间
	Workers            []*Worker
	TimeFormatStrategy string
}

worker组

func NewWorkerGroup

func NewWorkerGroup(filePath string, stream chan string, st *stra.Strategy) *WorkerGroup

* filepath和stream依赖外部,其他的都自己创建

func (WorkerGroup) GetLatestTmsAndDelay

func (this WorkerGroup) GetLatestTmsAndDelay() (tms int64, delay int64)

func (*WorkerGroup) ResetMaxDelay

func (wg *WorkerGroup) ResetMaxDelay()

func (*WorkerGroup) SetLatestTmsAndDelay

func (this *WorkerGroup) SetLatestTmsAndDelay(tms int64, delay int64)

func (*WorkerGroup) Start

func (wg *WorkerGroup) Start()

func (*WorkerGroup) Stop

func (wg *WorkerGroup) Stop()

type WorkerSection

type WorkerSection struct {
	WorkerNum    int `yaml:"workerNum"`
	QueueSize    int `yaml:"queueSize"`
	PushInterval int `yaml:"pushInterval"`
	WaitPush     int `yaml:"waitPush"`
}
var WorkerConfig WorkerSection

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL