Documentation
¶
Index ¶
Constants ¶
const ( AuditMsgFailureOnTargetAndInflight = "an audit revealed that the target and inflight should both be zero but neither was." AuditMsgFailureOnTarget = "an audit revealed that the target should be zero but was not." AuditMsgFailureOnInflight = "an audit revealed that inflight should be zero but was not." )
const ( BatchEvent = "batch" PauseEvent = "pause" ResumeEvent = "resume" ShutdownEvent = "shutdown" AuditPassEvent = "audit-pass" AuditFailEvent = "audit-fail" AuditSkipEvent = "audit-skip" RequestEvent = "request" CapacityEvent = "capacity" ReleasedEvent = "released" AllocatedEvent = "allocated" TargetEvent = "target" VerifiedContainerEvent = "verified-container" CreatedContainerEvent = "created-container" ProvisionStartEvent = "provision-start" ProvisionDoneEvent = "provision-done" VerifiedBlobEvent = "verified-blob" CreatedBlobEvent = "created-blob" FailedEvent = "failed" ErrorEvent = "error" FlushStartEvent = "flush-start" FlushDoneEvent = "flush-done" )
Variables ¶
var ( NoWatcherError = errors.New("the operation must have a watcher assigned.") TooManyAttemptsError = errors.New("the operation exceeded the maximum number of attempts.") TooExpensiveError = errors.New("the operation costs more than the maximum capacity.") BufferFullError = errors.New("the buffer is full, try to enqueue again later.") BufferIsShutdown = errors.New("the buffer is shutdown, you may no longer enqueue.") ImproperOrderError = errors.New("methods can only be called in this order Start() > Stop().") NoOperationError = errors.New("no operation was provided.") InitializationOnlyError = errors.New("this property can only be set before Start() is called.") )
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher interface { Eventer WithRateLimiter(rl RateLimiter) Batcher WithFlushInterval(val time.Duration) Batcher WithCapacityInterval(val time.Duration) Batcher WithAuditInterval(val time.Duration) Batcher WithMaxOperationTime(val time.Duration) Batcher WithPauseTime(val time.Duration) Batcher WithErrorOnFullBuffer() Batcher WithEmitBatch() Batcher WithEmitFlush() Batcher WithEmitRequest() Batcher WithMaxConcurrentBatches(val uint32) Batcher Enqueue(op Operation) error Pause() Flush() Inflight() uint32 OperationsInBuffer() uint32 NeedsCapacity() uint32 Start(ctx context.Context) (err error) }
func NewBatcher ¶
func NewBatcher() Batcher
This method creates a new Batcher with a buffer that can contain up to 10,000 Operations. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcher() you will chain some WithXXXX methods, for instance... `NewBatcher().WithRateLimiter(limiter)`.
func NewBatcherWithBuffer ¶
This method creates a new Batcher with a buffer that can contain up to a user-defined number of Operations. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcherWithBuffer() you will chain some WithXXXX methods, for instance... `NewBatcherWithBuffer().WithRateLimiter(limiter)`.
type EventerBase ¶
type EventerBase struct {
// contains filtered or unexported fields
}
func (*EventerBase) AddListener ¶
func (r *EventerBase) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID
You can add a listener to catch events that are raised by Batcher or a RateLimiter.
func (*EventerBase) Emit ¶
func (r *EventerBase) Emit(event string, val int, msg string, metadata interface{})
To raise an event, you may emit a unique string for the event along with val, msg, and metadata as appropriate to describe the event.
func (*EventerBase) RemoveListener ¶
func (r *EventerBase) RemoveListener(id uuid.UUID)
If you no longer need to catch events that are raised by Batcher or a RateLimiter, you can use this method to remove the listener.
type LeaseManager ¶
type LeaseManager interface { RaiseEventsTo(e Eventer) Provision(ctx context.Context) (err error) CreatePartitions(ctx context.Context, count int) LeasePartition(ctx context.Context, id string, index uint32) (leaseTime time.Duration) }
func NewAzureBlobLeaseManager ¶
func NewAzureBlobLeaseManager(accountName, containerName, masterKey string) LeaseManager
This method creates a new AzureBlobLeaseManager to allow the SharedResource to use Azure Blob Storage to manage leases across instances. You must provide an Azure Storage accountName, containerName, and a masterKey.
type Operation ¶
type RateLimiter ¶
type SharedResource ¶
type SharedResource interface { RateLimiter }
func NewSharedResource ¶
func NewSharedResource() SharedResource
This function should be called to create a new SharedResource. The accountName and containerName refer to the details of an Azure Storage Account and container that the lease blobs can be created in. If multiple processes are sharing the same capacity, they should all point to the same container. Commonly after calling NewSharedResource() you will chain some WithXXXX methods, for instance... `NewSharedResource().WithMasterKey(key)`.
type Watcher ¶
type Watcher interface { WithMaxAttempts(val uint32) Watcher WithMaxBatchSize(val uint32) Watcher WithMaxOperationTime(val time.Duration) Watcher MaxAttempts() uint32 MaxBatchSize() uint32 MaxOperationTime() time.Duration ProcessBatch(ops []Operation) }
func NewWatcher ¶
This method creates a new Watcher with a callback function. This function will be called whenever a batch of Operations is ready to be processed. When the callback function is completed, it will reduce the Target by the cost of all Operations in the batch. If for some reason the processing is "stuck" in this function, they Target will be reduced after MaxOperationTime. Every time this function is called with a batch it is run as a new goroutine so anything inside could cause race conditions with the rest of your code - use atomic, sync, etc. as appropriate.