Documentation ¶
Overview ¶
Package gentle defines Stream and Handler interfaces and provides composable resilient implementations of them.
Stream and Handler ¶
Stream and Handler are our fundamental abstractions. Stream has Get() that emits Messages. Handler has Handle() that transforms given Messages.
Message(https://godoc.org/github.com/cfchou/go-gentle/gentle#Message) Stream(https://godoc.org/github.com/cfchou/go-gentle/gentle#Stream) Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Handler)
Developers should implement their own logic in the forms of Stream/Handler. For simple cases, named types SimpleStream and SimpleHandler help directly make a function a Stream/Handler.
SimpleStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleStream) SimpleHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#SimpleHandler)
Example:
// GameScore implements gentle.Message interface type GameScore struct { id string // better to be unique for tracing it in log Score int } // a gentle.Message must support ID func (s GameScore) ID() string { return s.id } // scoreStream is a gentle.Stream that wraps an API call to an external service for // getting game scores. // For simple cases that the logic can be defined entirely in a function, we can // simply define it to be a gentle.SimpleStream. var scoreStream gentle.SimpleStream = func(_ context.Context) (gentle.Message, error) { // simulate a result from an external service return &GameScore{ id: "", Score: rand.Intn(100), }, nil } // DbWriter is a gentle.Handler that writes scores to the database. // Instead of using gentle.SimpleHandler, we define a struct explicitly // implementing gentle.Handler interface. type DbWriter struct { db *sql.DB table string } func (h *DbWriter) Handle(_ context.Context, msg gentle.Message) (gentle.Message, error) { gameScore := msg.(*GameScore) statement := fmt.Sprintf("INSERT INTO %s (score, date) VALUES (?, DATETIME());", h.table) _, err := h.db.Exec(statement, gameScore.Score) if err != nil { return nil, err } return msg, nil } // example continues in the next section
Gentle-ments -- our resilience Streams and Handlers ¶
Resiliency patterns are indispensable in distributed systems because external services are not always reliable. Some useful patterns in the forms of Streams/Handlers are provided in this package(pun to call them gentle-ments). They include rate-limiting, retry(back-off), bulkhead and circuit-breaker. Each of them can be freely composed with other Streams/Handlers as one sees fit.
RateLimitedStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#RateLimitedStream) RetryStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryStream) BulkheadStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#BulkheadStream) CircuitStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#CircuitStream) RateLimitedHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RateLimitedHandler) RetryHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryHandler) BulkheadHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#BulkheadHandler) CircuitHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#CircuitHandler)
Generally, users call one of the option constructors like NewRetryHandlerOpts, to get an option object filled with default values which can be mutated if necessary. Then, pass it to one of the gentle-ment constructors above.
Example cont.(error handling is omitted for brevity):
func main() { db, _ := sql.Open("sqlite3", "scores.sqlite") defer db.Close() db.Exec("DROP TABLE IF EXISTS game;") db.Exec("CREATE TABLE game (score INTEGER, date DATETIME);") dbWriter := &DbWriter{ db: db, table: "game", } // Rate-limit the queries while allowing burst of some gentleScoreStream := gentle.NewRateLimitedStream( gentle.NewRateLimitedStreamOpts("myApp", "rlQuery", gentle.NewTokenBucketRateLimit(500*time.Millisecond, 5)), scoreStream) // Limit concurrent writes to Db limitedDbWriter := gentle.NewBulkheadHandler( gentle.NewBulkheadHandlerOpts("myApp", "bkWrite", 16), dbWriter) // Constantly backing off when limitedDbWriter returns an error backoffFactory := gentle.NewConstBackOffFactory( gentle.NewConstBackOffFactoryOpts(500*time.Millisecond, 5*time.Minute)) gentleDbWriter := gentle.NewRetryHandler( gentle.NewRetryHandlerOpts("myApp", "rtWrite", backoffFactory), limitedDbWriter) // Compose the final Stream stream := gentle.AppendHandlersStream(gentleScoreStream, gentleDbWriter) // Keep fetching scores from the remote service to our database. // The amount of simultaneous go-routines are capped by the size of ticketPool. ticketPool := make(chan struct{}, 1000) for { ticketPool <- struct{}{} go func() { defer func(){ <-ticketPool }() stream.Get(context.Background()) }() } }
Full example(https://gist.github.com/c2ac4060aaf0fcada38a3d85b3c07a71)
Convention ¶
Throughout the package, we follow a convention to create Streams/Handlers or other constructs. Firstly, we calling NewXxxOpts() to obtain an option object initialized with default values. An option object is open for mutation. Once its values are settled, we'll pass it to the corresponding constructor of gentle-ments.
Note that generally an option object is one-off. That is, it should not be reused to construct more than one gentle-ment instance.
Composability ¶
Like gentle-ments, users may define Streams/Handlers to compose other ones the way they want. For simple cases, there are helpers for chaining Streams/Handlers. Their semantic is that any failing element in the chain would skip the rest of all. Also note that any element can also be a nested chain itself.
AppendHandlersStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersStream) AppendHandlersHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendHandlersHandler)
We also define types of fallback and related helpers for appending a chain of fallbacks to Stream/Handler. Any successful fallback in the chain would skip the rest of all.
StreamFallback(https://godoc.org/github.com/cfchou/go-gentle/gentle#StreamFallback) AppendFallbacksStream(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendFallbacksStream) HandlerFallback(https://godoc.org/github.com/cfchou/go-gentle/gentle#HandlerFallback) AppendFallbacksHandler(https://godoc.org/github.com/cfchou/go-gentle/gentle#AppendFallbacksHandler)
Context Support ¶
Stream.Get() and Handler.Handle() both take context.Context. One of Context's common usages is to collaborate request-scoped timeout. Our gentle-ments respect timeout as much as possible and loyally pass the context to the user-defined upstreams or up-handlers which may also respect context's timeout.
Thread Safety ¶
Stream.Get() and Handler.Handle() should be thread-safe. A good practice is to make Stream/Handler state-less. A Message needs not to be immutable but it's good to be so. That said, gentle-ments' Get()/Handle() are all thread-safe and don't mutate Messages.
Logging ¶
Users may plug in whatever logging library they like as long as it supports interface Logger(https://godoc.org/github.com/cfchou/go-gentle/gentle#Logger). Fans of log15 and logurs may check out the sibling package extra/log(https://godoc.org/github.com/cfchou/go-gentle/extra/log) for adapters already available at hand.
There's a root logger gentle.Log which if not specified is a no-op logger. Every gentle-ment has its own logger. Users can get/set the logger in the option object which is then be used to initialize a gentle-ment. By default, each of these loggers is a child returned by gentle.Log.New(fields) where fields are key-value pairs of:
"namespace": "namespace of this Stream/Handler" "name": "name of this Stream/Handler" "gentle": "type of this Stream/Handler"
Logger interface doesn't have methods like SetHandler or SetLevel, because such functions are often implementation-dependent. Instead, you set up the logger and then assign it to gentle.Log or a gentle-ment's option. That way, we have fine-grained controls over logging. Check out the sibling package extra/log for examples.
Metrics ¶
Currently there're three metric interfaces of metrics collectors for gentle-ments:
Metric for RateLimitedStream/Handler, BulkheadStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#Metric) RetryMetric for RetryStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#RetryMetric) CbMetric for CircuitStream/Handler(https://godoc.org/github.com/cfchou/go-gentle/gentle#CbMetric)
In the sibling package extra/metric(https://godoc.org/github.com/cfchou/go-gentle/extra/metric), we have provided implementations for prometheus and statsd and examples. Generally, it's similar to Logger in that one can change an option's metrics collector before creating a gentle-ment. By default, metrics collectors are all no-op.
OpenTracing ¶
Gentle-ments integrate OpenTracing(https://github.com/opentracing/opentracing-go). Users may create a span in the root context which is then passed around by Streams/Handlers. Gentle-ments' options come with a opentracing.Tracer which is by default a global tracer. There's an example of using Uber's jaeger as the backend(https://github.com/cfchou/go-gentle/blob/master/extra/tracing/jaeger.go).
Index ¶
- Constants
- Variables
- func CircuitReset()
- type BackOff
- type BackOffFactory
- type BulkheadHandler
- type BulkheadHandlerOpts
- type BulkheadStream
- type BulkheadStreamOpts
- type CbMetric
- type CircuitConf
- type CircuitHandler
- type CircuitHandlerOpts
- type CircuitStream
- type CircuitStreamOpts
- type ConstBackOffFactory
- type ConstBackOffFactoryOpts
- type ExpBackOffFactory
- type ExpBackOffFactoryOpts
- type Handler
- type HandlerFallback
- type HandlerOpts
- type Logger
- type Message
- type Metric
- type RateLimit
- type RateLimitedHandler
- type RateLimitedHandlerOpts
- type RateLimitedStream
- type RateLimitedStreamOpts
- type RetryHandler
- type RetryHandlerOpts
- type RetryMetric
- type RetryStream
- type RetryStreamOpts
- type SimpleHandler
- type SimpleMessage
- type SimpleStream
- type Stream
- type StreamFallback
- type StreamOpts
- type TokenBucketRateLimit
- type TracingRef
Examples ¶
- AppendFallbacksHandler
- AppendFallbacksStream
- AppendHandlersHandler
- AppendHandlersStream
- NewBulkheadHandler
- NewBulkheadStream
- NewCircuitHandler
- NewCircuitHandler (CustomCircuit)
- NewCircuitStream
- NewCircuitStream (CustomCircuit)
- NewRateLimitedHandler
- NewRateLimitedStream
- NewRetryHandler (ContantBackOff)
- NewRetryHandler (ExpBackOff)
- NewRetryStream (ContantBackOff)
- NewRetryStream (ExpBackOff)
- SimpleHandler
- SimpleStream
Constants ¶
const ( // BackOffStop is a sentinel return by BackOff.Next() indicating that no // more retry should be made. BackOffStop time.Duration = -1 // DefaultMaxNumBackOffs is the default for ContantBackOffFactoryOpts.MaxNumBackOffs // and ExpBackOffFactoryOpts.MaxNumBackOffs DefaultMaxNumBackOffs = 0 // DefaultRandomizationFactor is the default for ExpBackOffFactoryOpts.RandomizationFactor DefaultRandomizationFactor = 0.5 )
const ( // StreamRateLimited and other constants are types of resilience. // They are most often used with namespace & name to form an identifier in // logging/metric/tracing. StreamRateLimited = "sRate" StreamRetry = "sRetry" StreamBulkhead = "sBulk" StreamCircuit = "sCircuit" HandlerRateLimited = "hRate" HandlerRetry = "hRetry" HandlerBulkhead = "hBulk" HandlerCircuit = "hCircuit" )
const ( // Default values of a CircuitConf DefaultCbTimeout = 10 * time.Second DefaultCbMaxConcurrent = 1024 DefaultCbVolumeThreshold = 20 DefaultCbErrPercentThreshold = 50 DefaultCbSleepWindow = 5 * time.Second )
Variables ¶
var ( // ErrCbOpen suggests the circuit is opened. ErrCbOpen = errors.New(hystrix.ErrCircuitOpen.Error()) // ErrCbMaxConcurrency suggests the circuit has reached its maximum // concurrency of operations. ErrCbMaxConcurrency = errors.New(hystrix.ErrMaxConcurrency.Error()) // ErrCbTimeout suggests the operation has run for too long. ErrCbTimeout = errors.New(hystrix.ErrTimeout.Error()) // ErrMaxConcurrency suggests BulkheadStream/BulkheadHandler has reached // its maximum concurrency of operations. ErrMaxConcurrency = errors.New("Reached Max Concurrency") )
Functions ¶
func CircuitReset ¶
func CircuitReset()
CircuitReset resets all states(incl. metrics) of all circuits.
Types ¶
type BackOff ¶
BackOff provides a series of back-offs. When Next() returning BackOffStop, it'll no longer be called Next() again.
type BackOffFactory ¶
type BackOffFactory interface {
NewBackOff() BackOff
}
BackOffFactory creates a BackOff every time when RetryStream.Get() or RetryHandler.Handle() needs a series of back-offs.
type BulkheadHandler ¶
type BulkheadHandler struct {
// contains filtered or unexported fields
}
BulkheadHandler is a Handler that limits concurrent access to the up-handler.
func NewBulkheadHandler ¶
func NewBulkheadHandler(opts *BulkheadHandlerOpts, upHandler Handler) *BulkheadHandler
NewBulkheadHandler creates a BulkheadHandler to guard the up-handler.
Example ¶
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { return msg, nil } count := 10 // limit concurrent access to fakeHandler handler := NewBulkheadHandler( NewBulkheadHandlerOpts("", "test", 2), fakeHandler) wg := &sync.WaitGroup{} wg.Add(count) var msgID int64 for i := 0; i < count; i++ { go func() { defer wg.Done() id := atomic.AddInt64(&msgID, 1) msg, err := handler.Handle(context.Background(), SimpleMessage(strconv.FormatInt(id, 10))) if err != nil { if err == ErrMaxConcurrency { fmt.Println("Reached MaxConcurrency") } else { fmt.Println("Other err:", err) } } else { fmt.Println("msg:", msg.ID()) } }() } wg.Wait()
Output:
type BulkheadHandlerOpts ¶
type BulkheadHandlerOpts struct { HandlerOpts Metric Metric // MaxConcurrency limits the amount of concurrent Handle() MaxConcurrency int }
BulkheadHandlerOpts contains options that'll be used by NewBulkheadHandler.
func NewBulkheadHandlerOpts ¶
func NewBulkheadHandlerOpts(namespace, name string, maxConcurrency int) *BulkheadHandlerOpts
NewBulkheadHandlerOpts returns BulkheadHandlerOpts with default values.
type BulkheadStream ¶
type BulkheadStream struct {
// contains filtered or unexported fields
}
BulkheadStream is a Stream that limits concurrent access to the upstream.
func NewBulkheadStream ¶
func NewBulkheadStream(opts *BulkheadStreamOpts, upstream Stream) *BulkheadStream
NewBulkheadStream creates a BulkheadStream to guard the upstream.
Example ¶
var msgID int64 var fakeStream SimpleStream = func(_ context.Context) (Message, error) { id := atomic.AddInt64(&msgID, 1) return SimpleMessage(strconv.FormatInt(id, 10)), nil } count := 10 // limit concurrent access to fakeStream stream := NewBulkheadStream( NewBulkheadStreamOpts("", "test", 2), fakeStream) wg := &sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { defer wg.Done() msg, err := stream.Get(context.Background()) if err != nil { if err == ErrMaxConcurrency { fmt.Println("Reached MaxConcurrency") } else { fmt.Println("Other err:", err) } } else { fmt.Println("msg:", msg.ID()) } }() } wg.Wait()
Output:
type BulkheadStreamOpts ¶
type BulkheadStreamOpts struct { StreamOpts Metric Metric MaxConcurrency int }
BulkheadStreamOpts contains options that'll be used by NewBulkheadStream.
func NewBulkheadStreamOpts ¶
func NewBulkheadStreamOpts(namespace, name string, maxConcurrency int) *BulkheadStreamOpts
NewBulkheadStreamOpts returns BulkheadStreamOpts with default values.
type CbMetric ¶
type CbMetric interface { // Successful Stream.Get()/Handler.Handle() with timespan ObserveOk(timespan time.Duration) // Failed Stream.Get()/Handler.Handle() with timespan and error. ObserveErr(timespan time.Duration, err error) }
CbMetric is an interface for collecting metrics by CircuitStream/CircuitHandler
type CircuitConf ¶
type CircuitConf struct { // Timeout is how long to wait for command to complete Timeout time.Duration // MaxConcurrent is how many commands of the same type can run // at the same time MaxConcurrent int // VolumeThreshold is the minimum number of requests needed // before a circuit can be tripped due to health VolumeThreshold int // ErrorPercentThreshold causes circuits to open once the // rolling measure of errors exceeds this percent of requests ErrorPercentThreshold int // SleepWindow is how long to wait after a circuit opens before testing // for recovery is allowed SleepWindow time.Duration }
CircuitConf is the configuration of a circuit-breaker.
func (*CircuitConf) RegisterFor ¶
func (c *CircuitConf) RegisterFor(circuit string)
RegisterFor applies the CircuitConf to a circuit-breaker identified by its name.
type CircuitHandler ¶
type CircuitHandler struct {
// contains filtered or unexported fields
}
CircuitHandler is a Handler that guards the up-handler with a circuit-breaker.
func NewCircuitHandler ¶
func NewCircuitHandler(opts *CircuitHandlerOpts, upHandler Handler) *CircuitHandler
NewCircuitHandler creates a CircuitHandler to guard the up-handler.
Example ¶
rand.Seed(time.Now().UnixNano()) fakeErr := errors.New("fake err") // fakeHandler with randomized outcome var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { if rand.Intn(10)%10 == 0 { // 1/10 chances to return fakeErr return nil, fakeErr } if rand.Intn(10)%10 == 1 { // 1/10 chances to sleep until circuit's timeout time.Sleep(DefaultCbTimeout + 10*time.Millisecond) } return msg, nil } // resets all states(incl. metrics) of all circuits. CircuitReset() // create CircuitHandler to protect fakeHandler handler := NewCircuitHandler( NewCircuitHandlerOpts("", "test", xid.New().String()), fakeHandler) count := 100 wg := &sync.WaitGroup{} wg.Add(count) var msgID int64 for i := 0; i < count; i++ { go func() { defer wg.Done() id := atomic.AddInt64(&msgID, 1) msg, err := handler.Handle(context.Background(), SimpleMessage(strconv.FormatInt(id, 10))) if err != nil { switch err { case ErrCbMaxConcurrency: fmt.Println("Reached Circuit's MaxConcurrency") case ErrCbTimeout: fmt.Println("Reached Circuit's Timeout") case ErrCbOpen: fmt.Println("Reached Circuit's threshold so it opens") default: fmt.Println("Other err:", err) } return } fmt.Println("msg:", msg.ID()) }() } wg.Wait()
Output:
Example (CustomCircuit) ¶
rand.Seed(time.Now().UnixNano()) fakeErr := errors.New("fake err") // fakeHandler with randomized outcome var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { if rand.Intn(10)%10 == 0 { // 1/10 chances to return fakeErr return nil, fakeErr } if rand.Intn(10)%10 == 1 { // 1/10 chances to sleep until circuit's timeout time.Sleep(DefaultCbTimeout + 10*time.Millisecond) } return msg, nil } // customize circuit's setting circuit := xid.New().String() opts := NewCircuitHandlerOpts("", "test", circuit) opts.CircuitConf.MaxConcurrent = 20 // resets all states(incl. metrics) of all circuits. CircuitReset() // create CircuitHandler to protect fakeHandler handler := NewCircuitHandler(opts, fakeHandler) count := 100 wg := &sync.WaitGroup{} wg.Add(count) var msgID int64 for i := 0; i < count; i++ { go func() { defer wg.Done() id := atomic.AddInt64(&msgID, 1) msg, err := handler.Handle(context.Background(), SimpleMessage(strconv.FormatInt(id, 10))) if err != nil { switch err { case ErrCbMaxConcurrency: fmt.Println("Reached Circuit's MaxConcurrency") case ErrCbTimeout: fmt.Println("Reached Circuit's Timeout") case ErrCbOpen: fmt.Println("Reached Circuit's threshold so it opens") default: fmt.Println("Other err:", err) } return } fmt.Println("msg:", msg.ID()) }() } wg.Wait()
Output:
func (*CircuitHandler) GetCircuitName ¶
func (r *CircuitHandler) GetCircuitName() string
GetCircuitName returns the name of the internal circuit.
type CircuitHandlerOpts ¶
type CircuitHandlerOpts struct { HandlerOpts // CbMetric is the circuit's metric collector. Default is no-op. CbMetric CbMetric // Circuit is the name of the circuit-breaker to create. Each circuit-breaker // must have an unique name associated to its CircuitConf and internal hystrix metrics. Circuit string CircuitConf CircuitConf }
CircuitHandlerOpts contains options that'll be used by NewCircuitHandler.
func NewCircuitHandlerOpts ¶
func NewCircuitHandlerOpts(namespace, name, circuit string) *CircuitHandlerOpts
NewCircuitHandlerOpts returns CircuitHandlerOpts with default values. Note that circuit is the name of the circuit. Every circuit must have a unique name that maps to its CircuitConf.
type CircuitStream ¶
type CircuitStream struct {
// contains filtered or unexported fields
}
CircuitStream is a Stream that guards the upstream with a circuit-breaker.
func NewCircuitStream ¶
func NewCircuitStream(opts *CircuitStreamOpts, upstream Stream) *CircuitStream
NewCircuitStream creates a CircuitStream to guard the upstream.
Example ¶
rand.Seed(time.Now().UnixNano()) fakeErr := errors.New("fake err") var msgID int64 // fakeStream with randomized outcome var fakeStream SimpleStream = func(_ context.Context) (Message, error) { if rand.Intn(10)%10 == 0 { // 1/10 chances to return fakeErr return nil, fakeErr } id := atomic.AddInt64(&msgID, 1) if rand.Intn(10)%10 == 1 { // 1/10 chances to sleep until circuit's timeout time.Sleep(DefaultCbTimeout + 10*time.Millisecond) } return SimpleMessage(strconv.FormatInt(id, 10)), nil } // resets all states(incl. metrics) of all circuits. CircuitReset() // create CircuitStream to protect fakeStream stream := NewCircuitStream( NewCircuitStreamOpts("", "test", xid.New().String()), fakeStream) count := 100 wg := &sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { defer wg.Done() msg, err := stream.Get(context.Background()) if err != nil { switch err { case ErrCbMaxConcurrency: fmt.Println("Reached Circuit's MaxConcurrency") case ErrCbTimeout: fmt.Println("Reached Circuit's Timeout") case ErrCbOpen: fmt.Println("Reached Circuit's threshold so it opens") default: fmt.Println("Other err:", err) } return } fmt.Println("msg:", msg.ID()) }() } wg.Wait()
Output:
Example (CustomCircuit) ¶
rand.Seed(time.Now().UnixNano()) fakeErr := errors.New("fake err") var msgID int64 // fakeStream with randomized outcome var fakeStream SimpleStream = func(_ context.Context) (Message, error) { if rand.Intn(10)%10 == 0 { // 1/10 chances to return fakeErr return nil, fakeErr } id := atomic.AddInt64(&msgID, 1) if rand.Intn(10)%10 == 1 { // 1/10 chances to sleep until circuit's timeout time.Sleep(DefaultCbTimeout + 10*time.Millisecond) } return SimpleMessage(strconv.FormatInt(id, 10)), nil } // customize circuit's setting circuit := xid.New().String() opts := NewCircuitStreamOpts("", "test", circuit) opts.CircuitConf.MaxConcurrent = 20 // resets all states(incl. metrics) of all circuits. CircuitReset() // create CircuitStream to protect fakeStream stream := NewCircuitStream(opts, fakeStream) count := 100 wg := &sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { defer wg.Done() msg, err := stream.Get(context.Background()) if err != nil { switch err { case ErrCbMaxConcurrency: fmt.Println("Reached Circuit's MaxConcurrency") case ErrCbTimeout: fmt.Println("Reached Circuit's Timeout") case ErrCbOpen: fmt.Println("Reached Circuit's threshold so it opens") default: fmt.Println("Other err:", err) } return } fmt.Println("msg:", msg.ID()) }() } wg.Wait()
Output:
func (*CircuitStream) Get ¶
func (r *CircuitStream) Get(ctx context.Context) (Message, error)
Get may return errors that generated by the upstream and errors from circuit itself including ErrCircuitOpen, ErrCbMaxConcurrency and ErrCbTimeout.
func (*CircuitStream) GetCircuitName ¶
func (r *CircuitStream) GetCircuitName() string
GetCircuitName returns the name of the internal circuit.
type CircuitStreamOpts ¶
type CircuitStreamOpts struct { StreamOpts CbMetric CbMetric // Circuit is the name of the circuit-breaker to create. Each circuit-breaker // must have an unique name associated to its CircuitConf and internal hystrix metrics. Circuit string CircuitConf CircuitConf }
CircuitStreamOpts contains options that'll be used by NewCircuitStream.
func NewCircuitStreamOpts ¶
func NewCircuitStreamOpts(namespace, name, circuit string) *CircuitStreamOpts
NewCircuitStreamOpts returns CircuitStreamOpts with default values. Note that circuit is the name of the circuit. Every circuit must have a unique name that maps to its CircuitConf.
type ConstBackOffFactory ¶
type ConstBackOffFactory struct {
// contains filtered or unexported fields
}
ConstBackOffFactory creates non-thread-safe constant BackOffs.
func NewConstBackOffFactory ¶
func NewConstBackOffFactory(opts *ConstBackOffFactoryOpts) *ConstBackOffFactory
NewConstBackOffFactory creates a ConstBackOffFactory.
func (*ConstBackOffFactory) NewBackOff ¶
func (f *ConstBackOffFactory) NewBackOff() BackOff
NewBackOff creates a constant BackOff object.
type ConstBackOffFactoryOpts ¶
type ConstBackOffFactoryOpts struct { Interval time.Duration // After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff // stops. // If both are 0, it never stops backing off. // If only one of them is 0, then the other is checked. MaxElapsedTime time.Duration MaxNumBackOffs int64 // Clock provides a way to mock time. Mainly used in tests. Clock clock.Clock }
ConstBackOffFactoryOpts is settings for ConstBackOffFactory.
func NewConstBackOffFactoryOpts ¶
func NewConstBackOffFactoryOpts(interval time.Duration, maxElapsedTime time.Duration) *ConstBackOffFactoryOpts
NewConstBackOffFactoryOpts creates a default ConstBackOffFactoryOpts.
type ExpBackOffFactory ¶
type ExpBackOffFactory struct {
// contains filtered or unexported fields
}
ExpBackOffFactory creates non-thread-safe exponential BackOffs.
func NewExpBackOffFactory ¶
func NewExpBackOffFactory(opts *ExpBackOffFactoryOpts) *ExpBackOffFactory
NewExpBackOffFactory creates a ExpBackOffFactory
func (*ExpBackOffFactory) NewBackOff ¶
func (f *ExpBackOffFactory) NewBackOff() BackOff
NewBackOff creates an exponential BackOff object.
type ExpBackOffFactoryOpts ¶
type ExpBackOffFactoryOpts struct { // Next() returns a randomizedInterval which is: // currentInterval * rand(range [1-RandomizationFactor, 1+RandomizationFactor]) InitialInterval time.Duration RandomizationFactor float64 Multiplier float64 // If currentInterval * Multiplier >= MaxInterval, then currentInterval = b.MaxInterval // Otherwise, currentInterval *= Multiplier MaxInterval time.Duration // After MaxElapsedTime or MaxNumBackOffs(whichever comes first) the BackOff // stops. // If both are 0, it never stops backing off. // If only one of them is 0, then the other is checked. MaxElapsedTime time.Duration MaxNumBackOffs int64 // Clock provides a way to mock time. Mainly used in tests. Clock clock.Clock }
ExpBackOffFactoryOpts is settings for ExpBackOffFactory.
func NewExpBackOffFactoryOpts ¶
func NewExpBackOffFactoryOpts(initInterval time.Duration, multiplier float64, maxInterval time.Duration, maxElapsedTime time.Duration) *ExpBackOffFactoryOpts
NewExpBackOffFactoryOpts creates a default ExpBackOffFactoryOpts.
type Handler ¶
type Handler interface { // Handle() takes a Message as input and then returns either a Message or // an error exclusively. Handle(context.Context, Message) (Message, error) }
Handler transforms a Message.
func AppendFallbacksHandler ¶
func AppendFallbacksHandler(handler Handler, fallbacks ...HandlerFallback) Handler
AppendFallbacksHandler appends multiple HandlerFallbacks to a Handler. HandlerFallbacks run in sequence. The first fallback takes the error from handler.Handle() and every subsequent fallback accepts an error returned by the previous one. Moreover every fallback also takes the same Message passed to handler.
Example ¶
var firstHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { fmt.Println("handler err caused by msg:", msg.ID()) return nil, errors.New("handler err") } var fallbackA HandlerFallback = func(_ context.Context, msg Message, err error) (Message, error) { fmt.Println("fallbackA fails to deal with msg:", msg.ID()) return nil, errors.New("fallback err") } var fallbackB HandlerFallback = func(_ context.Context, msg Message, err error) (Message, error) { msgOut := SimpleMessage("xyz") fmt.Printf("fallbackB successfully deals with msg: %s, returns msg: %s\n", msg.ID(), msgOut.ID()) return msgOut, nil } handler := AppendFallbacksHandler(firstHandler, fallbackA, fallbackB) msg, _ := handler.Handle(context.Background(), SimpleMessage("abc")) fmt.Println("msg finally returned:", msg.ID())
Output: handler err caused by msg: abc fallbackA fails to deal with msg: abc fallbackB successfully deals with msg: abc, returns msg: xyz msg finally returned: xyz
func AppendHandlersHandler ¶
AppendHandlersHandler appends multiple Handlers to a Handler. Handlers run in sequence and every handler accepts a Message returned by the previous handler.
Example ¶
var repeatHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { msgOut := SimpleMessage(msg.ID() + "+") fmt.Printf("handle msg: %s, returns msg: %s\n", msg.ID(), msgOut.ID()) return msgOut, nil } handler := AppendHandlersHandler(repeatHandler, repeatHandler, repeatHandler) msg, _ := handler.Handle(context.Background(), SimpleMessage("abc")) fmt.Println("msg finally returned:", msg.ID())
Output: handle msg: abc, returns msg: abc+ handle msg: abc+, returns msg: abc++ handle msg: abc++, returns msg: abc+++ msg finally returned: abc+++
type HandlerFallback ¶
HandlerFallback is defined to be a function that takes the error returned by Handler.Handle() and the causal Message of that error. It then either returns a Message or return an error.
type HandlerOpts ¶
type HandlerOpts struct { // Namespace and Name are for logically organizing Streams/Handlers. They // appear along with the type of resilience in log or tracing Namespace string Name string Log Logger Tracer opentracing.Tracer TracingRef TracingRef }
HandlerOpts is options that every XxxHandlerOpts must have.
type Logger ¶
type Logger interface { // Log a message at the given level with key/value pairs. The number of // fields must be multiple of two for a key and a value. Debug(msg string, fields ...interface{}) Info(msg string, fields ...interface{}) Warn(msg string, fields ...interface{}) Error(msg string, fields ...interface{}) Crit(msg string, fields ...interface{}) // Logger may support structured/contextual logging. Our Streams/Handlers // by default will acquire an logger by calling package root logger // gentle.Log.New( // "namespace", "namespace of this Stream/Handler", // "name", "name of this Stream/Handler", // "gentle", "type of this stream/handler") New(fields ...interface{}) Logger }
Logger provides structural logging interface.
var Log Logger
Log is a package level logger. It's the parent logger of all loggers used by resilience Streams/Handlers defined in this package.
type Message ¶
type Message interface { // ID() should return a unique string representing this Message. ID() string }
Message is passed around Streams/Handlers.
type Metric ¶
type Metric interface { // Successful Stream.Get()/Handler.Handle() with timespan ObserveOk(timespan time.Duration) // Failed Stream.Get()/Handler.Handle() with timespan ObserveErr(timespan time.Duration) }
Metric is an interface for collecting metrics by RateLimitStream/RateLimitHandler/BulkheadStream/BulkheadHandler
type RateLimit ¶
type RateLimit interface { // Wait for $count tokens to be granted(return true) or timeout(return // false). If $timeout == 0, it would block as long as it needs. Wait(count int, timeout time.Duration) bool }
RateLimit is an interface for a "token bucket" algorithm.
type RateLimitedHandler ¶
type RateLimitedHandler struct {
// contains filtered or unexported fields
}
RateLimitedHandler is a Handler that runs the up-handler in a rate-limited manner.
func NewRateLimitedHandler ¶
func NewRateLimitedHandler(opts *RateLimitedHandlerOpts, upHandler Handler) *RateLimitedHandler
NewRateLimitedHandler creates a RateLimitedHandler to guard the up-handler.
Example ¶
var fakeHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { return msg, nil } count := 5 interval := 100 * time.Millisecond minimum := time.Duration(count-1) * interval // limit the rate to access fakeStream handler := NewRateLimitedHandler( NewRateLimitedHandlerOpts("", "test", NewTokenBucketRateLimit(interval, 1)), fakeHandler) begin := time.Now() wg := &sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { defer wg.Done() handler.Handle(context.Background(), SimpleMessage("abc")) }() } wg.Wait() fmt.Printf("Spend more than %s? %t\n", minimum, time.Now().After(begin.Add(minimum)))
Output: Spend more than 400ms? true
type RateLimitedHandlerOpts ¶
type RateLimitedHandlerOpts struct { HandlerOpts Metric Metric Limiter RateLimit }
RateLimitedHandlerOpts contains options that'll be used by NewRateLimitedHandler.
func NewRateLimitedHandlerOpts ¶
func NewRateLimitedHandlerOpts(namespace, name string, limiter RateLimit) *RateLimitedHandlerOpts
NewRateLimitedHandlerOpts returns RateLimitedHandlerOpts with default values.
type RateLimitedStream ¶
type RateLimitedStream struct {
// contains filtered or unexported fields
}
RateLimitedStream is a Stream that runs the upstream in a rate-limited manner.
func NewRateLimitedStream ¶
func NewRateLimitedStream(opts *RateLimitedStreamOpts, upstream Stream) *RateLimitedStream
NewRateLimitedStream creates a RateLimitedStream to guard the upstream.
Example ¶
var msgID int64 var fakeStream SimpleStream = func(_ context.Context) (Message, error) { id := atomic.AddInt64(&msgID, 1) return SimpleMessage(strconv.FormatInt(id, 10)), nil } count := 5 interval := 100 * time.Millisecond minimum := time.Duration(count-1) * interval // limit the rate to access fakeStream stream := NewRateLimitedStream( NewRateLimitedStreamOpts("", "test", NewTokenBucketRateLimit(interval, 1)), fakeStream) begin := time.Now() wg := &sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { defer wg.Done() stream.Get(context.Background()) }() } wg.Wait() fmt.Printf("Spend more than %s? %t\n", minimum, time.Now().After(begin.Add(minimum)))
Output: Spend more than 400ms? true
type RateLimitedStreamOpts ¶
type RateLimitedStreamOpts struct { StreamOpts Metric Metric Limiter RateLimit }
RateLimitedStreamOpts contains options that'll be used by NewRateLimitedStream.
func NewRateLimitedStreamOpts ¶
func NewRateLimitedStreamOpts(namespace, name string, limiter RateLimit) *RateLimitedStreamOpts
NewRateLimitedStreamOpts returns RateLimitedStreamOpts with default values.
type RetryHandler ¶
type RetryHandler struct {
// contains filtered or unexported fields
}
RetryHandler is a Handler that retries the up-handler with back-offs.
func NewRetryHandler ¶
func NewRetryHandler(opts *RetryHandlerOpts, upHandler Handler) *RetryHandler
NewRetryHandler creates a RetryHandler to guard the up-handler.
Example (ContantBackOff) ¶
fakeErr := errors.New("fake err") // fakeHandler keeps triggering back-offs. var fakeHandler SimpleHandler = func(_ context.Context, _ Message) (Message, error) { return nil, fakeErr } // No more back-off when total execution + back-offs elapsed more than 1s. backOffOpts := NewConstBackOffFactoryOpts(100*time.Millisecond, time.Second) backOffFactory := NewConstBackOffFactory(backOffOpts) opts := NewRetryHandlerOpts("", "test", backOffFactory) // Retry with back-offs to access fakeHandler handler := NewRetryHandler(opts, fakeHandler) _, err := handler.Handle(context.Background(), SimpleMessage("abc")) fmt.Println(err)
Output: fake err
Example (ExpBackOff) ¶
fakeErr := errors.New("fake err") // fakeHandler keeps triggering back-offs. var fakeHandler SimpleHandler = func(_ context.Context, _ Message) (Message, error) { return nil, fakeErr } // No more back-off when total execution + back-offs elapsed more than 2s. backOffOpts := NewExpBackOffFactoryOpts(100*time.Millisecond, 2, time.Second, 2*time.Second) backOffFactory := NewExpBackOffFactory(backOffOpts) opts := NewRetryHandlerOpts("", "test", backOffFactory) // Retry with back-offs to access fakeHandler handler := NewRetryHandler(opts, fakeHandler) _, err := handler.Handle(context.Background(), SimpleMessage("abc")) fmt.Println(err)
Output: fake err
type RetryHandlerOpts ¶
type RetryHandlerOpts struct { HandlerOpts RetryMetric RetryMetric // TODO // remove the dependency to package clock for this exported symbol Clock clock.Clock // BackOffFactory for instantiating BackOff objects BackOffFactory BackOffFactory }
RetryHandlerOpts contains options that'll be used by NewRetryHandler.
func NewRetryHandlerOpts ¶
func NewRetryHandlerOpts(namespace, name string, backOffFactory BackOffFactory) *RetryHandlerOpts
NewRetryHandlerOpts returns RetryHandlerOpts with default values.
type RetryMetric ¶
type RetryMetric interface { // Successful Stream.Get()/Handler.Handle() with timespan and the number of retries ObserveOk(timespan time.Duration, retry int) // Failed Stream.Get()/Handler.Handle() with timespan and the number of retries ObserveErr(timespan time.Duration, retry int) }
RetryMetric is an interface for collecting metrics by RetryStream/RetryHandler
type RetryStream ¶
type RetryStream struct {
// contains filtered or unexported fields
}
RetryStream will, when Get() encounters error, back off for some time and then retries.
func NewRetryStream ¶
func NewRetryStream(opts *RetryStreamOpts, upstream Stream) *RetryStream
NewRetryStream creates a RetryStream to guard the upstream.
Example (ContantBackOff) ¶
fakeErr := errors.New("fake err") // fakeStream keeps triggering back-offs. var fakeStream SimpleStream = func(_ context.Context) (Message, error) { return nil, fakeErr } // No more back-off when total execution + back-offs elapsed more than 1s. backOffOpts := NewConstBackOffFactoryOpts(100*time.Millisecond, time.Second) backOffFactory := NewConstBackOffFactory(backOffOpts) opts := NewRetryStreamOpts("", "test", backOffFactory) // Retry with back-offs to access fakeStream stream := NewRetryStream(opts, fakeStream) _, err := stream.Get(context.Background()) fmt.Println(err)
Output: fake err
Example (ExpBackOff) ¶
fakeErr := errors.New("fake err") // fakeStream keeps triggering back-offs. var fakeStream SimpleStream = func(_ context.Context) (Message, error) { return nil, fakeErr } // No more back-off when total execution + back-offs elapsed more than 2s. backOffOpts := NewExpBackOffFactoryOpts(100*time.Millisecond, 2, time.Second, 2*time.Second) backOffFactory := NewExpBackOffFactory(backOffOpts) opts := NewRetryStreamOpts("", "test", backOffFactory) // Retry with back-offs to access fakeStream stream := NewRetryStream(opts, fakeStream) _, err := stream.Get(context.Background()) fmt.Println(err)
Output: fake err
type RetryStreamOpts ¶
type RetryStreamOpts struct { StreamOpts RetryMetric RetryMetric // TODO // remove the dependency to package clock for this exported symbol Clock clock.Clock BackOffFactory BackOffFactory }
RetryStreamOpts contains options that'll be used by NewRetryStream.
func NewRetryStreamOpts ¶
func NewRetryStreamOpts(namespace, name string, backOffFactory BackOffFactory) *RetryStreamOpts
NewRetryStreamOpts returns RetryStreamOpts with default values.
type SimpleHandler ¶
SimpleHandler turns a function into a Handler
Example ¶
var handler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { return msg, nil } msg, _ := handler.Handle(context.Background(), SimpleMessage("abc")) fmt.Println("msg:", msg.ID())
Output: msg: abc
type SimpleMessage ¶
type SimpleMessage string
SimpleMessage essentially wraps a string to be a Message.
func (SimpleMessage) ID ¶
func (m SimpleMessage) ID() string
ID return's the identifier of the SimpleMessage.
type SimpleStream ¶
SimpleStream turns a function into a Stream
Example ¶
msgID := 0 var stream SimpleStream = func(_ context.Context) (Message, error) { msgID++ return SimpleMessage(strconv.Itoa(msgID)), nil } for i := 0; i < 5; i++ { msg, _ := stream.Get(context.Background()) fmt.Println("msg:", msg.ID()) }
Output: msg: 1 msg: 2 msg: 3 msg: 4 msg: 5
type Stream ¶
type Stream interface { // Get() returns either a Message or an error exclusively. Get(context.Context) (Message, error) }
Stream emits Message.
func AppendFallbacksStream ¶
func AppendFallbacksStream(stream Stream, fallbacks ...StreamFallback) Stream
AppendFallbacksStream appends multiple StreamFallbacks to a Stream. StreamFallbacks run in sequence. The first fallback accepts Message returned by the Stream and then every subsequent fallback accepts an error returned by the previous fallback.
Example ¶
var upstream SimpleStream = func(_ context.Context) (Message, error) { fmt.Println("upstream fails to return a msg") return nil, errors.New("stream error") } var fallbackA StreamFallback = func(_ context.Context, err error) (Message, error) { fmt.Println("fallbackA fails to correct the error:", err) return nil, errors.New("fallbackA error") } var fallbackB StreamFallback = func(_ context.Context, err error) (Message, error) { fmt.Println("fallbackB successfully deals with the error:", err) return SimpleMessage("abc"), nil } stream := AppendFallbacksStream(upstream, fallbackA, fallbackB) msg, _ := stream.Get(context.Background()) fmt.Println("msg finally returned:", msg.ID())
Output: upstream fails to return a msg fallbackA fails to correct the error: stream error fallbackB successfully deals with the error: fallbackA error msg finally returned: abc
func AppendHandlersStream ¶
AppendHandlersStream appends multiple Handlers to a Stream. Handlers run in sequence. The first handler accepts Message returned by the stream, and then every subsequent handler accepts a Message returned by the previous handler.
Example ¶
var upstream SimpleStream = func(_ context.Context) (Message, error) { msg := SimpleMessage("abc") fmt.Println("stream returns msg:", msg.ID()) return msg, nil } var repeatHandler SimpleHandler = func(_ context.Context, msg Message) (Message, error) { msgOut := SimpleMessage(msg.ID() + "+") fmt.Printf("handle msg: %s, returns msg: %s\n", msg.ID(), msgOut.ID()) return msgOut, nil } stream := AppendHandlersStream(upstream, repeatHandler, repeatHandler) msg, _ := stream.Get(context.Background()) fmt.Println("msg finally returned:", msg.ID())
Output: stream returns msg: abc handle msg: abc, returns msg: abc+ handle msg: abc+, returns msg: abc++ msg finally returned: abc++
type StreamFallback ¶
StreamFallback is defined to be a function that takes the error returned by Stream.Get() and then either returns a Message or return an error.
type StreamOpts ¶
type StreamOpts struct { Namespace string Name string Log Logger Tracer opentracing.Tracer TracingRef TracingRef }
StreamOpts is options that every XxxStreamOpts must have.
type TokenBucketRateLimit ¶
type TokenBucketRateLimit struct {
// contains filtered or unexported fields
}
func NewTokenBucketRateLimit ¶
func NewTokenBucketRateLimit(requestsInterval time.Duration, maxRequestBurst int) *TokenBucketRateLimit
$requestsInterval The minimum interval, in milliseconds, between two consecutive requests. N is the amount of requests allowed when a burst of requests coming in after not seeing requests for N * RequestsInterval. N is capped by $maxRequestBurst. If $maxRequestBurst == 1, then no burst allowed.
type TracingRef ¶
type TracingRef int
TracingRef is opentracing tracing causal reference, currently either a ChildOf or a FollowsFrom.
const ( // TracingDisabled indicates that tracing is not enabled regardless whether // a span exists in the given context. TracingDisabled TracingRef = iota // TracingChildOf represents tracing causal reference ChildOf. TracingChildOf // TracingFollowsFrom represents tracing causal reference FromFrom. TracingFollowsFrom )