worker

package
v0.0.0-...-86cb477 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2024 License: BSD-3-Clause Imports: 48 Imported by: 0

Documentation

Overview

Package worker provides functionality for running a worker service. Its primary operation is to fetch modules from a proxy and write them to the database.

Index

Constants

This section is empty.

Variables

View Source
var (

	// FetchLatencyDistribution aggregates frontend fetch request
	// latency by status code. It does not count shedded requests.
	FetchLatencyDistribution = &view.View{
		Name:        "go-discovery/worker/fetch-latency",
		Measure:     fetchLatency,
		Aggregation: ochttp.DefaultLatencyDistribution,
		Description: "Fetch latency by result status.",
		TagKeys:     []tag.Key{dcensus.KeyStatus},
	}
	// FetchResponseCount counts fetch responses by status.
	FetchResponseCount = &view.View{
		Name:        "go-discovery/worker/fetch-count",
		Measure:     fetchLatency,
		Aggregation: view.Count(),
		Description: "Fetch request count by result status",
		TagKeys:     []tag.Key{dcensus.KeyStatus},
	}
	// FetchPackageCount counts how many packages were successfully fetched.
	FetchPackageCount = &view.View{
		Name:        "go-discovery/worker/fetch-package-count",
		Measure:     fetchedPackages,
		Aggregation: view.Count(),
		Description: "Count of packages successfully fetched",
	}

	// SheddedFetchCount counts the number of fetches that were shedded.
	SheddedFetchCount = &view.View{
		Name:        "go-discovery/worker/fetch-shedded",
		Measure:     fetchesShedded,
		Aggregation: view.Count(),
		Description: "Count of shedded fetches",
	}
)
View Source
var (

	// EnqueueResponseCount counts worker enqueue responses by response type.
	EnqueueResponseCount = &view.View{
		Name:        "go-discovery/worker-enqueue/count",
		Measure:     enqueueStatus,
		Aggregation: view.Count(),
		Description: "Worker enqueue request count",
		TagKeys:     []tag.Key{keyEnqueueStatus},
	}

	ProcessingLag = &view.View{
		Name:        "go-discovery/worker_processing_lag",
		Measure:     processingLag,
		Aggregation: view.LastValue(),
		Description: "worker processing lag",
	}

	UnprocessedModules = &view.View{
		Name:        "go-discovery/unprocessed_modules/count",
		Measure:     unprocessedModules,
		Aggregation: view.LastValue(),
		Description: "number of unprocessed modules",
	}

	UnprocessedNewModules = &view.View{
		Name:        "go-discovery/unprocessed_new_modules/count",
		Measure:     unprocessedNewModules,
		Aggregation: view.LastValue(),
		Description: "number of unprocessed new modules",
	}

	DBProcesses = &view.View{
		Name:        "go-discovery/db_processes/count",
		Measure:     dbProcesses,
		Aggregation: view.LastValue(),
		Description: "number of active DB worker processes",
	}

	DBWaitingProcesses = &view.View{
		Name:        "go-discovery/db_waiting_processes/count",
		Measure:     dbWaitingProcesses,
		Aggregation: view.LastValue(),
		Description: "number of waiting DB worker processes",
	}
)

Functions

func PopulateExcluded

func PopulateExcluded(ctx context.Context, cfg *config.Config, db *postgres.DB) error

PopulateExcluded adds each element of excludedPrefixes to the excluded_prefixes table if it isn't already present.

Types

type FetchInfo

type FetchInfo struct {
	ModulePath string
	Version    string
	ZipSize    uint64
	Start      time.Time
	Finish     time.Time
	Status     int
	Error      error
}

FetchInfo describes a fetch in progress, or completed. It is used to display information on the worker home page.

func FetchInfos

func FetchInfos() []*FetchInfo

FetchInfos returns information about all fetches in progress, sorted by start time.

type Fetcher

type Fetcher struct {
	ProxyClient  *proxy.Client
	SourceClient *source.Client
	DB           *postgres.DB
	Cache        *cache.Cache

	Source string
	// contains filtered or unexported fields
}

A Fetcher holds state for fetching modules.

func (*Fetcher) FetchAndUpdateLatest

func (f *Fetcher) FetchAndUpdateLatest(ctx context.Context, modulePath string) (_ *internal.LatestModuleVersions, err error)

FetchAndUpdateLatest fetches information about the latest versions from the proxy, and updates the database if the version has changed. It returns the most recent good information, which may be what it just fetched or may be what is already in the DB. It does not update the latest good version; that happens inside InsertModule, because it must be protected by the module-path advisory lock.

func (*Fetcher) FetchAndUpdateState

func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (status int, resolvedVersion string, err error)

FetchAndUpdateState fetches and processes a module version, and then updates the module_version_states table according to the result. It returns an HTTP status code representing the result of the fetch operation, and a non-nil error if this status code is not 200.

type LoadShedStats

type LoadShedStats struct {
	SizeInFlight     uint64
	MaxSizeInFlight  uint64
	RequestsInFlight int
	RequestsShed     int
	RequestsTotal    int
}

LoadShedStats holds statistics about load shedding.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server can be installed to serve the go discovery worker.

func NewServer

func NewServer(cfg *config.Config, scfg ServerConfig) (_ *Server, err error)

NewServer creates a new Server with the given dependencies.

func (*Server) Install

func (s *Server) Install(handle func(string, http.Handler))

Install registers server routes using the given handler registration func.

func (*Server) ZipLoadShedStats

func (s *Server) ZipLoadShedStats() LoadShedStats

ZipLoadShedStats returns a snapshot of the current LoadShedStats for zip files.

type ServerConfig

type ServerConfig struct {
	DB                   *postgres.DB
	IndexClient          *index.Client
	ProxyClient          *proxy.Client
	SourceClient         *source.Client
	RedisCacheClient     *redis.Client
	RedisBetaCacheClient *redis.Client
	Queue                queue.Queue
	Reporter             derrors.Reporter
	StaticPath           template.TrustedSource
	GetExperiments       func() []*internal.Experiment
}

ServerConfig contains everything needed by a Server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL