Documentation ¶
Overview ¶
Package active provides code for managing processing of an entire directory of task files.
Package active provides code for managing processing of an entire directory of task files.
Index ¶
- Variables
- func MustStorageClient(ctx context.Context) stiface.Client
- type Context
- type FileLister
- type GCSSource
- type GardenerAPI
- func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job, ...) (*GCSSource, error)
- func (g *GardenerAPI) NextJob(ctx context.Context) (tracker.JobWithTarget, error)
- func (g *GardenerAPI) Poll(ctx context.Context, toRunnable func(o *storage.ObjectAttrs) Runnable, ...)
- func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job tracker.Job) (*errgroup.Group, error)
- func (g *GardenerAPI) Status(w http.ResponseWriter)
- type Runnable
- type RunnableSource
- type TokenSource
Constants ¶
This section is empty.
Variables ¶
var JobFailures = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "etl_job_failures", Help: "Job level failures.", }, []string{"prefix", "year", "type"}, )
JobFailures counts the all errors that result in test loss.
Provides metrics:
etl_job_failures{prefix, year, kind}
Example usage:
JobFailures.WithLabelValues("ndt/tcpinfo" "2019", "insert").Inc()
Functions ¶
Types ¶
type Context ¶
Context implements context.Context, but allows injection of an alternate Err().
type FileLister ¶
FileLister defines a function that returns a list of storage.ObjectAttrs.
func FileListerFunc ¶
FileListerFunc creates a function that returns a slice of *storage.ObjectAttrs. On certain GCS errors, it may return partial result and an error. TODO - consider moving this to GardenerAPI.
type GCSSource ¶
type GCSSource struct {
// contains filtered or unexported fields
}
GCSSource implements RunnableSource for a GCS bucket/prefix.
func NewGCSSource ¶
func NewGCSSource(ctx context.Context, label string, fl FileLister, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error)
NewGCSSource creates a new source for active processing.
func (*GCSSource) CancelStreaming ¶
func (src *GCSSource) CancelStreaming()
CancelStreaming terminates the streaming goroutine context.
type GardenerAPI ¶
type GardenerAPI struct {
// contains filtered or unexported fields
}
GardenerAPI encapsulates the backend paths and clients to connect to gardener and GCS.
func NewGardenerAPI ¶
func NewGardenerAPI(trackerBase url.URL, gcs stiface.Client) *GardenerAPI
NewGardenerAPI creates a GardenerAPI.
func (*GardenerAPI) JobFileSource ¶
func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error)
JobFileSource creates a gcsSource for the job.
func (*GardenerAPI) NextJob ¶
func (g *GardenerAPI) NextJob(ctx context.Context) (tracker.JobWithTarget, error)
NextJob requests a new job from Gardener service.
func (*GardenerAPI) Poll ¶
func (g *GardenerAPI) Poll(ctx context.Context, toRunnable func(o *storage.ObjectAttrs) Runnable, maxWorkers int, period time.Duration)
Poll requests work items from gardener, and processes them.
func (*GardenerAPI) RunAll ¶
func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job tracker.Job) (*errgroup.Group, error)
RunAll will execute functions provided by Next() until there are no more, or the context is canceled.
func (*GardenerAPI) Status ¶
func (g *GardenerAPI) Status(w http.ResponseWriter)
Status adds a small amount of status info to w.
type Runnable ¶
Runnable is just a function that does something and returns an error. A Runnable may return ErrShouldRetry if there was a non-persistent error. TODO - should this instead be and interface, with Run() and ShouldRetry()?
type RunnableSource ¶
type RunnableSource interface { // Next should return iterator.Done when there are no more Runnables. // It may block if there are no more runnables available right now, // (or if throttling is applied) Next(ctx context.Context) (Runnable, error) // Label returns a string for use in metrics and debug logs' Label() string }
RunnableSource provides a Next function that returns Runnables.
func Throttle ¶
func Throttle(src RunnableSource, tokens TokenSource) RunnableSource
Throttle applies a provided TokenSource to throttle a Source. This returns an interface, which is discouraged by Go advocates, but seems like the right thing to do here, as there is no reason to export the concrete type.
type TokenSource ¶
TokenSource specifies the interface for a source of tokens for throttling.
func NewWSTokenSource ¶
func NewWSTokenSource(n int) TokenSource
NewWSTokenSource returns a TokenSource based on semaphore.Weighted.