autoflush

package
v1.20211025.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxFlushes          = 128
	DefaultMaxLen              = 512
	DefaultFlushInterval       = 500 * time.Millisecond
	DefaultShutdownGracePeriod = 10 * time.Second
)

Defaults

View Source
const (
	MetricFlush               string = "autoflush.flush"
	MetricFlushItemCount      string = "autoflush.flush.item_count"
	MetricFlushEnqueueElapsed string = "autoflush.flush.enqueue.elapsed"
	MetricFlushHandler        string = "autoflush.flush.handler"
	MetricFlushHandlerElapsed string = "autoflush.flush.handler.elapsed"
	MetricFlushQueueLength    string = "autoflush.flush.queue_length"
	MetricBufferLength        string = "autoflush.buffer.length"
	MetricAdd                 string = "autoflush.add"
	MetricAddElapsed          string = "autoflush.add.elapsed"
	MetricAddMany             string = "autoflush.add_many"
	MetricAddManyItemCount    string = "autoflush.add_many.item_count"
	MetricAddManyElapsed      string = "autoflush.add_many.elapsed"
)

Metric names

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action func(context.Context, []interface{}) error

Action is an action called by an buffer.

type Buffer

type Buffer struct {
	Latch   *async.Latch
	Context context.Context

	Log    logger.Log
	Stats  stats.Collector
	Tracer Tracer

	MaxLen              int
	Interval            time.Duration
	Parallelism         int
	MaxFlushes          int
	ShutdownGracePeriod time.Duration

	Handler Action
	Errors  chan error
	// contains filtered or unexported fields
}

Buffer is a backing store that operates either on a fixed length flush or a fixed interval flush. A handler should be provided but without one the buffer will just clear. Adds that would cause fixed length flushes do not block on the flush handler.

func New

func New(handler Action, options ...Option) *Buffer

New creates a new buffer.

func (*Buffer) Add

func (ab *Buffer) Add(ctx context.Context, obj interface{})

Add adds a new object to the buffer, blocking if it triggers a flush. If the buffer is full, it will call the flush handler on a separate goroutine.

func (*Buffer) AddMany

func (ab *Buffer) AddMany(ctx context.Context, objs ...interface{})

AddMany adds many objects to the buffer at once.

func (*Buffer) Background

func (ab *Buffer) Background() context.Context

Background returns a background context.

func (*Buffer) Dispatch

func (ab *Buffer) Dispatch()

Dispatch is the main run loop.

func (*Buffer) FlushAsync

func (ab *Buffer) FlushAsync(ctx context.Context) error

FlushAsync clears the buffer, if a handler is provided it is passed the contents of the buffer. This call is asynchronous, in that it will call the flush handler on its own goroutine.

func (*Buffer) NotifyStarted

func (ab *Buffer) NotifyStarted() <-chan struct{}

NotifyStarted implements graceful.Graceful.

func (*Buffer) NotifyStopped

func (ab *Buffer) NotifyStopped() <-chan struct{}

NotifyStopped implements graceful.Graceful.

func (*Buffer) Start

func (ab *Buffer) Start() error

Start starts the auto-flush buffer.

This call blocks. To call it asynchronously:

go afb.Start()
<-afb.NotifyStarted()

func (*Buffer) Stop

func (ab *Buffer) Stop() error

Stop stops the buffer flusher.

Any in flight flushes will be given ShutdownGracePeriod amount of time.

Stop is _very_ complicated.

type Flush

type Flush struct {
	Context  context.Context
	Contents []interface{}
}

Flush is an inflight flush attempt.

type Option

type Option func(*Buffer)

Option is an option for auto-flush buffers.

func OptContext

func OptContext(ctx context.Context) Option

OptContext sets the auto-flush buffer's context.

func OptErrors

func OptErrors(errors chan error) Option

OptErrors sets the auto-flush buffer's error return channel.

func OptInterval

func OptInterval(d time.Duration) Option

OptInterval sets the auto-flush buffer's interval.

func OptLog

func OptLog(log logger.Log) Option

OptLog sets the Buffer logger.

func OptMaxFlushes

func OptMaxFlushes(maxFlushes int) Option

OptMaxFlushes sets the auto-flush buffer's maximum flush queue length.

func OptMaxLen

func OptMaxLen(maxLen int) Option

OptMaxLen sets the auto-flush buffer's maximum length.

func OptParallelism

func OptParallelism(parallelism int) Option

OptParallelism sets the auto-flush buffer's flush worker count.

func OptShutdownGracePeriod

func OptShutdownGracePeriod(shutdownGracePeriod time.Duration) Option

OptShutdownGracePeriod sets the auto-flush buffer's shutdown grace period.

func OptStats

func OptStats(stats stats.Collector) Option

OptStats sets the Buffer stats collector.

func OptTracer

func OptTracer(tracer Tracer) Option

OptTracer sets the Buffer logger.

type TraceFinisher

type TraceFinisher interface {
	Finish(error)
}

TraceFinisher finishes traces.

type Tracer

type Tracer interface {
	StartAdd(context.Context) TraceFinisher
	StartAddMany(context.Context) TraceFinisher
	StartQueueFlush(context.Context) TraceFinisher
	StartFlush(context.Context) (context.Context, TraceFinisher)
}

Tracer is a type that can trace actions in the Buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL