Documentation ¶
Index ¶
- Constants
- type Action
- type Buffer
- func (ab *Buffer) Add(ctx context.Context, obj interface{})
- func (ab *Buffer) AddMany(ctx context.Context, objs ...interface{})
- func (ab *Buffer) Background() context.Context
- func (ab *Buffer) Dispatch()
- func (ab *Buffer) FlushAsync(ctx context.Context) error
- func (ab *Buffer) NotifyStarted() <-chan struct{}
- func (ab *Buffer) NotifyStopped() <-chan struct{}
- func (ab *Buffer) Start() error
- func (ab *Buffer) Stop() error
- type Flush
- type Option
- func OptContext(ctx context.Context) Option
- func OptErrors(errors chan error) Option
- func OptInterval(d time.Duration) Option
- func OptLog(log logger.Log) Option
- func OptMaxFlushes(maxFlushes int) Option
- func OptMaxLen(maxLen int) Option
- func OptParallelism(parallelism int) Option
- func OptShutdownGracePeriod(shutdownGracePeriod time.Duration) Option
- func OptStats(stats stats.Collector) Option
- func OptTracer(tracer Tracer) Option
- type TraceFinisher
- type Tracer
Constants ¶
const ( DefaultMaxFlushes = 128 DefaultMaxLen = 512 DefaultFlushInterval = 500 * time.Millisecond DefaultShutdownGracePeriod = 10 * time.Second )
Defaults
const ( MetricFlush string = "autoflush.flush" MetricFlushItemCount string = "autoflush.flush.item_count" MetricFlushEnqueueElapsed string = "autoflush.flush.enqueue.elapsed" MetricFlushHandler string = "autoflush.flush.handler" MetricFlushHandlerElapsed string = "autoflush.flush.handler.elapsed" MetricFlushQueueLength string = "autoflush.flush.queue_length" MetricBufferLength string = "autoflush.buffer.length" MetricAdd string = "autoflush.add" MetricAddElapsed string = "autoflush.add.elapsed" MetricAddMany string = "autoflush.add_many" MetricAddManyItemCount string = "autoflush.add_many.item_count" MetricAddManyElapsed string = "autoflush.add_many.elapsed" )
Metric names
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer struct { Latch *async.Latch Context context.Context Log logger.Log Stats stats.Collector Tracer Tracer MaxLen int Interval time.Duration Parallelism int MaxFlushes int ShutdownGracePeriod time.Duration Handler Action Errors chan error // contains filtered or unexported fields }
Buffer is a backing store that operates either on a fixed length flush or a fixed interval flush. A handler should be provided but without one the buffer will just clear. Adds that would cause fixed length flushes do not block on the flush handler.
func (*Buffer) Add ¶
Add adds a new object to the buffer, blocking if it triggers a flush. If the buffer is full, it will call the flush handler on a separate goroutine.
func (*Buffer) Background ¶
Background returns a background context.
func (*Buffer) FlushAsync ¶
FlushAsync clears the buffer, if a handler is provided it is passed the contents of the buffer. This call is asynchronous, in that it will call the flush handler on its own goroutine.
func (*Buffer) NotifyStarted ¶
func (ab *Buffer) NotifyStarted() <-chan struct{}
NotifyStarted implements graceful.Graceful.
func (*Buffer) NotifyStopped ¶
func (ab *Buffer) NotifyStopped() <-chan struct{}
NotifyStopped implements graceful.Graceful.
type Option ¶
type Option func(*Buffer)
Option is an option for auto-flush buffers.
func OptContext ¶
OptContext sets the auto-flush buffer's context.
func OptInterval ¶
OptInterval sets the auto-flush buffer's interval.
func OptMaxFlushes ¶
OptMaxFlushes sets the auto-flush buffer's maximum flush queue length.
func OptParallelism ¶
OptParallelism sets the auto-flush buffer's flush worker count.
func OptShutdownGracePeriod ¶
OptShutdownGracePeriod sets the auto-flush buffer's shutdown grace period.
type Tracer ¶
type Tracer interface { StartAdd(context.Context) TraceFinisher StartAddMany(context.Context) TraceFinisher StartQueueFlush(context.Context) TraceFinisher StartFlush(context.Context) (context.Context, TraceFinisher) }
Tracer is a type that can trace actions in the Buffer.