Documentation ¶
Overview ¶
Package reassembly provides TCP stream re-assembly.
The reassembly package implements uni-directional TCP reassembly, for use in packet-sniffing applications. The caller reads packets off the wire, then presents them to an Assembler in the form of gopacket layers.TCP packets (github.com/funbinary/gopacket, github.com/funbinary/gopacket/layers).
The Assembler uses a user-supplied StreamFactory to create a user-defined Stream interface, then passes packet data in stream order to that object. A concurrency-safe StreamPool keeps track of all current Streams being reassembled, so multiple Assemblers may run at once to assemble packets while taking advantage of multiple cores.
TODO: Add simplest example
Index ¶
- Constants
- Variables
- type Assembler
- func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP)
- func (a *Assembler) AssembleWithContext(netFlow gopacket.Flow, t *layers.TCP, ac AssemblerContext)
- func (a *Assembler) Dump() string
- func (a *Assembler) FlushAll() (closed int)
- func (a *Assembler) FlushCloseOlderThan(t time.Time) (flushed, closed int)
- func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int)
- type AssemblerContext
- type AssemblerOptions
- type FlushOptions
- type ScatterGather
- type Sequence
- type Stream
- type StreamFactory
- type StreamPool
- type TCPAssemblyStats
- type TCPFlowDirection
- type TCPOptionCheck
- type TCPSimpleFSM
- type TCPSimpleFSMOptions
Constants ¶
const ( TCPStateClosed = 0 TCPStateSynSent = 1 TCPStateEstablished = 2 TCPStateCloseWait = 3 TCPStateLastAck = 4 TCPStateReset = 5 )
Internal values of state machine
Variables ¶
var DefaultAssemblerOptions = AssemblerOptions{
MaxBufferedPagesPerConnection: 0,
MaxBufferedPagesTotal: 0,
}
DefaultAssemblerOptions provides default options for an assembler. These options are used by default when calling NewAssembler, so if modified before a NewAssembler call they'll affect the resulting Assembler.
Note that the default options can result in ever-increasing memory usage unless one of the Flush* methods is called on a regular basis.
Functions ¶
This section is empty.
Types ¶
type Assembler ¶
type Assembler struct { AssemblerOptions // contains filtered or unexported fields }
Assembler handles reassembling TCP streams. It is not safe for concurrency... after passing a packet in via the Assemble call, the caller must wait for that call to return before calling Assemble again. Callers can get around this by creating multiple assemblers that share a StreamPool. In that case, each individual stream will still be handled serially (each stream has an individual mutex associated with it), however multiple assemblers can assemble different connections concurrently.
The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing applications written in Go. The Assembler uses the following methods to be as fast as possible, to keep packet processing speedy:
Avoids Lock Contention ¶
Assemblers locks connections, but each connection has an individual lock, and rarely will two Assemblers be looking at the same connection. Assemblers lock the StreamPool when looking up connections, but they use Reader locks initially, and only force a write lock if they need to create a new connection or close one down. These happen much less frequently than individual packet handling.
Each assembler runs in its own goroutine, and the only state shared between goroutines is through the StreamPool. Thus all internal Assembler state can be handled without any locking.
NOTE: If you can guarantee that packets going to a set of Assemblers will contain information on different connections per Assembler (for example, they're already hashed by PF_RING hashing or some other hashing mechanism), then we recommend you use a seperate StreamPool per Assembler, thus avoiding all lock contention. Only when different Assemblers could receive packets for the same Stream should a StreamPool be shared between them.
Avoids Memory Copying ¶
In the common case, handling of a single TCP packet should result in zero memory allocations. The Assembler will look up the connection, figure out that the packet has arrived in order, and immediately pass that packet on to the appropriate connection's handling code. Only if a packet arrives out of order is its contents copied and stored in memory for later.
Avoids Memory Allocation ¶
Assemblers try very hard to not use memory allocation unless absolutely necessary. Packet data for sequential packets is passed directly to streams with no copying or allocation. Packet data for out-of-order packets is copied into reusable pages, and new pages are only allocated rarely when the page cache runs out. Page caches are Assembler-specific, thus not used concurrently and requiring no locking.
Internal representations for connection objects are also reused over time. Because of this, the most common memory allocation done by the Assembler is generally what's done by the caller in StreamFactory.New. If no allocation is done there, then very little allocation is done ever, mostly to handle large increases in bandwidth or numbers of connections.
TODO: The page caches used by an Assembler will grow to the size necessary to handle a workload, and currently will never shrink. This means that traffic spikes can result in large memory usage which isn't garbage collected when typical traffic levels return.
func NewAssembler ¶
func NewAssembler(pool *StreamPool) *Assembler
NewAssembler creates a new assembler. Pass in the StreamPool to use, may be shared across assemblers.
This sets some sane defaults for the assembler options, see DefaultAssemblerOptions for details.
func (*Assembler) Assemble ¶
Assemble calls AssembleWithContext with the current timestamp, useful for packets being read directly off the wire.
func (*Assembler) AssembleWithContext ¶
AssembleWithContext reassembles the given TCP packet into its appropriate stream.
The timestamp passed in must be the timestamp the packet was seen. For packets read off the wire, time.Now() should be fine. For packets read from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will affect which streams are flushed by a call to FlushCloseOlderThan.
Each AssembleWithContext call results in, in order:
zero or one call to StreamFactory.New, creating a stream zero or one call to ReassembledSG on a single stream zero or one call to ReassemblyComplete on the same stream
func (*Assembler) FlushAll ¶
FlushAll flushes all remaining data into all remaining connections and closes those connections. It returns the total number of connections flushed/closed by the call.
func (*Assembler) FlushCloseOlderThan ¶
FlushCloseOlderThan flushes and closes streams older than given time
func (*Assembler) FlushWithOptions ¶
func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int)
FlushWithOptions finds any streams waiting for packets older than the given time T, and pushes through the data they have (IE: tells them to stop waiting and skip the data they're waiting for).
It also closes streams older than TC (that can be set to zero, to keep long-lived stream alive, but to flush data anyway).
Each Stream maintains a list of zero or more sets of bytes it has received out-of-order. For example, if it has processed up through sequence number 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of bytes also has the timestamp it was originally viewed. A flush call will look at the smallest subsequent set of bytes, in this case [15-20), and if its timestamp is older than the passed-in time, it will push it and all contiguous byte-sets out to the Stream's Reassembled function. In this case, it will push [15-20), but also [20-25), since that's contiguous. It will only push [30-50) if its timestamp is also older than the passed-in time, otherwise it will wait until the next FlushCloseOlderThan to see if bytes [25-30) come in.
Returns the number of connections flushed, and of those, the number closed because of the flush.
type AssemblerContext ¶
type AssemblerContext interface {
GetCaptureInfo() gopacket.CaptureInfo
}
AssemblerContext provides method to get metadata
type AssemblerOptions ¶
type AssemblerOptions struct { // MaxBufferedPagesTotal is an upper limit on the total number of pages to // buffer while waiting for out-of-order packets. Once this limit is // reached, the assembler will degrade to flushing every connection it // gets a packet for. If <= 0, this is ignored. MaxBufferedPagesTotal int // MaxBufferedPagesPerConnection is an upper limit on the number of pages // buffered for a single connection. Should this limit be reached for a // particular connection, the smallest sequence number will be flushed, along // with any contiguous data. If <= 0, this is ignored. MaxBufferedPagesPerConnection int }
AssemblerOptions controls the behavior of each assembler. Modify the options of each assembler you create to change their behavior.
type FlushOptions ¶
type FlushOptions struct { T time.Time // If nonzero, only connections with data older than T are flushed TC time.Time // If nonzero, only connections with data older than TC are closed (if no FIN/RST received) }
FlushOptions provide options for flushing connections.
type ScatterGather ¶
type ScatterGather interface { // Returns the length of available bytes and saved bytes Lengths() (int, int) // Returns the bytes up to length (shall be <= available bytes) Fetch(length int) []byte // Tell to keep from offset KeepFrom(offset int) // Return CaptureInfo of packet corresponding to given offset CaptureInfo(offset int) gopacket.CaptureInfo // Return some info about the reassembled chunks Info() (direction TCPFlowDirection, start bool, end bool, skip int) // Return some stats regarding the state of the stream Stats() TCPAssemblyStats }
ScatterGather is used to pass reassembled data and metadata of reassembled packets to a Stream via ReassembledSG
type Sequence ¶
type Sequence int64
Sequence is a TCP sequence number. It provides a few convenience functions for handling TCP wrap-around. The sequence should always be in the range [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations and should never be set.
func (Sequence) Difference ¶
Difference defines an ordering for comparing TCP sequences that's safe for roll-overs. It returns:
> 0 : if t comes after s < 0 : if t comes before s 0 : if t == s
The number returned is the sequence difference, so 4.Difference(8) will return 4.
It handles rollovers by considering any sequence in the first quarter of the uint32 space to be after any sequence in the last quarter of that space, thus wrapping the uint32 space.
type Stream ¶
type Stream interface { // Tell whether the TCP packet should be accepted, start could be modified to force a start even if no SYN have been seen Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool, ac AssemblerContext) bool // ReassembledSG is called zero or more times. // ScatterGather is reused after each Reassembled call, // so it's important to copy anything you need out of it, // especially bytes (or use KeepFrom()) ReassembledSG(sg ScatterGather, ac AssemblerContext) // ReassemblyComplete is called when assembly decides there is // no more data for this Stream, either because a FIN or RST packet // was seen, or because the stream has timed out without any new // packet data (due to a call to FlushCloseOlderThan). // It should return true if the connection should be removed from the pool // It can return false if it want to see subsequent packets with Accept(), e.g. to // see FIN-ACK, for deeper state-machine analysis. ReassemblyComplete(ac AssemblerContext) bool }
Stream is implemented by the caller to handle incoming reassembled TCP data. Callers create a StreamFactory, then StreamPool uses it to create a new Stream for every TCP stream.
assembly will, in order:
- Create the stream via StreamFactory.New
- Call ReassembledSG 0 or more times, passing in reassembled TCP data in order
- Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.
type StreamFactory ¶
type StreamFactory interface { // New should return a new stream for the given TCP key. New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac AssemblerContext) Stream }
StreamFactory is used by assembly to create a new stream for each new TCP session.
type StreamPool ¶
type StreamPool struct {
// contains filtered or unexported fields
}
StreamPool stores all streams created by Assemblers, allowing multiple assemblers to work together on stream processing while enforcing the fact that a single stream receives its data serially. It is safe for concurrency, usable by multiple Assemblers at once.
StreamPool handles the creation and storage of Stream objects used by one or more Assembler objects. When a new TCP stream is found by an Assembler, it creates an associated Stream by calling its StreamFactory's New method. Thereafter (until the stream is closed), that Stream object will receive assembled TCP data via Assembler's calls to the stream's Reassembled function.
Like the Assembler, StreamPool attempts to minimize allocation. Unlike the Assembler, though, it does have to do some locking to make sure that the connection objects it stores are accessible to multiple Assemblers.
func NewStreamPool ¶
func NewStreamPool(factory StreamFactory) *StreamPool
NewStreamPool creates a new connection pool. Streams will be created as necessary using the passed-in StreamFactory.
type TCPAssemblyStats ¶
type TCPAssemblyStats struct { // For this ScatterGather Chunks int Packets int // For the half connection, since last call to ReassembledSG() QueuedBytes int QueuedPackets int OverlapBytes int OverlapPackets int }
TCPAssemblyStats provides some figures for a ScatterGather
type TCPFlowDirection ¶
type TCPFlowDirection bool
TCPFlowDirection distinguish the two half-connections directions.
TCPDirClientToServer is assigned to half-connection for the first received packet, hence might be wrong if packets are not received in order. It's up to the caller (e.g. in Accept()) to decide if the direction should be interpretted differently.
const ( TCPDirClientToServer TCPFlowDirection = false TCPDirServerToClient TCPFlowDirection = true )
Value are not really useful
func (TCPFlowDirection) Reverse ¶
func (dir TCPFlowDirection) Reverse() TCPFlowDirection
Reverse returns the reversed direction
func (TCPFlowDirection) String ¶
func (dir TCPFlowDirection) String() string
type TCPOptionCheck ¶
type TCPOptionCheck struct {
// contains filtered or unexported fields
}
TCPOptionCheck contains options for the two directions
func NewTCPOptionCheck ¶
func NewTCPOptionCheck() TCPOptionCheck
NewTCPOptionCheck creates default options
func (*TCPOptionCheck) Accept ¶
func (t *TCPOptionCheck) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool) error
Accept checks whether the packet should be accepted by checking TCP options
type TCPSimpleFSM ¶
type TCPSimpleFSM struct {
// contains filtered or unexported fields
}
TCPSimpleFSM implements a very simple TCP state machine
Usage: When implementing a Stream interface and to avoid to consider packets that would be rejected due to client/server's TCP stack, the Accept() can call TCPSimpleFSM.CheckState().
Limitations: - packet should be received in-order. - no check on sequence number is performed - no RST
func NewTCPSimpleFSM ¶
func NewTCPSimpleFSM(options TCPSimpleFSMOptions) *TCPSimpleFSM
NewTCPSimpleFSM creates a new TCPSimpleFSM
func (*TCPSimpleFSM) CheckState ¶
func (t *TCPSimpleFSM) CheckState(tcp *layers.TCP, dir TCPFlowDirection) bool
CheckState returns false if tcp is invalid wrt current state or update the state machine's state
func (*TCPSimpleFSM) String ¶
func (t *TCPSimpleFSM) String() string
type TCPSimpleFSMOptions ¶
type TCPSimpleFSMOptions struct {
SupportMissingEstablishment bool // Allow missing SYN, SYN+ACK, ACK
}
TCPSimpleFSMOptions holds options for TCPSimpleFSM