Documentation ¶
Overview ¶
The calc package implements a calculation graph for Felix's dynamic state. The graph filters and transforms updates from the backend Syncer into a stream of host-specific updates to policies, profiles, endpoints and IP sets.
The graph is available either with a synchronous callback API or as a channel-based async API. The async version of the API is recommended because it includes and EventBuffer to efficiently batch IP set updates. In addition, it converts the callbacks into structs from the felix/proto package, which are ready to be marshaled directly to the felix front-end.
// Using the async API. asyncCalcGraph := calc.NewAsyncCalcGraph("hostname", outputChannel, nil) syncer := fc.datastore.Syncer(asyncCalcGraph) syncer.Start() asyncCalcGraph.Start() for event := range outputChannel { switch event := event.(type) { case *proto.XYZ: ... ... }
The best explanation of the wiring of the calculation graph nodes is in the code comments inside NewCalculationGraph.
Copyright (c) 2016-2017 Tigera, Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, preDNATTiers []*proto.TierInfo, ...) *proto.HostEndpoint
- func ModelWorkloadEndpointToProto(ep *model.WorkloadEndpoint, tiers []*proto.TierInfo) *proto.WorkloadEndpoint
- func NewCalculationGraph(callbacks PipelineCallbacks, hostname string) (allUpdDispatcher *dispatcher.Dispatcher)
- func NewTierInfo(name string) *tierInfo
- type ActiveRulesCalculator
- type AsyncCalcGraph
- type ConfigBatcher
- type DataplanePassthru
- type DatastoreNotReady
- type EndpointKeyToProfileIDMap
- type EventHandler
- type EventSequencer
- func (buf *EventSequencer) Flush()
- func (buf *EventSequencer) OnConfigUpdate(globalConfig, hostConfig map[string]string)
- func (buf *EventSequencer) OnDatastoreNotReady()
- func (buf *EventSequencer) OnEndpointTierUpdate(key model.Key, endpoint interface{}, filteredTiers []tierInfo)
- func (buf *EventSequencer) OnHostIPRemove(hostname string)
- func (buf *EventSequencer) OnHostIPUpdate(hostname string, ip *net.IP)
- func (buf *EventSequencer) OnIPPoolRemove(key model.IPPoolKey)
- func (buf *EventSequencer) OnIPPoolUpdate(key model.IPPoolKey, pool *model.IPPool)
- func (buf *EventSequencer) OnIPSetAdded(setID string, ipSetType proto.IPSetUpdate_IPSetType)
- func (buf *EventSequencer) OnIPSetMemberAdded(setID string, member labelindex.IPSetMember)
- func (buf *EventSequencer) OnIPSetMemberRemoved(setID string, member labelindex.IPSetMember)
- func (buf *EventSequencer) OnIPSetRemoved(setID string)
- func (buf *EventSequencer) OnPolicyActive(key model.PolicyKey, rules *ParsedRules)
- func (buf *EventSequencer) OnPolicyInactive(key model.PolicyKey)
- func (buf *EventSequencer) OnProfileActive(key model.ProfileRulesKey, rules *ParsedRules)
- func (buf *EventSequencer) OnProfileInactive(key model.ProfileRulesKey)
- type FelixSender
- type IPSetData
- type ParsedRule
- type ParsedRules
- type PipelineCallbacks
- type PolKV
- type PolicyByOrder
- type PolicyMatchListener
- type PolicyResolver
- func (pr *PolicyResolver) OnDatamodelStatus(status api.SyncStatus)
- func (pr *PolicyResolver) OnPolicyMatch(policyKey model.PolicyKey, endpointKey interface{})
- func (pr *PolicyResolver) OnPolicyMatchStopped(policyKey model.PolicyKey, endpointKey interface{})
- func (pr *PolicyResolver) OnUpdate(update api.Update) (filterOut bool)
- func (pr *PolicyResolver) RegisterWith(allUpdDispatcher, localEndpointDispatcher *dispatcher.Dispatcher)
- type PolicyResolverCallbacks
- type PolicySorter
- type RuleScanner
- func (rs *RuleScanner) OnPolicyActive(key model.PolicyKey, policy *model.Policy)
- func (rs *RuleScanner) OnPolicyInactive(key model.PolicyKey)
- func (rs *RuleScanner) OnProfileActive(key model.ProfileRulesKey, profile *model.ProfileRules)
- func (rs *RuleScanner) OnProfileInactive(key model.ProfileRulesKey)
- type StatsCollector
- type StatsUpdate
- type SyncerCallbacksDecoupler
- type ValidationFilter
Constants ¶
const ( // Compromise: shorter is better for occupancy and readability. Longer is better for // collision-resistance. 16 chars gives us 96 bits of entropy, which is fairly collision // resistant. RuleIDLength = 16 )
Variables ¶
var AllSelector selector.Selector
AllSelector is a pre-calculated copy of the "all()" selector.
var ( DummyDropRules = model.ProfileRules{ InboundRules: []model.Rule{{Action: "deny"}}, OutboundRules: []model.Rule{{Action: "deny"}}, } )
Functions ¶
func ModelHostEndpointToProto ¶
func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, preDNATTiers []*proto.TierInfo, forwardTiers []*proto.TierInfo) *proto.HostEndpoint
func ModelWorkloadEndpointToProto ¶
func ModelWorkloadEndpointToProto(ep *model.WorkloadEndpoint, tiers []*proto.TierInfo) *proto.WorkloadEndpoint
func NewCalculationGraph ¶
func NewCalculationGraph(callbacks PipelineCallbacks, hostname string) (allUpdDispatcher *dispatcher.Dispatcher)
func NewTierInfo ¶
func NewTierInfo(name string) *tierInfo
Types ¶
type ActiveRulesCalculator ¶
type ActiveRulesCalculator struct { // Callback objects. RuleScanner ruleScanner PolicyMatchListener PolicyMatchListener // contains filtered or unexported fields }
ActiveRulesCalculator calculates the set of policies and profiles (i.e. the rules) that are active for the particular endpoints that it's been told about. It emits events when the set of active rules changes.
For example, if the ActiveRulesCalculator is fed *all* the policies/profiles along with the endpoints that are on the local host then its output (via the callback objects) will indicate exactly which policies/profiles are active on the local host.
When looking at policies, the ActiveRules calculator is only interested in the selector attached to the policy itself (which determines the set of endpoints that it applies to). The rules in a policy may also contain selectors; those are are ignored here; they are mapped to IP sets by the RuleScanner.
func NewActiveRulesCalculator ¶
func NewActiveRulesCalculator() *ActiveRulesCalculator
func (*ActiveRulesCalculator) OnStatusUpdate ¶
func (arc *ActiveRulesCalculator) OnStatusUpdate(status api.SyncStatus)
func (*ActiveRulesCalculator) OnUpdate ¶
func (arc *ActiveRulesCalculator) OnUpdate(update api.Update) (_ bool)
func (*ActiveRulesCalculator) RegisterWith ¶
func (arc *ActiveRulesCalculator) RegisterWith(localEndpointDispatcher, allUpdDispatcher *dispatcher.Dispatcher)
type AsyncCalcGraph ¶
type AsyncCalcGraph struct { Dispatcher *dispatcher.Dispatcher // contains filtered or unexported fields }
func NewAsyncCalcGraph ¶
func NewAsyncCalcGraph(conf *config.Config, outputEvents chan<- interface{}, healthAggregator *health.HealthAggregator) *AsyncCalcGraph
func (*AsyncCalcGraph) OnStatusUpdated ¶
func (acg *AsyncCalcGraph) OnStatusUpdated(status api.SyncStatus)
func (*AsyncCalcGraph) OnUpdates ¶
func (acg *AsyncCalcGraph) OnUpdates(updates []api.Update)
func (*AsyncCalcGraph) Start ¶
func (acg *AsyncCalcGraph) Start()
type ConfigBatcher ¶
type ConfigBatcher struct {
// contains filtered or unexported fields
}
func NewConfigBatcher ¶
func NewConfigBatcher(hostname string, callbacks configCallbacks) *ConfigBatcher
func (*ConfigBatcher) OnDatamodelStatus ¶
func (cb *ConfigBatcher) OnDatamodelStatus(status api.SyncStatus)
func (*ConfigBatcher) OnUpdate ¶
func (cb *ConfigBatcher) OnUpdate(update api.Update) (filterOut bool)
func (*ConfigBatcher) RegisterWith ¶
func (cb *ConfigBatcher) RegisterWith(allUpdDispatcher *dispatcher.Dispatcher)
type DataplanePassthru ¶
type DataplanePassthru struct {
// contains filtered or unexported fields
}
DataplanePassthru passes through some datamodel updates to the dataplane layer, removing some duplicates along the way. It maps OnUpdate() calls to dedicated method calls for consistency with the rest of the dataplane API.
func NewDataplanePassthru ¶
func NewDataplanePassthru(callbacks passthruCallbacks) *DataplanePassthru
func (*DataplanePassthru) OnUpdate ¶
func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool)
func (*DataplanePassthru) RegisterWith ¶
func (h *DataplanePassthru) RegisterWith(dispatcher *dispatcher.Dispatcher)
type DatastoreNotReady ¶
type DatastoreNotReady struct{}
type EndpointKeyToProfileIDMap ¶
type EndpointKeyToProfileIDMap struct {
// contains filtered or unexported fields
}
EndpointKeyToProfileIDMap is a specialised map that calculates the deltas to the profile IDs when making an update.
func NewEndpointKeyToProfileIDMap ¶
func NewEndpointKeyToProfileIDMap() *EndpointKeyToProfileIDMap
type EventHandler ¶
type EventHandler func(message interface{})
type EventSequencer ¶
type EventSequencer struct { Callback EventHandler // contains filtered or unexported fields }
EventSequencer buffers and coalesces updates from the calculation graph then flushes them when Flush() is called. It flushed updates in a dependency-safe order.
func NewEventBuffer ¶
func NewEventBuffer(conf configInterface) *EventSequencer
func (*EventSequencer) Flush ¶
func (buf *EventSequencer) Flush()
func (*EventSequencer) OnConfigUpdate ¶
func (buf *EventSequencer) OnConfigUpdate(globalConfig, hostConfig map[string]string)
func (*EventSequencer) OnDatastoreNotReady ¶
func (buf *EventSequencer) OnDatastoreNotReady()
func (*EventSequencer) OnEndpointTierUpdate ¶
func (buf *EventSequencer) OnEndpointTierUpdate(key model.Key, endpoint interface{}, filteredTiers []tierInfo, )
func (*EventSequencer) OnHostIPRemove ¶
func (buf *EventSequencer) OnHostIPRemove(hostname string)
func (*EventSequencer) OnHostIPUpdate ¶
func (buf *EventSequencer) OnHostIPUpdate(hostname string, ip *net.IP)
func (*EventSequencer) OnIPPoolRemove ¶
func (buf *EventSequencer) OnIPPoolRemove(key model.IPPoolKey)
func (*EventSequencer) OnIPPoolUpdate ¶
func (buf *EventSequencer) OnIPPoolUpdate(key model.IPPoolKey, pool *model.IPPool)
func (*EventSequencer) OnIPSetAdded ¶
func (buf *EventSequencer) OnIPSetAdded(setID string, ipSetType proto.IPSetUpdate_IPSetType)
func (*EventSequencer) OnIPSetMemberAdded ¶
func (buf *EventSequencer) OnIPSetMemberAdded(setID string, member labelindex.IPSetMember)
func (*EventSequencer) OnIPSetMemberRemoved ¶
func (buf *EventSequencer) OnIPSetMemberRemoved(setID string, member labelindex.IPSetMember)
func (*EventSequencer) OnIPSetRemoved ¶
func (buf *EventSequencer) OnIPSetRemoved(setID string)
func (*EventSequencer) OnPolicyActive ¶
func (buf *EventSequencer) OnPolicyActive(key model.PolicyKey, rules *ParsedRules)
func (*EventSequencer) OnPolicyInactive ¶
func (buf *EventSequencer) OnPolicyInactive(key model.PolicyKey)
func (*EventSequencer) OnProfileActive ¶
func (buf *EventSequencer) OnProfileActive(key model.ProfileRulesKey, rules *ParsedRules)
func (*EventSequencer) OnProfileInactive ¶
func (buf *EventSequencer) OnProfileInactive(key model.ProfileRulesKey)
type FelixSender ¶
type IPSetData ¶
type IPSetData struct { // The selector and named port that this IP set represents. To represent an unfiltered named // port, set selector to AllSelector. If NamedPortProtocol == ProtocolNone then // this IP set represents a selector only, with no named port component. Selector selector.Selector // NamedPortProtocol identifies the protocol (TCP or UDP) for a named port IP set. It is // set to ProtocolNone for a selector-only IP set. NamedPortProtocol labelindex.IPSetPortProtocol // NamedPort contains the name of the named port represented by this IP set or "" for a // selector-only IP set NamedPort string // contains filtered or unexported fields }
func (*IPSetData) DataplaneProtocolType ¶
func (d *IPSetData) DataplaneProtocolType() proto.IPSetUpdate_IPSetType
DataplaneProtocolType returns the dataplane driver protocol type of this IP set. One of the proto.IPSetUpdate_IPSetType constants.
type ParsedRule ¶
type ParsedRule struct { Action string IPVersion *int Protocol *numorstring.Protocol SrcNets []*net.IPNet SrcPorts []numorstring.Port SrcNamedPortIPSetIDs []string DstNets []*net.IPNet DstPorts []numorstring.Port DstNamedPortIPSetIDs []string ICMPType *int ICMPCode *int SrcIPSetIDs []string DstIPSetIDs []string NotProtocol *numorstring.Protocol NotSrcNets []*net.IPNet NotSrcPorts []numorstring.Port NotSrcNamedPortIPSetIDs []string NotDstNets []*net.IPNet NotDstPorts []numorstring.Port NotDstNamedPortIPSetIDs []string NotICMPType *int NotICMPCode *int NotSrcIPSetIDs []string NotDstIPSetIDs []string }
ParsedRule is like a backend.model.Rule, except the tag and selector matches and named ports are replaced with pre-calculated ipset IDs.
type ParsedRules ¶
type ParsedRules struct { InboundRules []*ParsedRule OutboundRules []*ParsedRule // Untracked is true if these rules should not be "conntracked". Untracked bool // PreDNAT is true if these rules should be applied before any DNAT. PreDNAT bool }
ParsedRules holds our intermediate representation of either a policy's rules or a profile's rules. As part of its processing, the RuleScanner converts backend rules into ParsedRules. Where backend rules contain selectors, tags and named ports, ParsedRules only contain IPSet IDs. The RuleScanner calculates the relevant IDs as it processes the rules and diverts the details of the active tags, selectors and named ports to the named port index, which figures out the members that should be in those IP sets.
type PipelineCallbacks ¶
type PipelineCallbacks interface {
// contains filtered or unexported methods
}
type PolKV ¶
type PolKV struct { Key model.PolicyKey Value *model.Policy // contains filtered or unexported fields }
Note: PolKV is really internal to the calc package. It is named with an initial capital so that the test package calc_test can also use it.
func (*PolKV) GovernsEgress ¶
func (*PolKV) GovernsIngress ¶
type PolicyByOrder ¶
type PolicyByOrder []PolKV
func (PolicyByOrder) Len ¶
func (a PolicyByOrder) Len() int
func (PolicyByOrder) Less ¶
func (a PolicyByOrder) Less(i, j int) bool
func (PolicyByOrder) Swap ¶
func (a PolicyByOrder) Swap(i, j int)
type PolicyMatchListener ¶
type PolicyResolver ¶
type PolicyResolver struct { Callbacks PolicyResolverCallbacks InSync bool // contains filtered or unexported fields }
PolicyResolver marries up the active policies with local endpoints and calculates the complete, ordered set of policies that apply to each endpoint. As policies and endpoints are added/removed/updated, it emits events via the PolicyResolverCallbacks with the updated set of matching policies.
The PolicyResolver doesn't figure out which policies are currently active, it expects to be told via its OnPolicyMatch(Stopped) methods which policies match which endpoints. The ActiveRulesCalculator does that calculation.
func NewPolicyResolver ¶
func NewPolicyResolver() *PolicyResolver
func (*PolicyResolver) OnDatamodelStatus ¶
func (pr *PolicyResolver) OnDatamodelStatus(status api.SyncStatus)
func (*PolicyResolver) OnPolicyMatch ¶
func (pr *PolicyResolver) OnPolicyMatch(policyKey model.PolicyKey, endpointKey interface{})
func (*PolicyResolver) OnPolicyMatchStopped ¶
func (pr *PolicyResolver) OnPolicyMatchStopped(policyKey model.PolicyKey, endpointKey interface{})
func (*PolicyResolver) OnUpdate ¶
func (pr *PolicyResolver) OnUpdate(update api.Update) (filterOut bool)
func (*PolicyResolver) RegisterWith ¶
func (pr *PolicyResolver) RegisterWith(allUpdDispatcher, localEndpointDispatcher *dispatcher.Dispatcher)
type PolicyResolverCallbacks ¶
type PolicySorter ¶
type PolicySorter struct {
// contains filtered or unexported fields
}
func NewPolicySorter ¶
func NewPolicySorter() *PolicySorter
func (*PolicySorter) Sorted ¶
func (poc *PolicySorter) Sorted() *tierInfo
type RuleScanner ¶
type RuleScanner struct { OnIPSetActive func(ipSet *IPSetData) OnIPSetInactive func(ipSet *IPSetData) RulesUpdateCallbacks rulesUpdateCallbacks // contains filtered or unexported fields }
RuleScanner scans the rules sent to it by the ActiveRulesCalculator, looking for tags and selectors. It calculates the set of active tags and selectors and emits events when they become active/inactive.
Previously, Felix tracked tags and selectors separately, with a separate tag and label index. However, we found that had a high occupancy cost. The current code uses a shared index and maps tags onto labels, so a tag named tagName, becomes a label tagName="". The RuleScanner maps tags to label selectors of the form "has(tagName)", taking advantage of the mapping. Such a selector is almost equivalent to having the tag; the only case where the behaviour would differ is if the user was using the same name for a tag and a label and the label and tags of the same name were applied to different endpoints. Since tags are being deprecated, we can live with that potential aliasing issue in return for a significant occupancy improvement at high scale.
The RuleScanner also emits events when rules are updated: since the input rule structs contain tags and selectors but downstream, we only care about IP sets, the RuleScanner converts rules from model.Rule objects to calc.ParsedRule objects. The latter share most fields, but the tags and selector fields are replaced by lists of IP sets.
The RuleScanner only calculates which selectors and tags are active/inactive. It doesn't match endpoints against tags/selectors. (That is done downstream in a labelindex.InheritIndex created in NewCalculationGraph.)
func NewRuleScanner ¶
func NewRuleScanner() *RuleScanner
func (*RuleScanner) OnPolicyActive ¶
func (rs *RuleScanner) OnPolicyActive(key model.PolicyKey, policy *model.Policy)
func (*RuleScanner) OnPolicyInactive ¶
func (rs *RuleScanner) OnPolicyInactive(key model.PolicyKey)
func (*RuleScanner) OnProfileActive ¶
func (rs *RuleScanner) OnProfileActive(key model.ProfileRulesKey, profile *model.ProfileRules)
func (*RuleScanner) OnProfileInactive ¶
func (rs *RuleScanner) OnProfileInactive(key model.ProfileRulesKey)
type StatsCollector ¶
type StatsCollector struct { Callback func(StatsUpdate) error // contains filtered or unexported fields }
func NewStatsCollector ¶
func NewStatsCollector(callback func(StatsUpdate) error) *StatsCollector
func (*StatsCollector) OnStatusUpdate ¶
func (s *StatsCollector) OnStatusUpdate(status api.SyncStatus)
func (*StatsCollector) OnUpdate ¶
func (s *StatsCollector) OnUpdate(update api.Update) (filterOut bool)
func (*StatsCollector) RegisterWith ¶
func (s *StatsCollector) RegisterWith(allUpdDispatcher *dispatcher.Dispatcher)
type StatsUpdate ¶
func (StatsUpdate) String ¶
func (s StatsUpdate) String() string
type SyncerCallbacksDecoupler ¶
type SyncerCallbacksDecoupler struct {
// contains filtered or unexported fields
}
func NewSyncerCallbacksDecoupler ¶
func NewSyncerCallbacksDecoupler() *SyncerCallbacksDecoupler
func (*SyncerCallbacksDecoupler) OnStatusUpdated ¶
func (a *SyncerCallbacksDecoupler) OnStatusUpdated(status api.SyncStatus)
func (*SyncerCallbacksDecoupler) OnUpdates ¶
func (a *SyncerCallbacksDecoupler) OnUpdates(updates []api.Update)
func (*SyncerCallbacksDecoupler) SendTo ¶
func (a *SyncerCallbacksDecoupler) SendTo(sink api.SyncerCallbacks)
type ValidationFilter ¶
type ValidationFilter struct {
// contains filtered or unexported fields
}
func NewValidationFilter ¶
func NewValidationFilter(sink api.SyncerCallbacks) *ValidationFilter
func (*ValidationFilter) OnStatusUpdated ¶
func (v *ValidationFilter) OnStatusUpdated(status api.SyncStatus)
func (*ValidationFilter) OnUpdates ¶
func (v *ValidationFilter) OnUpdates(updates []api.Update)