Documentation ¶
Overview ¶
Package topology is the preferred method for creating and supervising system traces when using the Swoll API on modern container management and orchestration systems such as Kubernetes.
To better understand what this package does, it is best to start with learning a little bit about how Swoll creates, captures, filters, and emits data from the kernel back into our code.
The Swoll BPF has a very simple userspace-configurable filtering mechanism which allows us to either white-list or black-list what syscalls we want to monitor. Optionally, each call we want to monitor can also be associated with a specific kernel namespace. So, for example, a user can request to only see events which made the sytem call "open" in the kernel PID-Namespace `31337`. Any events that do not match this specific rule will be silently dropped by the kernel.
Furthermore, each filter can optionally maintain a basic sample-rate configuration, giving the developer the option to gain insight into high-load system-calls such as `sys_read` without impacting performance too much.
Since each container within a `Pod` gets its own unique (or derived if shared) namespace, swoll exploits the above ns+syscall filter feature by maintaining the relations between Kubernetes and the container-runtime by dynamically updating and tuning the filters in real-time.
In short (using Kubernetes as an example), when we request Swoll to monitor syscall events for the Pod "foobar", we connect to the kube-api-server, watch for Pod events that match "foobar", and when matched, utilizes the Container Runtime Interface to find process details for that Pod. Once we have obtained the init PID from the CRI, we can render the PID namespace we need to use to set the filter in the kernel.
In theory this sounds simple, but in practice things are not as easy. Swoll strives to run as lean-and-mean as possible, and in doing so, the goal of which is "One BPF Context To Mon Them All", and still without sacrificing performance for flexibility or vice-versa.
And the Topology API is exactly that. It "observes" events from Kubernetes and CRI (see: topology.Observer), runs one or more v1alpha1.Trace specifications as a topology.Job, which in-turn dynamically updates, de-duplicates, and prunes the kernel filter inside a single BPF context, better known as the topology.Hub.
Index ¶
- Variables
- type EventType
- type Hub
- func (h *Hub) AttachPath(name string, paths []string, cb func(string, *event.TraceEvent)) pubsub.Unsubscriber
- func (h *Hub) AttachTrace(t *v1alpha1.Trace, cb func(n string, ev *event.TraceEvent)) pubsub.Unsubscriber
- func (h *Hub) DeleteTrace(t *v1alpha1.Trace) error
- func (h *Hub) MustRun(ctx context.Context)
- func (h *Hub) MustRunJob(ctx context.Context, job *Job)
- func (h *Hub) Probe() *kernel.Probe
- func (h *Hub) PushJob(job *Job, ns, nr int)
- func (h *Hub) Run(ctx context.Context) error
- func (h *Hub) RunJob(ctx context.Context, job *Job) error
- func (h *Hub) RunTrace(ctx context.Context, t *v1alpha1.Trace) error
- func (h *Hub) Topology() *Topology
- func (h *Hub) WriteEvent(ev *event.TraceEvent)
- type Job
- func (j *Job) AddContainer(pod, name string)
- func (j *Job) Duration() time.Duration
- func (j *Job) JobID() string
- func (j *Job) MonitoredHosts(all bool) []string
- func (j *Job) RemoveContainer(pod, name string)
- func (j *Job) Run(ctx context.Context, h *Hub) error
- func (j *Job) TraceSpec() *v1alpha1.TraceSpec
- func (j *Job) TraceStatus() *v1alpha1.TraceStatus
- func (j *Job) WriteEvent(h *Hub, ev *event.TraceEvent)
- type JobContext
- type JobList
- type Kubernetes
- func (k *Kubernetes) Close() error
- func (k *Kubernetes) Connect(ctx context.Context) error
- func (k *Kubernetes) Containers(ctx context.Context) ([]*types.Container, error)
- func (k *Kubernetes) Copy(opts ...interface{}) (Observer, error)
- func (k *Kubernetes) Run(ctx context.Context, out chan<- *ObservationEvent)
- type KubernetesOption
- func WithKubernetesCRI(criSocket string) KubernetesOption
- func WithKubernetesConfig(kubeConfig string) KubernetesOption
- func WithKubernetesFieldSelector(f string) KubernetesOption
- func WithKubernetesLabelSelector(l string) KubernetesOption
- func WithKubernetesNamespace(namespace string) KubernetesOption
- func WithKubernetesProcRoot(path string) KubernetesOption
- type ObservationEvent
- type Observer
- type OnEventCallback
- type Topology
- func (t *Topology) Close() error
- func (t *Topology) Connect(ctx context.Context) error
- func (t *Topology) Containers(ctx context.Context) ([]*types.Container, error)
- func (t *Topology) LookupContainer(ctx context.Context, pidns int) (*types.Container, error)
- func (t *Topology) Run(ctx context.Context, cb OnEventCallback)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrBadNamespace = errors.New("invalid kernel pid-namespace")
ErrBadNamespace is the error returned to indicate the observer was unable to resolve the PID-Namespace of the container
var ErrContainerNotFound = errors.New("container not found")
ErrContainerNotFound is the error returned to indicate the container was unable to be resolved
var ErrNilContainer = errors.New("nil container")
ErrNilContainer is the error returned to indicate the observer sent an empty container message
var ErrNilEvent = errors.New("nil event")
ErrNilEvent is the error returned to indicate the observer sent an empty message
var ErrUnknownType = errors.New("unknown event-type")
ErrUnknownType is the error returned to indicate a malformed observer event
Functions ¶
This section is empty.
Types ¶
type Hub ¶ added in v0.1.3
Hub maintains all of the underlying kernel-filters, job request and output routing, metric-rules, de-duplication, garbage-collection using information it has obtained via the underlying Observer.
func NewHub ¶ added in v0.1.3
NewHub creates and initializes a Hub context and the underlying BPF, primes the kernel filter, and sets up the in-kernel metrics.
hub := topology.NewHub(assets.LoadBPFReader(), topology.NewKubernetes())
func (*Hub) AttachPath ¶ added in v0.1.3
func (h *Hub) AttachPath(name string, paths []string, cb func(string, *event.TraceEvent)) pubsub.Unsubscriber
AttachPath taps the caller into a subset of the data being sent to a running Job. Whenever an event is sent to a job, the Hub will also broadcast a copy of this event to a prefix-hash like so:
hash("kube-namespace/", "kube-pod/", "kube-container/", "syscall-name/")
Monitor ns/pod/container/syscall
hub.AttachPath("<name>", []string{"<namespace>", "<pod>", "<container>", "syscall"}, cb)
Monitor all syscalls and containers in pod:
hub.AttachPath("<name>", []string{"<namespace>", "<pod>"}, cb)
Example ¶
In this example we use AttachPath to "subscribe" to a subset of events being sent to a running Job output.
package main import ( "github.com/criticalstack/swoll/pkg/event" "github.com/criticalstack/swoll/pkg/topology" ) func main() { var hub *topology.Hub // Assumes there is a job that has matches namespace=kube-system, // pod=foo-pod, and a container named "boo" unsub := hub.AttachPath("example", []string{"kube-system", "foo-pod", "boo"}, func(name string, ev *event.TraceEvent) {}) defer unsub() }
Output:
func (*Hub) AttachTrace ¶ added in v0.1.3
func (h *Hub) AttachTrace(t *v1alpha1.Trace, cb func(n string, ev *event.TraceEvent)) pubsub.Unsubscriber
AttachTrace taps the caller into the events for a running Trace
Example ¶
Simple example to show how to use the AttachTrace method, this assumes the topology.Hub is already running with an Observer.
package main import ( "context" "github.com/criticalstack/swoll/api/v1alpha1" "github.com/criticalstack/swoll/pkg/event" "github.com/criticalstack/swoll/pkg/topology" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func main() { var hub *topology.Hub trace := &v1alpha1.Trace{ ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", }, Spec: v1alpha1.TraceSpec{ Syscalls: []string{"execve", "openat"}, }, Status: v1alpha1.TraceStatus{ JobID: "foo-bar", }, } go hub.RunTrace(context.TODO(), trace) hub.AttachTrace(trace, func(name string, ev *event.TraceEvent) {}) }
Output:
func (*Hub) DeleteTrace ¶ added in v0.1.3
DeleteTrace will stop all the running jobs that are associated with this specification. All kernel-filters that were added to create this job are removed if no other jobs share the same rules
func (*Hub) MustRunJob ¶ added in v0.1.3
MustRunJob is a fail-wrapper around RunJob
func (*Hub) PushJob ¶ added in v0.1.3
PushJob insert a namespace+nr specific job as a value of a list in two buckets; "nsmap": a mapping of pid-namespace+syscall -> list of jobs, "idmap": a mapping of a job-ID to individual job contexts.
This is done to solve potential job duplication issues with overlapping rules. For example if we have two rules:
rule-A = syscall_A, syscall_B rule-B = syscall_A, syscall_C
And if "rule-B" is deleted, we don't want the kernel filter "syscall_A" removed due to the fact it is still needed for "rule-A".
func (*Hub) Run ¶ added in v0.1.3
Run runs the main Hub event-loop. It maintains the filters and metric rules that run in the BPF, resolves and routes system-call events to all the job output queues, accepts Trace specs to run, and keeps the bpf running light.
Example ¶
Running the Hub
package main import ( "bytes" "context" "github.com/criticalstack/swoll/pkg/topology" ) func main() { obs, err := topology.NewKubernetes() if err != nil { panic(err) } var bpf *bytes.Reader hub, err := topology.NewHub(bpf, obs) if err != nil { panic(err) } ctx := context.Background() go hub.Run(ctx) <-ctx.Done() }
Output:
func (*Hub) RunJob ¶ added in v0.1.3
RunJob will schedule an already-allocated trace-job to be run inside the Hub.
func (*Hub) RunTrace ¶ added in v0.1.3
RunTrace will create and schedule a trace-job to be run inside the Hub.
Example ¶
A short example showing how to use the RunTrace call
package main import ( "bytes" "context" "github.com/criticalstack/swoll/api/v1alpha1" "github.com/criticalstack/swoll/pkg/topology" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func main() { var ( bpf *bytes.Reader observer topology.Observer ) hub, err := topology.NewHub(bpf, observer) if err != nil { panic(err) } ctx := context.Background() // Run the Hub. go hub.MustRun(ctx) // Monitor execve and openat in the kubernetes-namespace 'kube-system' and // name the job "foo-bar". go hub.RunTrace(ctx, &v1alpha1.Trace{ ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", }, Spec: v1alpha1.TraceSpec{ Syscalls: []string{"execve", "openat"}, }, Status: v1alpha1.TraceStatus{ JobID: "foo-bar", }, }) // trace is now running inside the Hub, you must attach to it to recv events <-ctx.Done() }
Output:
func (*Hub) Topology ¶ added in v0.1.3
Topology returns this Hub's current underlying topology context
func (*Hub) WriteEvent ¶ added in v0.1.3
func (h *Hub) WriteEvent(ev *event.TraceEvent)
WriteEvent writes a single TraceEvent to all subscribers using the path-subscriptions
type Job ¶ added in v0.1.3
Job stores the trace specification and a running list of hosts which have matched this job.
func (*Job) AddContainer ¶ added in v0.1.3
AddContainer tells the job to monitor a very specific container in a specific pod.
func (*Job) Duration ¶ added in v0.1.3
Duration returns how long this job has been running as seen by kube.
func (*Job) MonitoredHosts ¶ added in v0.1.3
MonitoredHosts returns a list of hosts that have been monitored by this job. If `all` is `false`, then ony containers that are currently being monitored will return, otherwise it will return every host that has ever matched this job.
func (*Job) RemoveContainer ¶ added in v0.1.3
RemoveContainer removes the watch for a specific container in a specific pod
func (*Job) Run ¶ added in v0.1.3
Run will run a job inside the Hub. The primary goal of this function is to read topology events using the LabelMatch, and for each pod that matches, create the kernel filter if needed, and append the JobContext to the list of running jobs in the Hub.
func (*Job) TraceStatus ¶ added in v0.1.3
func (j *Job) TraceStatus() *v1alpha1.TraceStatus
TraceStatus returns the status of this Job
func (*Job) WriteEvent ¶ added in v0.1.3
func (j *Job) WriteEvent(h *Hub, ev *event.TraceEvent)
WriteEvent writes event `ev` to all listeners of this `Job`
type JobContext ¶ added in v0.1.3
type JobContext struct { *Job // contains filtered or unexported fields }
JobContext contains information about the filters that were created in order to run a Job. Since multiple jobs can have shared resources (like kernel-filters), all possible rules are created and set.
For example, say we have two jobs: "job-A", and "job-B".
job-A monitors pods that match the label: app=nginx for the syscalls: "open", and "close" job-B monitors pods that match the label: type=webserver for just the syscall "open"
If a pod was created with both the labels above (app=nginx,type=webserver), and we were to blindly delete "job-B", any filters that were added that matched both rules would be removed.
Thus every filter is accounted for, treated much like a reference counter, only removing from the kernel-filter when no rules require it.
type JobList ¶ added in v0.1.3
JobList is a wrapper around a simple linked-list for groups of JobContexts
type Kubernetes ¶
type Kubernetes struct {
// contains filtered or unexported fields
}
Kubernetes satisfies the Observer interface for the topology. Using a combination of the kubelet api-server and the container-runtime interface, this will emit container start and stop messages to the caller
func NewKubernetes ¶
func NewKubernetes(opts ...KubernetesOption) (*Kubernetes, error)
NewKubernetes creates an Observer for Kubernetes
Example ¶
package main import ( "fmt" "github.com/criticalstack/swoll/pkg/topology" ) func main() { observer, err := topology.NewKubernetes( topology.WithKubernetesConfig("/root/.kube/config"), topology.WithKubernetesNamespace("kube-system"), topology.WithKubernetesCRI("/run/containerd/containerd.sock"), topology.WithKubernetesLabelSelector("app=nginx"), topology.WithKubernetesFieldSelector("status.phase=Running")) if err != nil { panic(err) } fmt.Println(observer) }
Output:
func (*Kubernetes) Close ¶
func (k *Kubernetes) Close() error
Close frees up all the running resources of this Kubernetes observer
func (*Kubernetes) Connect ¶
func (k *Kubernetes) Connect(ctx context.Context) error
Connect establishes the connections between the kube-apiserver and the container-runtime-interface.
func (*Kubernetes) Containers ¶
Containers returns a list of all currently running containers
func (*Kubernetes) Copy ¶ added in v0.1.3
func (k *Kubernetes) Copy(opts ...interface{}) (Observer, error)
Copy will copy all underlying data minus the client communication sockets.
func (*Kubernetes) Run ¶
func (k *Kubernetes) Run(ctx context.Context, out chan<- *ObservationEvent)
Run watches and maintains a cache of all running containers for kubernetes, sending events as an Observer to the topology.
type KubernetesOption ¶
type KubernetesOption func(*Kubernetes) error
func WithKubernetesCRI ¶
func WithKubernetesCRI(criSocket string) KubernetesOption
WithKubernetesCRI is the fully-qualified path to the container-runtime interface UNIX socket. This file must exist on the host that runs this code.
func WithKubernetesConfig ¶
func WithKubernetesConfig(kubeConfig string) KubernetesOption
WithKubernetesConfig will use the kubernetes configuration file. By default, this will attempt to use the in-cluster Kubernetes configuration settings.
func WithKubernetesFieldSelector ¶
func WithKubernetesFieldSelector(f string) KubernetesOption
WithKubernetesFieldSelector will only match hosts what matched this field-selector labelset.
func WithKubernetesLabelSelector ¶
func WithKubernetesLabelSelector(l string) KubernetesOption
WithKubernetesLabelSelector will only match hosts that match this label.
func WithKubernetesNamespace ¶
func WithKubernetesNamespace(namespace string) KubernetesOption
WithKubernetesNamespace will limit the observation to a specific kubernetes namespace
func WithKubernetesProcRoot ¶
func WithKubernetesProcRoot(path string) KubernetesOption
WithKubernetesProcRoot will look for the ProcFS mount inside the path. Useful if the containers you are monitoring are mounted to a different path. Defaults to "/"