Documentation ¶
Overview ¶
Package kiterunner provides the core request loop for performing highly concurrent low allocation requests.
This package should be used where you want to make a large number of requests across a large number of targets quickly, concurrently and with 0 allocations.
The 0 allocation benchmark comes from BenchmarkKiterunnerEngineRunGoland1Async located in benchmark_integration_test.go the benchmark assumes the following
- all your targets have been allocated
- all your routes have been allocated
- the caller releases Results back into the pool after they have completed reading the result
Hence the 0 allocations corresponds to 0 additional allocations performed while performing requests. On a 2020 Macbook 13" this yields the following results.
❯ ulimit -n 20000 && go test '-bench=BenchmarkKiterunnerEngineRunGoland1Async$' -v ./... -run='^$' -count=10 -benchtime=10000x -benchmem goos: darwin goarch: amd64 pkg: github.com/assetnote/kiterunner/pkg/kiterunner BenchmarkKiterunnerEngineRunGoland1Async BenchmarkKiterunnerEngineRunGoland1Async-8 10000 75670 ns/op 5 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 79782 ns/op 5 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 88210 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 88506 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 93852 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 78248 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 74890 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 74421 ns/op 7 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 74885 ns/op 6 B/op 0 allocs/op BenchmarkKiterunnerEngineRunGoland1Async-8 10000 74722 ns/op 6 B/op 0 allocs/op PASS ok github.com/assetnote/kiterunner/pkg/kiterunner 8.170s
The allocation savings primarily derive from aggressively using sync.Pools for objects that are re-used across targets and mis-using channels as low cost, cross goroutine communication buffers. These optimisations have resulted in slightly difficult to understand code in the handleTarget function. Future attempts to develop on this codebase should ensure that the baseline benchmarks aren't exceeded per commit, or where they are, a justifiable reason for the performance degradation is provided
The concurrency model elected for the Async loop is defined in RunAsync. This uses 3 goroutine worker separations
- One goroutine per target to supervise the scheduling of requests
- One goroutine for scheduling preflight requests. Spawned by the target thread
- N goroutines (Max Conn Per Host) per target for performing requests.
- request_wkr threads process both preflight requests and normal requests
- preflight requests sent to request_wkr include the response channel where results are sent
- normal requests are sent to the rx channel created a RunAsync time
This is illustrated in the following flow diagram
(A) (B) ┌─► preflight_wkr ──┬─► request_wkr ──┐ ┌─► target_wkr─────(D)────┼─► request_wkr ──┼───┐ │ ▲ └─► request_wkr ──┤ (E) │ └─────────────────────────────(C)─┘ │ │ ┌─► preflight_wkr ──┬─► request_wkr ──┐ │ tx ─┼─ target_wkr─────────────┼─► request_wkr ──┼───┼──►rx │ ▲ └─► request_wkr ──┤ │ │ └──────────────────────────────────┘ │ │ ┌─► preflight_wkr ──┬─► request_wkr ──┐ │ └─► target_wkr────────────┼─► request_wkr ──┼───┘ ▲ └─► request_wkr ──┤ └─────────────────────────────────┘ (A) - target_wkr - spawns preflight worker (B) - preflight_wkr - schedules preflight requests for given baseline (C) - request_wkr - performs request and sends response to target_wkr for aggregation (D) - target_wkr - schedules a batch of requests for given baseline (E) - request_wkr - results are sent to rx channel for aggregation
This concurrency model was selected based on the benchmarks run in github.com/assetnote/kiterunner/benchmark/concurrency_test.go where this was determined to provide a compromise between:
- Allowing for preflight requests to occur concurrently with normal requests
- The target_wkr does not need to wait for a directory to complete before performing the next preflight requests
- Restricting the number of requests sent to a given host at any one time
- We only use the pool of request_wkr for each given target to make requests
- Minimising inter-thread waits on channels
- We batch up the requests on a directory so the request_wkr doesnt need to read from the channel for each subsequent request in a directory they already have data for
- Avoiding a single slow target from blocking other targets from processing
Kiterunner Wrappers ¶
kiterunner_wrappers.go contains a set of synchronous wrapper functions for RunAsync. These are provided as utilities for callers who don't wish to have the complexity of managing a channel based function call. Using the functions that return Results will avoid releasing the Result back into the pool, consuming requests, but allowing you to aggregate the results.
Example (KiterunnerRunAsync) ¶
KiterunnerRunAsync demonstrates how to perform an RunAsync call and how to process results We recommend releasing the Result after processing is complete to avoid unnecessary allocations for future results. the penalty invoked is just the cost of allocating the memory for a new Result, which is relatively nominal compared to the number of requests sent
routes := MakeRedirectRoutes(1, 1) e := NewEngine(routes, ReadBody(false), // adjusting this significantly affects allocations MaxParallelHosts(1), MaxConnPerHost(2), ) ctx := context.Background() targets := MakeTargets(1) for _, v := range targets { // set the context so they can be cancelled if the task is cancelled v.SetContext(ctx) // parse the host header so we don't send garbage to the client v.ParseHostHeader() } // start the loop and get your communication channels tx, rx, err := e.RunAsync(ctx) if err != nil { // handle err } var res []*Result for _, v := range targets { tx <- v } close(tx) for r := range rx { res = append(res, r) // OR if you don't need results r.Release() } // rx will close when tx is closed
Output:
Example (KiterunnerRunSync) ¶
KiterunnerRunSync demonstrates how to perform a synchronous call against the kiterunner Engine. The results can be used as normal. Modifying the target or route that is returned may result in unexpected behaviour if the routes and targets are re-used in a later iteration of the run
ctx := context.Background() e := NewEngine(MakeRoutes(5), MaxParallelHosts(5), TargetQuarantineThreshold(0)) targets := MakeTargets(5) for _, v := range targets { v.ParseHostHeader() v.SetContext(ctx) } res, err := e.Run(ctx, targets) if err != nil { // handle err } for _, v := range res { fmt.Println(v.String()) }
Output:
Index ¶
- Constants
- Variables
- func LogResult(r *Result, config *Config)
- func LogResults(res []*Result, config *Config)
- func LogResultsChan(ctx context.Context, res chan *Result, config *Config)
- func ReleaseResult(h *Result)
- type Attribute
- type Config
- type ConfigOption
- func AddProgressBar(p ProgressBar) ConfigOption
- func AddRequestFilter(f RequestValidator) ConfigOption
- func BlacklistDomains(in []string) ConfigOption
- func Delay(n time.Duration) ConfigOption
- func HTTPExtraHeaders(h []http.Header) ConfigOption
- func MaxConnPerHost(v int) ConfigOption
- func MaxParallelHosts(v int) ConfigOption
- func MaxRedirects(n int) ConfigOption
- func MaxTimeout(n time.Duration) ConfigOption
- func ReadBody(v bool) ConfigOption
- func ReadHeaders(v bool) ConfigOption
- func SetPreflightCheckRoutes(r []*http.Route) ConfigOption
- func SkipPreflight(enabled bool) ConfigOption
- func TargetQuarantineThreshold(n int64) ConfigOption
- func WildcardDetection(enabled bool) ConfigOption
- type ContentLengthValidator
- type Engine
- func (e *Engine) Config() *Config
- func (e *Engine) Run(ctx context.Context, input []*http.Target) ([]*Result, error)
- func (e *Engine) RunAsync(ctx context.Context) (tx chan *http.Target, rx chan *Result, err error)
- func (e *Engine) RunCallback(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) ([]*Result, error)
- func (e *Engine) RunCallbackNoResult(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) error
- type ErrBadConfig
- type ErrFailedPreflight
- type KnownBadSitesValidator
- type NullProgressBar
- type ProgressBar
- type ReqMsg
- type ReqMsgType
- type RequestValidator
- type Result
- type StatusCodeBlacklist
- type StatusCodeWhitelist
- type WildcardResponse
- type WildcardResponseValidator
- type WildcardResponses
Examples ¶
Constants ¶
const CheckInterval = 10
CheckInterval indicates how many requests a worker thread will process before checking for context cancellation or target quarantine
Variables ¶
var ( // ErrTargetQuarantined indicates that the target has crossed the threshold amount for consecutive non-baseline requests // and has consequently been quarantined. This indicates the host should not be scanned anymore ErrTargetQuarantined = fmt.Errorf("target quarantined") // DefaultRootRoute is the default base route that is used as a canary baseline against all hosts. DefaultRootRoute = []*http.Route{{Path: []byte("/"), Method: http.GET}} )
var ( ErrLengthMatch = fmt.Errorf("failed on content length check") ErrScaledLengthMatch = fmt.Errorf("failed on adjusted content length check") ErrWordCountMatch = fmt.Errorf("failed on word and line count match") ErrContentLengthRangeMatch = fmt.Errorf("failed on content length range match") ErrBlacklistedStatusCode = fmt.Errorf("failed with blacklisted status code") ErrWhitelistedStatusCode = fmt.Errorf("failed with not whitelisted status code") )
These errors correspond to errors resulting from failing various response validation checks.
var ( ErrGoogleBadRequest = fmt.Errorf("google bad request found") ErrAmazonGatewayBadRequest = fmt.Errorf("amazon gateway bad request found") )
var ( PreflightCheckRoutes = []*http.Route{ { Method: http.GET, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16] + "/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, { Method: http.GET, Path: []byte("/"), }, { Method: http.GET, Path: []byte("/" + strings.Repeat("A", 1500)), }, { Method: http.POST, Path: []byte("/"), }, { Method: http.PUT, Path: []byte("/auth" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), Headers: []http.Header{{"Authorization", "Basic MTox"}}, }, { Method: http.GET, Path: []byte("/auth" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), Headers: []http.Header{{"Authorization", "Basic MTox"}}, }, { Method: http.GET, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, { Method: http.PUT, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, { Method: http.POST, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, { Method: http.DELETE, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, { Method: http.PATCH, Path: []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]), }, } )
var (
ReqMsgPool sync.Pool
)
Functions ¶
func LogResults ¶
LogResults will output the results using the configured logger
func LogResultsChan ¶
LogResultsChan will output the results using the configured logger
func ReleaseResult ¶
func ReleaseResult(h *Result)
ReleaseResult releases a host into the shared header pool
Types ¶
type Config ¶
type Config struct { MaxParallelHosts int `toml:"max_parallel_hosts" json:"max_parallel_hosts" mapstructure:"max_parallel_hosts"` MaxConnPerHost int `toml:"max_conn_per_host" json:"max_conn_per_host" mapstructure:"max_conn_per_host"` WildcardDetection bool `json:"wildcard_detection"` Delay time.Duration `toml:"delay_ms" json:"delay_ms" mapstructure:"delay_ms"` HTTP http.Config `toml:"http" json:"http" mapstructure:"http"` QuarantineThreshold int64 PreflightCheckRoutes []*http.Route // these are the routes use to calculate the baseline. If the slice is empty, no baselines will be created so requests will match on the status codes ProgressBar ProgressBar RequestValidators []RequestValidator }
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type ConfigOption ¶
type ConfigOption func(*Config)
func AddProgressBar ¶
func AddProgressBar(p ProgressBar) ConfigOption
func AddRequestFilter ¶
func AddRequestFilter(f RequestValidator) ConfigOption
func BlacklistDomains ¶
func BlacklistDomains(in []string) ConfigOption
func Delay ¶
func Delay(n time.Duration) ConfigOption
func HTTPExtraHeaders ¶
func HTTPExtraHeaders(h []http.Header) ConfigOption
func MaxConnPerHost ¶
func MaxConnPerHost(v int) ConfigOption
func MaxParallelHosts ¶
func MaxParallelHosts(v int) ConfigOption
func MaxRedirects ¶
func MaxRedirects(n int) ConfigOption
func MaxTimeout ¶
func MaxTimeout(n time.Duration) ConfigOption
func ReadBody ¶
func ReadBody(v bool) ConfigOption
func ReadHeaders ¶
func ReadHeaders(v bool) ConfigOption
func SetPreflightCheckRoutes ¶
func SetPreflightCheckRoutes(r []*http.Route) ConfigOption
func SkipPreflight ¶
func SkipPreflight(enabled bool) ConfigOption
SkipPreflight will zero out the preflight check routes
func TargetQuarantineThreshold ¶
func TargetQuarantineThreshold(n int64) ConfigOption
func WildcardDetection ¶
func WildcardDetection(enabled bool) ConfigOption
type ContentLengthValidator ¶
func NewContentLengthValidator ¶
func NewContentLengthValidator(ranges []http.Range) *ContentLengthValidator
func (ContentLengthValidator) String ¶
func (v ContentLengthValidator) String() string
func (*ContentLengthValidator) Validate ¶
func (v *ContentLengthValidator) Validate(r http.Response, _ []WildcardResponse, _ *Config) error
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine provides a scan configuration with a set of routes and a specified configuration Calling Run or RunAsync can be done concurrently. Each call will create its own threadpool If you wish to access one threadpool from multiple workers, use RunAsync and communicate using the provided channels The options are non-configurable after instantiation as modifying the routes or config during Run or RunAsync may lead to non-deterministic behaviour
func NewEngine ¶
func NewEngine(routes http.RouteMap, opts ...ConfigOption) *Engine
NewEngine will create an engine with the defined routes and options. If you require a different set of routes, you should instantiate a new engine
func (*Engine) Config ¶
Config returns the config for the engine. Modifying this config will modify the config for any currently running scans
func (*Engine) Run ¶
Run will perform the same operation as RunAsync. This wraps the channels with allocated structs and returns the results This is safe to call from concurrent threads and will use separate worker pools for each call. your callback will be invoked on each result received so you can asynchronously process the results if you wish All the results will still be returned by the []*Result slice. Modifying the result in the callback is considered undefined behaviour
func (*Engine) RunAsync ¶
RunAsync will begin all the concurrent threads for scanning. Inputs should be fed to the tx (to transmit) and results are read off rx (receive). This function can fail if the config provided to the engine is invalid Each call instantiates its own set of workers, tx, and rx. Hence this is safe to call concurrently The Engine will terminate when the context is cancelled, or when the tx channel is closed When the rx channel is closed, all results have been returned. The caller closing rx may panic and is considered unexpected behaviour
func (*Engine) RunCallback ¶
func (e *Engine) RunCallback(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) ([]*Result, error)
RunCallback will run the scan against the provided input, calling the provided callbacks on each result. the callbacks can be used to log the error in realtime, or perform other processes. You should not modify or use the Target, or route from the Result as this may have unintended side effects
func (*Engine) RunCallbackNoResult ¶
func (e *Engine) RunCallbackNoResult(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) error
RunCallbackNoResult will run the scan against the provided input, calling the provided callbacks on each result. the callbacks can be used to log the error in realtime, or perform other processes. You should not modify or use the Target, or route from the Result as this may have unintended side effects This function does not return the results as they are released immediately after all callbacks are called. It is unsafe to use the result after your callbacks return Use this when you don't require using the result after the callback, e.g. writing to disk/printing to output
type ErrBadConfig ¶
type ErrBadConfig struct {
// contains filtered or unexported fields
}
func (*ErrBadConfig) Error ¶
func (e *ErrBadConfig) Error() string
type ErrFailedPreflight ¶
type ErrFailedPreflight struct {
// contains filtered or unexported fields
}
func (ErrFailedPreflight) Error ¶
func (e ErrFailedPreflight) Error() string
func (ErrFailedPreflight) Unwrap ¶
func (e ErrFailedPreflight) Unwrap() error
type KnownBadSitesValidator ¶
type KnownBadSitesValidator struct{}
func (*KnownBadSitesValidator) Validate ¶
func (v *KnownBadSitesValidator) Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error
type NullProgressBar ¶
type NullProgressBar struct {
// contains filtered or unexported fields
}
func (*NullProgressBar) AddTotal ¶
func (n *NullProgressBar) AddTotal(v int64)
func (*NullProgressBar) Incr ¶
func (n *NullProgressBar) Incr(v int64)
type ProgressBar ¶
type ReqMsg ¶
type ReqMsg struct { Preflight *subpathBaseline Job *job // contains filtered or unexported fields }
type RequestValidator ¶
type RequestValidator interface {
Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error
}
RequestValidator is an interface that lets you add custom validators for what are good and bad responses
type Result ¶
func AcquireResult ¶
func AcquireResult() *Result
AcquireResult retrieves a host from the shared header pool
func (*Result) AppendBytes ¶
<METHOD> <STATUSCODE> <URL> [redirects ...]
func (*Result) AppendPrettyBytes ¶
<METHOD> <STATUSCODE> [lines, words, lines] <URL> [redirects ...]
type StatusCodeBlacklist ¶
type StatusCodeBlacklist struct {
Codes map[int]interface{}
}
func NewStatusCodeBlacklist ¶
func NewStatusCodeBlacklist(valid []int) *StatusCodeBlacklist
func (StatusCodeBlacklist) String ¶
func (v StatusCodeBlacklist) String() string
func (*StatusCodeBlacklist) Validate ¶
func (v *StatusCodeBlacklist) Validate(r http.Response, _ []WildcardResponse, _ *Config) error
type StatusCodeWhitelist ¶
type StatusCodeWhitelist struct {
Codes map[int]interface{}
}
func NewStatusCodeWhitelist ¶
func NewStatusCodeWhitelist(valid []int) *StatusCodeWhitelist
func (StatusCodeWhitelist) String ¶
func (v StatusCodeWhitelist) String() string
func (*StatusCodeWhitelist) Validate ¶
func (v *StatusCodeWhitelist) Validate(r http.Response, _ []WildcardResponse, _ *Config) error
type WildcardResponse ¶
type WildcardResponse struct { DefaultStatusCode int DefaultContentLength int AdjustedContentLength int // adjustedContentLength is the content length adjusted for the length of the requested path AdjustmentScale int // number of times the requested path appears in the request DefaultWordCount int // number of spaces + 1 DefaultLineCount int // number of newlines + 1 }
type WildcardResponseValidator ¶
type WildcardResponseValidator struct{}
func (*WildcardResponseValidator) Validate ¶
func (v *WildcardResponseValidator) Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error
type WildcardResponses ¶
type WildcardResponses []WildcardResponse
func (WildcardResponses) UniqueAdd ¶
func (w WildcardResponses) UniqueAdd(wr WildcardResponse) (WildcardResponses, bool)