Documentation ¶
Index ¶
- Variables
- func CleanupLeftovers(ipt iptablesutil.Interface) (encounteredError bool)
- func GetLocalAddrSet() utilnet.IPSet
- func GetLocalAddrs() ([]net.IP, error)
- func ProxyTCP(in, out *net.TCPConn)
- func ShouldSkipService(service *localv1.Service) bool
- func ShuffleStrings(s []string) []string
- func ToCIDR(ip net.IP) string
- func TryConnectEndpoints(service iptables.ServicePortName, srcAddr net.Addr, protocol string, ...) (out net.Conn, err error)
- type Backend
- func (s *Backend) BindFlags(flags *pflag.FlagSet)
- func (s *Backend) DeleteEndpoint(namespace, serviceName, epKey string)
- func (s *Backend) DeleteService(namespace, name string)
- func (s *Backend) Reset()
- func (s *Backend) SetEndpoint(namespace, serviceName, epKey string, endpoint *localv1.Endpoint)
- func (s *Backend) SetService(svc *localv1.Service)
- func (s *Backend) Setup()
- func (s *Backend) Sink() localsink.Sink
- func (s *Backend) Sync()
- type BoundedFrequencyRunner
- type ClientCache
- type Interface
- type LoadBalancer
- type LoadBalancerRR
- func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort iptables.ServicePortName)
- func (lb *LoadBalancerRR) DeleteService(svcPort iptables.ServicePortName)
- func (lb *LoadBalancerRR) NewService(svcPort iptables.ServicePortName, affinityType *localv1.ClientIPAffinity, ...) error
- func (lb *LoadBalancerRR) NextEndpoint(svcPort iptables.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
- func (lb *LoadBalancerRR) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)
- func (lb *LoadBalancerRR) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)
- func (lb *LoadBalancerRR) OnEndpointsSynced()
- func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort iptables.ServicePortName) bool
- type PortAllocator
- type ProxySocket
- type ProxySocketFunc
- type ServiceInfo
- type ServicePortName
- type UserspaceLinux
- func (proxier *UserspaceLinux) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)
- func (proxier *UserspaceLinux) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)
- func (proxier *UserspaceLinux) OnEndpointsSynced()
- func (proxier *UserspaceLinux) OnEndpointsUpdate(oldEndpoints, endpoints *localv1.Endpoint)
- func (proxier *UserspaceLinux) OnServiceAdd(service *localv1.Service)
- func (proxier *UserspaceLinux) OnServiceDelete(service *localv1.Service)
- func (proxier *UserspaceLinux) OnServiceSynced()
- func (proxier *UserspaceLinux) OnServiceUpdate(oldService, service *localv1.Service)
- func (proxier *UserspaceLinux) Sync()
- func (proxier *UserspaceLinux) SyncLoop()
- type UserspaceServiceChangeTracker
Constants ¶
This section is empty.
Variables ¶
var ( ErrMissingServiceEntry = errors.New("missing service entry") ErrMissingEndpoints = errors.New("missing endpoints") )
var EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
How long we wait for a connection to a backend in seconds
var ( // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on // the loopback address. May be checked for by callers of NewProxier to know whether // the caller provided invalid input. ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost") )
Functions ¶
func CleanupLeftovers ¶
func CleanupLeftovers(ipt iptablesutil.Interface) (encounteredError bool)
CleanupLeftovers removes all iptables rules and chains created by the Proxier It returns true if an error was encountered. Errors are logged.
func GetLocalAddrSet ¶
GetLocalAddrSet return a local IPSet. If failed to get local addr, will assume no local ips.
func GetLocalAddrs ¶
GetLocalAddrs returns a list of all network addresses on the local system
func ShouldSkipService ¶
ShouldSkipService checks if a given service should skip proxying
func ShuffleStrings ¶
ShuffleStrings copies strings from the specified slice into a copy in random order. It returns a new slice.
func ToCIDR ¶
ToCIDR returns a host address of the form <ip-address>/32 for IPv4 and <ip-address>/128 for IPv6
func TryConnectEndpoints ¶
func TryConnectEndpoints(service iptables.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error)
TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling through until it is able to successfully connect, or it has tried with all timeouts in EndpointDialTimeouts.
Types ¶
type Backend ¶
func (*Backend) DeleteEndpoint ¶
func (*Backend) DeleteService ¶
func (*Backend) SetEndpoint ¶
name of the endpoint is the same as the service name
func (*Backend) SetService ¶
type BoundedFrequencyRunner ¶
type BoundedFrequencyRunner struct {
// contains filtered or unexported fields
}
BoundedFrequencyRunner manages runs of a user-provided function. See NewBoundedFrequencyRunner for examples.
func (*BoundedFrequencyRunner) Loop ¶
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{})
Loop handles the periodic timer and run requests. This is expected to be called as a goroutine.
func (*BoundedFrequencyRunner) RetryAfter ¶
func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration)
RetryAfter ensures that the function will run again after no later than interval. This can be called from inside a run of the BoundedFrequencyRunner's function, or asynchronously.
func (*BoundedFrequencyRunner) Run ¶
func (bfr *BoundedFrequencyRunner) Run()
Run the function as soon as possible. If this is called while Loop is not running, the call may be deferred indefinitely. If there is already a queued request to call the underlying function, it may be dropped - it is just guaranteed that we will try calling the underlying function as soon as possible starting from now.
type ClientCache ¶
Holds all the known UDP clients that have not timed out.
type Interface ¶
type Interface interface { // EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true. EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error) // FlushChain clears the specified chain. If the chain did not exist, return error. FlushChain(table iptables.Table, chain iptables.Chain) error // DeleteChain deletes the specified chain. If the chain did not exist, return error. DeleteChain(table iptables.Table, chain iptables.Chain) error // ChainExists tests whether the specified chain exists, returning an error if it // does not, or if it is unable to check. ChainExists(table iptables.Table, chain iptables.Chain) (bool, error) // EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true. EnsureRule(position iptables.RulePosition, table iptables.Table, chain iptables.Chain, args ...string) (bool, error) // DeleteRule checks if the specified rule is present and, if so, deletes it. DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error // IsIPv6 returns true if this is managing ipv6 tables. IsIPv6() bool // Protocol returns the IP family this instance is managing, Protocol() iptables.Protocol // SaveInto calls `iptables-save` for table and stores result in a given buffer. SaveInto(table iptables.Table, buffer *bytes.Buffer) error // Restore runs `iptables-restore` passing data through []byte. // table is the Table to restore // data should be formatted like the output of SaveInto() // flush sets the presence of the "--noflush" flag. see: FlushFlag // counters sets the "--counters" flag. see: RestoreCountersFlag Restore(table iptables.Table, data []byte, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush iptables.FlushFlag, counters iptables.RestoreCountersFlag) error // Monitor detects when the given iptables tables have been flushed by an external // tool (e.g. a firewall reload) by creating canary chains and polling to see if // they have been deleted. (Specifically, it polls tables[0] every interval until // the canary has been deleted from there, then waits a short additional time for // the canaries to be deleted from the remaining tables as well. You can optimize // the polling by listing a relatively empty table in tables[0]). When a flush is // detected, this calls the reloadFunc so the caller can reload their own iptables // rules. If it is unable to create the canary chains (either initially or after // a reload) it will log an error and stop monitoring. // (This function should be called from a goroutine.) Monitor(canary iptables.Chain, tables []iptables.Table, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) // HasRandomFully reveals whether `-j MASQUERADE` takes the // `--random-fully` option. This is helpful to work around a // Linux kernel bug that sometimes causes multiple flows to get // mapped to the same IP:PORT and consequently some suffer packet // drops. HasRandomFully() bool }
Interface is an injectable interface for running iptables commands. Implementations must be goroutine-safe.
type LoadBalancer ¶
type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given // service-port and source address. NextEndpoint(service iptables.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) NewService(service iptables.ServicePortName, affinityClientIP *localv1.ClientIPAffinity, stickyMaxAgeSeconds int) error DeleteService(service iptables.ServicePortName) CleanupStaleStickySessions(service iptables.ServicePortName) ServiceHasEndpoints(service iptables.ServicePortName) bool // For userspace because we dont have an EndpointChangeTracker which can auto lookup services behind the scenes, // we need to send this explicitly. OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service) OnEndpointsSynced() }
LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancerRR ¶
type LoadBalancerRR struct {
// contains filtered or unexported fields
}
LoadBalancerRR is a round-robin load balancer.
func NewLoadBalancerRR ¶
func NewLoadBalancerRR() *LoadBalancerRR
NewLoadBalancerRR returns a new LoadBalancerRR.
func (*LoadBalancerRR) CleanupStaleStickySessions ¶
func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort iptables.ServicePortName)
func (*LoadBalancerRR) DeleteService ¶
func (lb *LoadBalancerRR) DeleteService(svcPort iptables.ServicePortName)
func (*LoadBalancerRR) NewService ¶
func (lb *LoadBalancerRR) NewService(svcPort iptables.ServicePortName, affinityType *localv1.ClientIPAffinity, ttlSeconds int) error
func (*LoadBalancerRR) NextEndpoint ¶
func (lb *LoadBalancerRR) NextEndpoint(svcPort iptables.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NextEndpoint returns a service endpoint. The service endpoint is chosen using the round-robin algorithm.
func (*LoadBalancerRR) OnEndpointsAdd ¶
func (lb *LoadBalancerRR) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)
func (*LoadBalancerRR) OnEndpointsDelete ¶
func (lb *LoadBalancerRR) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)
func (*LoadBalancerRR) OnEndpointsSynced ¶
func (lb *LoadBalancerRR) OnEndpointsSynced()
func (*LoadBalancerRR) ServiceHasEndpoints ¶
func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort iptables.ServicePortName) bool
ServiceHasEndpoints checks whether a service entry has endpoints.
type PortAllocator ¶
type ProxySocket ¶
type ProxySocket interface { // Addr gets the net.Addr for a ProxySocket. Addr() net.Addr // Close stops the ProxySocket from accepting incoming connections. // Each implementation should comment on the impact of calling Close // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. ProxyLoop(service iptables.ServicePortName, info *ServiceInfo, loadBalancer LoadBalancer) // ListenPort returns the host port that the ProxySocket is listening on ListenPort() int }
Abstraction over TCP/UDP sockets which are proxied.
type ProxySocketFunc ¶
ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
type ServiceInfo ¶
type ServiceInfo struct { // Timeout is the read/write timeout (used for UDP connections) Timeout time.Duration // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service ActiveClients *ClientCache // contains filtered or unexported fields }
ServiceInfo contains information and state for a particular proxied service
func (*ServiceInfo) IsAlive ¶
func (info *ServiceInfo) IsAlive() bool
func (*ServiceInfo) IsFinished ¶
func (info *ServiceInfo) IsFinished() bool
func (*ServiceInfo) IsStarted ¶
func (info *ServiceInfo) IsStarted() bool
type ServicePortName ¶
type ServicePortName struct { types.NamespacedName Port string Protocol localv1.Protocol PortName string // FYI Jay added this, because we needed it for the BuildPortsToEndpointsMap function by KPNG }
ServicePortName carries a namespace + name + portname. This is the unique identifier for a load-balanced service.
type UserspaceLinux ¶
type UserspaceLinux struct {
// contains filtered or unexported fields
}
Proxier is a simple proxy for TCP connections between a localhost:lport and services that provide the actual implementations.
func NewCustomProxier ¶
func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptablesutil.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*UserspaceLinux, error)
NewCustomProxier functions similarly to NewProxier, returning a new Proxier for the given LoadBalancer and address. The new proxier is constructed using the ProxySocket constructor provided, however, instead of constructing the default ProxySockets.
func NewUserspaceLinux ¶
func NewUserspaceLinux(loadBalancer LoadBalancer, listenIP net.IP, iptables iptablesutil.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*UserspaceLinux, error)
func (*UserspaceLinux) OnEndpointsAdd ¶
func (proxier *UserspaceLinux) OnEndpointsAdd(ep *localv1.Endpoint, svc *localv1.Service)
OnEndpointsAdd is called whenever creation of new endpoints object is observed.
func (*UserspaceLinux) OnEndpointsDelete ¶
func (proxier *UserspaceLinux) OnEndpointsDelete(ep *localv1.Endpoint, svc *localv1.Service)
OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
func (*UserspaceLinux) OnEndpointsSynced ¶
func (proxier *UserspaceLinux) OnEndpointsSynced()
OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
func (*UserspaceLinux) OnEndpointsUpdate ¶
func (proxier *UserspaceLinux) OnEndpointsUpdate(oldEndpoints, endpoints *localv1.Endpoint)
OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
func (*UserspaceLinux) OnServiceAdd ¶
func (proxier *UserspaceLinux) OnServiceAdd(service *localv1.Service)
OnServiceAdd is called whenever creation of new service object is observed.
func (*UserspaceLinux) OnServiceDelete ¶
func (proxier *UserspaceLinux) OnServiceDelete(service *localv1.Service)
OnServiceDelete is called whenever deletion of an existing service object is observed.
func (*UserspaceLinux) OnServiceSynced ¶
func (proxier *UserspaceLinux) OnServiceSynced()
OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
func (*UserspaceLinux) OnServiceUpdate ¶
func (proxier *UserspaceLinux) OnServiceUpdate(oldService, service *localv1.Service)
OnServiceUpdate is called whenever modification of an existing service object is observed.
func (*UserspaceLinux) Sync ¶
func (proxier *UserspaceLinux) Sync()
Sync is called to synchronize the proxier state to iptables as soon as possible.
func (*UserspaceLinux) SyncLoop ¶
func (proxier *UserspaceLinux) SyncLoop()
SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
type UserspaceServiceChangeTracker ¶
type UserspaceServiceChangeTracker struct {
// contains filtered or unexported fields
}
func (*UserspaceServiceChangeTracker) Delete ¶
func (sct *UserspaceServiceChangeTracker) Delete(namespace, name string) bool
func (*UserspaceServiceChangeTracker) Update ¶
func (sct *UserspaceServiceChangeTracker) Update(current *localv1.Service) bool
Update updates given service's change map based on the <previous, current> service pair. It returns true if items changed, otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, Add item
- pass <nil, service> as the <previous, current> pair.
Update item
- pass <oldService, service> as the <previous, current> pair.
Delete item
- pass <service, nil> as the <previous, current> pair.