flow

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DirectionIngress = uint8(0)
	DirectionEgress  = uint8(1)
)

Values according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml

View Source
const IPv6Type = 0x86DD

IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml

View Source
const MacLen = 6

Variables

This section is empty.

Functions

func Dedupe added in v0.2.1

func Dedupe(expireTime time.Duration) func(in <-chan []*Record, out chan<- []*Record)

Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward the flows from the first interface coming to it, until that flow expires in the cache (no activity for it during the expiration time)

Types

type Accounter

type Accounter struct {
	// contains filtered or unexported fields
}

Accounter accumulates flows metrics in memory and eventually evicts them via an evictor channel. The accounting process is usually done at kernel-space. This type reimplements it at userspace for the edge case where packets are submitted directly via ring-buffer because the kernel-side accounting map is full.

func NewAccounter

func NewAccounter(
	maxEntries int, evictTimeout time.Duration, ifaceNamer InterfaceNamer,
	clock func() time.Time,
	monoClock func() time.Duration,
) *Accounter

NewAccounter creates a new Accounter. The cache has no limit and it's assumed that eviction is done by the caller.

func (*Accounter) Account

func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record)

Account runs in a new goroutine. It reads all the records from the input channel and accumulate their metrics internally. Once the metrics have reached their max size or the eviction times out, it evicts all the accumulated flows by the returned channel.

type CapacityLimiter added in v0.2.1

type CapacityLimiter struct {
	// contains filtered or unexported fields
}

CapacityLimiter forwards the flows between two nodes but checks the status of the destination node's buffered channel. If it is already full, it drops the incoming flow and periodically will log a message about the number of lost flows.

func (*CapacityLimiter) Limit added in v0.2.1

func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record)
type DataLink struct {
	SrcMac MacAddr
	DstMac MacAddr
}

type Direction

type Direction uint8

type HumanBytes

type HumanBytes uint64

type IPAddr added in v0.1.1

type IPAddr [net.IPv6len]uint8

IPAddr encodes v4 and v6 IPs with a fixed length. IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96 as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2 (same behavior as Go's net.IP type)

func (*IPAddr) IP added in v0.1.1

func (ia *IPAddr) IP() net.IP

IP returns the net.IP equivalent object

func (*IPAddr) IntEncodeV4 added in v0.1.1

func (ia *IPAddr) IntEncodeV4() uint32

IntEncodeV4 encodes an IPv4 address as an integer (in network encoding, big endian). It assumes that the passed IP is already IPv4. Otherwise it would just encode the last 4 bytes of an IPv6 address

func (*IPAddr) MarshalJSON added in v0.1.1

func (ia *IPAddr) MarshalJSON() ([]byte, error)

type InterfaceNamer added in v0.2.0

type InterfaceNamer func(ifIndex int) string

type MacAddr

type MacAddr [MacLen]uint8

func (*MacAddr) MarshalJSON

func (m *MacAddr) MarshalJSON() ([]byte, error)

func (*MacAddr) String

func (m *MacAddr) String() string

type MapTracer added in v0.2.1

type MapTracer struct {
	// contains filtered or unexported fields
}

MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into a flow Record structure, and performs the accumulation of each perCPU-record into a single flow

func NewMapTracer added in v0.2.1

func NewMapTracer(fetcher mapFetcher, namer InterfaceNamer, evictionTimeout time.Duration) *MapTracer

func (*MapTracer) Flush added in v0.2.1

func (m *MapTracer) Flush()

Flush forces reading (and removing) all the flows from the source eBPF map and sending the entries to the next stage in the pipeline

func (*MapTracer) TraceLoop added in v0.2.1

func (m *MapTracer) TraceLoop(ctx context.Context) node.InitFunc

type Network

type Network struct {
	SrcAddr IPAddr
	DstAddr IPAddr
}

type RawRecord added in v0.2.0

type RawRecord struct {
	RecordKey
	RecordMetrics
}

record structure as parsed from eBPF it's important to emphasize that the fields in this structure have to coincide, byte by byte, with the flow_record_t structure in the bpf/flow.h file

func ReadFrom

func ReadFrom(reader io.Reader) (*RawRecord, error)

ReadFrom reads a Record from a binary source, in LittleEndian order

type Record

type Record struct {
	RawRecord
	// TODO: redundant field from RecordMetrics. Reorganize structs
	TimeFlowStart time.Time
	TimeFlowEnd   time.Time
	Interface     string
}

Record contains accumulated metrics from a flow

func NewRecord added in v0.2.0

func NewRecord(
	key RecordKey,
	metrics RecordMetrics,
	currentTime time.Time,
	monotonicCurrentTime uint64,
	namer InterfaceNamer,
) *Record

type RecordKey added in v0.2.0

type RecordKey struct {
	EthProtocol uint16 `json:"Etype"`
	Direction   uint8  `json:"FlowDirection"`
	DataLink
	Network
	Transport
	IFIndex uint32
}

RecordKey identifies a flow Must coincide byte by byte with kernel-side flow_id_t (bpf/flow.h)

type RecordMetrics added in v0.2.0

type RecordMetrics struct {
	Packets uint32
	Bytes   uint64
	// StartMonoTimeNs and EndMonoTimeNs are the start and end times as system monotonic timestamps
	// in nanoseconds, as output from bpf_ktime_get_ns() (kernel space)
	// and monotime.Now() (user space)
	StartMonoTimeNs uint64
	EndMonoTimeNs   uint64

	Errno uint8
}

RecordMetrics provides flows metrics and timing information Must coincide byte by byte with kernel-side flow_metrics_t (bpf/flow.h)

func (*RecordMetrics) Accumulate added in v0.2.0

func (r *RecordMetrics) Accumulate(src *RecordMetrics)

type RingBufTracer added in v0.2.1

type RingBufTracer struct {
	// contains filtered or unexported fields
}

RingBufTracer receives single-packet flows via ringbuffer (usually, these that couldn't be added in the eBPF kernel space due to the map being full or busy) and submits them to the userspace Aggregator map

func NewRingBufTracer added in v0.2.1

func NewRingBufTracer(
	reader ringBufReader, flusher mapFlusher, logTimeout time.Duration,
) *RingBufTracer

func (*RingBufTracer) TraceLoop added in v0.2.1

func (m *RingBufTracer) TraceLoop(ctx context.Context) node.InitFunc

type Transport

type Transport struct {
	SrcPort  uint16
	DstPort  uint16
	Protocol uint8 `json:"Proto"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL