mgr

package
v0.0.0-...-c38d673 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusOK   = "ok"
	StatusBad  = "bad"
	StatusLost = "lost"
)
View Source
const (
	DefaultTryTimes = 3
	MetaTmp         = "meta_tmp/"
)
View Source
const (
	StatsShell = "stats"
	PREFIX     = "/logkit"
)
View Source
const (
	SpeedUp     = "up"
	SpeedDown   = "down"
	SpeedStable = "stable"

	RunnerRunning = "running"
	RunnerStopped = "stopped"
)
View Source
const DefaultMyTag = "default"
View Source
const KeyRouterConfig = "router"
View Source
const KeySendConfig = "senders"

Variables

View Source
var DEFAULT_LOGKIT_REST_DIR = "/.logkitconfs"
View Source
var DEFAULT_PORT = 3000
View Source
var DIR_NOT_EXIST_SLEEP_TIME = "300" //300 s
View Source
var KeySampleLog = "sampleLog"

Functions

func GetMySlaveUrl

func GetMySlaveUrl(address, schema string) (uri string, err error)

func MergeEnvTags

func MergeEnvTags(name string, tags map[string]interface{}) map[string]interface{}

MergeEnvTags 获取环境变量里的内容

func MergeExtraInfoTags

func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[string]interface{}

func ParseData

func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error)

parse模块中各种type的日志都能获取解析后的数据

func RawData

func RawData(readerConfig conf.MapConf) (string, error)

RawData 从 reader 模块中根据 type 获取字符串形式的样例日志

func Register

func Register(masters []string, myhost, tag string) error

func RespError

func RespError(c echo.Context, respCode int, errCode, errMsg string) error

func RespSuccess

func RespSuccess(c echo.Context, data interface{}) error

func SendData

func SendData(senderConfig map[string]interface{}) error

func TransformData

func TransformData(transformerConfig map[string]interface{}) ([]Data, error)

Types

type CleanInfo

type CleanInfo struct {
	// contains filtered or unexported fields
}

type Cluster

type Cluster struct {
	ClusterConfig
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(cc *ClusterConfig) *Cluster

func (*Cluster) AddSlave

func (cc *Cluster) AddSlave(url, tag string)

func (*Cluster) RunRegisterLoop

func (cc *Cluster) RunRegisterLoop() error

func (*Cluster) UpdateSlaveStatus

func (cc *Cluster) UpdateSlaveStatus()

type ClusterConfig

type ClusterConfig struct {
	MasterUrl []string `json:"master_url"`
	IsMaster  bool     `json:"is_master"`
	Enable    bool     `json:"enable"`
	Address   string   `json:"address"`
	Tag       string   `json:"tag"`
}

type ClusterStatus

type ClusterStatus struct {
	Status map[string]RunnerStatus `json:"status"`
	Tag    string                  `json:"tag"`
	Err    string                  `json:"error"`
}

type ErrorsList

type ErrorsList struct {
	ReadErrors      *ErrorQueue            `json:"read_errors"`
	ParseErrors     *ErrorQueue            `json:"parse_errors"`
	TransformErrors map[string]*ErrorQueue `json:"transform_errors"`
	SendErrors      map[string]*ErrorQueue `json:"send_errors"`
}

func (*ErrorsList) Clone

func (list *ErrorsList) Clone() *ErrorsList

Clone 返回当前 ErrorList 的完整拷贝,若无数据则会返回 nil

func (*ErrorsList) Reset

func (list *ErrorsList) Reset()

返回队列实际容量

func (*ErrorsList) Sort

func (list *ErrorsList) Sort() (dst ErrorsResult)

复制出一个顺序的 Errors

type ErrorsResult

type ErrorsResult struct {
	ReadErrors      []ErrorInfo            `json:"read_errors"`
	ParseErrors     []ErrorInfo            `json:"parse_errors"`
	TransformErrors map[string][]ErrorInfo `json:"transform_errors"`
	SendErrors      map[string][]ErrorInfo `json:"send_errors"`
}

type LogExportRunner

type LogExportRunner struct {
	RunnerInfo
	// contains filtered or unexported fields
}

func NewLogExportRunner

func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner *LogExportRunner, err error)

func NewLogExportRunnerWithService

func NewLogExportRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser,
	transformers []transforms.Transformer, senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner *LogExportRunner, err error)

func (*LogExportRunner) Cleaner

func (r *LogExportRunner) Cleaner() CleanInfo

func (*LogExportRunner) GetErrors

func (r *LogExportRunner) GetErrors() ErrorsResult

func (*LogExportRunner) LagStats

func (r *LogExportRunner) LagStats() (rl *LagInfo, err error)

func (*LogExportRunner) Name

func (r *LogExportRunner) Name() string

func (*LogExportRunner) Reset

func (r *LogExportRunner) Reset() (err error)

func (*LogExportRunner) Run

func (r *LogExportRunner) Run()

func (*LogExportRunner) Status

func (r *LogExportRunner) Status() (rs RunnerStatus)

func (*LogExportRunner) StatusBackup

func (r *LogExportRunner) StatusBackup()

func (*LogExportRunner) StatusRestore

func (r *LogExportRunner) StatusRestore()

func (*LogExportRunner) Stop

func (r *LogExportRunner) Stop()

Stop 清理所有使用到的资源, 等待10秒尝试读取完毕 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。 Parser 无状态,无需stop。

func (*LogExportRunner) TokenRefresh

func (r *LogExportRunner) TokenRefresh(tokens AuthTokens) error

type Manager

type Manager struct {
	ManagerConfig
	DefaultDir string

	Version    string
	SystemInfo string
	// contains filtered or unexported fields
}

func NewCustomManager

func NewCustomManager(conf ManagerConfig, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (*Manager, error)

func NewManager

func NewManager(conf ManagerConfig) (*Manager, error)

func (*Manager) Add

func (m *Manager) Add(confPath string)

func (*Manager) AddRunner

func (m *Manager) AddRunner(name string, conf RunnerConfig, createTime time.Time) (err error)

func (*Manager) Configs

func (m *Manager) Configs() (rss map[string]RunnerConfig)

func (*Manager) DeleteRunner

func (m *Manager) DeleteRunner(name string) (err error)

func (*Manager) Error

func (m *Manager) Error(name string) (rss ErrorsResult, err error)

func (*Manager) Errors

func (m *Manager) Errors() (rss map[string]ErrorsResult)

func (*Manager) ForkRunner

func (m *Manager) ForkRunner(confPath string, config RunnerConfig, returnOnErr bool) error

func (*Manager) IsRunning

func (m *Manager) IsRunning(confPath string) bool

func (*Manager) Remove

func (m *Manager) Remove(confPath string) (err error)

func (*Manager) RemoveWithConfig

func (m *Manager) RemoveWithConfig(confPath string, isDelete bool) (err error)

func (*Manager) ResetRunner

func (m *Manager) ResetRunner(name string) (err error)

ResetRunner 必须在runner实例存在下才可以reset, reset是调用runner本身的方法, 而runner stop实际上是销毁实例,所以先要启动runner

func (*Manager) RestoreWebDir

func (m *Manager) RestoreWebDir()

func (*Manager) StartRunner

func (m *Manager) StartRunner(name string) (err error)

func (*Manager) Status

func (m *Manager) Status() (rss map[string]RunnerStatus)

func (*Manager) Stop

func (m *Manager) Stop() error

func (*Manager) StopRunner

func (m *Manager) StopRunner(name string) (err error)

func (*Manager) UpdateRunner

func (m *Manager) UpdateRunner(name string, conf RunnerConfig) (err error)

func (*Manager) UpdateToken

func (m *Manager) UpdateToken(tokens []AuthTokens) (err error)

func (*Manager) Watch

func (m *Manager) Watch(confsPath []string) (err error)

type ManagerConfig

type ManagerConfig struct {
	BindHost string `json:"bind_host"`

	Idc          string        `json:"idc"`
	Zone         string        `json:"zone"`
	RestDir      string        `json:"rest_dir"`
	Cluster      ClusterConfig `json:"cluster"`
	DisableWeb   bool          `json:"disable_web"`
	ServerBackup bool          `json:"-"`
}

type PostParseRet

type PostParseRet struct {
	SamplePoints []Data `json:"SamplePoints"`
}

PostParseRet 返回值

type RegisterReq

type RegisterReq struct {
	Url string `json:"url"`
	Tag string `json:"tag"`
}

type RestService

type RestService struct {
	// contains filtered or unexported fields
}

func NewRestService

func NewRestService(mgr *Manager, router *echo.Echo) *RestService

func (*RestService) ClusterStatus

func (rs *RestService) ClusterStatus() echo.HandlerFunc

master API GET /logkit/cluster/status?tag=tagValue&url=urlValue

func (*RestService) DeleteClusterConfig

func (rs *RestService) DeleteClusterConfig() echo.HandlerFunc

DELETE /logkti/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) DeleteConfig

func (rs *RestService) DeleteConfig() echo.HandlerFunc

delete /logkit/configs/<name>

func (*RestService) DeleteSlaves

func (rs *RestService) DeleteSlaves() echo.HandlerFunc

DELETE /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) GetCleanerKeyOptions

func (rs *RestService) GetCleanerKeyOptions() echo.HandlerFunc

get /logkit/cleaner/options 获取解析选项

func (*RestService) GetClusterConfig

func (rs *RestService) GetClusterConfig() echo.HandlerFunc

master API Get /logkit/cluster/configs:name?tag=tagValue&url=urlValue

func (*RestService) GetClusterConfigs

func (rs *RestService) GetClusterConfigs() echo.HandlerFunc

master API Get /logkit/cluster/configs?tag=tagValue&url=urlValue

func (*RestService) GetClusterRunners

func (rs *RestService) GetClusterRunners() echo.HandlerFunc

master API GET /logkit/cluster/runners?tag=tagValue&url=urlValue

func (*RestService) GetConfig

func (rs *RestService) GetConfig() echo.HandlerFunc

get /logkit/configs/:name

func (*RestService) GetConfigs

func (rs *RestService) GetConfigs() echo.HandlerFunc

get /logkit/configs

func (*RestService) GetError

func (rs *RestService) GetError() echo.HandlerFunc

get /logkit/errors/<name>

func (*RestService) GetErrorCodeHumanize

func (rs *RestService) GetErrorCodeHumanize() echo.HandlerFunc

get /logkit/errorcode

func (*RestService) GetErrors

func (rs *RestService) GetErrors() echo.HandlerFunc

get /logkit/errors

func (*RestService) GetParserKeyOptions

func (rs *RestService) GetParserKeyOptions() echo.HandlerFunc

get /logkit/parser/options 获取解析选项

func (*RestService) GetParserSampleLogs

func (rs *RestService) GetParserSampleLogs() echo.HandlerFunc

get /logkit/parser/samplelogs 获取样例日志

func (*RestService) GetParserTooltips

func (rs *RestService) GetParserTooltips() echo.HandlerFunc

get /logkit/parser/tooltips 获取解析用途提示

func (*RestService) GetParserUsages

func (rs *RestService) GetParserUsages() echo.HandlerFunc

get /logkit/parser/usages 获得解析用途说明

func (*RestService) GetReaderKeyOptions

func (rs *RestService) GetReaderKeyOptions() echo.HandlerFunc

get /logkit/reader/options 获取Reader参数配置

func (*RestService) GetReaderTooltips

func (rs *RestService) GetReaderTooltips() echo.HandlerFunc

get /logkit/reader/tooltips 获取Reader用途提示

func (*RestService) GetReaderUsages

func (rs *RestService) GetReaderUsages() echo.HandlerFunc

get /logkit/reader/usages 获取Reader用途

func (*RestService) GetRunners

func (rs *RestService) GetRunners() echo.HandlerFunc

get /logkit/runners

func (*RestService) GetTransformerOptions

func (rs *RestService) GetTransformerOptions() echo.HandlerFunc

GET /logkit/transformer/options

func (*RestService) GetTransformerSampleConfigs

func (rs *RestService) GetTransformerSampleConfigs() echo.HandlerFunc

GET /logkit/transformer/sampleconfigs

func (*RestService) GetTransformerUsages

func (rs *RestService) GetTransformerUsages() echo.HandlerFunc

GET /logkit/transformer/usages

