Documentation ¶
Overview ¶
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
Index ¶
- type AADSASCredential
- type AADSASCredentialOption
- type Credential
- type LeaserCheckpointer
- func (sl *LeaserCheckpointer) AcquireLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
- func (sl *LeaserCheckpointer) Close() error
- func (sl *LeaserCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error
- func (sl *LeaserCheckpointer) DeleteLease(ctx context.Context, partitionID string) error
- func (sl *LeaserCheckpointer) DeleteStore(ctx context.Context) error
- func (sl *LeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error)
- func (sl *LeaserCheckpointer) EnsureLease(ctx context.Context, partitionID string) (eph.LeaseMarker, error)
- func (sl *LeaserCheckpointer) EnsureStore(ctx context.Context) error
- func (sl *LeaserCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool)
- func (sl *LeaserCheckpointer) GetLeases(ctx context.Context) ([]eph.LeaseMarker, error)
- func (sl *LeaserCheckpointer) ReleaseLease(ctx context.Context, partitionID string) (bool, error)
- func (sl *LeaserCheckpointer) RenewLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
- func (sl *LeaserCheckpointer) SetEventHostProcessor(eph *eph.EventProcessorHost)
- func (sl *LeaserCheckpointer) StoreExists(ctx context.Context) (bool, error)
- func (sl *LeaserCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error
- func (sl *LeaserCheckpointer) UpdateLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
- type LeaserCheckpointerOption
- type SASToken
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AADSASCredential ¶
type AADSASCredential struct { azblob.Credential ResourceGroup string SubscriptionID string AccountName string ContainerName string // contains filtered or unexported fields }
AADSASCredential represents a token provider for Azure Storage SAS using AAD to authorize signing
func NewAADSASCredential ¶
func NewAADSASCredential(subscriptionID, resourceGroup, accountName, containerName string, opts ...AADSASCredentialOption) (*AADSASCredential, error)
NewAADSASCredential constructs a SAS token provider for Azure storage using Azure Active Directory credentials
canonicalizedResource should be formed as described here: https://docs.microsoft.com/en-us/rest/api/storagerp/storageaccounts/listservicesas
func (*AADSASCredential) New ¶
func (cred *AADSASCredential) New(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.Policy
New creates a credential policy object.
type AADSASCredentialOption ¶
type AADSASCredentialOption func(*aad.TokenProviderConfiguration) error
AADSASCredentialOption provides options for configuring AAD SAS Token Providers
func AADSASCredentialWithEnvironmentVars ¶
func AADSASCredentialWithEnvironmentVars() AADSASCredentialOption
AADSASCredentialWithEnvironmentVars configures the TokenProvider using the environment variables available
- Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
- Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
3. Managed Service Identity (MSI): attempt to authenticate via MSI
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
type Credential ¶
type Credential interface { azblob.Credential }
Credential is a wrapper for the Azure Storage azblob.Credential
type LeaserCheckpointer ¶
type LeaserCheckpointer struct { // LeasePersistenceInterval is the default period of time which dirty leases will be persisted to Azure Storage LeasePersistenceInterval time.Duration // contains filtered or unexported fields }
LeaserCheckpointer implements the eph.LeaserCheckpointer interface for Azure Storage
func NewStorageLeaserCheckpointer ¶
func NewStorageLeaserCheckpointer(credential Credential, accountName, containerName string, env azure.Environment, opts ...LeaserCheckpointerOption) (*LeaserCheckpointer, error)
NewStorageLeaserCheckpointer builds an Azure Storage Leaser Checkpointer which handles leasing and checkpointing for the EventProcessorHost
func (*LeaserCheckpointer) AcquireLease ¶
func (sl *LeaserCheckpointer) AcquireLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
AcquireLease acquires the lease to the Azure blob in the container
func (*LeaserCheckpointer) Close ¶
func (sl *LeaserCheckpointer) Close() error
Close will stop the leaser / checkpointer from persisting dirty leases & checkpoints to storage
func (*LeaserCheckpointer) DeleteCheckpoint ¶
func (sl *LeaserCheckpointer) DeleteCheckpoint(ctx context.Context, partitionID string) error
DeleteCheckpoint will attempt to delete the checkpoint from Azure Storage
func (*LeaserCheckpointer) DeleteLease ¶
func (sl *LeaserCheckpointer) DeleteLease(ctx context.Context, partitionID string) error
DeleteLease deletes a lease in the storage container
func (*LeaserCheckpointer) DeleteStore ¶
func (sl *LeaserCheckpointer) DeleteStore(ctx context.Context) error
DeleteStore deletes the Azure Storage container
func (*LeaserCheckpointer) EnsureCheckpoint ¶
func (sl *LeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error)
EnsureCheckpoint ensures a checkpoint exists for the lease
func (*LeaserCheckpointer) EnsureLease ¶
func (sl *LeaserCheckpointer) EnsureLease(ctx context.Context, partitionID string) (eph.LeaseMarker, error)
EnsureLease creates a lease in the container if it doesn't exist
func (*LeaserCheckpointer) EnsureStore ¶
func (sl *LeaserCheckpointer) EnsureStore(ctx context.Context) error
EnsureStore creates the container if it does not exist
func (*LeaserCheckpointer) GetCheckpoint ¶
func (sl *LeaserCheckpointer) GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool)
GetCheckpoint returns the latest checkpoint for the partitionID.
func (*LeaserCheckpointer) GetLeases ¶
func (sl *LeaserCheckpointer) GetLeases(ctx context.Context) ([]eph.LeaseMarker, error)
GetLeases gets all of the partition leases
func (*LeaserCheckpointer) ReleaseLease ¶
ReleaseLease releases the lease to the blob in Azure storage
func (*LeaserCheckpointer) RenewLease ¶
func (sl *LeaserCheckpointer) RenewLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
RenewLease renews the lease to the Azure blob
func (*LeaserCheckpointer) SetEventHostProcessor ¶
func (sl *LeaserCheckpointer) SetEventHostProcessor(eph *eph.EventProcessorHost)
SetEventHostProcessor sets the EventHostProcessor on the instance of the LeaserCheckpointer
func (*LeaserCheckpointer) StoreExists ¶
func (sl *LeaserCheckpointer) StoreExists(ctx context.Context) (bool, error)
StoreExists returns true if the storage container exists
func (*LeaserCheckpointer) UpdateCheckpoint ¶
func (sl *LeaserCheckpointer) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error
UpdateCheckpoint will attempt to write the checkpoint to Azure Storage
func (*LeaserCheckpointer) UpdateLease ¶
func (sl *LeaserCheckpointer) UpdateLease(ctx context.Context, partitionID string) (eph.LeaseMarker, bool, error)
UpdateLease renews and uploads the latest lease to the blob store
type LeaserCheckpointerOption ¶
type LeaserCheckpointerOption func(*LeaserCheckpointer) error
LeaserCheckpointerOption provides a way to customize a LeaserCheckpointer
func WithInitialCheckpoint ¶
func WithInitialCheckpoint(getInitialCheckpoint func() persist.Checkpoint) LeaserCheckpointerOption
WithInitialCheckpoint is a LeaserCheckpointerOption that overrides the initial checkpoint used when no checkpoint exists rather than starting from the start of the stream
func WithPrefixInBlobPath ¶
func WithPrefixInBlobPath(prefix string) LeaserCheckpointerOption
WithPrefixInBlobPath is a LeaserCheckpointerOption that adds a prefix to the checkpoint blob path