batcher

package module
v2.0.0-...-3963fac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AuditMsgFailureOnTargetAndInflight = "an audit revealed that the target and inflight should both be zero but neither was."
	AuditMsgFailureOnTarget            = "an audit revealed that the target should be zero but was not."
	AuditMsgFailureOnInflight          = "an audit revealed that inflight should be zero but was not."
)
View Source
const (
	BatchEvent             = "batch"
	PauseEvent             = "pause"
	ResumeEvent            = "resume"
	ShutdownEvent          = "shutdown"
	AuditPassEvent         = "audit-pass"
	AuditFailEvent         = "audit-fail"
	AuditSkipEvent         = "audit-skip"
	RequestEvent           = "request"
	CapacityEvent          = "capacity"
	ReleasedEvent          = "released"
	AllocatedEvent         = "allocated"
	TargetEvent            = "target"
	VerifiedContainerEvent = "verified-container"
	CreatedContainerEvent  = "created-container"
	ProvisionStartEvent    = "provision-start"
	ProvisionDoneEvent     = "provision-done"
	VerifiedBlobEvent      = "verified-blob"
	CreatedBlobEvent       = "created-blob"
	FailedEvent            = "failed"
	ErrorEvent             = "error"
	FlushStartEvent        = "flush-start"
	FlushDoneEvent         = "flush-done"
)

Variables

View Source
var (
	NoWatcherError               = errors.New("the operation must have a watcher assigned.")
	TooManyAttemptsError         = errors.New("the operation exceeded the maximum number of attempts.")
	TooExpensiveError            = errors.New("the operation costs more than the maximum capacity.")
	BufferFullError              = errors.New("the buffer is full, try to enqueue again later.")
	BufferIsShutdown             = errors.New("the buffer is shutdown, you may no longer enqueue.")
	ImproperOrderError           = errors.New("methods can only be called in this order Start() > Stop().")
	NoOperationError             = errors.New("no operation was provided.")
	InitializationOnlyError      = errors.New("this property can only be set before Start() is called.")
	SharedCapacityNotProvisioned = errors.New("shared capacity cannot be set if it was not provisioned.")
)

Functions

This section is empty.

Types

type Batcher

type Batcher interface {
	Eventer
	WithRateLimiter(rl RateLimiter) Batcher
	WithFlushInterval(val time.Duration) Batcher
	WithCapacityInterval(val time.Duration) Batcher
	WithAuditInterval(val time.Duration) Batcher
	WithMaxOperationTime(val time.Duration) Batcher
	WithPauseTime(val time.Duration) Batcher
	WithErrorOnFullBuffer() Batcher
	WithEmitBatch() Batcher
	WithEmitFlush() Batcher
	WithEmitRequest() Batcher
	WithMaxConcurrentBatches(val uint32) Batcher
	Enqueue(op Operation) error
	Pause()
	Flush()
	Inflight() uint32
	OperationsInBuffer() uint32
	NeedsCapacity() uint32
	Start(ctx context.Context) (err error)
}

func NewBatcher

func NewBatcher() Batcher

This method creates a new Batcher with a buffer that can contain up to 10,000 Operations. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcher() you will chain some WithXXXX methods, for instance... `NewBatcher().WithRateLimiter(limiter)`.

func NewBatcherWithBuffer

func NewBatcherWithBuffer(maxBufferSize uint32) Batcher

This method creates a new Batcher with a buffer that can contain up to a user-defined number of Operations. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcherWithBuffer() you will chain some WithXXXX methods, for instance... `NewBatcherWithBuffer().WithRateLimiter(limiter)`.

type Eventer

type Eventer interface {
	AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID
	RemoveListener(id uuid.UUID)
	Emit(event string, val int, msg string, metadata interface{})
}

type EventerBase

type EventerBase struct {
	// contains filtered or unexported fields
}

func (*EventerBase) AddListener

func (r *EventerBase) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID

You can add a listener to catch events that are raised by Batcher or a RateLimiter.

func (*EventerBase) Emit

func (r *EventerBase) Emit(event string, val int, msg string, metadata interface{})

To raise an event, you may emit a unique string for the event along with val, msg, and metadata as appropriate to describe the event.

func (*EventerBase) RemoveListener

func (r *EventerBase) RemoveListener(id uuid.UUID)

If you no longer need to catch events that are raised by Batcher or a RateLimiter, you can use this method to remove the listener.

type LeaseManager

type LeaseManager interface {
	RaiseEventsTo(e Eventer)
	Provision(ctx context.Context) (err error)
	CreatePartitions(ctx context.Context, count int)
	LeasePartition(ctx context.Context, id string, index uint32) (leaseTime time.Duration)
}

func NewAzureBlobLeaseManager

func NewAzureBlobLeaseManager(accountName, containerName, masterKey string) LeaseManager

This method creates a new AzureBlobLeaseManager to allow the SharedResource to use Azure Blob Storage to manage leases across instances. You must provide an Azure Storage accountName, containerName, and a masterKey.

type Operation

type Operation interface {
	Payload() interface{}
	Attempt() uint32
	Cost() uint32
	Watcher() Watcher
	IsBatchable() bool
	MakeAttempt()
}

func NewOperation

func NewOperation(watcher Watcher, cost uint32, payload interface{}, batchable bool) Operation

This method creates a new Operation with a Watcher, cost, payload, and a flag determining whether or not the Operation is batchable. An Operation will be Enqueued into a Batcher.

type RateLimiter

type RateLimiter interface {
	Eventer
	MaxCapacity() uint32
	Capacity() uint32
	GiveMe(target uint32)
	Start(ctx context.Context) error
}

type SharedResource

type SharedResource interface {
	RateLimiter
	WithFactor(val uint32) SharedResource
	WithReservedCapacity(val uint32) SharedResource
	WithSharedCapacity(val uint32, mgr LeaseManager) SharedResource
	WithMaxInterval(val uint32) SharedResource
	SetReservedCapacity(capacity uint32)
	SetSharedCapacity(capacity uint32) error
}

func NewSharedResource

func NewSharedResource() SharedResource

This function should be called to create a new SharedResource. The accountName and containerName refer to the details of an Azure Storage Account and container that the lease blobs can be created in. If multiple processes are sharing the same capacity, they should all point to the same container. Commonly after calling NewSharedResource() you will chain some WithXXXX methods, for instance... `NewSharedResource().WithMasterKey(key)`.

type Watcher

type Watcher interface {
	WithMaxAttempts(val uint32) Watcher
	WithMaxBatchSize(val uint32) Watcher
	WithMaxOperationTime(val time.Duration) Watcher
	MaxAttempts() uint32
	MaxBatchSize() uint32
	MaxOperationTime() time.Duration
	ProcessBatch(ops []Operation)
}

func NewWatcher

func NewWatcher(onReady func(batch []Operation)) Watcher

This method creates a new Watcher with a callback function. This function will be called whenever a batch of Operations is ready to be processed. When the callback function is completed, it will reduce the Target by the cost of all Operations in the batch. If for some reason the processing is "stuck" in this function, they Target will be reduced after MaxOperationTime. Every time this function is called with a batch it is run as a new goroutine so anything inside could cause race conditions with the rest of your code - use atomic, sync, etc. as appropriate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL