Documentation
¶
Index ¶
- Constants
- Variables
- func ParseMap(s string) map[string]string
- type ConsulAgent
- type ConsulCatalog
- type Daemon
- func (d *Daemon) ConsulNodeDoUntil(ctx context.Context, nodeName string, opts *consulApi.QueryOptions, ...) error
- func (d *Daemon) Deregister(ctx context.Context, in *katalogsync.DeregisterQuery) (*katalogsync.DeregisterResult, error)
- func (d *Daemon) Register(ctx context.Context, in *katalogsync.RegisterQuery) (*katalogsync.RegisterResult, error)
- func (d *Daemon) Run() error
- type DaemonConfig
- type Kubelet
- type KubeletClient
- type KubeletClientConfig
- type Pod
- func (p *Pod) ContainerExclusion() map[string]struct{}
- func (p *Pod) GetPort(n string) int
- func (p *Pod) GetServiceHealth(n string, defaultVal string) string
- func (p *Pod) GetServiceID(serviceName string) string
- func (p *Pod) GetServiceMeta(n string) map[string]string
- func (p *Pod) GetServiceNames() []string
- func (p *Pod) GetTags(n string) []string
- func (p *Pod) HandleReadinessGate() error
- func (p *Pod) HasChange(service *consulApi.AgentService) bool
- func (p *Pod) HasServiceName(n string) bool
- func (p *Pod) Ready() (bool, map[string]bool)
- func (p *Pod) UpdatePod(k8sPod corev1.Pod)
- func (p *Pod) WaitChanges() chan struct{}
- type SidecarState
- type SyncStatus
- type SyncStatuses
Constants ¶
const ReadinessGateType = "katalog-sync.wish.com/synced" // name of readiness gate
ReadinessGate
Variables ¶
var ( ConsulSyncSourceName = "external-sync-source" ConsulSyncSourceValue = "katalog-sync" ConsulK8sLinkName = "external-k8s-link" ConsulK8sNamespace = "external-k8s-namespace" ConsulK8sPod = "external-k8s-pod" )
var ( // Annotation names ConsulServiceNames = "katalog-sync.wish.com/service-names" // comma-separated list of service names ConsulServicePort = "katalog-sync.wish.com/service-port" // port to use for consul entry ConsulServicePortOverride = "katalog-sync.wish.com/service-port-" // port override to use for a specific service name ConsulServiceTags = "katalog-sync.wish.com/service-tags" // tags for the service ConsulServiceTagsOverride = "katalog-sync.wish.com/service-tags-" // tags override to use for a specific service name ConsulServiceMeta = "katalog-sync.wish.com/service-meta" // meta for the service ConsulServiceMetaOverride = "katalog-sync.wish.com/service-meta-" // meta override to use for a specific service name ConsulServiceHealth = "katalog-sync.wish.com/service-health" // health status for the service (passing/warning/critical) ConsulServiceHealthOverride = "katalog-sync.wish.com/service-health-" // health status override SidecarName = "katalog-sync.wish.com/sidecar" // Name of sidecar container, only to be set if it exists SyncInterval = "katalog-sync.wish.com/sync-interval" // How frequently we want to sync this service ConsulServiceCheckTTL = "katalog-sync.wish.com/service-check-ttl" // TTL for the service checks we put in consul ContainerExclusion = "katalog-sync.wish.com/container-exclude" // comma-separated list of containers to exclude from ready check )
Functions ¶
Types ¶
type ConsulAgent ¶
type ConsulAgent interface { UpdateTTL(checkID, output, status string) error Services() (map[string]*consulApi.AgentService, error) ServiceDeregister(serviceID string) error ServiceRegister(service *consulApi.AgentServiceRegistration) error }
ConsulAgent encapsulates the interface for interacting with the local agent and service API
type ConsulCatalog ¶ added in v0.0.2
type ConsulCatalog interface {
Services() (map[string]*consulApi.AgentService, error)
}
ConsulCatalog encapsulates the interface for interacting with the Catalog API
type Daemon ¶
type Daemon struct {
// contains filtered or unexported fields
}
Daemon is responsible for syncing state from k8s -> consul
func NewDaemon ¶
func NewDaemon(c DaemonConfig, k8sClient Kubelet, consulClient *consulApi.Client) *Daemon
NewDaemon is a helper function to return a new *Daemon
func (*Daemon) ConsulNodeDoUntil ¶ added in v0.0.2
func (d *Daemon) ConsulNodeDoUntil(ctx context.Context, nodeName string, opts *consulApi.QueryOptions, f consulNodeFunc) error
ConsulNodeDoUntil is a helper to wait until a change has propogated into the CatalogAPI
func (*Daemon) Deregister ¶
func (d *Daemon) Deregister(ctx context.Context, in *katalogsync.DeregisterQuery) (*katalogsync.DeregisterResult, error)
Deregister handles a sidecar request for deregistration. This will block until (2) the service has been removed from the agent services API (3) the entry has been removed from the catalog API (meaning it synced to the cluster)
func (*Daemon) Register ¶
func (d *Daemon) Register(ctx context.Context, in *katalogsync.RegisterQuery) (*katalogsync.RegisterResult, error)
Register handles a sidecar request for registration. This will block until (1) the pod excluding the sidecar container is ready (2) the service has been pushed to the agent services API (3) the entry shows up in the catalog API (meaning it synced to the cluster)
type DaemonConfig ¶
type DaemonConfig struct { MinSyncInterval time.Duration `long:"min-sync-interval" env:"MIN_SYNC_INTERVAL" description:"minimum duration allowed for sync" default:"500ms"` MaxSyncInterval time.Duration `long:"max-sync-interval" env:"MAX_SYNC_INTERVAL" description:"maximum duration allowed for sync" default:"5s"` DefaultSyncInterval time.Duration `long:"default-sync-interval" env:"DEFAULT_SYNC_INTERVAL" default:"1s"` DefaultCheckTTL time.Duration `long:"default-check-ttl" env:"DEFAULT_CHECK_TTL" default:"10s"` SyncTTLBuffer time.Duration `` /* 143-byte string literal not displayed */ }
DaemonConfig contains the configuration options for a katalog-sync-daemon
type KubeletClient ¶
type KubeletClient struct {
// contains filtered or unexported fields
}
KubeletClient is an HTTP client for kubelet that implements the Kubelet interface
func NewKubeletClient ¶
func NewKubeletClient(c KubeletClientConfig) (*KubeletClient, error)
NewKubeletClient returns a new KubeletClient based on the given config
func (*KubeletClient) GetPodList ¶
func (k *KubeletClient) GetPodList() (*k8sApi.PodList, error)
GetPodList returns the list of pods the kubelet is managing
type KubeletClientConfig ¶
type KubeletClientConfig struct { APIEndpoint string `long:"kubelet-api" env:"KUBELET_API" description:"kubelet API endpoint" default:"http://localhost:10255/pods"` InsecureSkipVerify bool `` /* 146-byte string literal not displayed */ }
KubeletClientConfig holds the config options for connecting to the kubelet API
type Pod ¶
type Pod struct { corev1.Pod *SidecarState // map servicename -> sync status SyncStatuses OutstandingReadinessGate bool // Do we have a ReadinessGate to set InitialSyncDone bool // Ready and in consul CheckTTL time.Duration SyncInterval time.Duration Ctx context.Context Cancel context.CancelFunc // contains filtered or unexported fields }
Pod is our representation of a pod in k8s
func NewPod ¶
func NewPod(pod corev1.Pod, dc *DaemonConfig) (*Pod, error)
NewPod returns a daemon pod based on a config and a k8s pod
func (*Pod) ContainerExclusion ¶ added in v0.0.9
ContainerExclusion returns the containers that should be excluded from a readiness check
func (*Pod) GetPort ¶
GetPort returns the port for a given service for this pod This first checks the service-specific port, and falls back to the service-level port
func (*Pod) GetServiceHealth ¶ added in v0.0.18
GetServiceHealth returns the service health specified in annotation, or defaultVal if not specified.
func (*Pod) GetServiceID ¶
GetServiceID returns an identifier that addresses this pod.
func (*Pod) GetServiceMeta ¶ added in v0.0.5
GetServiceMeta returns a map of metadata to be added to the ServiceMetadata
func (*Pod) GetServiceNames ¶
GetServiceNames returns the list of service names defined in the k8s annotations
func (*Pod) GetTags ¶
GetTags returns the tags for a given service for this pod This first checks the service-specific tags, and falls back to the service-level tags
func (*Pod) HandleReadinessGate ¶ added in v0.0.17
func (*Pod) HasChange ¶
func (p *Pod) HasChange(service *consulApi.AgentService) bool
HasChange will return whether a change has been made that needs a full resync if not then a simple TTL update will suffice
func (*Pod) HasServiceName ¶
HasServiceName returns whether a given name is one of the annotated service names for this pod
func (*Pod) WaitChanges ¶ added in v0.0.20
func (p *Pod) WaitChanges() chan struct{}
type SidecarState ¶
State from our sidecar service
type SyncStatus ¶
SyncStatus encapsulates the result of the last sync attempt
func (*SyncStatus) SetError ¶
func (s *SyncStatus) SetError(e error)
SetError sets the error and LastUpdated time for the status
type SyncStatuses ¶
type SyncStatuses map[string]*SyncStatus
SyncStatuses is a map of SyncStatus for each service defined in a pod (serviceName -> *SyncStatus)
func (SyncStatuses) GetError ¶
func (s SyncStatuses) GetError() error
GetError returns the first error found in the set of SyncStatuses
func (SyncStatuses) GetStatus ¶
func (s SyncStatuses) GetStatus(n string) *SyncStatus
GetStatus returns the SyncStatus for the given serviceName