func (*RestService) GetVersion

func (rs *RestService) GetVersion() echo.HandlerFunc

func (*RestService) IsMaster

func (rs *RestService) IsMaster() echo.HandlerFunc

master API GET /logkit/cluster/ismaster

func (*RestService) Ping

func (rs *RestService) Ping() echo.HandlerFunc

master API GET /logkit/cluster/ping

func (*RestService) PostClusterConfig

func (rs *RestService) PostClusterConfig() echo.HandlerFunc

POST /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigReset

func (rs *RestService) PostClusterConfigReset() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/reset?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStart

func (rs *RestService) PostClusterConfigStart() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/start?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStop

func (rs *RestService) PostClusterConfigStop() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/stop?tag=tagValue&url=urlValue

func (*RestService) PostConfig

func (rs *RestService) PostConfig() echo.HandlerFunc

post /logkit/configs/<name>

func (*RestService) PostConfigReset

func (rs *RestService) PostConfigReset() echo.HandlerFunc

POST /logkit/configs/<name>/reset

func (*RestService) PostConfigStart

func (rs *RestService) PostConfigStart() echo.HandlerFunc

POST /logkit/configs/<name>/start

func (*RestService) PostConfigStop

func (rs *RestService) PostConfigStop() echo.HandlerFunc

POST /logkit/configs/<name>/stop

func (*RestService) PostParse

func (rs *RestService) PostParse() echo.HandlerFunc

post /logkit/parser/parse 接受解析请求

func (*RestService) PostParserCheck

func (rs *RestService) PostParserCheck() echo.HandlerFunc

POST /logkit/parser/check

func (*RestService) PostRead

func (rs *RestService) PostRead() echo.HandlerFunc

POST /logkit/reader/read 请求校验reader配置

func (*RestService) PostReaderCheck

func (rs *RestService) PostReaderCheck() echo.HandlerFunc

POST /logkit/reader/check 请求校验reader配置

func (*RestService) PostRegister

func (rs *RestService) PostRegister() echo.HandlerFunc

master API POST /logkit/cluster/register

func (*RestService) PostSlaveTag

func (rs *RestService) PostSlaveTag() echo.HandlerFunc

POST /logkit/cluster/slaves/tag?tag=tagValue&url=urlValue

func (*RestService) PostTag

func (rs *RestService) PostTag() echo.HandlerFunc

slave API POST /logkit/cluster/tag

func (*RestService) PostTransform

func (rs *RestService) PostTransform() echo.HandlerFunc

POST /logkit/transformer/transform Transform (multiple logs/single log) in (json array/json object) format with registered transformers Return result string in json array format

func (*RestService) PostTransformerCheck

func (rs *RestService) PostTransformerCheck() echo.HandlerFunc

POST /logkit/transformer/check

func (*RestService) PutClusterConfig

func (rs *RestService) PutClusterConfig() echo.HandlerFunc

PUT /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PutConfig

func (rs *RestService) PutConfig() echo.HandlerFunc

put /logkit/configs/<name>

func (*RestService) Register

func (rs *RestService) Register() error

func (*RestService) Slaves

func (rs *RestService) Slaves() echo.HandlerFunc

master API GET /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) Status

func (rs *RestService) Status() echo.HandlerFunc

get /logkit/status

func (*RestService) Stop

func (rs *RestService) Stop()

Stop will stop RestService

type Runner

type Runner interface {
	Name() string
	Run()
	Stop()
	Cleaner() CleanInfo
	Status() RunnerStatus
}

func NewCustomRunner

func NewCustomRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner Runner, err error)

func NewRunner

func NewRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal) (runner Runner, err error)

NewRunner 创建Runner

func NewRunnerWithService

func NewRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser, transformers []transforms.Transformer,
	senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner Runner, err error)

type RunnerConfig

