flow

package
v1.6.0-crc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DirectionIngress = uint8(0)
	DirectionEgress  = uint8(1)
)

Values according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml

View Source
const IPv6Type = 0x86DD

IPv4Type / IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml

View Source
const MacLen = 6

Variables

This section is empty.

Functions

func Accumulate added in v0.3.0

func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics)

func Decorate added in v0.2.4

func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record)

Decorate adds to the flows extra metadata fields that are not directly fetched by eBPF: - The interface name (corresponding to the interface index in the flow). - The IP address of the agent host.

func Dedupe added in v0.2.1

func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer, m *metrics.Metrics) func(in <-chan []*Record, out chan<- []*Record)

Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward the flows from the first interface coming to it, until that flow expires in the cache (no activity for it during the expiration time) The justMark argument tells that the deduper should not drop the duplicate flows but set their Duplicate field.

func IP added in v0.3.0

func IP(ia IPAddr) net.IP

IP returns the net.IP equivalent object

func IntEncodeV4 added in v0.3.0

func IntEncodeV4(ia [net.IPv6len]uint8) uint32

IntEncodeV4 encodes an IPv4 address as an integer (in network encoding, big endian). It assumes that the passed IP is already IPv4. Otherwise it would just encode the last 4 bytes of an IPv6 address

Types

type Accounter

type Accounter struct {
	// contains filtered or unexported fields
}

Accounter accumulates flows metrics in memory and eventually evicts them via an evictor channel. The accounting process is usually done at kernel-space. This type reimplements it at userspace for the edge case where packets are submitted directly via ring-buffer because the kernel-side accounting map is full.

func NewAccounter

func NewAccounter(
	maxEntries int, evictTimeout time.Duration,
	clock func() time.Time,
	monoClock func() time.Duration,
	m *metrics.Metrics,
) *Accounter

NewAccounter creates a new Accounter. The cache has no limit and it's assumed that eviction is done by the caller.

func (*Accounter) Account

func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record)

Account runs in a new goroutine. It reads all the records from the input channel and accumulate their metrics internally. Once the metrics have reached their max size or the eviction times out, it evicts all the accumulated flows by the returned channel.

type CapacityLimiter added in v0.2.1

type CapacityLimiter struct {
	// contains filtered or unexported fields
}

CapacityLimiter forwards the flows between two nodes but checks the status of the destination node's buffered channel. If it is already full, it drops the incoming flow and periodically will log a message about the number of lost flows.

func NewCapacityLimiter

func NewCapacityLimiter(m *metrics.Metrics) *CapacityLimiter

func (*CapacityLimiter) Limit added in v0.2.1

func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record)

type Direction

type Direction uint8

type HumanBytes

type HumanBytes uint64

type IPAddr added in v0.1.1

type IPAddr [net.IPv6len]uint8

IPAddr encodes v4 and v6 IPs with a fixed length. IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96 as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2 (same behavior as Go's net.IP type)

func IPAddrFromNetIP

func IPAddrFromNetIP(netIP net.IP) IPAddr

IPAddrFromNetIP returns IPAddr from net.IP

func (*IPAddr) MarshalJSON added in v0.1.1

func (ia *IPAddr) MarshalJSON() ([]byte, error)

type InterfaceNamer added in v0.2.0

type InterfaceNamer func(ifIndex int) string

type MacAddr

type MacAddr [MacLen]uint8

func (*MacAddr) MarshalJSON

func (m *MacAddr) MarshalJSON() ([]byte, error)

func (*MacAddr) String

func (m *MacAddr) String() string

type MapTracer added in v0.2.1

type MapTracer struct {
	// contains filtered or unexported fields
}

MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into a flow Record structure, and performs the accumulation of each perCPU-record into a single flow

func NewMapTracer added in v0.2.1

func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration, m *metrics.Metrics) *MapTracer

func (*MapTracer) Flush added in v0.2.1

func (m *MapTracer) Flush()

