Documentation ¶
Index ¶
- Constants
- Variables
- func GetMySlaveUrl(address, schema string) (uri string, err error)
- func MergeEnvTags(name string, tags map[string]interface{}) map[string]interface{}
- func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[string]interface{}
- func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error)
- func RawData(readerConfig conf.MapConf) (string, error)
- func Register(masters []string, myhost, tag string) error
- func RespError(c echo.Context, respCode int, errCode, errMsg string) error
- func RespSuccess(c echo.Context, data interface{}) error
- func SendData(senderConfig map[string]interface{}) error
- func TransformData(transformerConfig map[string]interface{}) ([]Data, error)
- type CleanInfo
- type Cluster
- type ClusterConfig
- type ClusterStatus
- type ErrorsList
- type ErrorsResult
- type LogExportRunner
- func (r *LogExportRunner) Cleaner() CleanInfo
- func (r *LogExportRunner) GetErrors() ErrorsResult
- func (r *LogExportRunner) LagStats() (rl *LagInfo, err error)
- func (r *LogExportRunner) Name() string
- func (r *LogExportRunner) Reset() (err error)
- func (r *LogExportRunner) Run()
- func (r *LogExportRunner) Status() (rs RunnerStatus)
- func (r *LogExportRunner) StatusBackup()
- func (r *LogExportRunner) StatusRestore()
- func (r *LogExportRunner) Stop()
- func (r *LogExportRunner) TokenRefresh(tokens AuthTokens) error
- type Manager
- func (m *Manager) Add(confPath string)
- func (m *Manager) AddRunner(name string, conf RunnerConfig, createTime time.Time) (err error)
- func (m *Manager) Configs() (rss map[string]RunnerConfig)
- func (m *Manager) DeleteRunner(name string) (err error)
- func (m *Manager) Error(name string) (rss ErrorsResult, err error)
- func (m *Manager) Errors() (rss map[string]ErrorsResult)
- func (m *Manager) ForkRunner(confPath string, config RunnerConfig, returnOnErr bool) error
- func (m *Manager) IsRunning(confPath string) bool
- func (m *Manager) Remove(confPath string) (err error)
- func (m *Manager) RemoveWithConfig(confPath string, isDelete bool) (err error)
- func (m *Manager) ResetRunner(name string) (err error)
- func (m *Manager) RestoreWebDir()
- func (m *Manager) StartRunner(name string) (err error)
- func (m *Manager) Status() (rss map[string]RunnerStatus)
- func (m *Manager) Stop() error
- func (m *Manager) StopRunner(name string) (err error)
- func (m *Manager) UpdateRunner(name string, conf RunnerConfig) (err error)
- func (m *Manager) UpdateToken(tokens []AuthTokens) (err error)
- func (m *Manager) Watch(confsPath []string) (err error)
- type ManagerConfig
- type PostParseRet
- type RegisterReq
- type RestService
- func (rs *RestService) ClusterStatus() echo.HandlerFunc
- func (rs *RestService) DeleteClusterConfig() echo.HandlerFunc
- func (rs *RestService) DeleteConfig() echo.HandlerFunc
- func (rs *RestService) DeleteSlaves() echo.HandlerFunc
- func (rs *RestService) GetCleanerKeyOptions() echo.HandlerFunc
- func (rs *RestService) GetClusterConfig() echo.HandlerFunc
- func (rs *RestService) GetClusterConfigs() echo.HandlerFunc
- func (rs *RestService) GetClusterRunners() echo.HandlerFunc
- func (rs *RestService) GetConfig() echo.HandlerFunc
- func (rs *RestService) GetConfigs() echo.HandlerFunc
- func (rs *RestService) GetError() echo.HandlerFunc
- func (rs *RestService) GetErrorCodeHumanize() echo.HandlerFunc
- func (rs *RestService) GetErrors() echo.HandlerFunc
- func (rs *RestService) GetParserKeyOptions() echo.HandlerFunc
- func (rs *RestService) GetParserSampleLogs() echo.HandlerFunc
- func (rs *RestService) GetParserTooltips() echo.HandlerFunc
- func (rs *RestService) GetParserUsages() echo.HandlerFunc
- func (rs *RestService) GetReaderKeyOptions() echo.HandlerFunc
- func (rs *RestService) GetReaderTooltips() echo.HandlerFunc
- func (rs *RestService) GetReaderUsages() echo.HandlerFunc
- func (rs *RestService) GetRunners() echo.HandlerFunc
- func (rs *RestService) GetTransformerOptions() echo.HandlerFunc
- func (rs *RestService) GetTransformerSampleConfigs() echo.HandlerFunc
- func (rs *RestService) GetTransformerUsages() echo.HandlerFunc
- func (rs *RestService) GetVersion() echo.HandlerFunc
- func (rs *RestService) IsMaster() echo.HandlerFunc
- func (rs *RestService) Ping() echo.HandlerFunc
- func (rs *RestService) PostClusterConfig() echo.HandlerFunc
- func (rs *RestService) PostClusterConfigReset() echo.HandlerFunc
- func (rs *RestService) PostClusterConfigStart() echo.HandlerFunc
- func (rs *RestService) PostClusterConfigStop() echo.HandlerFunc
- func (rs *RestService) PostConfig() echo.HandlerFunc
- func (rs *RestService) PostConfigReset() echo.HandlerFunc
- func (rs *RestService) PostConfigStart() echo.HandlerFunc
- func (rs *RestService) PostConfigStop() echo.HandlerFunc
- func (rs *RestService) PostParse() echo.HandlerFunc
- func (rs *RestService) PostParserCheck() echo.HandlerFunc
- func (rs *RestService) PostRead() echo.HandlerFunc
- func (rs *RestService) PostReaderCheck() echo.HandlerFunc
- func (rs *RestService) PostRegister() echo.HandlerFunc
- func (rs *RestService) PostSlaveTag() echo.HandlerFunc
- func (rs *RestService) PostTag() echo.HandlerFunc
- func (rs *RestService) PostTransform() echo.HandlerFunc
- func (rs *RestService) PostTransformerCheck() echo.HandlerFunc
- func (rs *RestService) PutClusterConfig() echo.HandlerFunc
- func (rs *RestService) PutConfig() echo.HandlerFunc
- func (rs *RestService) Register() error
- func (rs *RestService) Slaves() echo.HandlerFunc
- func (rs *RestService) Status() echo.HandlerFunc
- func (rs *RestService) Stop()
- type Runner
- func NewCustomRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, ...) (runner Runner, err error)
- func NewRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal) (runner Runner, err error)
- func NewRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, ...) (runner Runner, err error)
- type RunnerConfig
- type RunnerErrors
- type RunnerInfo
- type RunnerStatus
- type Service
- type Slave
- type SlaveConfig
- type StatusPersistable
- type TagReq
- type TokenRefreshable
- type Version
Constants ¶
const ( StatusOK = "ok" StatusBad = "bad" StatusLost = "lost" )
const ( DefaultTryTimes = 3 MetaTmp = "meta_tmp/" )
const ( StatsShell = "stats" PREFIX = "/logkit" )
const ( SpeedUp = "up" SpeedDown = "down" SpeedStable = "stable" RunnerRunning = "running" RunnerStopped = "stopped" )
const DefaultMyTag = "default"
const KeyRouterConfig = "router"
const KeySendConfig = "senders"
Variables ¶
var DEFAULT_LOGKIT_REST_DIR = "/.logkitconfs"
var DEFAULT_PORT = 3000
var DIR_NOT_EXIST_SLEEP_TIME = "300" //300 s
var KeySampleLog = "sampleLog"
Functions ¶
func GetMySlaveUrl ¶
func MergeEnvTags ¶
MergeEnvTags 获取环境变量里的内容
func MergeExtraInfoTags ¶
func RespSuccess ¶
func TransformData ¶
Types ¶
type Cluster ¶
type Cluster struct { ClusterConfig // contains filtered or unexported fields }
func NewCluster ¶
func NewCluster(cc *ClusterConfig) *Cluster
func (*Cluster) RunRegisterLoop ¶
func (*Cluster) UpdateSlaveStatus ¶
func (cc *Cluster) UpdateSlaveStatus()
type ClusterConfig ¶
type ClusterStatus ¶
type ClusterStatus struct { Status map[string]RunnerStatus `json:"status"` Tag string `json:"tag"` Err string `json:"error"` }
type ErrorsList ¶
type ErrorsList struct { ReadErrors *ErrorQueue `json:"read_errors"` ParseErrors *ErrorQueue `json:"parse_errors"` TransformErrors map[string]*ErrorQueue `json:"transform_errors"` SendErrors map[string]*ErrorQueue `json:"send_errors"` }
func (*ErrorsList) Clone ¶
func (list *ErrorsList) Clone() *ErrorsList
Clone 返回当前 ErrorList 的完整拷贝,若无数据则会返回 nil
type ErrorsResult ¶
type LogExportRunner ¶
type LogExportRunner struct { RunnerInfo // contains filtered or unexported fields }
func NewLogExportRunner ¶
func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner *LogExportRunner, err error)
func NewLogExportRunnerWithService ¶
func NewLogExportRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser, transformers []transforms.Transformer, senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner *LogExportRunner, err error)
func (*LogExportRunner) Cleaner ¶
func (r *LogExportRunner) Cleaner() CleanInfo
func (*LogExportRunner) GetErrors ¶
func (r *LogExportRunner) GetErrors() ErrorsResult
func (*LogExportRunner) LagStats ¶
func (r *LogExportRunner) LagStats() (rl *LagInfo, err error)
func (*LogExportRunner) Name ¶
func (r *LogExportRunner) Name() string
func (*LogExportRunner) Reset ¶
func (r *LogExportRunner) Reset() (err error)
func (*LogExportRunner) Run ¶
func (r *LogExportRunner) Run()
func (*LogExportRunner) Status ¶
func (r *LogExportRunner) Status() (rs RunnerStatus)
func (*LogExportRunner) StatusBackup ¶
func (r *LogExportRunner) StatusBackup()
func (*LogExportRunner) StatusRestore ¶
func (r *LogExportRunner) StatusRestore()
func (*LogExportRunner) Stop ¶
func (r *LogExportRunner) Stop()
Stop 清理所有使用到的资源, 等待10秒尝试读取完毕 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。 Parser 无状态,无需stop。
func (*LogExportRunner) TokenRefresh ¶
func (r *LogExportRunner) TokenRefresh(tokens AuthTokens) error
type Manager ¶
type Manager struct { ManagerConfig DefaultDir string Version string SystemInfo string // contains filtered or unexported fields }
func NewCustomManager ¶
func NewManager ¶
func NewManager(conf ManagerConfig) (*Manager, error)
func (*Manager) Configs ¶
func (m *Manager) Configs() (rss map[string]RunnerConfig)
func (*Manager) DeleteRunner ¶
func (*Manager) Errors ¶
func (m *Manager) Errors() (rss map[string]ErrorsResult)
func (*Manager) ForkRunner ¶
func (m *Manager) ForkRunner(confPath string, config RunnerConfig, returnOnErr bool) error
func (*Manager) RemoveWithConfig ¶
func (*Manager) ResetRunner ¶
ResetRunner 必须在runner实例存在下才可以reset, reset是调用runner本身的方法, 而runner stop实际上是销毁实例,所以先要启动runner
func (*Manager) RestoreWebDir ¶
func (m *Manager) RestoreWebDir()
func (*Manager) StartRunner ¶
func (*Manager) Status ¶
func (m *Manager) Status() (rss map[string]RunnerStatus)
func (*Manager) StopRunner ¶
func (*Manager) UpdateRunner ¶
func (m *Manager) UpdateRunner(name string, conf RunnerConfig) (err error)
func (*Manager) UpdateToken ¶
type ManagerConfig ¶
type PostParseRet ¶
type PostParseRet struct {
SamplePoints []Data `json:"SamplePoints"`
}
PostParseRet 返回值
type RegisterReq ¶
type RestService ¶
type RestService struct {
// contains filtered or unexported fields
}
func NewRestService ¶
func NewRestService(mgr *Manager, router *echo.Echo) *RestService
func (*RestService) ClusterStatus ¶
func (rs *RestService) ClusterStatus() echo.HandlerFunc
master API GET /logkit/cluster/status?tag=tagValue&url=urlValue
func (*RestService) DeleteClusterConfig ¶
func (rs *RestService) DeleteClusterConfig() echo.HandlerFunc
DELETE /logkti/cluster/configs/<name>?tag=tagValue&url=urlValue
func (*RestService) DeleteConfig ¶
func (rs *RestService) DeleteConfig() echo.HandlerFunc
delete /logkit/configs/<name>
func (*RestService) DeleteSlaves ¶
func (rs *RestService) DeleteSlaves() echo.HandlerFunc
DELETE /logkit/cluster/slaves?tag=tagValue&url=urlValue
func (*RestService) GetCleanerKeyOptions ¶
func (rs *RestService) GetCleanerKeyOptions() echo.HandlerFunc
get /logkit/cleaner/options 获取解析选项
func (*RestService) GetClusterConfig ¶
func (rs *RestService) GetClusterConfig() echo.HandlerFunc
master API Get /logkit/cluster/configs:name?tag=tagValue&url=urlValue
func (*RestService) GetClusterConfigs ¶
func (rs *RestService) GetClusterConfigs() echo.HandlerFunc
master API Get /logkit/cluster/configs?tag=tagValue&url=urlValue
func (*RestService) GetClusterRunners ¶
func (rs *RestService) GetClusterRunners() echo.HandlerFunc
master API GET /logkit/cluster/runners?tag=tagValue&url=urlValue
func (*RestService) GetConfig ¶
func (rs *RestService) GetConfig() echo.HandlerFunc
get /logkit/configs/:name
func (*RestService) GetConfigs ¶
func (rs *RestService) GetConfigs() echo.HandlerFunc
get /logkit/configs
func (*RestService) GetError ¶
func (rs *RestService) GetError() echo.HandlerFunc
get /logkit/errors/<name>
func (*RestService) GetErrorCodeHumanize ¶
func (rs *RestService) GetErrorCodeHumanize() echo.HandlerFunc
get /logkit/errorcode
func (*RestService) GetErrors ¶
func (rs *RestService) GetErrors() echo.HandlerFunc
get /logkit/errors
func (*RestService) GetParserKeyOptions ¶
func (rs *RestService) GetParserKeyOptions() echo.HandlerFunc
get /logkit/parser/options 获取解析选项
func (*RestService) GetParserSampleLogs ¶
func (rs *RestService) GetParserSampleLogs() echo.HandlerFunc
get /logkit/parser/samplelogs 获取样例日志
func (*RestService) GetParserTooltips ¶
func (rs *RestService) GetParserTooltips() echo.HandlerFunc
get /logkit/parser/tooltips 获取解析用途提示
func (*RestService) GetParserUsages ¶
func (rs *RestService) GetParserUsages() echo.HandlerFunc
get /logkit/parser/usages 获得解析用途说明
func (*RestService) GetReaderKeyOptions ¶
func (rs *RestService) GetReaderKeyOptions() echo.HandlerFunc
get /logkit/reader/options 获取Reader参数配置
func (*RestService) GetReaderTooltips ¶
func (rs *RestService) GetReaderTooltips() echo.HandlerFunc
get /logkit/reader/tooltips 获取Reader用途提示
func (*RestService) GetReaderUsages ¶
func (rs *RestService) GetReaderUsages() echo.HandlerFunc
get /logkit/reader/usages 获取Reader用途
func (*RestService) GetRunners ¶
func (rs *RestService) GetRunners() echo.HandlerFunc
get /logkit/runners
func (*RestService) GetTransformerOptions ¶
func (rs *RestService) GetTransformerOptions() echo.HandlerFunc
GET /logkit/transformer/options
func (*RestService) GetTransformerSampleConfigs ¶
func (rs *RestService) GetTransformerSampleConfigs() echo.HandlerFunc
GET /logkit/transformer/sampleconfigs
func (*RestService) GetTransformerUsages ¶
func (rs *RestService) GetTransformerUsages() echo.HandlerFunc
GET /logkit/transformer/usages
func (*RestService) GetVersion ¶
func (rs *RestService) GetVersion() echo.HandlerFunc
func (*RestService) IsMaster ¶
func (rs *RestService) IsMaster() echo.HandlerFunc
master API GET /logkit/cluster/ismaster
func (*RestService) Ping ¶
func (rs *RestService) Ping() echo.HandlerFunc
master API GET /logkit/cluster/ping
func (*RestService) PostClusterConfig ¶
func (rs *RestService) PostClusterConfig() echo.HandlerFunc
POST /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue
func (*RestService) PostClusterConfigReset ¶
func (rs *RestService) PostClusterConfigReset() echo.HandlerFunc
POST /logkit/cluster/configs/<name>/reset?tag=tagValue&url=urlValue
func (*RestService) PostClusterConfigStart ¶
func (rs *RestService) PostClusterConfigStart() echo.HandlerFunc
POST /logkit/cluster/configs/<name>/start?tag=tagValue&url=urlValue
func (*RestService) PostClusterConfigStop ¶
func (rs *RestService) PostClusterConfigStop() echo.HandlerFunc
POST /logkit/cluster/configs/<name>/stop?tag=tagValue&url=urlValue
func (*RestService) PostConfig ¶
func (rs *RestService) PostConfig() echo.HandlerFunc
post /logkit/configs/<name>
func (*RestService) PostConfigReset ¶
func (rs *RestService) PostConfigReset() echo.HandlerFunc
POST /logkit/configs/<name>/reset
func (*RestService) PostConfigStart ¶
func (rs *RestService) PostConfigStart() echo.HandlerFunc
POST /logkit/configs/<name>/start
func (*RestService) PostConfigStop ¶
func (rs *RestService) PostConfigStop() echo.HandlerFunc
POST /logkit/configs/<name>/stop
func (*RestService) PostParse ¶
func (rs *RestService) PostParse() echo.HandlerFunc
post /logkit/parser/parse 接受解析请求
func (*RestService) PostParserCheck ¶
func (rs *RestService) PostParserCheck() echo.HandlerFunc
POST /logkit/parser/check
func (*RestService) PostRead ¶
func (rs *RestService) PostRead() echo.HandlerFunc
POST /logkit/reader/read 请求校验reader配置
func (*RestService) PostReaderCheck ¶
func (rs *RestService) PostReaderCheck() echo.HandlerFunc
POST /logkit/reader/check 请求校验reader配置
func (*RestService) PostRegister ¶
func (rs *RestService) PostRegister() echo.HandlerFunc
master API POST /logkit/cluster/register
func (*RestService) PostSlaveTag ¶
func (rs *RestService) PostSlaveTag() echo.HandlerFunc
POST /logkit/cluster/slaves/tag?tag=tagValue&url=urlValue
func (*RestService) PostTag ¶
func (rs *RestService) PostTag() echo.HandlerFunc
slave API POST /logkit/cluster/tag
func (*RestService) PostTransform ¶
func (rs *RestService) PostTransform() echo.HandlerFunc
POST /logkit/transformer/transform Transform (multiple logs/single log) in (json array/json object) format with registered transformers Return result string in json array format
func (*RestService) PostTransformerCheck ¶
func (rs *RestService) PostTransformerCheck() echo.HandlerFunc
POST /logkit/transformer/check
func (*RestService) PutClusterConfig ¶
func (rs *RestService) PutClusterConfig() echo.HandlerFunc
PUT /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue
func (*RestService) PutConfig ¶
func (rs *RestService) PutConfig() echo.HandlerFunc
put /logkit/configs/<name>
func (*RestService) Register ¶
func (rs *RestService) Register() error
func (*RestService) Slaves ¶
func (rs *RestService) Slaves() echo.HandlerFunc
master API GET /logkit/cluster/slaves?tag=tagValue&url=urlValue
type Runner ¶
type Runner interface { Name() string Run() Stop() Cleaner() CleanInfo Status() RunnerStatus }
func NewCustomRunner ¶
func NewRunner ¶
func NewRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal) (runner Runner, err error)
NewRunner 创建Runner
func NewRunnerWithService ¶
type RunnerConfig ¶
type RunnerConfig struct { RunnerInfo SourceData string `json:"sourceData, omitempty"` // MetricConfig []MetricConfig `json:"metric,omitempty"` ReaderConfig conf.MapConf `json:"reader"` CleanerConfig conf.MapConf `json:"cleaner,omitempty"` ParserConf conf.MapConf `json:"parser"` Transforms []map[string]interface{} `json:"transforms,omitempty"` SendersConfig []conf.MapConf `json:"senders"` Router router.RouterConfig `json:"router,omitempty"` IsInWebFolder bool `json:"web_folder,omitempty"` IsStopped bool `json:"is_stopped,omitempty"` IsFromServer bool `json:"from_server,omitempty"` // 判读是否从服务器拉取的配置 }
RunnerConfig 从多数据源读取,经过解析后,发往多个数据目的地
func TrimSecretInfo ¶
func TrimSecretInfo(conf RunnerConfig, trimSk bool) RunnerConfig
TrimSecretInfo 将配置文件中的 token 等鉴权相关信息去掉
type RunnerErrors ¶
type RunnerErrors interface {
GetErrors() ErrorsResult
}
type RunnerInfo ¶
type RunnerInfo struct { RunnerName string `json:"name"` Note string `json:"note,omitempty"` CollectInterval int `json:"collect_interval,omitempty"` // metric runner收集的频率 MaxBatchLen int `json:"batch_len,omitempty"` // 每个read batch的行数 MaxBatchSize int `json:"batch_size,omitempty"` // 每个read batch的字节数 MaxBatchInterval int `json:"batch_interval,omitempty"` // 最大发送时间间隔 MaxBatchTryTimes int `json:"batch_try_times,omitempty"` // 最大发送次数,小于等于0代表无限重试 ErrorsListCap int `json:"errors_list_cap"` // 记录错误信息的最大条数 CreateTime string `json:"createtime"` EnvTag string `json:"env_tag,omitempty"` ExtraInfo bool `json:"extra_info"` }
type RunnerStatus ¶
type RunnerStatus struct { Name string `json:"name"` Logpath string `json:"logpath"` ReadDataSize int64 `json:"readDataSize"` ReadDataCount int64 `json:"readDataCount"` Elaspedtime float64 `json:"elaspedtime"` Lag LagInfo `json:"lag"` ReaderStats StatsInfo `json:"readerStats"` ParserStats StatsInfo `json:"parserStats"` SenderStats map[string]StatsInfo `json:"senderStats"` TransformStats map[string]StatsInfo `json:"transformStats"` Error string `json:"error,omitempty"` ReadSpeedKB float64 `json:"readspeed_kb"` ReadSpeed float64 `json:"readspeed"` ReadSpeedTrendKb string `json:"readspeedtrend_kb"` ReadSpeedTrend string `json:"readspeedtrend"` RunningStatus string `json:"runningStatus"` Tag string `json:"tag,omitempty"` Url string `json:"url,omitempty"` HistoryErrors *ErrorsList `json:"history_errors"` // contains filtered or unexported fields }
RunnerStatus runner运行状态,添加字段请在clone函数中相应添加
func (*RunnerStatus) Clone ¶
func (src *RunnerStatus) Clone() RunnerStatus
Clone 复制出一个完整的RunnerStatus
type SlaveConfig ¶
type SlaveConfig struct { Configs map[string]RunnerConfig `json:"configs"` Tag string `json:"tag"` Err string `json:"error"` }
type StatusPersistable ¶
type StatusPersistable interface { StatusBackup() StatusRestore() }
type TokenRefreshable ¶
type TokenRefreshable interface {
TokenRefresh(AuthTokens) error
}