type RunnerConfig struct {
	RunnerInfo
	SourceData string `json:"sourceData, omitempty"`
	// MetricConfig  []MetricConfig           `json:"metric,omitempty"`
	ReaderConfig  conf.MapConf             `json:"reader"`
	CleanerConfig conf.MapConf             `json:"cleaner,omitempty"`
	ParserConf    conf.MapConf             `json:"parser"`
	Transforms    []map[string]interface{} `json:"transforms,omitempty"`
	SendersConfig []conf.MapConf           `json:"senders"`
	Router        router.RouterConfig      `json:"router,omitempty"`
	IsInWebFolder bool                     `json:"web_folder,omitempty"`
	IsStopped     bool                     `json:"is_stopped,omitempty"`
	IsFromServer  bool                     `json:"from_server,omitempty"` // 判读是否从服务器拉取的配置
}

RunnerConfig 从多数据源读取,经过解析后,发往多个数据目的地

func Compatible

func Compatible(rc RunnerConfig) RunnerConfig

Compatible 用于新老配置的兼容

func TrimSecretInfo

func TrimSecretInfo(conf RunnerConfig, trimSk bool) RunnerConfig

TrimSecretInfo 将配置文件中的 token 等鉴权相关信息去掉

type RunnerErrors

type RunnerErrors interface {
	GetErrors() ErrorsResult
}

type RunnerInfo

type RunnerInfo struct {
	RunnerName       string `json:"name"`
	Note             string `json:"note,omitempty"`
	CollectInterval  int    `json:"collect_interval,omitempty"` // metric runner收集的频率
	MaxBatchLen      int    `json:"batch_len,omitempty"`        // 每个read batch的行数
	MaxBatchSize     int    `json:"batch_size,omitempty"`       // 每个read batch的字节数
	MaxBatchInterval int    `json:"batch_interval,omitempty"`   // 最大发送时间间隔
	MaxBatchTryTimes int    `json:"batch_try_times,omitempty"`  // 最大发送次数,小于等于0代表无限重试
	ErrorsListCap    int    `json:"errors_list_cap"`            // 记录错误信息的最大条数
	CreateTime       string `json:"createtime"`
	EnvTag           string `json:"env_tag,omitempty"`
	ExtraInfo        bool   `json:"extra_info"`
}

type RunnerStatus

type RunnerStatus struct {
	Name           string               `json:"name"`
	Logpath        string               `json:"logpath"`
	ReadDataSize   int64                `json:"readDataSize"`
	ReadDataCount  int64                `json:"readDataCount"`
	Elaspedtime    float64              `json:"elaspedtime"`
	Lag            LagInfo              `json:"lag"`
	ReaderStats    StatsInfo            `json:"readerStats"`
	ParserStats    StatsInfo            `json:"parserStats"`
	SenderStats    map[string]StatsInfo `json:"senderStats"`
	TransformStats map[string]StatsInfo `json:"transformStats"`
	Error          string               `json:"error,omitempty"`

	ReadSpeedKB      float64     `json:"readspeed_kb"`
	ReadSpeed        float64     `json:"readspeed"`
	ReadSpeedTrendKb string      `json:"readspeedtrend_kb"`
	ReadSpeedTrend   string      `json:"readspeedtrend"`
	RunningStatus    string      `json:"runningStatus"`
	Tag              string      `json:"tag,omitempty"`
	Url              string      `json:"url,omitempty"`
	HistoryErrors    *ErrorsList `json:"history_errors"`
	// contains filtered or unexported fields
}

RunnerStatus runner运行状态,添加字段请在clone函数中相应添加

func (*RunnerStatus) Clone

func (src *RunnerStatus) Clone() RunnerStatus

Clone 复制出一个完整的RunnerStatus

type Service

type Service struct {
	Prefix string
}

type Slave

type Slave struct {
	Url       string    `json:"url"`
	Tag       string    `json:"tag"`
	Status    string    `json:"status"`
	LastTouch time.Time `json:"last_touch"`
}

type SlaveConfig

type SlaveConfig struct {
	Configs map[string]RunnerConfig `json:"configs"`
	Tag     string                  `json:"tag"`
	Err     string                  `json:"error"`
}

type StatusPersistable

type StatusPersistable interface {
	StatusBackup()
	StatusRestore()
}

type TagReq

type TagReq struct {
	Tag string `json:"tag"`
}

type TokenRefreshable

type TokenRefreshable interface {
	TokenRefresh(AuthTokens) error
}

type Version

type Version struct {
	Version string `json:"version"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL