Documentation ¶
Index ¶
- Constants
- Variables
- func InitTracing(serviceName string, tracingAgentHostPort string, opt ...Option) (tracer opentracing.Tracer, reporter jaeger.Reporter, closer io.Closer, ...)
- func RunE(fn func() error, opt ...Option) (err error)
- func StartSpan(operationName string) opentracing.Span
- func StartSpanWithParent(parent opentracing.SpanContext, operationName string) opentracing.Span
- func WrapChannel(chanToWrap interface{}) (<-chan interface{}, error)
- type Adapter
- type Args
- type CommonWorker
- func (w *CommonWorker) GetContext() context.Context
- func (w *CommonWorker) Perform(job *Job, opt ...Option) error
- func (w *CommonWorker) PerformAt(job *Job, t time.Time) error
- func (w *CommonWorker) PerformEvery(job *Job, interval time.Duration, opt ...Option) error
- func (w *CommonWorker) PerformIn(job *Job, d time.Duration) error
- func (w *CommonWorker) PerformReceive(job *Job, readChan interface{}, opt ...Option) error
- func (w *CommonWorker) Register(name string, h Handler) error
- func (w *CommonWorker) SetContext(ctx context.Context)
- func (w *CommonWorker) Start(ctx context.Context) error
- func (w *CommonWorker) Stop() error
- type ConcurrentJob
- type Context
- type Handler
- type HealthCheck
- type Job
- type Logger
- type Metric
- type Option
- func WithBanner(useBanner bool) Option
- func WithChannel(ch interface{}) Option
- func WithEngine(e *gin.Engine) Option
- func WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) Option
- func WithErrorPercentageByRequestCount(percentageOfErrors float64, minNumOfRequests, lastNumOfRequests int) Option
- func WithHealthHandler(h gin.HandlerFunc) Option
- func WithHealthPath(path string) Option
- func WithHealthTicker(ticker *time.Ticker) Option
- func WithIgnore(handlers ...string) Option
- func WithJob(j *Job) Option
- func WithLogLevel(level logrus.Level) Option
- func WithMetricsPath(path string) Option
- func WithNamespace(ns string) Option
- func WithReducedLoggingFunc(a ReducedLoggingFunc) Option
- func WithSampleProbability(sampleProbability float64) Option
- func WithSilentNoResponse(silent bool) Option
- func WithSilentSuccess(silent bool) Option
- func WithSubSystem(sub string) Option
- func WithSync(sync bool) Option
- type Options
- type PerformWith
- type Prometheus
- type ReducedLoggingFunc
- type Session
- type Status
- type Worker
- type WorkerFactory
Constants ¶
const ( // RequestSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the request approximate size in bytes // WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt RequestSize = "keyRequestSize" // ResponseSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the response approximate size in bytes // WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt ResponseSize = "keyResponseSize" )
const ( // StatusUnknown was a job with an unknown status StatusUnknown Status = -1 // StatusSuccess was a successful job StatusSuccess = 200 // StatusBadRequest was a job with a bad request StatusBadRequest = 400 // StatusForbidden was a forbidden job StatusForbidden = 403 StatusUnauthorized = 401 // StatusTimeout was a job that timed out StatusTimeout = 408 // StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job) StatusNoResponse = 444 // StatusInternalError was a job with an internal error StatusInternalError = 500 StatusUnavailable = 503 )
const AggregateLogger = "aggregateLogger"
AggregateLogger defines the const string for getting the logger from a Job context
const DefaultHealthTickerDuration = 1 * time.Minute
DefaultHealthTickerDuration is the time duration between the recalculation of the status returned by HealthCheck.GetStatus()
Variables ¶
var ContextTraceIDField string
ContextTraceIDField - used to find the trace id in the context - optional
Functions ¶
func InitTracing ¶
func InitTracing(serviceName string, tracingAgentHostPort string, opt ...Option) ( tracer opentracing.Tracer, reporter jaeger.Reporter, closer io.Closer, err error)
InitTracing will init opentracing with options WithSampleProbability defaults: constant sampling
func RunE ¶
Run the function safely knowing that if it panics, the panic will be caught and returned as an error. it produces a stack trace and if WithJob(job), the job's status is set to StatusInternalError.
func StartSpan ¶
func StartSpan(operationName string) opentracing.Span
StartSpan will start a new span with no parent span.
func StartSpanWithParent ¶
func StartSpanWithParent(parent opentracing.SpanContext, operationName string) opentracing.Span
StartSpanWithParent will start a new span with a parent span. example:
span:= StartSpanWithParent(c.Get("tracing-context"),
func WrapChannel ¶
func WrapChannel(chanToWrap interface{}) (<-chan interface{}, error)
WrapChanel takes a concrete receiving chan in as an interface{}, and wraps it with an interface{} chan so you can treat all receiving channels the same way
Types ¶
type Adapter ¶
Adapter defines the adaptor middleware type
func WithAggregateLogger ¶
func WithAggregateLogger( useBanner bool, timeFormat string, utc bool, logrusFieldNameForTraceID string, contextTraceIDField []byte, opt ...Option) Adapter
WithAggregateLogger is a middleware adapter for aggregated logging (see go-gin-logrus)
func WithHealthCheck ¶
func WithHealthCheck(health *HealthCheck, opt ...Option) Adapter
WithHealthCheck is an adpater middleware for healthcheck. it also adds a health http GET endpoint. It supports the Option. WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) that allows you to override the default of: 1.0 (100%) errors in the last 5 min.
func WithOpenTracing ¶
WithOpenTracing is an adpater middleware that adds opentracing
func WithPrometheus ¶
func WithPrometheus(p *Prometheus) Adapter
Instrument is a gin middleware that can be used to generate metrics for a single handler
type CommonWorker ¶
type CommonWorker struct { // Logger for the worker Logger Logger // contains filtered or unexported fields }
CommonWorker defines the typical common worker
func NewCommonWorker ¶
func NewCommonWorker(l Logger) *CommonWorker
NewCommonWorker creates a new CommonWorker
func NewCommonWorkerWithContext ¶
func NewCommonWorkerWithContext(ctx context.Context, l Logger) *CommonWorker
NewCommonWorkerWithContext creates a new CommonWorker
func (*CommonWorker) GetContext ¶
func (w *CommonWorker) GetContext() context.Context
GetContext from the worker
func (*CommonWorker) Perform ¶
func (w *CommonWorker) Perform(job *Job, opt ...Option) error
Perform executes the job. If WithSync(true) Option then it's a blocking call, the default is false (so async)
func (*CommonWorker) PerformAt ¶
func (w *CommonWorker) PerformAt(job *Job, t time.Time) error
PerformAt performs a job at a particular time using a goroutine.
func (*CommonWorker) PerformEvery ¶
PerformEvery executes the job on the interval. if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time. the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval.
func (*CommonWorker) PerformIn ¶
func (w *CommonWorker) PerformIn(job *Job, d time.Duration) error
PerformIn performs a job after the "in" time has expired.
func (*CommonWorker) PerformReceive ¶
func (w *CommonWorker) PerformReceive(job *Job, readChan interface{}, opt ...Option) error
PerformReceive will loop on receiving data from the readChan (passed as a simple interface{}). This uses the Job ctx for timeouts and cancellation (just like the rest of the framework)
func (*CommonWorker) Register ¶
func (w *CommonWorker) Register(name string, h Handler) error
Register Handler with the worker
func (*CommonWorker) SetContext ¶
func (w *CommonWorker) SetContext(ctx context.Context)
SetContext for the worker
func (*CommonWorker) Start ¶
func (w *CommonWorker) Start(ctx context.Context) error
Start the worker
func (*CommonWorker) Stop ¶
func (w *CommonWorker) Stop() error
Stop the worker and you must call Stop() to clean up the CommonWorker internal Ctx (or it will leak memory)
type ConcurrentJob ¶
type ConcurrentJob struct { // PerformWith if the job will be performed with PerformWithEveryWithSync as a reoccuring job // or just once as PerformWithWithSync PerformWith PerformWith // PerformEvery defines the duration between executions of PerformWithEveryWithSync jobs PerformEvery time.Duration // MaxWorkers for the concurrent Job MaxWorkers int64 // Job to run concurrently Job Job PerformReceiveChan interface{} // contains filtered or unexported fields }
ConcurrentJob represents a job to be run concurrently
func NewConcurrentJob ¶
func NewConcurrentJob( job Job, workerFactory WorkerFactory, performWith PerformWith, performEvery time.Duration, maxWorker int64, startInterval time.Duration, logger Logger, opt ...Option, ) (ConcurrentJob, error)
NewConcurrentJob makes a new job
func (*ConcurrentJob) Register ¶
func (j *ConcurrentJob) Register(name string, h Handler) error
Register a handler for the Job's workers
func (*ConcurrentJob) RunningWorkers ¶
func (j *ConcurrentJob) RunningWorkers() int64
func (*ConcurrentJob) Stop ¶
func (j *ConcurrentJob) Stop()
Stop all the work and you must call Stop() to clean up the ConcurrentJob Ctx (or it will leak memory)
type Context ¶
type Context struct { // context.Context allows it to be a compatible context context.Context // contains filtered or unexported fields }
Context for the Job and is reset for every execution
type Handler ¶
Handler is executed a a Work for a given Job. It also defines the interface for handlers that can be used by middleware adapters
type HealthCheck ¶
type HealthCheck struct { // HealthPath is the GET url path for the endpoint HealthPath string // Engine is the gin.Engine that should serve the endpoint Engine *gin.Engine // Handler is the gin Hanlder to use for the endpoint Handler gin.HandlerFunc // contains filtered or unexported fields }
HealthCheck provides a healthcheck endpoint for the work
func NewHealthCheck ¶
func NewHealthCheck(opt ...Option) *HealthCheck
NewHealthCheck creates a new HealthCheck with the options provided. Options: WithEngine(*gin.Engine), WithHealthPath(string), WithHealthHander(gin.HandlerFunc), WithMetricTicker(time.Ticker)
func (*HealthCheck) Close ¶
func (h *HealthCheck) Close()
Close cleans up the all the HealthCheck resources
func (*HealthCheck) DefaultHealthHandler ¶
func (h *HealthCheck) DefaultHealthHandler() gin.HandlerFunc
func (*HealthCheck) GetStatus ¶
func (h *HealthCheck) GetStatus() int
GetStatus returns the current health status
func (*HealthCheck) SetStatus ¶
func (h *HealthCheck) SetStatus(s int)
SetStatus sets the current health status
func (*HealthCheck) WithEngine ¶
func (h *HealthCheck) WithEngine(e *gin.Engine)
WithEngine lets you set the *gin.Engine if it's created after you've created the *HealthCheck
type Job ¶
type Job struct { // Queue the job should be placed into Queue string // ctx related to the execution of a job - Perform(job) gets a new ctx everytime Ctx *Context // Args that will be passed to the Handler as the 2nd parameter when run Args Args // Handler that will be run by the worker Handler string // Timeout for every execution of job Timeout time.Duration }
Job to be processed by a Worker
type Logger ¶
type Logger interface { Debugf(string, ...interface{}) Infof(string, ...interface{}) Errorf(string, ...interface{}) Debug(...interface{}) Info(...interface{}) Error(...interface{}) }
Logger is used by worker to write logs
type Metric ¶
Metric is a single meter (a counter for now, but in the future: gauge or histogram, optionally - with history)
func NewMetricCounter ¶
NewCounter returns a counter metric that increments the value with each incoming number.
type Option ¶
type Option func(Options)
Option - how Options are passed as arguments
func WithBanner ¶
WithBanner specifies the table name to use for an outbox
func WithEngine ¶
WithEngine is an option allowing to set the gin engine when intializing with New. Example : r := gin.Default() p := work.NewPrometheus(WithEngine(r))
func WithErrorPercentage ¶
WithErrorPercentage allows you to override the default of 1.0 (100%) with the % you want for error rate.
func WithHealthHandler ¶
func WithHealthHandler(h gin.HandlerFunc) Option
WithHealthHandler override the default health endpoint handler
func WithHealthPath ¶
WithHealthPath override the default path for the health endpoint
func WithHealthTicker ¶
func WithIgnore ¶
WithIgnore is used to disable instrumentation on some routes
func WithLogLevel ¶
WithLogLevel will set the logrus log level for the job handler
func WithMetricsPath ¶
WithMetricsPath is an option allowing to set the metrics path when intializing with New. Example : work.New(work.WithMetricsPath("/mymetrics"))
func WithNamespace ¶
WithNamespace is an option allowing to set the namespace when intitializing with New. Example : work.New(work.WithNamespace("my_namespace"))
func WithReducedLoggingFunc ¶
func WithReducedLoggingFunc(a ReducedLoggingFunc) Option
WithReducedLoggingFunc specifies the function used to set custom logic around when to print logs
func WithSampleProbability ¶
WithSampleProbability - optional sample probability
func WithSilentNoResponse ¶
WithSilentNoResponse specifies that StatusNoResponse requests should be silent (no logging)
func WithSilentSuccess ¶
WithSilentSuccess specifies that StatusSuccess requests should be silent (no logging)
func WithSubSystem ¶
WithSubsystem is an option allowing to set the subsystem when intitializing with New. Example : work.New(work.WithSubsystem("my_system"))
type Options ¶
type Options map[string]interface{}
Options = how options are represented
type PerformWith ¶
type PerformWith int
const ( PerformWithUnknown PerformWith = iota PerformWithSync PerformWithAsync PerformEveryWithSync PerformEveryWithAsync PerformReceiveWithSync PerformReceiveWithAsync )
func (PerformWith) String ¶
func (p PerformWith) String() string
type Prometheus ¶
type Prometheus struct { MetricsPath string Namespace string Subsystem string Ignored isPresentMap Engine *gin.Engine // contains filtered or unexported fields }
Prometheus contains the metrics gathered by the instance and its path
func NewPrometheus ¶
func NewPrometheus(opt ...Option) *Prometheus
New will initialize a new Prometheus instance with the given options. If no options are passed, sane defaults are used. If a router is passed using the Engine() option, this instance will automatically bind to it.
func (*Prometheus) WithEngine ¶
func (p *Prometheus) WithEngine(e *gin.Engine)
WithEngine is a method that should be used if the engine is set after middleware initialization
type ReducedLoggingFunc ¶
ReducedLoggingFunc defines a function type used for custom logic on when to print logs
var DefaultReducedLoggingFunc ReducedLoggingFunc = func(s Status, l int) bool { return false }
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session that is used to pass session info to a Job this is a good spot to put things like *redis.Pool or *sqlx.DB for outbox connection pools
type Worker ¶
type Worker interface { // Start the worker Start(context.Context) error // Stop the worker Stop() error // PerformEvery a job every interval (loop) // if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time // the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval PerformEvery(*Job, time.Duration, ...Option) error // Perform a job as soon as possibly, If WithSync(true) Option then it's a blocking call, the default is false (so async) Perform(*Job, ...Option) error // PerformAt performs a job at a particular time and always async PerformAt(*Job, time.Time) error // PerformIn performs a job after waiting for a specified amount of time and always async PerformIn(*Job, time.Duration) error // PeformReceive peforms a job for every value received from channel PerformReceive(*Job, interface{}, ...Option) error // Register a Handler Register(string, Handler) error // GetContext returns the worker context GetContext() context.Context // SetContext sets the worker context SetContext(context.Context) }