Documentation ¶
Index ¶
- Constants
- func IsLockLostError(err error) bool
- func IsNetworkError(err error) bool
- func IsRetriableAMQPError(err error) bool
- func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error)
- func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error)
- func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error)
- func NewBulkMessageEntryFromASBMessage(asbMsg *azservicebus.ReceivedMessage) (pubsub.BulkMessageEntry, error)
- func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error)
- func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error
- type Client
- func (c *Client) Close(log logger.Logger)
- func (c *Client) CloseAllSenders(log logger.Logger)
- func (c *Client) CloseSender(queueOrTopic string, log logger.Logger)
- func (c *Client) EnsureQueue(ctx context.Context, queue string) error
- func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscribeOptions) error
- func (c *Client) EnsureTopic(ctx context.Context, topic string) error
- func (c *Client) GetClient() *servicebus.Client
- func (c *Client) GetSender(ctx context.Context, queueOrTopic string, ensureFn ensureFn) (*servicebus.Sender, error)
- func (c *Client) PublishBinding(ctx context.Context, req *bindings.InvokeRequest, queueOrTopic string, ...) (*bindings.InvokeResponse, error)
- func (c *Client) PublishPubSub(ctx context.Context, req *pubsub.PublishRequest, ensureFn ensureFn, ...) error
- func (c *Client) PublishPubSubBulk(ctx context.Context, req *pubsub.BulkPublishRequest, ensureFn ensureFn, ...) (pubsub.BulkPublishResponse, error)
- func (c *Client) ReconnectionBackoff() backoff.BackOff
- type HandlerFn
- type HandlerResponseItem
- type MessageReceiver
- type Metadata
- type Receiver
- type SessionReceiver
- type SubscribeOptions
- type Subscription
- func (s *Subscription) AbandonMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)
- func (s *Subscription) CompleteMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)
- func (s *Subscription) Connect(ctx context.Context, newReceiverFunc func() (Receiver, error)) (Receiver, error)
- func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler HandlerFn, receiver Receiver, ...) error
- type SubscriptionOptions
Constants ¶
const ( // MessageKeyMessageID defines the metadata key for the message id. MessageKeyMessageID = "MessageId" // read, write. // MessageKeyMessageIDAlias is an alias for "MessageId" for write only, for backwards-compatibility MessageKeyMessageIDAlias = "id" // MessageKeyCorrelationID defines the metadata key for the correlation id. MessageKeyCorrelationID = "CorrelationId" // read, write. // MessageKeyCorrelationIDAlias is an alias for "CorrelationId" for write only, for backwards-compatibility MessageKeyCorrelationIDAlias = "correlationID" // MessageKeySessionID defines the metadata key for the session id. MessageKeySessionID = "SessionId" // read, write. // MessageKeyLabel defines the metadata key for the label. MessageKeyLabel = "Label" // read, write. // MessageKeyReplyTo defines the metadata key for the reply to value. MessageKeyReplyTo = "ReplyTo" // read, write. // MessageKeyTo defines the metadata key for the to value. MessageKeyTo = "To" // read, write. // MessageKeyPartitionKey defines the metadata key for the partition key. MessageKeyPartitionKey = "PartitionKey" // read, write. // MessageKeyContentType defines the metadata key for the content type. MessageKeyContentType = "ContentType" // read, write. // MessageKeyDeliveryCount defines the metadata key for the delivery count. MessageKeyDeliveryCount = "DeliveryCount" // read. // MessageKeyLockedUntilUtc defines the metadata key for the locked until utc value. MessageKeyLockedUntilUtc = "LockedUntilUtc" // read. // MessageKeyLockToken defines the metadata key for the lock token. MessageKeyLockToken = "LockToken" // read. // MessageKeyEnqueuedTimeUtc defines the metadata key for the enqueued time utc value. MessageKeyEnqueuedTimeUtc = "EnqueuedTimeUtc" // read. // MessageKeySequenceNumber defines the metadata key for the sequence number. MessageKeySequenceNumber = "SequenceNumber" // read. // MessageKeyScheduledEnqueueTimeUtc defines the metadata key for the scheduled enqueue time utc value. MessageKeyScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc" // read, write. // MessageKeyReplyToSessionID defines the metadata key for the reply to session id. // Currently unused. MessageKeyReplyToSessionID = "ReplyToSessionId" // read, write. )
const ( MetadataModeBinding byte = 1 << iota MetadataModeTopics MetadataModeQueues byte = 0 )
Modes for ParseMetadata.
const ( RequireSessionsMetadataKey = "requireSessions" SessionIdleTimeoutMetadataKey = "sessionIdleTimeoutInSec" MaxConcurrentSessionsMetadataKey = "maxConcurrentSessions" DefaultSesssionIdleTimeoutInSec = 60 DefaultMaxConcurrentSessions = 8 )
Variables ¶
This section is empty.
Functions ¶
func IsLockLostError ¶
IsLockLostError returns true if the error is "locklost".
func IsNetworkError ¶
IsNetworkError returns true if the error returned by Service Bus is a network-level one, which would require reconnecting.
func IsRetriableAMQPError ¶
IsRetriableAMQPError returns true if the error returned by Service Bus is a retriable error from AMQP, which doesn't require reconnecting.
func NewASBMessageFromBulkMessageEntry ¶
func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error)
NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.
func NewASBMessageFromInvokeRequest ¶
func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error)
NewASBMessageFromInvokeRequest builds a new Azure Service Bus message from a binding's Invoke request.
func NewASBMessageFromPubsubRequest ¶
func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error)
NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.
func NewBulkMessageEntryFromASBMessage ¶
func NewBulkMessageEntryFromASBMessage(asbMsg *azservicebus.ReceivedMessage) (pubsub.BulkMessageEntry, error)
NewBulkMessageEntryFromASBMessage returns a pubsub.NewMessageEntry from a bulk message received from ASB.
func NewPubsubMessageFromASBMessage ¶
func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error)
NewPubsubMessageFromASBMessage returns a pubsub.NewMessage from a message received from ASB.
func UpdateASBBatchMessageWithBulkPublishRequest ¶
func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error
UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client contains the clients for Service Bus and methods to get senders and to create topics, subscriptions, queues.
func (*Client) CloseAllSenders ¶
CloseAllSenders closes all sender connections.
func (*Client) CloseSender ¶
CloseSender closes a sender for a queue or topic.
func (*Client) EnsureQueue ¶
EnsureTopic creates the queue if it doesn't exist. Returns with nil error if the admin client doesn't exist.
func (*Client) EnsureSubscription ¶
func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscribeOptions) error
EnsureSubscription creates the topic subscription if it doesn't exist. Returns with nil error if the admin client doesn't exist.
func (*Client) EnsureTopic ¶
EnsureTopic creates the topic if it doesn't exist. Returns with nil error if the admin client doesn't exist.
func (*Client) GetClient ¶
func (c *Client) GetClient() *servicebus.Client
GetClient returns the azservicebus.Client object.
func (*Client) GetSender ¶
func (c *Client) GetSender(ctx context.Context, queueOrTopic string, ensureFn ensureFn) (*servicebus.Sender, error)
GetSenderForTopic returns the sender for a queue or topic, or creates a new one if it doesn't exist
func (*Client) PublishBinding ¶
func (c *Client) PublishBinding(ctx context.Context, req *bindings.InvokeRequest, queueOrTopic string, log logger.Logger) (*bindings.InvokeResponse, error)
PublishBinding is used by binding components to publish messages. It includes a retry logic that can also cause reconnections. Note this doesn't invoke "EnsureQueue" or "EnsureTopic" because bindings don't do that on publishing.
func (*Client) PublishPubSub ¶
func (c *Client) PublishPubSub(ctx context.Context, req *pubsub.PublishRequest, ensureFn ensureFn, log logger.Logger) error
PublishPubSub is used by PubSub components to publish messages. It includes a retry logic that can also cause reconnections.
func (*Client) PublishPubSubBulk ¶
func (c *Client) PublishPubSubBulk(ctx context.Context, req *pubsub.BulkPublishRequest, ensureFn ensureFn, log logger.Logger) (pubsub.BulkPublishResponse, error)
PublishPubSubBulk is used by PubSub components to publush bulk messages.
func (*Client) ReconnectionBackoff ¶
func (c *Client) ReconnectionBackoff() backoff.BackOff
ReconnectionBackoff returns the backoff for reconnecting in a subscription.
type HandlerFn ¶
type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error)
HandlerFn is the type for handlers that receive messages
func GetBulkPubSubHandlerFunc ¶
func GetBulkPubSubHandlerFunc(topic string, handler pubsub.BulkHandler, log logger.Logger, timeout time.Duration) HandlerFn
GetPubSubHandlerFunc returns the handler function for bulk pubsub messages.
type HandlerResponseItem ¶
HandlerResponseItem represents a response from the handler for each message.
type MessageReceiver ¶
type MessageReceiver struct {
*azservicebus.Receiver
}
func NewMessageReceiver ¶
func NewMessageReceiver(r *azservicebus.Receiver) *MessageReceiver
type Metadata ¶
type Metadata struct { /** For bindings and pubsubs **/ ConnectionString string `mapstructure:"connectionString"` ConsumerID string `mapstructure:"consumerID"` // Only topics TimeoutInSec int `mapstructure:"timeoutInSec"` HandlerTimeoutInSec int `mapstructure:"handlerTimeoutInSec"` LockRenewalInSec int `mapstructure:"lockRenewalInSec"` MaxActiveMessages int `mapstructure:"maxActiveMessages"` MaxConnectionRecoveryInSec int `mapstructure:"maxConnectionRecoveryInSec"` MinConnectionRecoveryInSec int `mapstructure:"minConnectionRecoveryInSec"` DisableEntityManagement bool `mapstructure:"disableEntityManagement"` MaxRetriableErrorsPerSec int `mapstructure:"maxRetriableErrorsPerSec"` MaxDeliveryCount *int32 `mapstructure:"maxDeliveryCount"` // Only used during subscription creation - default is set by the server (10) LockDurationInSec *int `mapstructure:"lockDurationInSec"` // Only used during subscription creation - default is set by the server (60s) DefaultMessageTimeToLiveInSec *int `mapstructure:"defaultMessageTimeToLiveInSec"` // Only used during subscription creation - default is set by the server (depends on the tier) AutoDeleteOnIdleInSec *int `mapstructure:"autoDeleteOnIdleInSec"` // Only used during subscription creation - default is set by the server (disabled) MaxConcurrentHandlers int `mapstructure:"maxConcurrentHandlers"` PublishMaxRetries int `mapstructure:"publishMaxRetries"` PublishInitialRetryIntervalInMs int `mapstructure:"publishInitialRetryIntervalInMs"` NamespaceName string `mapstructure:"namespaceName"` // Only for Azure AD /** For bindings only **/ QueueName string `mapstructure:"queueName" only:"bindings"` // Only queues }
Metadata options for Service Bus components. Note: AzureAD-related keys are handled separately.
func ParseMetadata ¶
ParseMetadata parses metadata keys that are common to all Service Bus components
func (Metadata) CreateQueueProperties ¶
func (a Metadata) CreateQueueProperties() *sbadmin.QueueProperties
CreateQueueProperties returns the QueueProperties object to create new Queues in Service Bus.
func (Metadata) CreateSubscriptionProperties ¶
func (a Metadata) CreateSubscriptionProperties(opts SubscribeOptions) *sbadmin.SubscriptionProperties
CreateSubscriptionProperties returns the SubscriptionProperties object to create new Subscriptions to Service Bus topics.
type Receiver ¶
type Receiver interface { ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.CompleteMessageOptions) error AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.AbandonMessageOptions) error Close(ctx context.Context) error }
type SessionReceiver ¶
type SessionReceiver struct {
*azservicebus.SessionReceiver
}
func NewSessionReceiver ¶
func NewSessionReceiver(r *azservicebus.SessionReceiver) *SessionReceiver
func (*SessionReceiver) RenewSessionLocks ¶
type SubscribeOptions ¶
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
func NewSubscription ¶
func NewSubscription(opts SubscriptionOptions, logger logger.Logger) *Subscription
NewBulkSubscription returns a new Subscription object. Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.
func (*Subscription) AbandonMessage ¶
func (s *Subscription) AbandonMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)
AbandonMessage marks a messsage as abandoned.
func (*Subscription) CompleteMessage ¶
func (s *Subscription) CompleteMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)
CompleteMessage marks a message as complete.
func (*Subscription) Connect ¶
func (s *Subscription) Connect(ctx context.Context, newReceiverFunc func() (Receiver, error)) (Receiver, error)
Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).
func (*Subscription) ReceiveBlocking ¶
func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler HandlerFn, receiver Receiver, onFirstSuccess func(), logMsg string) error
ReceiveBlocking is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.