Documentation ¶
Overview ¶
Package solace contains the main type definitions for the various messaging services. You can use MessagingServiceBuilder to create a client-based messaging service. If you want to use secure socket layer (SSL) endpoints, OpenSSL must installed on the systems that run your client applications. Client applications secure connections to an event broker (or broker) using SSL endpoints. For example on PubSub+ software event brokers, you can use SMF TLS/SSL (default port of 55443) and Web Transport TLS/SSL connectivity (default port 1443) for messaging. The ports that are utilized depends on the configuration broker.
For an overview of TLS/SSL Encryption, see TLS/SSL Encryption Overview in the Solace documentation at https://docs.solace.com/Overviews/TLS-SSL-Message-Encryption-Overview.htm.
MessageServiceBuilder is retrieved through the messaging package as follows.
package main import solace.dev/go/messaging import solace.dev/go/messaging/pkg/solace func main() { var messagingServiceBuilder solace.MessagingServiceBuilder messagingServiceBuilder = messaging.NewMessagingServiceBuilder() messagingService, err := messagingServiceBuilder.Build() ... }
Before the MessagingService is created, global properties can be set by environment variable. The following environment variables are recognized and handled during API initialization:
SOLCLIENT_GLOBAL_PROP_GSS_KRB_LIB: GSS (Kerberos) library name. If not set the default value is OS specific
Linux/MacOS: libgssapi_krb5.so.2
Windows: secur32.dll
SOLCLIENT_GLOBAL_PROP_SSL_LIB: TLS Protocol library name. If not set the default value is OS specific:
Linux: libssl.so
MacOS: libssl.dylib
Windows: libssl-1_1.dll
SOLCLIENT_GLOBAL_PROP_CRYPTO_LIB: TLS Cryptography library name. If not set the default value is OS specific:
Linux: libcrypto.so
MacOS: libcrypto.dylib
Windows: libcrypto-1_1.dll-
GLOBAL_GSS_KRB_LIB: Alternate name for SOLCLIENT_GLOBAL_PROP_GSS_KRB_LIB
GLOBAL_SSL_LIB: Alternate name for SOLCLIENT_GLOBAL_PROP_SSL_LIB
GLOBAL_CRYPTO_LIB: Alternate name for SOLCLIENT_GLOBAL_PROP_CRYPTO_LIB
Index ¶
- type AuthenticationError
- type DirectMessagePublisher
- type DirectMessagePublisherBuilder
- type DirectMessageReceiver
- type DirectMessageReceiverBuilder
- type EndpointProvisioner
- type Error
- type FailedPublishEvent
- type IllegalArgumentError
- type IllegalStateError
- type IncompleteMessageDeliveryError
- type InvalidConfigurationError
- type LifecycleControl
- type MessageHandler
- type MessagePublishReceiptListener
- type MessagePublisher
- type MessagePublisherHealthCheck
- type MessageReceiver
- type MessageReplayError
- type MessagingService
- type MessagingServiceBuilder
- type NativeError
- type OutboundMessageBuilder
- type PersistentMessagePublisher
- type PersistentMessagePublisherBuilder
- type PersistentMessageReceiver
- type PersistentMessageReceiverBuilder
- type PersistentReceiverInfo
- type ProvisionOutcome
- type PublishFailureListener
- type PublishReceipt
- type PublisherOverflowError
- type PublisherReadinessListener
- type ReceiverState
- type ReceiverStateChangeListener
- type ReconnectionAttemptListener
- type ReconnectionListener
- type Replier
- type ReplyMessageHandler
- type RequestMessageHandler
- type RequestReplyMessagePublisher
- type RequestReplyMessagePublisherBuilder
- type RequestReplyMessageReceiver
- type RequestReplyMessageReceiverBuilder
- type RequestReplyMessagingService
- type ResourceInfo
- type ServiceEvent
- type ServiceInterruptionListener
- type ServiceUnreachableError
- type SubscriptionChangeListener
- type SubscriptionOperation
- type TerminationEvent
- type TerminationNotificationListener
- type TimeoutError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AuthenticationError ¶
type AuthenticationError struct {
// contains filtered or unexported fields
}
AuthenticationError indicates an authentication related error occurred when connecting to a remote event broker. The pointer type *AuthenticationError is returned.
type DirectMessagePublisher ¶
type DirectMessagePublisher interface { MessagePublisher MessagePublisherHealthCheck // StartAsyncCallback starts the DirectMessagePublisher asynchronously. // Calls the specified callback when started with an error if one occurred, otherwise nil // if successful. StartAsyncCallback(callback func(DirectMessagePublisher, error)) // SetPublishFailureListener sets the listener to call if the publishing of // a direct message fails. SetPublishFailureListener(listener PublishFailureListener) // TerminateAsyncCallback terminates the DirectMessagePublisher asynchronously. // Calls the callback when terminated with nil if successful, or an error if // one occurred. If gracePeriod is less than 0, this function waits indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // PublishBytes publishes a message of type byte array to the specified destination. // Returns an error if one occurred while attempting to publish, or if the publisher // is not started/terminated. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than the publisher's I/O // capabilities allow. When publishing can be resumed, the registered // PublisherReadinessListeners are called. PublishBytes(message []byte, destination *resource.Topic) error // PublishString publishes a message of type string to the specified destination. // Returns an error if one occurred. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than the publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners are called. PublishString(message string, destination *resource.Topic) error // Publish publishes the specified message of type OutboundMessage built by a // OutboundMessageBuilder to the specified destination. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than the publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners are called. Publish(message message.OutboundMessage, destination *resource.Topic) error // PublishWithProperties publishes the specified message of type OutboundMessage // with the specified properties. These properties override the properties on // the OutboundMessage instance if it is present. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than the publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners are called. PublishWithProperties(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider) error }
The DirectMessagePublisher interface is used to publish direct messages.
type DirectMessagePublisherBuilder ¶
type DirectMessagePublisherBuilder interface { // Build creates a new DirectMessagePublisher instance based on the configured properties. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build() (messagePublisher DirectMessagePublisher, err error) // OnBackPressureReject sets the publisher back pressure strategy to reject // where publish attempts will be rejected once the bufferSize, in number of messages, is reached. // If bufferSize is 0, an error will be thrown when the transport is full when publishing. // A buffer of the given size will be statically allocated when the publisher is built. // Valid bufferSize is >= 0. OnBackPressureReject(bufferSize uint) DirectMessagePublisherBuilder // OnBackPressureWait sets the publisher back pressure strategy to wait where publish // attempts may block until there is space in the buffer of size bufferSize in number of messages. // A buffer of the given size will be statically allocated when the publisher is built. // Valid bufferSize is >= 1. OnBackPressureWait(bufferSize uint) DirectMessagePublisherBuilder // FromConfigurationProvider configures the direct publisher with the specified properties. // The built-in PublisherPropertiesConfigurationProvider implementations include: // - PublisherPropertyMap - A map of PublisherProperty keys to values. FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) DirectMessagePublisherBuilder }
DirectMessagePublisherBuilder allows for configuration of direct message publisher instances.
type DirectMessageReceiver ¶
type DirectMessageReceiver interface { MessageReceiver // Include all functionality of MessageReceiver. // StartAsyncCallback starts the DirectMessageReceiver asynchronously. // Calls the callback when started with an error if one occurred, otherwise nil // if successful. StartAsyncCallback(callback func(DirectMessageReceiver, error)) // TerminateAsyncCallback terminates the DirectMessageReceiver asynchronously. // Calls the callback when terminated with nil if successful or an error if // one occurred. If gracePeriod is less than 0, the function will wait indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // ReceiveAsync registers a callback to be called when new messages // are received. Returns an error if one occurred while registering the callback. // If a callback is already registered, it will be replaced by the specified // callback. ReceiveAsync(callback MessageHandler) error // ReceiveMessage receives a inbound message synchronously from the receiver. // Returns an error if the receiver has not started, or has already terminated. // ReceiveMessage waits until the specified timeout to receive a message, or will wait // forever if the timeout specified is a negative value. If a timeout occurs, a solace.TimeoutError // is returned. ReceiveMessage(timeout time.Duration) (received message.InboundMessage, err error) }
The DirectMessageReceiver is used to receive direct messages.
type DirectMessageReceiverBuilder ¶
type DirectMessageReceiverBuilder interface { // Build creates a DirectMessageReceiver with the specified properties. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build() (messageReceiver DirectMessageReceiver, err error) // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided // or the specified ShareName is invalid. BuildWithShareName(shareName *resource.ShareName) (messageReceiver DirectMessageReceiver, err error) // OnBackPressureDropLatest configures the receiver with the specified buffer size. If the buffer // is full and a message arrives, the incoming message is discarded. // A buffer of the given size will be statically allocated when the receiver is built. // The bufferCapacity must be greater than or equal to 1. OnBackPressureDropLatest(bufferCapacity uint) DirectMessageReceiverBuilder // OnBackPressureDropOldest configures the receiver with the specified buffer size, bufferCapacity. If the buffer // is full and a message arrives, the oldest message in the buffer is discarded. // A buffer of the given size will be statically allocated when the receiver is built. // The value of bufferCapacity must be greater than or equal to 1. OnBackPressureDropOldest(bufferCapacity uint) DirectMessageReceiverBuilder // WithSubscriptions sets a list of TopicSubscriptions to subscribe // to when starting the receiver. This function also accepts *resource.TopicSubscription subscriptions. WithSubscriptions(topics ...resource.Subscription) DirectMessageReceiverBuilder // FromConfigurationProvider configures the DirectMessageReceiver with the specified properties. // The built-in ReceiverPropertiesConfigurationProvider implementations include: // - ReceiverPropertyMap - A map of ReceiverProperty keys to values. FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) DirectMessageReceiverBuilder }
DirectMessageReceiverBuilder allows for configuration of DirectMessageReceiver instances.
type EndpointProvisioner ¶ added in v1.7.0
type EndpointProvisioner interface { // Provision a queue with the specified name on the broker bearing // all the properties configured on the Provisioner. // Properties left unconfigured will be set to broker defaults. // Accepts a boolean parameter to ignore a specific error response from the broker which indicates // that a queue with the same name and properties already exists. // Blocks until the operation is finished on the broker, returns the provision outcome. Provision(queueName string, ignoreExists bool) ProvisionOutcome // ProvisionAsync will asynchronously provision a queue with the specified name on // the broker bearing all the properties configured on the Provisioner. // Accepts a boolean parameter to ignore a specific error response from the broker which indicates // that a queue with the same name and properties already exists. // Properties left unconfigured will be set to broker defaults. // This function is idempotent. The only way to resume configuration operation // after this function is called is to create a new instance. // Any attempt to call this function will provision the queue // on the broker, even if this function completes. // The maximum number of outstanding requests for provision is set to 32. // This function will return an error when this limit is reached or exceeded. // Returns a channel immediately that receives the endpoint provision outcome when completed. ProvisionAsync(queueName string, ignoreExists bool) <-chan ProvisionOutcome // ProvisionAsyncWithCallback will asynchronously provision a queue with the specified name on // the broker bearing all the properties configured on the Provisioner. // Accepts a boolean parameter to ignore a specific error response from the broker which indicates // that a queue with the same name and properties already exists. // Properties left unconfigured will be set to broker defaults. // This function is idempotent. The only way to resume configuration operation // after this function is called is to create a new instance. // Any attempt to call this function will provision the queue // on the broker, even if this function completes. // Returns immediately and registers a callback that will receive an // outcome for the endpoint provision. // Please note that the callback may not be executed in network order from the broker ProvisionAsyncWithCallback(queueName string, ignoreExists bool, callback func(ProvisionOutcome)) // Deprovision (deletes) the queue with the given name from the broker. // Ignores all queue properties accumulated in the EndpointProvisioner. // Accepts the ignoreMissing boolean property, which, if set to true, // turns the "no such queue" error into nil. // Blocks until the operation is finished on the broker, returns the nil or an error Deprovision(queueName string, ignoreMissing bool) error // DeprovisionAsync will asynchronously deprovision (deletes) the queue with the given // name from the broker. Returns immediately. // Ignores all queue properties accumulated in the EndpointProvisioner. // Accepts the ignoreMissing boolean property, which, if set to true, // turns the "no such queue" error into nil. // Any error (or nil if successful) is reported through the returned channel. // Returns a channel immediately that receives nil or an error. DeprovisionAsync(queueName string, ignoreMissing bool) <-chan error // DeprovisionAsyncWithCallback will asynchronously deprovision (deletes) the queue with the // given name on the broker. // Ignores all queue properties accumulated in the EndpointProvisioner. // Accepts the ignoreMissing boolean property, which, if set to true, // turns the "no such queue" error into nil. // Returns immediately and registers a callback that will receive an // error if deprovision on the broker fails. // Please note that the callback may not be executed in network order from the broker DeprovisionAsyncWithCallback(queueName string, ignoreMissing bool, callback func(err error)) // FromConfigurationProvider sets the configuration based on the specified configuration provider. // The following are built in configuration providers: // - EndpointPropertyMap - This can be used to set an EndpointProperty to a value programatically. // // The EndpointPropertiesConfigurationProvider interface can also be implemented by a type // to have it act as a configuration factory by implementing the following: // // func (type MyType) GetConfiguration() EndpointPropertyMap {...} // // Any properties provided by the configuration provider are layered over top of any // previously set properties, including those set by specifying various strategies. // Can be used to clone a EndpointProvisioner object. FromConfigurationProvider(properties config.EndpointPropertiesConfigurationProvider) EndpointProvisioner // Returns a copy of the current configuration held. GetConfiguration() config.EndpointPropertyMap // WithProperty will set an individual queue property by name. Does not perform type checking. WithProperty(propertyName config.EndpointProperty, propertyValue interface{}) EndpointProvisioner // WithDurability will set the durability property for the endpoint. // True for durable, false for non-durable. WithDurability(durable bool) EndpointProvisioner // WithExclusiveAccess will set the endpoint access type. // True for exclusive, false for non-exclusive. WithExclusiveAccess(exclusive bool) EndpointProvisioner // WithDiscardNotification will set the notification behaviour on message discards. // True to notify senders about discards, false not to. WithDiscardNotification(notifySender bool) EndpointProvisioner // WithMaxMessageRedelivery will sets the number of times messages from the // queue will be redelivered before being diverted to the DMQ. WithMaxMessageRedelivery(count uint) EndpointProvisioner // WithMaxMessageSize will set the maximum message size in bytes the queue will accept. WithMaxMessageSize(count uint) EndpointProvisioner // WithPermission will set the queue's permission level for others. // The levels are supersets of each other, can not be combined and the last one set will take effect. WithPermission(permission config.EndpointPermission) EndpointProvisioner // WithQuotaMB will set the overall size limit of the queue in MegaBytes. WithQuotaMB(quota uint) EndpointProvisioner // WithTTLPolicy will set how the queue will handle the TTL value in messages. // True to respect it, false to ignore it. WithTTLPolicy(respect bool) EndpointProvisioner }
EndpointProvisioner aids the type-safe collection of queue properties, and can provision multiple queues with different names (but identical properties) on the broker. Warning: This is a mutable object. The fluent builder style setters modify and return the original object. Make copies explicitly.
type Error ¶
type Error interface { error // contains filtered or unexported methods }
Error is an error returned from the API.
type FailedPublishEvent ¶
type FailedPublishEvent interface { // GetMessage retrieves the message that was not delivered GetMessage() message.OutboundMessage // GetDestination retrieves the destination that the message was published to. GetDestination() resource.Destination // GetTimeStamp retrieves the timestamp of the error. GetTimeStamp() time.Time // GetError retrieves the error that failed the publish attempt. GetError() error }
FailedPublishEvent represents an event thrown when publishing a direct message fails.
type IllegalArgumentError ¶
type IllegalArgumentError struct {
// contains filtered or unexported fields
}
IllegalArgumentError indicates an invalid argument was passed to a function. The pointer type *IllegalArgumentError is returned.
type IllegalStateError ¶
type IllegalStateError struct {
// contains filtered or unexported fields
}
IllegalStateError indicates an invalid state occurred when performing an action. The pointer type *IllegalStateError is returned.
type IncompleteMessageDeliveryError ¶
type IncompleteMessageDeliveryError struct {
// contains filtered or unexported fields
}
IncompleteMessageDeliveryError indicates that some messages were not delivered. The pointer type *IncompleteMessageDeliveryError is returned.
type InvalidConfigurationError ¶
type InvalidConfigurationError struct {
// contains filtered or unexported fields
}
InvalidConfigurationError indicates that a specified configuration is invalid. These errors are returned by the Build functions of a builder. The pointer type *InvalidConfigurationError is returned.
type LifecycleControl ¶
type LifecycleControl interface { // Start starts the messaging service synchronously. // Before this function is called, the messaging service is considered // off-duty. To operate normally, this function must be called on // a receiver or publisher instance. This function is idempotent. // Returns an error if one occurred or nil if successful. Start() error // StartAsync starts the messaging service asynchronously. // Before this function is called, the messaging service is considered // off-duty. To operate normally, this function must be called on // a receiver or publisher instance. This function is idempotent. // Returns a channel that will receive an error if one occurred or // nil if successful. Subsequent calls will return additional // channels that can await an error, or nil if already started. StartAsync() <-chan error // Terminate terminates the messaging service gracefully and synchronously. // This function is idempotent. The only way to resume operation // after this function is called is to create another instance. // Any attempt to call this function renders the instance // permanently terminated, even if this function completes. // A graceful shutdown is attempted within the specified grace period (gracePeriod). // Setting gracePeriod to 0 implies a non-graceful shutdown that ignores // unfinished tasks or in-flight messages. // This function returns an error if one occurred, or // nil if it successfully and gracefully terminated. // If gracePeriod is set to less than 0, the function waits indefinitely. Terminate(gracePeriod time.Duration) error // TerminateAsync terminates the messaging service asynchronously. // This function is idempotent. The only way to resume operation // after this function is called is to create another instance. // Any attempt to call this function renders the instance // permanently terminated, even if this function completes. // A graceful shutdown is attempted within the specified grace period (gracePeriod). // Setting gracePeriod to 0 implies a non-graceful shutdown that ignores // unfinished tasks or in-flight messages. // This function returns a channel that receives an error if one occurred, or // nil if it successfully and gracefully terminated. // If gracePeriod is set to less than 0, the function waits indefinitely. TerminateAsync(gracePeriod time.Duration) <-chan error // IsRunning checks if the process was successfully started and not yet stopped. // Returns true if running, otherwise false. IsRunning() bool // IsTerminates checks if the message-delivery process is terminated. // Returns true if terminated, otherwise false. IsTerminated() bool // IsTerminating checks if the message-delivery process termination is ongoing. // Returns true if the message message-delivery process is being terminated, // but termination is not yet complete, otherwise false. IsTerminating() bool // SetTerminationNotificationListener adds a callback to listen for // non-recoverable interruption events. SetTerminationNotificationListener(listener TerminationNotificationListener) }
LifecycleControl contains lifecycle functionality common to various messaging services such as publishers and receivers.
type MessageHandler ¶
type MessageHandler func(inboundMessage message.InboundMessage)
MessageHandler is a callback that can be registered to receive messages asynchronously.
type MessagePublishReceiptListener ¶
type MessagePublishReceiptListener func(PublishReceipt)
MessagePublishReceiptListener is a listener that can be registered for the delivery receipt events.
type MessagePublisher ¶
type MessagePublisher interface { // Extend LifecycleControl for various lifecycle management functionality. LifecycleControl }
MessagePublisher represents the shared functionality between all publisher instances.
type MessagePublisherHealthCheck ¶
type MessagePublisherHealthCheck interface { // IsReady checks if the publisher can publish messages. Returns true if the // publisher can publish messages, otherwise false if the publisher is prevented from // sending messages (e.g., a full buffer or I/O problems). IsReady() bool // SetPublisherReadinessListener registers a listener to be called when the // publisher can send messages. Typically, the listener is notified after a // Publisher instance raises an error indicating that the outbound message // buffer is full. SetPublisherReadinessListener(listener PublisherReadinessListener) // NotifyWhenReady makes a request to notify the application when the // publisher is ready. This function triggers a readiness notification if one // needs to be sent, otherwise the next readiness notification is // processed. NotifyWhenReady() }
MessagePublisherHealthCheck allows applications to check and listen for events that indicate when message publishers are ready to publish. This is often used to handle various back pressure schemes, such as reject on full, and allows publishing to stop until the publisher can begin accepting more messages.
type MessageReceiver ¶
type MessageReceiver interface { // Extend LifecycleControl for various lifecycle management functionality LifecycleControl // AddSubscription will subscribe to another message source on a PubSub+ Broker to receive messages from. // Will block until subscription is added. Accepts *resource.TopicSubscription instances as the subscription. // Returns a solace/errors.*IllegalStateError if the service is not running. // Returns a solace/errors.*IllegalArgumentError if unsupported Subscription type is passed. // Returns nil if successful. AddSubscription(subscription resource.Subscription) error // RemoveSubscription will unsubscribe from a previously subscribed message source on a broker // such that no more messages will be received from it. // Will block until subscription is removed. // Accepts *resource.TopicSubscription instances as the subscription. // Returns an solace/errors.*IllegalStateError if the service is not running. // Returns a solace/errors.*IllegalArgumentError if unsupported Subscription type is passed. // Returns nil if successful. RemoveSubscription(subscription resource.Subscription) error // AddSubscriptionAsync will subscribe to another message source on a PubSub+ Broker to receive messages from. // Will block until subscription is added. Accepts *resource.TopicSubscription instances as the subscription. // Returns a solace/errors.*IllegalStateError if the service is not running. // Returns a solace/errors.*IllegalArgumentError if unsupported Subscription type is passed. // Returns nil if successful. AddSubscriptionAsync(subscription resource.Subscription, listener SubscriptionChangeListener) error // RemoveSubscriptionAsymc will unsubscribe from a previously subscribed message source on a broker // such that no more messages will be received from it. Will block until subscription is removed. // Accepts *resource.TopicSubscription instances as the subscription. // Returns an solace/errors.*IllegalStateError if the service is not running. // Returns a solace/errors.*IllegalArgumentError if unsupported Subscription type is passed. // Returns nil if successful. RemoveSubscriptionAsync(subscription resource.Subscription, listener SubscriptionChangeListener) error }
MessageReceiver represents the shared functionality between all MessageReceivers
type MessageReplayError ¶
type MessageReplayError struct {
// contains filtered or unexported fields
}
MessageReplayError indicates
type MessagingService ¶
type MessagingService interface { // Connect connects the messaging service. // This function blocks until the connection attempt is completed. // Returns nil if successful, otherwise an error containing failure details, which may be the following: // - solace/errors.*PubSubPlusClientError - If a connection error occurs. // - solace/errors.*IllegalStateError - If MessagingService has already been terminated. Connect() error // ConnectAsync connects the messaging service asynchronously. // Returns a channel that receives an event when completed. // Channel (chan) receives nil if successful, otherwise an error containing failure details. // For more information, see MessagingService.Connect. ConnectAsync() <-chan error // ConnectAsyncWithCallback connects the messaging service asynchonously. // When complete, the specified callback is called with nil if successful, // otherwise an error if not successful. In both cases, the messaging service // is passed as well. ConnectAsyncWithCallback(callback func(MessagingService, error)) // CreateDirectMessagePublisherBuilder creates a DirectMessagePublisherBuilder // that can be used to configure direct message publisher instances. CreateDirectMessagePublisherBuilder() DirectMessagePublisherBuilder // CreateDirectMessageReceiverBuilder creates a DirectMessageReceiverBuilder // that can be used to configure direct message receiver instances. CreateDirectMessageReceiverBuilder() DirectMessageReceiverBuilder // CreatePersistentMessagePublisherBuilder creates a PersistentMessagePublisherBuilder // that can be used to configure persistent message publisher instances. CreatePersistentMessagePublisherBuilder() PersistentMessagePublisherBuilder // CreatePersistentMessageReceiverBuilder creates a PersistentMessageReceiverBuilder // that can be used to configure persistent message receiver instances. CreatePersistentMessageReceiverBuilder() PersistentMessageReceiverBuilder // MessageBuilder creates an OutboundMessageBuilder that can be // used to build messages to send via a message publisher. MessageBuilder() OutboundMessageBuilder // EndpointProvisioner is used to provision and deprovision endpoints on the broker. EndpointProvisioner() EndpointProvisioner // RequestReply creates a RequestReplyMessagingService that inherits // the configuration of this MessagingService instance. RequestReply() RequestReplyMessagingService // Disconnect disconnects the messaging service. // The messaging service must be connected to disconnect. // This function blocks until the disconnection attempt is completed. // Returns nil if successful, otherwise an error containing failure details. // A disconnected messaging service may not be reconnected. // Returns solace/errors.*IllegalStateError if it is not yet connected. Disconnect() error // DisconnectAsync disconnects the messaging service asynchronously. // Returns a channel (chan) that receives an event when completed. // The channel receives nil if successful, otherwise an error containing the failure details // For more information, see MessagingService.Disconnect. DisconnectAsync() <-chan error // DisconnectAsyncWithCallback disconnects the messaging service asynchronously. // When complete, the specified callback is called with nil if successful, otherwise // an error if not successful. DisconnectAsyncWithCallback(callback func(error)) // IsConnected determines if the messaging service is operational and if Connect was previously // called successfully. // Returns true if the messaging service is connected to a remote destination, otherwise false. IsConnected() bool // AddReconnectionListener adds a new reconnection listener to the messaging service. // The reconnection listener is called when reconnection events occur. // Returns an identifier that can be used to remove the listener using RemoveReconnectionListener. AddReconnectionListener(listener ReconnectionListener) uint64 // AddReconnectionAttemptListener adds a listener to receive reconnection-attempt notifications. // The reconnection listener is called when reconnection-attempt events occur. // Returns an identifier that can be used to remove the listener using RemoveReconnectionAttemptListener. AddReconnectionAttemptListener(listener ReconnectionAttemptListener) uint64 // RemoveReconnectionListener removes a listener from the messaging service with the specified identifier. RemoveReconnectionListener(listenerID uint64) // RemoveReconnectionAttemptListener removes a listener from the messaging service with the specified identifier. RemoveReconnectionAttemptListener(listenerID uint64) // AddServiceInterruptionListener adds a listener to receive non-recoverable, service-interruption events. // Returns an identifier othat can be used to remove the listener using RemoveServiceInterruptionListener. AddServiceInterruptionListener(listener ServiceInterruptionListener) uint64 // RemoveServiceInterruptionListener removes a service listener to receive non-recoverable, // service-interruption events with the specified identifier. RemoveServiceInterruptionListener(listenerID uint64) // GetApplicationID retrieves the application identifier. GetApplicationID() string // Metrics returns the metrics for this MessagingService instance. Metrics() metrics.APIMetrics // Info returns the API Info for this MessagingService instance. Info() metrics.APIInfo // Updates the value of a modifiable service property once the service has been created. // Modifiable service properties include: // - solace/config.AuthenticationPropertySchemeOAuth2AccessToken, // whose update will be applied during the next reconnection attempt. // - solace/config.AuthenticationPropertySchemeOAuth2OIDCIDToken, // whose update will be applied during the next reconnection attempt. // // Modification of a service property may occur instantly, or may occur during the next // service reconnection. // Modification of a service property during an ongoing service reconnection may apply // to the next reconnection attempt. // property (ServiceProperty): The name of the property to modify. // value (interface{}): The new value of the property. // // - solace/errors.*IllegalArgumentError: If the specified property cannot // - be modified. // - solace/errors.*IllegalStateError: If the specified property cannot // be modified in the current service state. // - solace/errors.*NativeError: If other transport or communication related errors occur. UpdateProperty(property config.ServiceProperty, value interface{}) error }
MessagingService represents a broker that provides a messaging service.
type MessagingServiceBuilder ¶
type MessagingServiceBuilder interface { // Build creates MessagingService based on the provided configuration. // Returns the built MessagingService instance, otherwise nil if an error occurred. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build() (messagingService MessagingService, err error) // BuildWithApplicationID creates MessagingService based on the provided configuration // using the specified application identifier as the applicationID. // Returns the created MessagingService instance, otherwise nil if an error occurred. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. BuildWithApplicationID(applicationID string) (messagingService MessagingService, err error) // FromConfigurationProvider sets the configuration based on the specified configuration provider. // The following are built in configuration providers: // - ServicePropertyMap - This can be used to set a ServiceProperty to a value programatically. // // The ServicePropertiesConfigurationProvider interface can also be implemented by a type // to have it act as a configuration factory by implementing the following: // // func (type MyType) GetConfiguration() ServicePropertyMap {...} // // Any properties provided by the configuration provider are layered over top of any // previously set properties, including those set by specifying various strategies. FromConfigurationProvider(provider config.ServicePropertiesConfigurationProvider) MessagingServiceBuilder // WithAuthenticationStrategy configures the resulting messaging service // with the specified authentication configuration WithAuthenticationStrategy(authenticationStrategy config.AuthenticationStrategy) MessagingServiceBuilder // WithRetryStrategy configures the resulting messaging service // with the specified retry strategy WithConnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder // WithMessageCompression configures the resulting messaging service // with the specified compression factor. The builder attempts to use // the specified compression-level with the provided host and port. It fails // to build if an an atempt is made to use compression on a non-secured and // non-compressed port. WithMessageCompression(compressionFactor int) MessagingServiceBuilder // WithReconnectionRetryStrategy configures the resulting messaging service // with the specified reconnection strategy. WithReconnectionRetryStrategy(retryStrategy config.RetryStrategy) MessagingServiceBuilder // WithTransportSecurityStrategy configures the resulting messaging service // with the specified transport security strategy. WithTransportSecurityStrategy(transportSecurityStrategy config.TransportSecurityStrategy) MessagingServiceBuilder // WithProvisionTimeoutMs configures the timeout for provision and deprovision operations, in milliseconds. WithProvisionTimeoutMs(timeout time.Duration) MessagingServiceBuilder }
MessagingServiceBuilder is used to configure and build MessagingService instances.
type NativeError ¶
type NativeError struct {
// contains filtered or unexported fields
}
NativeError is a struct that stores the error message and subcode for the error message.
func NewNativeError ¶
func NewNativeError(message string, subcode subcode.Code) *NativeError
NewNativeError returns a new solace.NativeError with the given message and subcode when applicable.
func (*NativeError) Error ¶
func (err *NativeError) Error() string
Error returns the error message from solace.NativeError.
func (*NativeError) SubCode ¶
func (err *NativeError) SubCode() subcode.Code
SubCode returns the subcode associated with the specified error, otherwise SubCodeNone if no subcode is relevant.
type OutboundMessageBuilder ¶
type OutboundMessageBuilder interface { // Build creates an OutboundMessage instance based on the configured properties. // Accepts additional configuration providers to apply only to the built message, with the // last in the list taking precedence. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build(additionalConfiguration ...config.MessagePropertiesConfigurationProvider) (message.OutboundMessage, error) // BuildWithByteArrayPayload creates a message with a byte array payload. // Accepts additional configuration providers to apply only to the built message, with the // last in the list taking precedence. // Returns the built message, otherwise an error if one occurred. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. BuildWithByteArrayPayload(payload []byte, additionalConfiguration ...config.MessagePropertiesConfigurationProvider) (message message.OutboundMessage, err error) // BuildWithStringPayload builds a new message with a string payload. // Accepts additional configuration providers to apply only to the built message, with the // last in the list taking precedence. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. BuildWithStringPayload(payload string, additionalConfiguration ...config.MessagePropertiesConfigurationProvider) (message message.OutboundMessage, err error) // BuildWithMapPayload builds a new message with a SDTMap payload. // Accepts additional configuration providers to apply only to the built message, with the // last in the list taking precedence. // If invalid data, ie. data not allowed as SDTData, is found in the // map, this function will return a nil OutboundMessage and an error. // Returns a solace/errors.*IllegalArgumentError if an invalid payload is specified. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. BuildWithMapPayload(payload sdt.Map, additionalConfiguration ...config.MessagePropertiesConfigurationProvider) (message message.OutboundMessage, err error) // BuildWithStreamPayload builds a new message with a SDTStream payload. // Accepts additional configuration providers to apply only to the built message, with the // last in the list taking precedence. // If invalid data, ie. data not allowed as SDTData, is found in the // stream, this function returns a nil OutboundMessage and an error. // Returns a solace/errors.*IllegalArgumentError if an invalid payload is specified. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. BuildWithStreamPayload(payload sdt.Stream, additionalConfiguration ...config.MessagePropertiesConfigurationProvider) (message message.OutboundMessage, err error) // FromConfigurationProvider sets the given message properties to the resulting message. // Both Solace defined config.MessageProperty keys as well as arbitrary user-defined // property keys are accepted. If using custom defined properties, the date type can be // any of sdt.Data supported types. FromConfigurationProvider(properties config.MessagePropertiesConfigurationProvider) OutboundMessageBuilder // WithProperty sets an individual message property on the resulting message. // Both Solace defined config.MessageProperty keys as well as arbitrary user-defined // property keys are accepted. If using custom defined properties, the date type can be // any of sdt.Data supported types. WithProperty(propertyName config.MessageProperty, propertyValue interface{}) OutboundMessageBuilder // WithExpiration sets the message expiration time to the given time. WithExpiration(t time.Time) OutboundMessageBuilder // WithHTTPContentHeader sets the specified HTTP content-header on the message. WithHTTPContentHeader(contentType, contentEncoding string) OutboundMessageBuilder // WithPriority sets the priority of the message, where the priority is a value between 0 (lowest) and 255 (highest). WithPriority(priority int) OutboundMessageBuilder // WithApplicationMessageId sets the application message ID of the message. It is carried in the message metadata // and is used for application to application signaling. WithApplicationMessageID(messageID string) OutboundMessageBuilder // WithApplicationMessageType sets the application message type for a message. It is carried in the message metadata // and is used for application to application signaling. WithApplicationMessageType(messageType string) OutboundMessageBuilder // WithSequenceNumber sets the sequence number for the message. The sequence number is carried in the message metadata // and is used for application to application signaling. WithSequenceNumber(sequenceNumber uint64) OutboundMessageBuilder // WithSenderID sets the sender ID for a message from a string. If config.ServicePropertyGenerateSenderID is enabled on // the messaging service, then passing a string to this method will override the API generated sender ID. WithSenderID(senderID string) OutboundMessageBuilder // WithCorrelationID sets the correlation ID for the message. The correlation ID is user-defined and carried end-to-end. // It can be matched in a selector, but otherwise is not relevant to the event broker. The correlation ID may be used // for peer-to-peer message synchronization. In JMS applications, this field is carried as the JMSCorrelationID Message // Header Field. WithCorrelationID(correlationID string) OutboundMessageBuilder }
OutboundMessageBuilder allows construction of messages to be sent.
type PersistentMessagePublisher ¶
type PersistentMessagePublisher interface { MessagePublisher MessagePublisherHealthCheck // StartAsyncCallback starts the PersistentMessagePublisher asynchronously. // Calls the callback when started with an error if one occurred, otherwise nil // when successful. StartAsyncCallback(callback func(PersistentMessagePublisher, error)) // SetPublishReceiptListener sets the listener to receive delivery receipts. // PublishReceipt events are triggered once the API receives an acknowledgement // when a message is received from the event broker. // This should be set before the publisher is started to avoid dropping acknowledgements. // The listener does not receive events from PublishAwaitAcknowledgement calls. SetMessagePublishReceiptListener(listener MessagePublishReceiptListener) // TerminateAsyncCallback terminates the PersistentMessagePublisher asynchronously. // Calls the callback when terminated with nil if successful, otherwise an error if // one occurred. When gracePeriod is a value less than 0, the function waits indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // PublishBytes sends a message of type byte array to the specified destination. // Returns an error if one occurred while attempting to publish or if the publisher // is not started/terminated. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. PublishBytes(message []byte, destination *resource.Topic) error // PublishString sends a message of type string to the specified destination. // Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. PublishString(message string, destination *resource.Topic) error // Publish sends the specified message of type OutboundMessage built by a // OutboundMessageBuilder to the specified destination. // Optionally, you can provide properties in the form of OutboundMessageProperties to override // any properties set on OutboundMessage. The properties argument can be nil to // not set any properties. // Optionally, provide a context that is available in the PublishReceiptListener // registered with SetMessagePublishReceiptListener as GetUserContext. // The context argument can be nil to not set a context. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{}) error // PublishAwaitAcknowledgement sends the specified message of type OutboundMessage // and awaits a publish acknowledgement. // Optionally, you can provide properties in the form of OutboundMessageProperties to override // any properties set on OutboundMessage. The properties argument can be nil to // not set any properties. // If the specified timeout argument is less than 0, the function waits indefinitely. // Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. PublishAwaitAcknowledgement(message message.OutboundMessage, destination *resource.Topic, timeout time.Duration, properties config.MessagePropertiesConfigurationProvider) error }
PersistentMessagePublisher allows for the publishing of persistent messages (guaranteed messages).
type PersistentMessagePublisherBuilder ¶
type PersistentMessagePublisherBuilder interface { // Build returns a new PersistentMessagePublisher based on the configured properties. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build() (messagePublisher PersistentMessagePublisher, err error) // OnBackPressureReject sets the publisher back pressure strategy to reject // where the publish attempts are rejected once the bufferSize (the number of messages), is reached. // If bufferSize is 0, an error is thrown when the transport is full when attempting to publish. // A buffer of the given size will be statically allocated when the publisher is built. // Valid bufferSize is greater than or equal to 0. OnBackPressureReject(bufferSize uint) PersistentMessagePublisherBuilder // OnBackPressureWait sets the publisher back pressure strategy to wait where publish // attempts block until there is space in the buffer of size bufferSize (the number of messages). // A buffer of the given size will be statically allocated when the publisher is built. // Valid bufferSize is greater than or equal to 1. OnBackPressureWait(bufferSize uint) PersistentMessagePublisherBuilder // FromConfigurationProvider configures the persistent publisher with the given properties. // Built in PublisherPropertiesConfigurationProvider implementations include: // - PublisherPropertyMap - A map of PublisherProperty keys to values. FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) PersistentMessagePublisherBuilder }
PersistentMessagePublisherBuilder allows for configuration of persistent message publisher instances.
type PersistentMessageReceiver ¶
type PersistentMessageReceiver interface { MessageReceiver // Include all functionality of MessageReceiver. // Ack acknowledges that a message was received. Ack(message message.InboundMessage) error // StartAsyncCallback starts the PersistentMessageReceiver asynchronously. // Calls the callback when started with an error if one occurred, otherwise nil // if successful. StartAsyncCallback(callback func(PersistentMessageReceiver, error)) // TerminateAsyncCallback terminates the PersistentMessageReceiver asynchronously. // Calls the callback when terminated with nil if successful, otherwise an error if // one occurred. If gracePeriod is less than 0, the function waits indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // ReceiveAsync registers a callback to be called when new messages // are received. Returns an error if one occurred while registering the callback. // If a callback is already registered, it is replaced by the specified // callback. ReceiveAsync(callback MessageHandler) error // ReceiveMessage receives a message synchronously from the receiver. // Returns an error if the receiver is not started or already terminated. // This function waits until the specified timeout to receive a message or waits // forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError // is returned. ReceiveMessage(timeout time.Duration) (message.InboundMessage, error) // Pause pauses the receiver's message delivery to asynchronous message handlers. // Pausing an already paused receiver has no effect. // Returns an IllegalStateError if the receiver has not started or has already terminated. Pause() error // Resume unpause the receiver's message delivery to asynchronous message handlers. // Resume a receiver that is not paused has no effect. // Returns an IllegalStateError if the receiver has not started or has already terminated. Resume() error // ReceiverInfo returns a runtime accessor for the receiver information such as the remote // resource to which it connects. // Returns an IllegalStateError if the receiver has not started or has already terminated. ReceiverInfo() (info PersistentReceiverInfo, err error) }
PersistentMessageReceiver allows for receiving persistent message (guaranteed messages).
type PersistentMessageReceiverBuilder ¶
type PersistentMessageReceiverBuilder interface { // Build creates a PersistentMessageReceiver with the specified properties. // Returns solace/errors.*IllegalArgumentError if the queue is nil. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build(queue *resource.Queue) (receiver PersistentMessageReceiver, err error) // WithActivationPassivationSupport sets the listener to receiver broker notifications // about state changes for the resulting receiver. This change can happen if there are // multiple instances of the same receiver for high availability and activity is exchanged. // This change is handled by the broker. WithActivationPassivationSupport(listener ReceiverStateChangeListener) PersistentMessageReceiverBuilder // WithMessageAutoAcknowledgement enables automatic acknowledgement on all receiver methods. // Auto Acknowledgement will be performed after an acknowledgement after the receive callback // is called when using ReceiveAsync, or after the message is passed to the application when // using ReceiveMessage. In cases where underlying network connectivity fails, automatic // acknowledgement processing is not guaranteed. WithMessageAutoAcknowledgement() PersistentMessageReceiverBuilder // WithMessageClientAcknowledgement disables automatic acknowledgement on all receiver methods // and instead enables support for client acknowledgement for both synchronous and asynchronous // message delivery functions. New persistent receiver builders default to client acknowledgement. WithMessageClientAcknowledgement() PersistentMessageReceiverBuilder // WithMessageSelector sets the message selector to the specified string. // If an empty string is provided, the filter is cleared. WithMessageSelector(filterSelectorExpression string) PersistentMessageReceiverBuilder // WithMissingResourcesCreationStrategy sets the missing resource creation strategy // defining what actions the API may take when missing resources are detected. WithMissingResourcesCreationStrategy(strategy config.MissingResourcesCreationStrategy) PersistentMessageReceiverBuilder // WithMessageReplay enables support for message replay using a specific replay // strategy. Once started, the receiver replays using the specified strategy. // Valid strategies include config.ReplayStrategyAllMessages(), // config.ReplayStrategyTimeBased and config.ReplicationGroupMessageIDReplayStrategy. WithMessageReplay(strategy config.ReplayStrategy) PersistentMessageReceiverBuilder // WithSubscriptions sets a list of TopicSubscriptions to subscribe // to when starting the receiver. Accepts *resource.TopicSubscription subscriptions. WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder // FromConfigurationProvider configures the persistent receiver with the specified properties. // The built-in ReceiverPropertiesConfigurationProvider implementations include: // ReceiverPropertyMap, a map of ReceiverProperty keys to values FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) PersistentMessageReceiverBuilder }
PersistentMessageReceiverBuilder is used for configuration of PersistentMessageReceiver.
type PersistentReceiverInfo ¶
type PersistentReceiverInfo interface { // GetResourceInfo returns the remote endpoint (resource) information for the receiver. GetResourceInfo() ResourceInfo }
PersistentReceiverInfo provides information about the receiver at runtime.
type ProvisionOutcome ¶ added in v1.7.0
type ProvisionOutcome interface { // GetError returns the low level error object if any. GetError() error // GetStatus retrives the actual outcome: true means success, false means failure. GetStatus() bool }
ProvisionOutcome - the EndpointProvisioner.Provision operation return this structure to indicate the success and the underlying error code. It is possible for the outcome to be successful and yet contain a non-nil error when the queue already exists on the broker, and the Provision function was invoked with the ignoreExists flag set.
type PublishFailureListener ¶
type PublishFailureListener func(FailedPublishEvent)
PublishFailureListener is a listener that can be registered for publish failure events.
type PublishReceipt ¶
type PublishReceipt interface { // GetUserContext retrieves the context associated with the publish, if provided. // Returns nil if no user context is set. GetUserContext() interface{} // GetTimeStamp retrieves the time. The time indicates when the event occurred, specifically the time when the // acknowledgement was received by the API from the event broker, or if present, when the GetError error // occurred. GetTimeStamp() time.Time // GetMessage returns an OutboundMessage that was successfully published. GetMessage() message.OutboundMessage // GetError retrieves an error if one occurred, which usually indicates a publish attempt failed. // GetError returns nil on a successful publish, otherwise an error if a failure occurred // while delivering the message. GetError() error // IsPersisted returns true if the event broker confirmed that the message was successfully received and persisted, // otherwise false. IsPersisted() bool }
PublishReceipt is the receipt for delivery of a persistent message.
type PublisherOverflowError ¶
type PublisherOverflowError struct {
// contains filtered or unexported fields
}
PublisherOverflowError indicates when publishing has stopped due to internal buffer limits. The pointer type *PublisherOverflowError is returned.
type PublisherReadinessListener ¶
type PublisherReadinessListener func()
PublisherReadinessListener defines a function that can be registered to receive notifications from a publisher instance for readiness.
type ReceiverState ¶
type ReceiverState byte
ReceiverState represents the various states the receiver can be in, such as Active or Passive. This property is controlled by the remote broker.
const ( // ReceiverActive is the state in which the receiver receives messages from a broker. ReceiverActive ReceiverState = iota // ReceiverPassive is the state in which the receiver would not receive messages from a broker. // Often, this is because another instance of the receiver became active, so this instance became // passive. ReceiverPassive )
type ReceiverStateChangeListener ¶
type ReceiverStateChangeListener func(oldState ReceiverState, newState ReceiverState, timestamp time.Time)
ReceiverStateChangeListener is a listener that can be registered on a receiver to be notified of changes in receiver state by the remote broker.
type ReconnectionAttemptListener ¶
type ReconnectionAttemptListener func(event ServiceEvent)
ReconnectionAttemptListener is a handler that can be registered to a MessagingService. It is called when a session is disconnected and reconnection attempts have begun.
type ReconnectionListener ¶
type ReconnectionListener func(event ServiceEvent)
ReconnectionListener is a handler that can be registered to a MessagingService. It is called when a session was disconnected and subsequently reconnected.
type Replier ¶ added in v1.6.0
type Replier interface { // Reply publishes a reply or response message. Reply(message message.OutboundMessage) error }
Replier allows for received request-reply messages to be replied to. The destination of these messages is automatically determined by the InboundMessage passed to a RequestMessageHandler.
type ReplyMessageHandler ¶ added in v1.6.0
type ReplyMessageHandler func(message message.InboundMessage, userContext interface{}, err error)
ReplyMessageHandler is a callback to handle a reply message. The function will be called with a message received or nil, the user context if it was set when calling RequestReplyMessagePublisher.Publish, and an error if one was thrown.
type RequestMessageHandler ¶ added in v1.6.0
type RequestMessageHandler func(message message.InboundMessage, replier Replier)
RequestMessageHandler is a callback called when a message is received. It is passed the request message as well as a replier allowing for the publishing of a reply message. The replier argument may be nil indicating that a NON-Request-Reply message has been received on the topic subscription given when building the RequestReplyMessageReceiver instance.
type RequestReplyMessagePublisher ¶ added in v1.6.0
type RequestReplyMessagePublisher interface { MessagePublisher MessagePublisherHealthCheck // StartAsyncCallback will start the RequestReplyMessagePublisher asynchronously. // Before this function is called, the service is considered // off-duty. To operate normally, this function must be called on // the RequestReplyMessageReceiver instance. This function is idempotent. // Returns immediately and will call the callback function when ready // passing the started RequestReplyMessageReceiver instance, or nil and // an error if one occurred. Subsequent calls will register additional // callbacks that will be called immediately if already started. StartAsyncCallback(callback func(RequestReplyMessagePublisher, error)) // TerminateAsyncCallback will terminate the message publisher asynchronously. // This function is idempotent. The only way to resume operation // after this function is called is to create a new instance. // Any attempt to call this function renders the instance // permanently terminated, even if this function completes. // A graceful shutdown will be attempted within the grace period. // A grace period of 0 implies a non-graceful shutdown that ignores // unfinished tasks or in-flight messages. // Returns immediately and registers a callback that will receive an // error if one occurred or nil if successfully and gracefully terminated. // If gracePeriod is less than 0, the function will wait indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // PublishBytes sends a request for a reply of type byte array to the specified destination. // The API will handle correlation of messages so no additional work is requried. // Takes a requestMessage to send, a replyMessageHandler function to handle the // response, a requestsDestination to deliver the requestMessage to, a replyTimeout // indicating the maximum wait time for a response message and an optional // userContext object given to the replyMessageHandler (may be nil). // Returns an error if one occurred. If replyTimeout is less than 0, the function // will wait indefinitely. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. PublishBytes(message []byte, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error // PublishString sends a request for a reply of type string to the specified destination. // The API will handle correlation of messages so no additional work is requried. // Takes a requestMessage to send, a replyMessageHandler function to handle the // response, a requestsDestination to deliver the requestMessage to, a replyTimeout // indicating the maximum wait time for a response message and an optional // userContext object given to the replyMessageHandler (may be nil). // Returns an error if one occurred. If replyTimeout is less than 0, the function // will wait indefinitely. Possible errors include: // - solace/errors.*PubSubPlusClientError - If the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError - If messages are published faster than publisher's I/O // capabilities allow. When publishing can be resumed, the registered PublisherReadinessListeners // are called. PublishString(message string, replyMessageHandler ReplyMessageHandler, destination *resource.Topic, replyTimeout time.Duration, userContext interface{}) error // Publish sends a request for a reply non-blocking with optional user context. // The API will handle correlation of messages so no additional work is requried. // Takes a requestMessage to send, a replyMessageHandler function to handle the // response, a requestsDestination to deliver the requestMessage to, a replyTimeout // indicating the maximum wait time for a response message and an optional // userContext object given to the replyMessageHandler (may be nil). // Returns an error if one occurred. If replyTimeout is less than 0, the function // will wait indefinitely. Possible errors include: // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners // will be called. Publish(requestMessage message.OutboundMessage, replyMessageHandler ReplyMessageHandler, requestsDestination *resource.Topic, replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider, userContext interface{}) error // PublishAwaitResponse will send a request for a reply blocking until a response is // received. The API will handle correlation of messages so no additional work is required. // Takes a requestMessage to send, a requestDestination to deliver the requestMessage to, // and a replyTimeout indicating the maximum wait time for a response message. // Will return the response and an error if one occurred. If replyTimeout is less than 0, // the function will wait indefinitely. Possible errors include: // - solace/errors.*PubSubPlusClientError if the message could not be sent and all retry attempts failed. // - solace/errors.*PublisherOverflowError if publishing messages faster than publisher's I/O // capabilities allow. When publishing can be resumed, registered PublisherReadinessListeners // will be called. PublishAwaitResponse(requestMessage message.OutboundMessage, requestDestination *resource.Topic, replyTimeout time.Duration, properties config.MessagePropertiesConfigurationProvider) (message.InboundMessage, error) }
RequestReplyMessagePublisher allows for publishing of request-reply messages with handling for reply messages.
type RequestReplyMessagePublisherBuilder ¶ added in v1.6.0
type RequestReplyMessagePublisherBuilder interface { // Build will build a new RequestReplyMessagePublisher instance based on the configured properties. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build() (messagePublisher RequestReplyMessagePublisher, err error) // OnBackPressureReject will set the publisher backpressure strategy to reject // where publish attempts will be rejected once the bufferSize, in number of messages, is reached. // If bufferSize is 0, an error will be thrown when the transport is full when publishing. // Valid bufferSize is >= 0. OnBackPressureReject(bufferSize uint) RequestReplyMessagePublisherBuilder // OnBackPressureWait will set the publisher backpressure strategy to wait where publish // attempts will block until there is space in the buffer of size bufferSize in number of messages. // Valid bufferSize is >= 1. OnBackPressureWait(bufferSize uint) RequestReplyMessagePublisherBuilder // FromConfigurationProvider will configure the persistent publisher with the given properties. // Built in PublisherPropertiesConfigurationProvider implementations include: // PublisherPropertyMap, a map of PublisherProperty keys to values FromConfigurationProvider(provider config.PublisherPropertiesConfigurationProvider) RequestReplyMessagePublisherBuilder }
RequestReplyMessagePublisherBuilder allows for configuration of request reply message publisher instances
type RequestReplyMessageReceiver ¶ added in v1.6.0
type RequestReplyMessageReceiver interface { MessageReceiver // StartAsyncCallback will start the message receiver asynchronously. // Before this function is called, the service is considered // off-duty. To operate normally, this function must be called on // the RequestReplyMessageReceiver instance. This function is idempotent. // Returns immediately and will call the callback function when ready // passing the started RequestReplyMessageReceiver instance, or nil and // an error if one occurred. Subsequent calls will register additional // callbacks that will be called immediately if already started. StartAsyncCallback(callback func(RequestReplyMessageReceiver, error)) // TerminateAsyncCallback will terminate the message receiver asynchronously. // This function is idempotent. The only way to resume operation // after this function is called is to create a new instance. // Any attempt to call this function renders the instance // permanently terminated, even if this function completes. // A graceful shutdown will be attempted within the grace period. // A grace period of 0 implies a non-graceful shutdown that ignores // unfinished tasks or in-flight messages. // Returns immediately and registers a callback that will receive an // error if one occurred or nil if successfully and gracefully terminated. // If gracePeriod is less than 0, the function will wait indefinitely. TerminateAsyncCallback(gracePeriod time.Duration, callback func(error)) // ReceiveAsync registers an asynchronous message handler. The given // messageHandler will handle an ordered sequence of inbound request messages. // This function is mutually exclusive to ReceiveMessage. // Returns an error one occurred while registering the callback. // If a callback is already registered, it will be replaced by the given // callback. ReceiveAsync(messageHandler RequestMessageHandler) error // ReceiveMessage receives a message and replier synchronously from the receiver. // Returns a nil replier if the message can not be replied to. // Returns an error if the receiver is not started or already terminated. // This function waits until the specified timeout to receive a message or waits // forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError // is returned. ReceiveMessage(timeout time.Duration) (message.InboundMessage, Replier, error) }
RequestReplyMessageReceiver allows receiving of request-reply messages with handling for sending reply messages.
type RequestReplyMessageReceiverBuilder ¶ added in v1.6.0
type RequestReplyMessageReceiverBuilder interface { // Build will build a new RequestReplyMessageReceiver with the given properties. // The message receiver will subscribe to the specified topic subscription. // Accepts TopicSubscription instances as Subscriptions. See solace.TopicSubscriptionOf. // Returns solace/errors.*InvalidConfigurationError if an invalid configuration is provided. Build(requestTopicSubscription resource.Subscription) (messageReceiver RequestReplyMessageReceiver, err error) // the given properties using a shared topic subscription and the shared name. BuildWithSharedSubscription(requestTopicSubscription resource.Subscription, shareName *resource.ShareName) (messageReceiver RequestReplyMessageReceiver, err error) // OnBackPressureDropLatest configures the receiver with the specified buffer size. If the buffer // is full and a message arrives, the incoming message is discarded. // A buffer of the given size will be statically allocated when the receiver is built. // The bufferCapacity must be greater than or equal to 1. OnBackPressureDropLatest(bufferCapacity uint) RequestReplyMessageReceiverBuilder // OnBackPressureDropOldest configures the receiver with the specified buffer size, bufferCapacity. If the buffer // is full and a message arrives, the oldest message in the buffer is discarded. // A buffer of the given size will be statically allocated when the receiver is built. // The value of bufferCapacity must be greater than or equal to 1. OnBackPressureDropOldest(bufferCapacity uint) RequestReplyMessageReceiverBuilder // FromConfigurationProvider will configure the request reply receiver with the given properties. // Built in ReceiverPropertiesConfigurationProvider implementations include: // ReceiverPropertyMap, a map of ReceiverProperty keys to values FromConfigurationProvider(provider config.ReceiverPropertiesConfigurationProvider) RequestReplyMessageReceiverBuilder }
RequestReplyMessageReceiverBuilder allows for configuration of RequestReplyMessageReceiver instances
type RequestReplyMessagingService ¶ added in v1.6.0
type RequestReplyMessagingService interface { // CreateRequestReplyMessagePublisherBuilder creates a new request reply message publisher // builder that can be used to configure request reply publisher instances. CreateRequestReplyMessagePublisherBuilder() RequestReplyMessagePublisherBuilder // CreateRequestReplyMessageReceiverBuilder creates a new request reply message receiver // builder that can be used to configure request reply receiver instances. CreateRequestReplyMessageReceiverBuilder() RequestReplyMessageReceiverBuilder }
RequestReplyMessagingService allows access to request reply behaviour.
type ResourceInfo ¶
type ResourceInfo interface { // GetName returns the name of the resource. GetName() string // IsDurable returns true is a resource is durable, otherwise false. IsDurable() bool }
ResourceInfo provides information about a resouce at runtime.
type ServiceEvent ¶
type ServiceEvent interface { // GetTimestamp retrieves the timestamp of the event. GetTimestamp() time.Time // GetBrokerURI retrieves the URI of the broker. GetBrokerURI() string // GetMessage retrieves the message contents. GetMessage() string // GetCause retrieves the cause of the client error. GetCause() error }
ServiceEvent interface represents a messaging service event that applications can listen for.
type ServiceInterruptionListener ¶
type ServiceInterruptionListener func(event ServiceEvent)
ServiceInterruptionListener is a handler that can be registered to a MessagingService. It is called when a session is disconncted and the connection is unrecoverable.
type ServiceUnreachableError ¶
type ServiceUnreachableError struct {
// contains filtered or unexported fields
}
ServiceUnreachableError indicates that a remote service connection could not be established. The pointer type *ServiceUnreachableError is returned.
type SubscriptionChangeListener ¶
type SubscriptionChangeListener func(subscription resource.Subscription, operation SubscriptionOperation, errOrNil error)
SubscriptionChangeListener is a callback that can be set on async subscription operations that allows for handling of success or failure. The callback will be passed the subscription in question, the operation (either SubscriptionAdded or SubscriptionRemoved), and the error or nil if no error was thrown while adding the subscription.
type SubscriptionOperation ¶
type SubscriptionOperation byte
SubscriptionOperation represents the operation that triggered a SubscriptionChangeListener callback
const ( // SubscriptionAdded is the resulting subscription operation from AddSubscriptionAsync SubscriptionAdded SubscriptionOperation = iota // SubscriptionRemoved is the resulting subscription operation from RemoveSubscription SubscriptionRemoved )
type TerminationEvent ¶
type TerminationEvent interface { // GetTimestamp retrieves the timestamp of the event. GetTimestamp() time.Time // GetMessage retrieves the event message. GetMessage() string // GetCause retrieves the cause of the client exception, if any. // Returns the error event or nil if no cause is present. GetCause() error }
TerminationEvent represents a non-recoverable receiver or publisher unsolicited termination for which applications can listen.
type TerminationNotificationListener ¶
type TerminationNotificationListener func(TerminationEvent)
TerminationNotificationListener represents a listener that can be registered with a LifecycleControl instance to listen for termination events.
type TimeoutError ¶
type TimeoutError struct {
// contains filtered or unexported fields
}
TimeoutError indicates that a timeout error occurred. The pointer type *TimeoutError is returned.
Source Files ¶
- direct_message_publisher.go
- direct_message_receiver.go
- doc.go
- endpoint_provisioner.go
- errors.go
- lifecycle.go
- message_publisher.go
- message_receiver.go
- messaging_service.go
- outbound_message_builder.go
- persistent_message_publisher.go
- persistent_message_receiver.go
- request_reply_message_publisher.go
- request_reply_message_receiver.go
Directories ¶
Path | Synopsis |
---|---|
Package config contains the following constructs used for configuration:
|
Package config contains the following constructs used for configuration: |
Package logging allows for configuration of the API's logging levels.
|
Package logging allows for configuration of the API's logging levels. |
Package message contains the type definitions of InboundMessage and OutboundMessage.
|
Package message contains the type definitions of InboundMessage and OutboundMessage. |
rgmid
Package rgmid contains the ReplicationGroupMessageID interface.
|
Package rgmid contains the ReplicationGroupMessageID interface. |
sdt
Package sdt contains the types needed to work with Structured Data on a message.
|
Package sdt contains the types needed to work with Structured Data on a message. |
Package metrics contains the various metrics that can be retrieved as well as the interface for retrieving the metrics.
|
Package metrics contains the various metrics that can be retrieved as well as the interface for retrieving the metrics. |
Package resource contains types and factory functions for various broker resources such as topics and queues.
|
Package resource contains types and factory functions for various broker resources such as topics and queues. |
Package subcode contains the subcodes returned from the Solace PubSub+ Messaging API for C. The subcodes are generated in subcode_generated.go.
|
Package subcode contains the subcodes returned from the Solace PubSub+ Messaging API for C. The subcodes are generated in subcode_generated.go. |