Flush forces reading (and removing) all the flows from the source eBPF map and sending the entries to the next stage in the pipeline

func (*MapTracer) TraceLoop added in v0.2.1

func (m *MapTracer) TraceLoop(ctx context.Context, forceGC bool) node.StartFunc[[]*Record]

type PacketRecord added in v0.3.3

type PacketRecord struct {
	Stream []byte
	Time   time.Time
}

func NewPacketRecord added in v0.3.3

func NewPacketRecord(
	stream []byte,
	len uint32,
	ts time.Time,
) *PacketRecord

NewPacketRecord contains packet bytes

func ReadRawPacket added in v0.3.3

func ReadRawPacket(reader io.Reader) (*PacketRecord, error)

ReadRawPacket reads a PacketRecord from a binary source, in LittleEndian order

type PerfBuffer added in v0.3.3

type PerfBuffer struct {
	// contains filtered or unexported fields
}

func NewPerfBuffer added in v0.3.3

func NewPerfBuffer(
	maxEntries int, evictTimeout time.Duration,
) *PerfBuffer

func (*PerfBuffer) PBuffer added in v0.3.3

func (c *PerfBuffer) PBuffer(in <-chan *PacketRecord, out chan<- []*PacketRecord)

type PerfTracer added in v0.3.3

type PerfTracer struct {
	// contains filtered or unexported fields
}

RingBufTracer receives single-packet flows via ringbuffer (usually, these that couldn't be added in the eBPF kernel space due to the map being full or busy) and submits them to the userspace Aggregator map

func NewPerfTracer added in v0.3.3

func NewPerfTracer(
	reader perfReader, logTimeout time.Duration,
) *PerfTracer

func (*PerfTracer) TraceLoop added in v0.3.3

func (m *PerfTracer) TraceLoop(ctx context.Context) node.StartFunc[*PacketRecord]

type RawByte added in v0.3.3

type RawByte byte

type RawRecord added in v0.2.0

type RawRecord ebpf.BpfFlowRecordT

record structure as parsed from eBPF

func ReadFrom

func ReadFrom(reader io.Reader) (*RawRecord, error)

ReadFrom reads a Record from a binary source, in LittleEndian order

type Record

type Record struct {
	RawRecord
	// TODO: redundant field from RecordMetrics. Reorganize structs
	TimeFlowStart time.Time
	TimeFlowEnd   time.Time
	DNSLatency    time.Duration
	Interface     string
	// Duplicate tells whether this flow has another duplicate so it has to be excluded from
	// any metrics' aggregation (e.g. bytes/second rates between two pods).
	// The reason for this field is that the same flow can be observed from multiple interfaces,
	// so the agent needs to choose only a view of the same flow and mark the others as
	// "exclude from aggregation". Otherwise rates, sums, etc... values would be multiplied by the
	// number of interfaces this flow is observed from.
	Duplicate bool

	// AgentIP provides information about the source of the flow (the Agent that traced it)
	AgentIP net.IP
	// Calculated RTT which is set when record is created by calling NewRecord
	TimeFlowRtt time.Duration
	DupList     []map[string]uint8
}

Record contains accumulated metrics from a flow

func NewRecord added in v0.2.0

func NewRecord(
	key ebpf.BpfFlowId,
	metrics *ebpf.BpfFlowMetrics,
	currentTime time.Time,
	monotonicCurrentTime uint64,
) *Record

type RingBufTracer added in v0.2.1

type RingBufTracer struct {
	// contains filtered or unexported fields
}

RingBufTracer receives single-packet flows via ringbuffer (usually, these that couldn't be added in the eBPF kernel space due to the map being full or busy) and submits them to the userspace Aggregator map

func NewRingBufTracer added in v0.2.1

func NewRingBufTracer(reader ringBufReader, flusher mapFlusher, logTimeout time.Duration, m *metrics.Metrics) *RingBufTracer

func (*RingBufTracer) TraceLoop added in v0.2.1

func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*RawRecord]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL