Documentation ¶
Overview ¶
Package prometheus provides Prometheus implementations for metrics. Individual metrics are mapped to their Prometheus counterparts, and (depending on the constructor used) may be automatically registered in the global Prometheus metrics registry.
Index ¶
- Constants
- Variables
- func DefaultErrorEncoder(err error, w http.ResponseWriter)
- func GenJobRequestStringByMeta(sep string, src ...*JobRequest) (string, error)
- func LoadUserConfig(field string, obj interface{}) error
- func MarshalJobDescription(j *JobDescription) ([]byte, error)
- func MarshalJobRequest(req *JobRequest) (string, error)
- func MarshalJobRequests(sep string, reqs ...*JobRequest) (string, error)
- func NewCronAPI() *cronAPI
- func NewJobAPI(etcd *Etcd) *jobAPI
- func NewPostgres(conf *PostgresConfig) *sqlx.DB
- func NewScheduleAPI(etcd *Etcd, db *sqlx.DB, config *HttpConfig) *scheduleAPI
- func NewWorkflowAPI(db *sqlx.DB) *workFlowAPI
- func StartClientJob(job Job)
- type Counter
- type CreateWorkflowRequest
- type CreateWorkflowResponse
- type DecodeRequestFunc
- type DeleteWorkflowRequest
- type DeleteWorkflowResponse
- type Endpoint
- type Errors
- type Etcd
- func (e *Etcd) Close() error
- func (e *Etcd) DelKey(ctx context.Context, key string) error
- func (e *Etcd) Get(ctx context.Context, key string) (value []byte, err error)
- func (e *Etcd) GetWithPrefixKey(ctx context.Context, prefix string) ([]string, []string, error)
- func (e *Etcd) GrantLease(ttl int64) (clientv3.LeaseID, error)
- func (e *Etcd) InsertKV(ctx context.Context, key, val string, leaseID clientv3.LeaseID) error
- func (e *Etcd) InsertKVNoExisted(ctx context.Context, key, val string, leaseID clientv3.LeaseID) error
- func (e *Etcd) KeepaliveWithTTL(ctx context.Context, key, value string, ttl int64) error
- func (e *Etcd) RenewLease(ctx context.Context, id clientv3.LeaseID) error
- func (e *Etcd) RevokeLease(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
- func (e *Etcd) TryLockWithTTL(key string, ttl int64) error
- func (e *Etcd) WatchWithPrefix(key string, cb func(*clientv3.Event) error) error
- type EtcdConfig
- type Executor
- type ExecutorConfig
- type ExecutorContext
- type ExprListener
- func (e *ExprListener) EnterANDOR(c *parser.ANDORContext)
- func (e *ExprListener) EnterEveryRule(ctx antlr.ParserRuleContext)
- func (e *ExprListener) EnterID(c *parser.IDContext)
- func (e *ExprListener) EnterParenthesis(c *parser.ParenthesisContext)
- func (e *ExprListener) EnterStart(c *parser.StartContext)
- func (e *ExprListener) ExitANDOR(c *parser.ANDORContext)
- func (e *ExprListener) ExitEveryRule(ctx antlr.ParserRuleContext)
- func (e *ExprListener) ExitID(c *parser.IDContext)
- func (e *ExprListener) ExitParenthesis(c *parser.ParenthesisContext)
- func (e *ExprListener) ExitStart(c *parser.StartContext)
- func (e *ExprListener) Pop() Job
- func (e *ExprListener) VisitErrorNode(node antlr.ErrorNode)
- func (e *ExprListener) VisitTerminal(node antlr.TerminalNode)
- type Gauge
- type GetCronRequest
- type GetCronResponse
- type GetJobRequest
- type GetJobResponse
- type GetWorkflowRequest
- type GetWorkflowResponse
- type HandleFunc
- type Header
- type Histogram
- type HttpConfig
- type ICounter
- type IGauge
- type IHistogram
- type Job
- type JobConfig
- type JobDescription
- type JobError
- type JobMiddlewareRequest
- type JobRequest
- type JobResponse
- type JobServer
- type JsonRPCServer
- type JsonRPCService
- type KV
- type LabelValues
- type ListJobRequest
- type ListJobResponse
- type ListWorkflowRequest
- type ListWorkflowResponse
- type LoopJob
- type MergeFunc
- type Middleware
- type ParallelJob
- type Pattern
- type PlaceHolder
- type PostgresConfig
- type ScheduleConfig
- type SerialJob
- type SplitFunc
- type Stack
- type StatusCoder
- type Summary
- type SyncMap
- type SyncRequest
- type UpdateWorkflowRequest
- type UpdateWorkflowResponse
- type WorkFlow
- type WorkFlowStatus
- type WorkerRole
- type WrapperJob
Constants ¶
const ( StateAvaiable = "available" // available StateExecuting = "executing" StateFailed = "failed" StateFinish = "finish" // prometheus PrometheusNamespace = "Jelly" PrometheusSubsystem = "Schedule" )
const ( // logger const field ProjectKey = "project" ProjectValue = "schedule" )
const DefaultConfigFilename = "/etc/config/schedule_config.yaml"
const (
EmptyJobRequest = "{}"
)
const (
HeadCountKey = "X-Total-Count"
)
const (
JsonRPCPath = `/rpc`
)
把Job封装成一个RPC服务
const (
RemoteServerMethod = `JsonRPCService.Exec`
)
Variables ¶
var ( // etcd ErrKeyAlreadyExists = errors.New("key already exists") ErrEtcdLeaseNotFound = errors.New("lease not found") ErrJobNotFound = errors.New("job not found") ErrorJobParaInvalid = errors.New("job para invalid") // api ErrBadRequest = newApiError(http.StatusBadRequest, "StatusBadRequest") ErrBadCronExpr = newApiError(http.StatusBadRequest, "ErrBadCronExpr") ErrorInvalidPara = newApiError(http.StatusBadRequest, "ErrorInvalidPara") ErrNotFound = newApiError(http.StatusNotFound, "ErrNotFound") )
var ( JobPrefix = `/schedule/job` JobFormat = fasttemplate.New(JobPrefix+`/{Name}`, "{", "}") TTL = int64(10) )
提供给用户使用, 内部会调用RPC, 抽象成服务 并注册到etcd
var ( EParse = errors.New("parse pattern failed") EPlaceHolder = errors.New("unknown placeholder format") EDate = errors.New("not a date") ENumber = errors.New("not a number") EAlpha = errors.New("not a alpha") )
var ( WorkflowTableName = `workflow` CreateWorkflowTableDDL = `create table if not exists ` + WorkflowTableName + ` ( id varchar primary key, name varchar, description varchar, expression varchar, cron varchar, para varchar, success_limit int, failed_limit int, belong_executor varchar, state varchar, create_time bigint default extract(epoch from now())::bigint, update_time bigint default extract(epoch from now())::bigint );` WorkflowTableSelectColumn = `*` WorkflowTableColumn = `id,name,description,expression,cron,para,success_limit,failed_limit,belong_executor,state,create_time,update_time` WorkflowTableColumnSize = len(strings.Split(WorkflowTableColumn, ",")) WorkflowTableOnConflictDDL = fmt.Sprintf(` on conflict (id) do update set name = excluded.name, description = excluded.description, expression = excluded.expression, cron = excluded.cron, para = excluded.para, success_limit = excluded.success_limit, failed_limit = excluded.failed_limit, belong_executor = excluded.belong_executor, state = excluded.state, update_time = GREATEST(%s.update_time, excluded.update_time);`, WorkflowTableName) )
// 执行几次结束 SuccessLimit int64 `json:"executeLimit" yaml:"executeLimit" ` // 碰到错误的方式 ErrorPolicy string `json:"errorPolicy" yaml:"errorPolicy"` // 可以指定由哪个执行器执行 BelongExecutor string `json:"belongExecutor" yaml:"belongExecutor" `
Functions ¶
func DefaultErrorEncoder ¶
func DefaultErrorEncoder(err error, w http.ResponseWriter)
func GenJobRequestStringByMeta ¶
func GenJobRequestStringByMeta(sep string, src ...*JobRequest) (string, error)
func LoadUserConfig ¶
func MarshalJobDescription ¶
func MarshalJobDescription(j *JobDescription) ([]byte, error)
func MarshalJobRequest ¶
func MarshalJobRequest(req *JobRequest) (string, error)
func MarshalJobRequests ¶
func MarshalJobRequests(sep string, reqs ...*JobRequest) (string, error)
func NewCronAPI ¶
func NewCronAPI() *cronAPI
func NewPostgres ¶
func NewPostgres(conf *PostgresConfig) *sqlx.DB
func NewScheduleAPI ¶
func NewScheduleAPI(etcd *Etcd, db *sqlx.DB, config *HttpConfig) *scheduleAPI
func NewWorkflowAPI ¶
func StartClientJob ¶
func StartClientJob(job Job)
Types ¶
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter implements Counter, via a Prometheus CounterVec.
func NewCounter ¶
func NewCounter(cv *prometheus.CounterVec) *Counter
NewCounter wraps the CounterVec and returns a usable Counter object.
func NewCounterFrom ¶
func NewCounterFrom(opts prometheus.CounterOpts, labelNames []string) *Counter
NewCounterFrom constructs and registers a Prometheus CounterVec, and returns a usable Counter object.
type CreateWorkflowRequest ¶
type CreateWorkflowRequest struct {
Workflows []*WorkFlow
}
type CreateWorkflowResponse ¶
type CreateWorkflowResponse struct {
Ids []string `json:"ids"`
}
type DecodeRequestFunc ¶
type DeleteWorkflowRequest ¶
type DeleteWorkflowRequest struct {
Ids []string `json:"ids"`
}
type DeleteWorkflowResponse ¶
type DeleteWorkflowResponse struct { }
type Etcd ¶
type Etcd struct {
// contains filtered or unexported fields
}
func NewEtcd ¶
func NewEtcd(config *EtcdConfig) *Etcd
func (*Etcd) GetWithPrefixKey ¶
func (*Etcd) InsertKVNoExisted ¶
func (*Etcd) KeepaliveWithTTL ¶
func (*Etcd) RevokeLease ¶
type EtcdConfig ¶
type Executor ¶
func NewExecutor ¶
func NewExecutor(etcd *Etcd, db *sqlx.DB, config ExecutorConfig) *Executor
type ExecutorConfig ¶
type ExecutorContext ¶
type ExecutorContext struct {
// contains filtered or unexported fields
}
type ExprListener ¶
type ExprListener struct {
// contains filtered or unexported fields
}
func (e *Executor) getJob(jobId string) (Job, error) {
func NewExprListener ¶
func (*ExprListener) EnterANDOR ¶
func (e *ExprListener) EnterANDOR(c *parser.ANDORContext)
func (*ExprListener) EnterEveryRule ¶
func (e *ExprListener) EnterEveryRule(ctx antlr.ParserRuleContext)
func (*ExprListener) EnterID ¶
func (e *ExprListener) EnterID(c *parser.IDContext)
func (*ExprListener) EnterParenthesis ¶
func (e *ExprListener) EnterParenthesis(c *parser.ParenthesisContext)
func (*ExprListener) EnterStart ¶
func (e *ExprListener) EnterStart(c *parser.StartContext)
func (*ExprListener) ExitANDOR ¶
func (e *ExprListener) ExitANDOR(c *parser.ANDORContext)
func (*ExprListener) ExitEveryRule ¶
func (e *ExprListener) ExitEveryRule(ctx antlr.ParserRuleContext)
func (*ExprListener) ExitID ¶
func (e *ExprListener) ExitID(c *parser.IDContext)
func (*ExprListener) ExitParenthesis ¶
func (e *ExprListener) ExitParenthesis(c *parser.ParenthesisContext)
func (*ExprListener) ExitStart ¶
func (e *ExprListener) ExitStart(c *parser.StartContext)
func (*ExprListener) Pop ¶
func (e *ExprListener) Pop() Job
func (*ExprListener) VisitErrorNode ¶
func (e *ExprListener) VisitErrorNode(node antlr.ErrorNode)
func (*ExprListener) VisitTerminal ¶
func (e *ExprListener) VisitTerminal(node antlr.TerminalNode)
type Gauge ¶
type Gauge struct {
// contains filtered or unexported fields
}
Gauge implements Gauge, via a Prometheus GaugeVec.
func NewGauge ¶
func NewGauge(gv *prometheus.GaugeVec) *Gauge
NewGauge wraps the GaugeVec and returns a usable Gauge object.
func NewGaugeFrom ¶
func NewGaugeFrom(opts prometheus.GaugeOpts, labelNames []string) *Gauge
NewGaugeFrom constructs and registers a Prometheus GaugeVec, and returns a usable Gauge object.
type GetCronRequest ¶
type GetCronResponse ¶
type GetJobRequest ¶
type GetJobRequest struct {
// contains filtered or unexported fields
}
type GetJobResponse ¶
type GetJobResponse struct {
JobStats []*JobDescription `json:"jobStats"`
}
func NewGetJobResponse ¶
func NewGetJobResponse() *GetJobResponse
type GetWorkflowRequest ¶
type GetWorkflowRequest struct {
Ids []string `json:"ids"`
}
type GetWorkflowResponse ¶
type GetWorkflowResponse struct {
Workflows []*WorkFlow `json:"workflows"`
}
type HandleFunc ¶
type HandleFunc func(w http.ResponseWriter, r *http.Request)
func HandleFuncWrapper ¶
func HandleFuncWrapper(dec DecodeRequestFunc, e Endpoint, enc encodeResponseFunc) HandleFunc
type Histogram ¶
type Histogram struct {
// contains filtered or unexported fields
}
Histogram implements Histogram via a Prometheus HistogramVec. The difference between a Histogram and a Summary is that Histograms require predefined quantile buckets, and can be statistically aggregated.
func NewHistogram ¶
func NewHistogram(hv *prometheus.HistogramVec) *Histogram
NewHistogram wraps the HistogramVec and returns a usable Histogram object.
func NewHistogramFrom ¶
func NewHistogramFrom(opts prometheus.HistogramOpts, labelNames []string) *Histogram
NewHistogramFrom constructs and registers a Prometheus HistogramVec, and returns a usable Histogram object.
func (*Histogram) With ¶
func (h *Histogram) With(labelValues ...string) IHistogram
With implements Histogram.
type HttpConfig ¶
type ICounter ¶
// Counter describes a metric that accumulates values monotonically. // An example of a counter is the number of received HTTP requests.
type IGauge ¶
Gauge describes a metric that takes specific values over time. An example of a gauge is the current depth of a job queue.
type IHistogram ¶
type IHistogram interface { With(labelValues ...string) IHistogram Observe(value float64) }
Histogram describes a metric that takes repeated observations of the same kind of thing, and produces a statistical summary of those observations, typically expressed as quantiles or buckets. An example of a histogram is HTTP request latencies.
type JobDescription ¶
type JobDescription struct { Id string `json:"id"` Name string `json:"name"` Host string `json:"host"` Port int `json:"port"` ServicePath string `json:"servicePath"` JobPath string `json:"jobPath"` }
func ListJobStats ¶
func ListJobStats() ([]*JobDescription, error)
func UnMarshalJobDescription ¶
func UnMarshalJobDescription(buf []byte) (*JobDescription, error)
func (JobDescription) String ¶
func (w JobDescription) String() string
func (JobDescription) ToJob ¶
func (w JobDescription) ToJob() Job
type JobMiddlewareRequest ¶
type JobRequest ¶
type JobRequest struct { Meta map[string]interface{} `json:"meta,omitempty"` //Values []string `json:"values,omitempty"` Values map[string][]string `json:"values,omitempty"` Pattern string `json:"pattern,omitempty"` // contains filtered or unexported fields }
Meta 每个request负责解释 Values 呈现给job的值域 Pattern 值域表达式, 负责填充值域
//Values []string `json:"values,omitempty"`
func NewJobRequest ¶
func NewJobRequest() *JobRequest
func NewJobRequestByKey ¶
func NewJobRequestByKey(key string, src *JobRequest) *JobRequest
func NewJobRequestByMeta ¶
func NewJobRequestByMeta(src ...*JobRequest) *JobRequest
func UnMarshalJobRequest ¶
func UnMarshalJobRequest(req string) (*JobRequest, error)
func UnMarshalJobRequests ¶
func UnMarshalJobRequests(req, sep string) ([]*JobRequest, error)
func (*JobRequest) GetBoolFromMeta ¶
func (j *JobRequest) GetBoolFromMeta(key string) bool
func (*JobRequest) GetBytesFromMeta ¶
func (j *JobRequest) GetBytesFromMeta(key string) []byte
func (*JobRequest) GetInt64FromMeta ¶
func (j *JobRequest) GetInt64FromMeta(key string) int64
func (*JobRequest) GetStringFromMeta ¶
func (j *JobRequest) GetStringFromMeta(key string) string
type JobResponse ¶
type JobResponse JobRequest
type JsonRPCServer ¶
type JsonRPCServer struct {
// contains filtered or unexported fields
}
func (*JsonRPCServer) Close ¶
func (d *JsonRPCServer) Close()
func (*JsonRPCServer) Start ¶
func (d *JsonRPCServer) Start() error
type JsonRPCService ¶
type JsonRPCService struct {
// contains filtered or unexported fields
}
type LabelValues ¶
type LabelValues []string
func (LabelValues) With ¶
func (lvs LabelValues) With(labelValues ...string) LabelValues
With validates the input, and returns a new aggregate labelValues.
type ListJobRequest ¶
type ListJobRequest struct{}
type ListJobResponse ¶
type ListJobResponse struct {
JobStats []*JobDescription `json:"jobStats"`
}
func NewListJobResponse ¶
func NewListJobResponse() *ListJobResponse
type ListWorkflowRequest ¶
type ListWorkflowResponse ¶
type Middleware ¶
func Instrumenting ¶
func Instrumenting(latency IHistogram, success, failed ICounter) Middleware
type ParallelJob ¶
type ParallelJob struct {
// contains filtered or unexported fields
}
func NewParallelJob ¶
func NewParallelJob(sep string, splitFn SplitFunc, mergeFn MergeFunc, jobs ...Job) *ParallelJob
func (*ParallelJob) Name ¶
func (s *ParallelJob) Name() string
func (*ParallelJob) Progress ¶
func (s *ParallelJob) Progress() int
type Pattern ¶
type Pattern struct {
// contains filtered or unexported fields
}
func ParsePattern ¶
type PlaceHolder ¶
type PlaceHolder struct {
// contains filtered or unexported fields
}
func NewPlaceHolder ¶
func NewPlaceHolder(str string, begin, end int) (*PlaceHolder, error)
type PostgresConfig ¶
type ScheduleConfig ¶
type ScheduleConfig struct { Etcd EtcdConfig Postgres PostgresConfig Http HttpConfig Executor ExecutorConfig Job JobConfig }
func LoadScheduleConfig ¶
func LoadScheduleConfig(filename string) (*ScheduleConfig, error)
type SerialJob ¶
type SerialJob struct {
// contains filtered or unexported fields
}
func NewSerialJob ¶
type StatusCoder ¶
type StatusCoder interface {
StatusCode() int
}
type Summary ¶
type Summary struct {
// contains filtered or unexported fields
}
Summary implements Histogram, via a Prometheus SummaryVec. The difference between a Summary and a Histogram is that Summaries don't require predefined quantile buckets, but cannot be statistically aggregated.
func NewSummary ¶
func NewSummary(sv *prometheus.SummaryVec) *Summary
NewSummary wraps the SummaryVec and returns a usable Summary object.
func NewSummaryFrom ¶
func NewSummaryFrom(opts prometheus.SummaryOpts, labelNames []string) *Summary
NewSummaryFrom constructs and registers a Prometheus SummaryVec, and returns a usable Summary object.
func (*Summary) With ¶
func (s *Summary) With(labelValues ...string) IHistogram
With implements Histogram.
type SyncMap ¶
func NewSyncMap ¶
func NewSyncMap() *SyncMap
type SyncRequest ¶
type UpdateWorkflowRequest ¶
type UpdateWorkflowRequest struct {
WorkFlows []*WorkFlow `json:"workflows"`
}
type UpdateWorkflowResponse ¶
type UpdateWorkflowResponse struct { }
type WorkFlow ¶
type WorkFlow struct { Id string `json:"id,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` Expression string `json:"expression,omitempty"` Cron string `json:"cron,omitempty"` Para string `json:"para"` ExecutorWhenDeployed bool `json:"executor_when_deployed" yaml:"executor_when_deployed" ` ExecutorWhenDeployedDelay int64 `json:"executor_when_deployed_delay" yaml:"executor_when_deployed_delay" ` // 执行几次结束 SuccessLimit int64 `json:"successLimit" yaml:"successLimit" ` // 碰到错误的方式 FailedLimit int64 `json:"failedLimit" yaml:"failedLimit"` // 可以指定由哪个执行器执行 BelongExecutor string `json:"belongExecutor" yaml:"belongExecutor" ` State string `json:"state,omitempty"` CreateTime int64 `json:"createTime,omitempty"` UpdateTime int64 `json:"updateTime,omitempty"` }
type WorkFlowStatus ¶
type WorkFlowStatus struct { Id string Executing bool MaxExecuteCount int64 SuccessExecuteCount int64 FailedExecuteCount int64 LastExecuteDuration int64 }
// 主要记录workflow runtime 状态记录的
type WrapperJob ¶
type WrapperJob struct {
// contains filtered or unexported fields
}
executor从workflow中得到了job的id 利用这个类, 封装成一个Job接口
func NewWrapperJob ¶
func NewWrapperJob(info *JobDescription) *WrapperJob
func (*WrapperJob) Name ¶
func (e *WrapperJob) Name() string
Source Files ¶
- api.go
- api_err.go
- assert.go
- constants.go
- cron_api.go
- etcd.go
- executor.go
- expr_listener.go
- job.go
- job_api.go
- job_api_client.go
- job_description.go
- job_err.go
- job_loop.go
- job_main.go
- job_middleware.go
- job_parallel.go
- job_request.go
- job_response.go
- job_serial.go
- job_server.go
- job_wrapper.go
- logger.go
- pattern.go
- postgres.go
- prometheus.go
- prometheus_metrics.go
- prometheus_middle_instruments.go
- rpc_server.go
- schedule_config.go
- stack.go
- sync_map.go
- worker.go
- worker_role.go
- workflow.go
- workflow_api.go
- workflow_db.go
- workflow_status.go