Documentation ¶
Index ¶
- func DOTConverter(r io.ReadCloser) (io.ReadCloser, error)
- func NewPrefetchPolicy(q s3iface.S3API, maxBytes int64, maxConcurrent int) func(BucketIterator) FileManager
- type BucketFilter
- type BucketIterator
- type BucketIteratorReader
- type BucketStateIterator
- type Converter
- type Digester
- type FileManager
- type LogFile
- type LogFileAccountFilter
- type LogFileFilter
- type LogFileRegionFilter
- type LogFileTimeFilter
- type MultiLogFileFilter
- type PrefetchFileManager
- type ReaderDigester
- type Semaphore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DOTConverter ¶
func DOTConverter(r io.ReadCloser) (io.ReadCloser, error)
DOTConverter takes in as input a sinle AWS VPC Flow Log file, or a digest of VPC Flow Logs, and converts the data into a DOT graph.DOTConverter. The input ReadCloser will be closed after conversion, the caller should close the output ReadCloser when done reading.
func NewPrefetchPolicy ¶
func NewPrefetchPolicy(q s3iface.S3API, maxBytes int64, maxConcurrent int) func(BucketIterator) FileManager
NewPrefetchPolicy implements the signature required for the BucketIteratorReader.FetchPolicy by producing a FileManager that will pre-fetch content before it is requested. This can dramatically speed up reading but must be tuned to the correct concurrency and memory limits of a system.
Types ¶
type BucketFilter ¶
type BucketFilter struct { Filter LogFileFilter BucketIterator }
BucketFilter is a BucketIterator wrapper that drops anything that fails the filter check.
func (*BucketFilter) Iterate ¶
func (it *BucketFilter) Iterate() bool
Iterate will consume from the wrapped iterator until an element is found that passes the filter.
type BucketIterator ¶
type BucketIterator interface { // Iterate pushes the cursor one record forward such that // the current value is fetched when calling Current(). // This method should return false after all records have // been iterated over or an error is encountered attempting // to fetch records. Iterate() bool // Get the current value of the iterator. Current() LogFile // Close cleans up any resources used by the iterator and // returns an error, if any, that caused iterations to stop. Close() error }
BucketIterator scans an S3 bucket and converts AWS API responses to LogFile records.
type BucketIteratorReader ¶
type BucketIteratorReader struct { BucketIterator BucketIterator FetchPolicy func(BucketIterator) FileManager // contains filtered or unexported fields }
BucketIteratorReader converts implementations of the BucketIterator interfaces into an io.ReaderCloser that acts as a continuous stream of data from all the files returned by the interator.
func (*BucketIteratorReader) Close ¶
func (r *BucketIteratorReader) Close() error
Close the reader and the underlying BucketIterator. The reader may not be used again after calling Close().
type BucketStateIterator ¶
type BucketStateIterator struct { Bucket string Prefix string Queue s3iface.S3API // contains filtered or unexported fields }
BucketStateIterator holds the current state of the iterator.
func (BucketStateIterator) Close ¶
func (iter BucketStateIterator) Close() error
Close cleans up any resources used by the iterator and returns an error, if any, that caused iterations to stop.
func (*BucketStateIterator) Current ¶
func (iter *BucketStateIterator) Current() LogFile
Current gets the current value of the iterator.
func (*BucketStateIterator) Iterate ¶
func (iter *BucketStateIterator) Iterate() bool
Iterate pushes the cursor one record forward such that the current value is fetched when calling Current(). This method should return false after all records have been iterated over or an error is encountered attempting to fetch records.
type Converter ¶
type Converter func(io.ReadCloser) (io.ReadCloser, error)
Converter provides an interface for converting the input data into a different format, made available in the output io.ReadCloser
type Digester ¶
type Digester interface {
Digest() (io.ReadCloser, error)
}
Digester interface digests input data, and outputs an io.ReaderCloser from which the compacted data can be read
type FileManager ¶
type FileManager interface { // Get a consumable reader for the next object from the manager. Get() (io.Reader, error) // Put a consumed reader back in the manager for cleanup. Put(io.Reader) }
FileManager is the binding between an S3 file download strategy and the BucketIteratorReader. It will be called by the BucketIteratorReader to fetch a new S3 Object reader as well as to return consumed readers for cleanup.
type LogFile ¶
type LogFile struct { // Bucket is the S3 bucket in which the logs are stored. Bucket string // Key is "key" value used by ListObject and GetObject. Key string // Account is the AWS account ID extraced from the log path. Account string // Region is the AWS region extracted from the log path. Region string // Timestamp is the value from the log file name. Timestamp time.Time // FlowLogID is the key for the VPC log resource in AWS. FlowLogID string // Hash is the checksum value extracted from the log file name. Hash string // Size of the file containing the logs. Size int64 }
LogFile is a structured representation of a VPC Flow log file. It should contain enough data that a consumer could fetch the file contents.
type LogFileAccountFilter ¶
LogFileAccountFilter reduces the set to only those from a particular set of accounts.
func (LogFileAccountFilter) FilterLogFile ¶
func (f LogFileAccountFilter) FilterLogFile(lf LogFile) bool
FilterLogFile compares against the set of allowed accounts.
type LogFileFilter ¶
LogFileFilter is used to inspect LogFile instances and determine if they are fit to be emitted from a BucketIterator
type LogFileRegionFilter ¶
LogFileRegionFilter reduces the set to only those from a particular set of regions.
func (LogFileRegionFilter) FilterLogFile ¶
func (f LogFileRegionFilter) FilterLogFile(lf LogFile) bool
FilterLogFile compares against the set of allowed regions.
type LogFileTimeFilter ¶
LogFileTimeFilter applies an inclusive start/end time bound to all files.
func (LogFileTimeFilter) FilterLogFile ¶
func (f LogFileTimeFilter) FilterLogFile(lf LogFile) bool
FilterLogFile applies the time bound checks.
type MultiLogFileFilter ¶
type MultiLogFileFilter []LogFileFilter
MultiLogFileFilter composes any number of filters into a single filter that returns false on the first failed filter or true if all filters pass.
func (MultiLogFileFilter) FilterLogFile ¶
func (f MultiLogFileFilter) FilterLogFile(lf LogFile) bool
FilterLogFile executes all enclosed filters until one of them returns false.
type PrefetchFileManager ¶
type PrefetchFileManager struct { // Queue is any implemenation of the S3API and will be used // to download the individual object contents. Queue s3iface.S3API // BucketIterator is the source from which the manager will // pull when deciding what to prefetch. BucketIterator BucketIterator // Lock is used to control concurrency of downloads. // Using a sync.Lock will result in sequential downloads // while using the Semaphore will allow up to N // number of concurrent downloads in the background. Lock sync.Locker // MaxBytes is used to control the amount of data fetched // into memory from S3. While the Lock attributes controls // the maximum concurrent downloads, this attribute controls // how much data is actually pre-fetched. Ideally, this value // is larger than some number of individual objects in order // to allow for actual prefetching of data. In the event that // prefetching the next object would put the buffer over the // limit the prefetching will stop until the buffer is drained // enough to contain the next file. // // One exception to this is when the buffer is empty and the next // file is still larger, on its own, than the max bytes. In this // case, the prefetcher will still download the file but will then // wait for the buffer to drain before downloading the next file. // This means that if the MaxBytes are set to less than the average // file size then the prefetcher may degenerate into sequential // downloads. MaxBytes int64 // Ready is the channel/buffer on which downloaded files are placed // while awaiting consumption. This channel may be given a buffer // size or be blocking. Ready chan io.Reader // contains filtered or unexported fields }
PrefetchFileManager implements the FileManager interface by eagerly fetching content in the background to maximize the chances of having a reader ready whenever a consumer asks for one. Typically, this is created with the NewPrefetchPolicy method and used as the FetchPolicy for the BucketIteratorReader.
func (*PrefetchFileManager) Get ¶
func (f *PrefetchFileManager) Get() (io.Reader, error)
Get a prefetched file. If prefetch is lagging behind then this call will block until a file is available. If any error was encountered since the last call to Get then it is returned.
func (*PrefetchFileManager) Prefetch ¶
func (f *PrefetchFileManager) Prefetch()
Prefetch starts a loop that consumes from the attached BucketIterator and attempts to load that content before it is needed.
func (*PrefetchFileManager) Put ¶
func (f *PrefetchFileManager) Put(r io.Reader)
Put returns a file to the manager for cleanup.
type ReaderDigester ¶
type ReaderDigester struct {
Reader io.ReadCloser
}
ReaderDigester is responsible for compacting multiple VPC flow log lines into fewer, summarized lines.
func (*ReaderDigester) Digest ¶
func (d *ReaderDigester) Digest() (io.ReadCloser, error)
Digest reads from the given io.Reader, and compacts multiple VPC flow log lines, producing a digest of the material made available via the resulting io.ReadCloser. A digest is created by squashing "stable" values together, and aggregating more volatile values. Stable values would be the srcaddr, dstaddr, dstport, protocol, and action values. These are not as likely to change with great frequency as the more volatile values such as srcport, start, end, log-status, bytes, and packets. For the most part, these volatile values will change with every entry even when the stable values are exactly the same.
type Semaphore ¶
type Semaphore struct {
C chan interface{}
}
Semaphore implements the sync.Locker interface to help with concurrency control for fetching files. The buffer size of C defines the max concurrent callers of Lock.