aggregator

package
v0.0.0-...-c0ae054 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: Apache-2.0 Imports: 34 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewController = component.Declare(
	func(ControllerArgs) string { return "aggregator" },
	func(args ControllerArgs, fs *flag.FlagSet) ControllerOptions {
		return ControllerOptions{
			cellId: fs.String(
				"cell-id",
				"default",
				"the cell this aggregator instance is deployed for",
			),
			pprLabelSelector: utilflag.LabelSelectorEverything(
				fs,
				"podprotector-label-selector",
				"skip handling PodProtectors that do not match this label selector; "+
					"this currently does not affect the list-watch and is only used for fault injection",
			),
			podLabelSelector: utilflag.LabelSelectorEverything(
				fs,
				"pod-label-selector",
				"only watch pods matching this label selector, used to reduce memory usage by excluding irrelevant pods",
			),
			informerSyncTimeAlgo: utilflag.EnumFromMap(args.InformerSyncTimeAlgos).
				Default(args.DefaultInformerSyncTimeAlgo).
				Flag(
					fs,
					"informer-synctime-algorithm",
					"the algorithm to infer reflector update time from pod events",
				),
			podRelistPeriod: fs.Duration(
				"pod-relist-period",
				0,
				"pod informer relist frequency, enable if pod updates are infrequent",
			),
		}
	},
	func(args ControllerArgs, requests *component.DepRequests) ControllerDeps {
		return ControllerDeps{
			coreClient: component.DepPtr(
				requests,
				kube.NewClient(kube.ClientArgs{ClusterName: constants.CoreClusterName}),
			),
			workerClient: component.DepPtr(
				requests,
				kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}),
			),
			pprInformer: component.DepPtr(
				requests,
				pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{
					ClusterName:   constants.CoreClusterName,
					InformerPhase: "leader",
					Elector:       optional.Some(constants.ElectorArgs),
				}),
			),
			observer: o11y.Request[observer.Observer](requests),
			elector:  component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)),
			worker: component.DepPtr(requests, worker.New[types.NamespacedName](
				"aggregator",
				args.Clock,
			)),
			defaultConfig: component.DepPtr(requests, defaultconfig.New(util.Empty{})),
		}
	},
	initController,
	component.Lifecycle[ControllerArgs, ControllerOptions, ControllerDeps, ControllerState]{
		Start: func(ctx context.Context, _ *ControllerArgs, _ *ControllerOptions, deps *ControllerDeps, state *ControllerState) error {
			go func() {
				ctx, err := deps.elector.Get().Await(ctx)
				if err != nil {
					return
				}

				go state.runPodInformer(ctx.Done())

				go deps.observer.Get().NextEventPoolCurrentSize(
					ctx, util.Empty{},
					state.caches.notifyOnInformerEvent.ItemCount,
				)
				go deps.observer.Get().NextEventPoolCurrentLatency(
					ctx, util.Empty{},
					state.caches.notifyOnInformerEvent.TimeSinceLastDrain,
				)
			}()

			return nil
		},
		Join:         nil,
		HealthChecks: nil,
	},
	func(*component.Data[ControllerArgs, ControllerOptions, ControllerDeps, ControllerState]) util.Empty {
		return util.Empty{}
	},
)

Functions

func DefaultArg

func DefaultArg() component.Declared[util.Empty]

Types

type Caches

type Caches struct {
	// contains filtered or unexported fields
}

type ControllerArgs

type ControllerArgs struct {
	InformerSyncTimeAlgos       map[string]synctime.PodInterpreter
	DefaultInformerSyncTimeAlgo string
	Clock                       clock.WithTicker
}

type ControllerDeps

type ControllerDeps struct {
	// contains filtered or unexported fields
}

type ControllerOptions

type ControllerOptions struct {
	// contains filtered or unexported fields
}

type ControllerState

type ControllerState struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL