Documentation ¶
Overview ¶
Package worker provides functionality for running a worker service. Its primary operation is to fetch modules from a proxy and write them to the database.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // FetchLatencyDistribution aggregates frontend fetch request // latency by status code. It does not count shedded requests. FetchLatencyDistribution = &view.View{ Name: "go-discovery/worker/fetch-latency", Measure: fetchLatency, Aggregation: ochttp.DefaultLatencyDistribution, Description: "Fetch latency by result status.", TagKeys: []tag.Key{dcensus.KeyStatus}, } // FetchResponseCount counts fetch responses by status. FetchResponseCount = &view.View{ Name: "go-discovery/worker/fetch-count", Measure: fetchLatency, Aggregation: view.Count(), Description: "Fetch request count by result status", TagKeys: []tag.Key{dcensus.KeyStatus}, } // FetchPackageCount counts how many packages were successfully fetched. FetchPackageCount = &view.View{ Name: "go-discovery/worker/fetch-package-count", Measure: fetchedPackages, Aggregation: view.Count(), Description: "Count of packages successfully fetched", } // SheddedFetchCount counts the number of fetches that were shedded. SheddedFetchCount = &view.View{ Name: "go-discovery/worker/fetch-shedded", Measure: fetchesShedded, Aggregation: view.Count(), Description: "Count of shedded fetches", } )
var ( // EnqueueResponseCount counts worker enqueue responses by response type. EnqueueResponseCount = &view.View{ Name: "go-discovery/worker-enqueue/count", Measure: enqueueStatus, Aggregation: view.Count(), Description: "Worker enqueue request count", TagKeys: []tag.Key{keyEnqueueStatus}, } ProcessingLag = &view.View{ Name: "go-discovery/worker_processing_lag", Measure: processingLag, Aggregation: view.LastValue(), Description: "worker processing lag", } UnprocessedModules = &view.View{ Name: "go-discovery/unprocessed_modules/count", Measure: unprocessedModules, Aggregation: view.LastValue(), Description: "number of unprocessed modules", } UnprocessedNewModules = &view.View{ Name: "go-discovery/unprocessed_new_modules/count", Measure: unprocessedNewModules, Aggregation: view.LastValue(), Description: "number of unprocessed new modules", } DBProcesses = &view.View{ Name: "go-discovery/db_processes/count", Measure: dbProcesses, Aggregation: view.LastValue(), Description: "number of active DB worker processes", } DBWaitingProcesses = &view.View{ Name: "go-discovery/db_waiting_processes/count", Measure: dbWaitingProcesses, Aggregation: view.LastValue(), Description: "number of waiting DB worker processes", } )
Functions ¶
Types ¶
type FetchInfo ¶
type FetchInfo struct { ModulePath string Version string ZipSize uint64 Start time.Time Finish time.Time Status int Error error }
FetchInfo describes a fetch in progress, or completed. It is used to display information on the worker home page.
func FetchInfos ¶
func FetchInfos() []*FetchInfo
FetchInfos returns information about all fetches in progress, sorted by start time.
type Fetcher ¶
type Fetcher struct { ProxyClient *proxy.Client SourceClient *source.Client DB *postgres.DB Cache *cache.Cache Source string // contains filtered or unexported fields }
A Fetcher holds state for fetching modules.
func (*Fetcher) FetchAndUpdateLatest ¶
func (f *Fetcher) FetchAndUpdateLatest(ctx context.Context, modulePath string) (_ *internal.LatestModuleVersions, err error)
FetchAndUpdateLatest fetches information about the latest versions from the proxy, and updates the database if the version has changed. It returns the most recent good information, which may be what it just fetched or may be what is already in the DB. It does not update the latest good version; that happens inside InsertModule, because it must be protected by the module-path advisory lock.
func (*Fetcher) FetchAndUpdateState ¶
func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (status int, resolvedVersion string, err error)
FetchAndUpdateState fetches and processes a module version, and then updates the module_version_states table according to the result. It returns an HTTP status code representing the result of the fetch operation, and a non-nil error if this status code is not 200.
type LoadShedStats ¶
type LoadShedStats struct { SizeInFlight uint64 MaxSizeInFlight uint64 RequestsInFlight int RequestsShed int RequestsTotal int }
LoadShedStats holds statistics about load shedding.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server can be installed to serve the go discovery worker.
func NewServer ¶
func NewServer(cfg *config.Config, scfg ServerConfig) (_ *Server, err error)
NewServer creates a new Server with the given dependencies.
func (*Server) ZipLoadShedStats ¶
func (s *Server) ZipLoadShedStats() LoadShedStats
ZipLoadShedStats returns a snapshot of the current LoadShedStats for zip files.
type ServerConfig ¶
type ServerConfig struct { DB *postgres.DB IndexClient *index.Client ProxyClient *proxy.Client SourceClient *source.Client RedisCacheClient *redis.Client RedisBetaCacheClient *redis.Client Queue queue.Queue Reporter derrors.Reporter StaticPath template.TrustedSource GetExperiments func() []*internal.Experiment }
ServerConfig contains everything needed by a Server.