aggregator

package
v0.0.0-...-2e24269 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2025 License: Apache-2.0 Imports: 33 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewController = component.Declare(
	func(ControllerArgs) string { return "aggregator" },
	func(_ ControllerArgs, fs *flag.FlagSet) ControllerOptions {
		return ControllerOptions{
			cellId: fs.String(
				"cell-id",
				"default",
				"the cell this aggregator instance is deployed for",
			),
			pprLabelSelector: utilflag.LabelSelectorEverything(
				fs,
				"podprotector-label-selector",
				"skip handling PodProtectors that do not match this label selector; "+
					"this currently does not affect the list-watch and is only used for fault injection",
			),
			podLabelSelector: utilflag.LabelSelectorEverything(
				fs,
				"pod-label-selector",
				"only watch pods matching this label selector, used to reduce memory usage by excluding irrelevant pods",
			),
			podRelistPeriod: fs.Duration(
				"pod-relist-period",
				0,
				"pod informer relist frequency, enable if pod updates are infrequent",
			),
		}
	},
	func(args ControllerArgs, requests *component.DepRequests) ControllerDeps {
		return ControllerDeps{
			sourceProvider: component.DepPtr(requests, pprutil.RequestSourceProvider()),
			syncTimeAlgo:   component.DepPtr(requests, synctime.RequestPodInterpreter()),
			workerClient: component.DepPtr(
				requests,
				kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}),
			),
			pprInformer: component.DepPtr(requests, pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{
				Suffix:  "",
				Elector: optional.Some(constants.ElectorArgs),
			})),
			observer: o11y.Request[observer.Observer](requests),
			elector:  component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)),
			worker: component.DepPtr(requests, worker.New[pprutil.PodProtectorKey](
				"aggregator",
				args.Clock,
			)),
			defaultConfig: component.DepPtr(requests, defaultconfig.New(util.Empty{})),
		}
	},
	initController,
	component.Lifecycle[ControllerArgs, ControllerOptions, ControllerDeps, ControllerState]{
		Start: func(ctx context.Context, _ *ControllerArgs, _ *ControllerOptions, deps *ControllerDeps, state *ControllerState) error {
			go func() {
				ctx, err := deps.elector.Get().Await(ctx)
				if err != nil {
					return
				}

				go state.runPodInformer(ctx.Done())

				go deps.observer.Get().NextEventPoolCurrentSize(
					ctx, util.Empty{},
					state.caches.notifyOnInformerEvent.ItemCount,
				)
				go deps.observer.Get().NextEventPoolCurrentLatency(
					ctx, util.Empty{},
					state.caches.notifyOnInformerEvent.TimeSinceLastDrain,
				)
			}()

			return nil
		},
		Join:         nil,
		HealthChecks: nil,
	},
	func(*component.Data[ControllerArgs, ControllerOptions, ControllerDeps, ControllerState]) util.Empty {
		return util.Empty{}
	},
)

Functions

This section is empty.

Types

type Caches

type Caches struct {
	// contains filtered or unexported fields
}

type ControllerArgs

type ControllerArgs struct {
	Clock clock.WithTicker
}

type ControllerDeps

type ControllerDeps struct {
	// contains filtered or unexported fields
}

type ControllerOptions

type ControllerOptions struct {
	// contains filtered or unexported fields
}

type ControllerState

type ControllerState struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL