Documentation
¶
Overview ¶
Package stream docs are in need of some❤️ TODO: write package docs See https://github.com/rabbitmq/amqp091-go/blob/main/doc.go for inspiration.
Index ¶
- Constants
- Variables
- func NewHeartBeater(duration time.Duration, client raw.Clienter, logger *slog.Logger) *heartBeater
- type ByteCapacity
- type CreateStreamOptions
- type DoneChan
- type Environment
- func (e *Environment) Close(ctx context.Context)
- func (e *Environment) CreateStream(ctx context.Context, name string, opts CreateStreamOptions) error
- func (e *Environment) DeleteStream(ctx context.Context, name string) error
- func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) (uint64, error)
- func (e *Environment) QueryPartitions(ctx context.Context, superstream string) ([]string, error)
- func (e *Environment) QuerySequence(ctx context.Context, reference, stream string) (uint64, error)
- func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats, error)
- type EnvironmentConfiguration
- type EnvironmentConfigurationOption
- func WithId(id string) EnvironmentConfigurationOption
- func WithLazyInitialization(lazy bool) EnvironmentConfigurationOption
- func WithMaxConsumersByConnection(n int) EnvironmentConfigurationOption
- func WithMaxProducersByConnection(n int) EnvironmentConfigurationOption
- func WithMaxTrackingConsumersByConnection(n int) EnvironmentConfigurationOption
- func WithUri(uri string) EnvironmentConfigurationOption
- func WithUris(uris ...string) EnvironmentConfigurationOption
- type Stats
- type StreamOptionsdeprecated
Constants ¶
const ( DefaultUri = "rabbitmq-stream://guest:guest@localhost:5552/%2f" DefaultHost = "localhost" DefaultPort = 5552 DefaultUsername = "guest" DefaultPassword = "guest" DefaultVirtualHost = "/" DefaultMaxProducersByConnection = 255 DefaultMaxTrackingConsumersByConnection = 50 DefaultMaxConsumersByConnection = 255 DefaultLazyInitialization = false DefaultId = "rabbitmq-stream" )
Default values used in EnvironmentConfiguration instead of zero-values
const ( Kilobyte = 1_000 Megabyte = Kilobyte * 1_000 Gigabyte = Megabyte * 1_000 Terabyte = Gigabyte * 1_000 )
const ( // DefaultTimeout in all network calls. DefaultTimeout = time.Second * 30 )
Variables ¶
var ( ErrNoLocators = fmt.Errorf("no locators configured") ErrUnsupportedOperation = fmt.Errorf("unsupported operation") )
Functions ¶
Types ¶
type ByteCapacity ¶
type ByteCapacity uint64
func (ByteCapacity) String ¶
func (b ByteCapacity) String() string
type CreateStreamOptions ¶
type CreateStreamOptions struct { MaxAge time.Duration MaxLength ByteCapacity MaxSegmentSize ByteCapacity }
type DoneChan ¶
type DoneChan struct { C chan struct{} // contains filtered or unexported fields }
func NewDoneChan ¶
func NewDoneChan() *DoneChan
func (*DoneChan) GracefulClose ¶
func (dc *DoneChan) GracefulClose()
GracefulClose closes the DoneChan only if the Done chan is not already closed.
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error)
func (*Environment) Close ¶
func (e *Environment) Close(ctx context.Context)
Close the connection to RabbitMQ server. This function closes all connections to RabbitMQ gracefully. A graceful disconnection sends a close request to RabbitMQ and awaits a confirmation response. If there's any error closing a connection, the error is logged to a logger extracted from the context.
func (*Environment) CreateStream ¶
func (e *Environment) CreateStream(ctx context.Context, name string, opts CreateStreamOptions) error
CreateStream with name and given options.
func (*Environment) DeleteStream ¶
func (e *Environment) DeleteStream(ctx context.Context, name string) error
DeleteStream with given name. Returns an error if the stream does not exist, or if any unknown error occurs. The context may carry a slog.Logger to log operations and intermediate errors, if any.
See also: raw.NewContextWithLogger
func (*Environment) QueryOffset ¶
QueryOffset retrieves the last consumer offset stored for a given consumer name and stream name.
func (*Environment) QueryPartitions ¶
QueryPartitions returns a list of partition streams for a given superstream name
func (*Environment) QuerySequence ¶
QuerySequence retrieves the last publishingID for a given producer (reference) and stream name.
func (*Environment) QueryStreamStats ¶
QueryStreamStats queries the server for Stats from a given stream name. Stats available are 'first offset id' and 'committed chunk id'
This command is available in RabbitMQ 3.11+
type EnvironmentConfiguration ¶
type EnvironmentConfiguration struct { // The URI of the nodes to try to connect to (cluster). This takes precedence // over URI and Host + Port. // // If Uris, Uri, Host and Port are zero-values, it will // default to "rabbitmq-stream://guest:guest@localhost:5552/%2f" Uris []string // The URI of the node to connect to (single node). This takes precedence over Host + Port. // // If Uris, Uri, Host and Port are zero-values, it will // default to "rabbitmq-stream://guest:guest@localhost:5552/%2f" Uri string // Host to connect to. Uris and Uri take precedence over this. Leave Uris and Uri unset // to use this and Port for connection. // // If Uris, Uri, Host and Port are zero-values, it will // default to "localhost" Host string // Port to use. Uris and Uri take precedence over this. Leave Uris and Uri unset // to use this and Host for connection. // // If Uris, Uri, Host and Port are zero-values, it will //default to 5552 Port int // Username to use to connect. // // Default: "guest" Username string // Password to use to connect // // Default: "guest" Password string // Virtual host to connect to // // Default: "/" VirtualHost string // The maximum number of `Producer` instances a single connection can maintain // before a new connection is open. The value must be between 1 and 255 // // Default: 255 MaxProducersByConnection int // The maximum number of `Consumer` instances that store their offset a single // connection can maintain before a new connection is open. The value must be // between 1 and 255 // // Default: 50 MaxTrackingConsumersByConnection int // The maximum number of `Consumer` instances a single connection can maintain // before a new connection is open. The value must be between 1 and 255 // // Default: 255 MaxConsumersByConnection int // To delay the connection opening until necessary // // Default: false LazyInitialization bool // Informational ID for the environment instance. Used as a prefix for connection // names // // Default: "rabbitmq-stream" Id string }
func NewEnvironmentConfiguration ¶
func NewEnvironmentConfiguration(options ...EnvironmentConfigurationOption) EnvironmentConfiguration
type EnvironmentConfigurationOption ¶
type EnvironmentConfigurationOption func(*EnvironmentConfiguration)
func WithId ¶
func WithId(id string) EnvironmentConfigurationOption
WithId configures the environment informational ID for the environment instance. Used as a prefix for connection names.
func WithLazyInitialization ¶
func WithLazyInitialization(lazy bool) EnvironmentConfigurationOption
WithLazyInitialization configures the environment to use lazy initialization. With lazy initialization enabled, it will delay the connection opening until necessary.
func WithMaxConsumersByConnection ¶
func WithMaxConsumersByConnection(n int) EnvironmentConfigurationOption
WithMaxConsumersByConnection configures the environment with N maximum number of consumers per connection. Additional connections will be created when this number is reached.
Valid range is 1 <= N <= 255. If N < 1, max consumers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.
func WithMaxProducersByConnection ¶
func WithMaxProducersByConnection(n int) EnvironmentConfigurationOption
WithMaxProducersByConnection configures the environment with N maximum number of producers per connection. Additional connections will be created when this number is reached.
Valid range is 1 <= N <= 255. If N < 1, max producers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.
func WithMaxTrackingConsumersByConnection ¶
func WithMaxTrackingConsumersByConnection(n int) EnvironmentConfigurationOption
WithMaxTrackingConsumersByConnection configures the environment with N maximum number of tracking consumers per connection. Additional connections will be created when this number is reached.
Consumers that don't use automatic offset tracking strategy do not count towards this limit.
Valid range is 1 <= N <= 255. If N < 1, max tracking consumers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.
func WithUri ¶
func WithUri(uri string) EnvironmentConfigurationOption
WithUri configures the environment with the attributes from the URI. URI must conform to the general form:
[scheme:][//[userinfo@]host][/]path
Whilst it is possible to write a URI as "/some-vhost", it is advisable to write the full URI to avoid ambiguities during parsing.
func WithUris ¶
func WithUris(uris ...string) EnvironmentConfigurationOption
WithUris configures the environment with the attributes from the first URI, and keeps the other URIs. Having multiple URIs is useful in clusters, so that different URIs can be tried if one RabbitMQ server becomes unavailable.
URI must conform to the general form:
[scheme:][//[userinfo@]host][/]path
Whilst it is possible to write a URI as "/some-vhost", it is advisable to write the full URI to avoid ambiguities during parsing.
type Stats ¶
type Stats struct {
// contains filtered or unexported fields
}
func (Stats) CommittedChunkId ¶
func (Stats) FirstOffset ¶
type StreamOptions
deprecated
type StreamOptions = CreateStreamOptions
StreamOptions is an alias for backwards compatibility with v1 of this client.
Deprecated: use CreateStreamOptions. This alias is kept for backwards compatibility