Documentation
¶
Index ¶
- Constants
- Variables
- func IsVersionGreaterOrEqual(version, target string) bool
- func SetLevelInfo(value int8)
- type AddressResolver
- type AutoCommitStrategy
- type BindingsOptions
- func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
- func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
- func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
- func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
- func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
- type BlockingQueue
- type Broker
- type Brokers
- type ByteCapacity
- func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
- func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
- type CPartitionClose
- type CPartitionContext
- type ChannelClose
- type ChannelPublishConfirm
- type Client
- func (c *Client) BrokerForConsumer(stream string) (*Broker, error)
- func (c *Client) BrokerLeader(stream string) (*Broker, error)
- func (c *Client) Close() error
- func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
- func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
- func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
- func (c *Client) DeleteStream(streamName string) error
- func (c *Client) DeleteSuperStream(superStream string) error
- func (c *Client) QueryPartitions(superStream string) ([]string, error)
- func (c *Client) StreamExists(stream string) bool
- func (c *Client) StreamStats(streamName string) (*StreamStats, error)
- type ClientProperties
- type Code
- type Compression
- type ConfirmationStatus
- func (cs *ConfirmationStatus) GetError() error
- func (cs *ConfirmationStatus) GetErrorCode() uint16
- func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
- func (cs *ConfirmationStatus) GetProducerID() uint8
- func (cs *ConfirmationStatus) GetPublishingId() int64
- func (cs *ConfirmationStatus) IsConfirmed() bool
- func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
- type ConnectionProperties
- type Consumer
- func (consumer *Consumer) Close() error
- func (consumer *Consumer) GetCloseHandler() chan Event
- func (consumer *Consumer) GetLastStoredOffset() int64
- func (consumer *Consumer) GetName() string
- func (consumer *Consumer) GetOffset() int64
- func (consumer *Consumer) GetStreamName() string
- func (consumer *Consumer) NotifyClose() ChannelClose
- func (consumer *Consumer) QueryOffset() (int64, error)
- func (consumer *Consumer) StoreCustomOffset(offset int64) error
- func (consumer *Consumer) StoreOffset() error
- type ConsumerContext
- type ConsumerFilter
- type ConsumerOptions
- func (c *ConsumerOptions) IsFilterEnabled() bool
- func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
- func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
- func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
- func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
- func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
- func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
- func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
- func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
- func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
- func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
- type ConsumerUpdate
- type Coordinator
- func (coordinator *Coordinator) ConsumersCount() int
- func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error)
- func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error)
- func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
- func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
- func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
- func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
- func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
- func (coordinator *Coordinator) NewProducer(parameters *ProducerOptions) (*Producer, error)
- func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
- func (coordinator *Coordinator) NewResponseWitName(id string) *Response
- func (coordinator *Coordinator) Producers() map[interface{}]interface{}
- func (coordinator *Coordinator) ProducersCount() int
- func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
- func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
- func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
- func (coordinator *Coordinator) RemoveResponseByName(id string) error
- type Environment
- func (env *Environment) Close() error
- func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
- func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
- func (env *Environment) DeleteStream(streamName string) error
- func (env *Environment) DeleteSuperStream(superStreamName string) error
- func (env *Environment) IsClosed() bool
- func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
- func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, ...) (*SuperStreamConsumer, error)
- func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
- func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
- func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
- func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
- func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
- func (env *Environment) StreamExists(streamName string) (bool, error)
- func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
- func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
- type EnvironmentOptions
- func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
- type Event
- type FilterValue
- type HashRoutingStrategy
- type HeartBeat
- type KeyRoutingStrategy
- type MessagesHandler
- type OffsetSpecification
- func (o OffsetSpecification) First() OffsetSpecification
- func (o OffsetSpecification) Last() OffsetSpecification
- func (o OffsetSpecification) LastConsumed() OffsetSpecificationdeprecated
- func (o OffsetSpecification) Next() OffsetSpecification
- func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
- func (o OffsetSpecification) String() string
- func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
- type PPartitionClose
- type PPartitionContext
- type PartitionPublishConfirm
- type PartitionsOptions
- func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
- type PostFilter
- type Producer
- func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
- func (producer *Producer) Close() error
- func (producer *Producer) GetBroker() *Broker
- func (producer *Producer) GetID() uint8
- func (producer *Producer) GetLastPublishingId() (int64, error)
- func (producer *Producer) GetName() string
- func (producer *Producer) GetOptions() *ProducerOptions
- func (producer *Producer) GetStreamName() string
- func (producer *Producer) NotifyClose() ChannelClose
- func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
- func (producer *Producer) Send(streamMessage message.StreamMessage) error
- type ProducerFilter
- type ProducerOptions
- func (po *ProducerOptions) IsFilterEnabled() bool
- func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptionsdeprecated
- func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
- func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
- func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
- func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
- func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
- func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
- type PublishFilter
- type ReaderProtocol
- type Response
- type RoutingStrategy
- type SaslConfiguration
- type SingleActiveConsumer
- type StreamMetadata
- type StreamOptions
- type StreamStats
- type StreamsMetadata
- type SuperStreamConsumer
- type SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
- type SuperStreamOptions
- type SuperStreamProducer
- func (s *SuperStreamProducer) Close() error
- func (s *SuperStreamProducer) ConnectPartition(partition string) error
- func (s *SuperStreamProducer) GetPartitions() []string
- func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
- func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
- func (s *SuperStreamProducer) Send(message message.StreamMessage) error
- type SuperStreamProducerOptions
- type TCPParameters
- type TuneState
- type Version
Constants ¶
const ( ClientVersion = "1.5.0" CommandDeletePublisher = 6 CommandQueryOffset = 11 CommandUnsubscribe = 12 CommandMetadataUpdate = 16 CommandClose = 22 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" SocketClosed = "socket client closed" MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" DeletePublisher = "deletePublisher" UnSubscribe = "unSubscribe" StreamTcpPort = "5552" )
const ( SaslConfigurationPlain = "PLAIN" SaslConfigurationExternal = "EXTERNAL" )
const ( UnitMb = "mb" UnitKb = "kb" UnitGb = "gb" UnitTb = "tb" )
const SEED = 104729
Variables ¶
var AlreadyClosed = errors.New("Already Closed")
var AuthenticationFailure = errors.New("Authentication Failure")
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
var CodeAccessRefused = errors.New("Resources Access Refused")
var ConfirmationTimoutError = errors.New("Confirmation Timeout Error")
var ConnectionClosed = errors.New("Can't Send the message, connection closed")
var ErrBlockingQueueStopped = errors.New("blocking queue stopped")
var ErrEnvironmentNotDefined = errors.New("Environment not defined")
var ErrMessageRouteNotFound = errors.New("Message Route not found for the message key")
var ErrProducerNotFound = errors.New("Producer not found in the SuperStream Producer")
var ErrSuperStreamConsumerOptionsNotDefined = errors.New("SuperStreamConsumerOptions not defined.")
var ErrSuperStreamProducerOptionsNotDefined = errors.New("SuperStreamProducerOptions not defined. The SuperStreamProducerOptions is mandatory with the RoutingStrategy")
var FilterNotSupported = errors.New("Filtering is not supported by the broker " +
"(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
var InternalError = errors.New("Internal Error")
var LeaderNotReady = errors.New("Leader not Ready yet")
var OffsetNotFoundError = errors.New("Offset not found")
var PreconditionFailed = errors.New("Precondition Failed")
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
var SingleActiveConsumerNotSupported = errors.New("Single Active Consumer is not supported by the broker " +
"(requires RabbitMQ 3.11+ and stream_single_active_consumer feature flag activated)")
var StreamAlreadyExists = errors.New("Stream Already Exists")
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
var StreamNotAvailable = errors.New("Stream Not Available")
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
var UnknownFrame = errors.New("Unknown Frame")
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
Functions ¶
func IsVersionGreaterOrEqual ¶ added in v1.3.3
func SetLevelInfo ¶
func SetLevelInfo(value int8)
Types ¶
type AddressResolver ¶
type AutoCommitStrategy ¶
type AutoCommitStrategy struct {
// contains filtered or unexported fields
}
func NewAutoCommitStrategy ¶
func NewAutoCommitStrategy() *AutoCommitStrategy
func (*AutoCommitStrategy) SetCountBeforeStorage ¶
func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy
func (*AutoCommitStrategy) SetFlushInterval ¶
func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy
type BindingsOptions ¶ added in v1.4.0
type BindingsOptions struct { Bindings []string MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewBindingsOptions ¶ added in v1.4.0
func NewBindingsOptions(bindings []string) *BindingsOptions
func (*BindingsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
func (*BindingsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
func (*BindingsOptions) SetMaxAge ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
func (*BindingsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
func (*BindingsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
type BlockingQueue ¶ added in v1.5.0
type BlockingQueue[T any] struct { // contains filtered or unexported fields }
func NewBlockingQueue ¶ added in v1.5.0
func NewBlockingQueue[T any](capacity int) *BlockingQueue[T]
NewBlockingQueue initializes a new BlockingQueue with the given capacity
func (*BlockingQueue[T]) Close ¶ added in v1.5.0
func (bq *BlockingQueue[T]) Close()
func (*BlockingQueue[T]) Enqueue ¶ added in v1.5.0
func (bq *BlockingQueue[T]) Enqueue(item T) error
Enqueue adds an item to the queue, blocking if the queue is full
func (*BlockingQueue[T]) GetChannel ¶ added in v1.5.0
func (bq *BlockingQueue[T]) GetChannel() chan T
func (*BlockingQueue[T]) IsEmpty ¶ added in v1.5.0
func (bq *BlockingQueue[T]) IsEmpty() bool
func (*BlockingQueue[T]) IsStopped ¶ added in v1.5.0
func (bq *BlockingQueue[T]) IsStopped() bool
func (*BlockingQueue[T]) Size ¶ added in v1.5.0
func (bq *BlockingQueue[T]) Size() int
func (*BlockingQueue[T]) Stop ¶ added in v1.5.0
func (bq *BlockingQueue[T]) Stop() []T
Stop stops the queue from accepting new items but allows some pending items. Stop is different from Close in that it allows the existing items to be processed. Drain the queue to be sure there are not pending messages
type Broker ¶
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
type ByteCapacity ¶
type ByteCapacity struct {
// contains filtered or unexported fields
}
func (ByteCapacity) B ¶
func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
func (ByteCapacity) From ¶
func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
func (ByteCapacity) GB ¶
func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
func (ByteCapacity) KB ¶
func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
func (ByteCapacity) MB ¶
func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
func (ByteCapacity) TB ¶
func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
type CPartitionClose ¶ added in v1.4.0
type CPartitionClose struct { Partition string Event Event Context CPartitionContext }
CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed The user can use the NotifyPartitionClose to get the channel
type CPartitionContext ¶ added in v1.4.0
type CPartitionContext interface {
ConnectPartition(partition string, offset OffsetSpecification) error
}
CPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the CPartitionContext to reconnect a partition to the SuperStreamConsumer Specifying the offset to start from
type ChannelClose ¶
type ChannelClose = <-chan Event
type ChannelPublishConfirm ¶
type ChannelPublishConfirm chan []*ConfirmationStatus
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BrokerForConsumer ¶
func (*Client) DeclarePublisher ¶
func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
func (*Client) DeclareStream ¶
func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
func (*Client) DeclareSubscriber ¶
func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Client) DeclareSuperStream ¶ added in v1.4.0
func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
func (*Client) DeleteStream ¶
func (*Client) DeleteSuperStream ¶ added in v1.4.0
func (*Client) QueryPartitions ¶ added in v1.4.0
func (*Client) StreamExists ¶
func (*Client) StreamStats ¶ added in v1.1.0
func (c *Client) StreamStats(streamName string) (*StreamStats, error)
type ClientProperties ¶
type ClientProperties struct {
// contains filtered or unexported fields
}
type Compression ¶
type Compression struct {
// contains filtered or unexported fields
}
func (Compression) Gzip ¶
func (compression Compression) Gzip() Compression
func (Compression) Lz4 ¶
func (compression Compression) Lz4() Compression
func (Compression) None ¶
func (compression Compression) None() Compression
func (Compression) Snappy ¶
func (compression Compression) Snappy() Compression
func (Compression) String ¶
func (compression Compression) String() string
func (Compression) Zstd ¶
func (compression Compression) Zstd() Compression
type ConfirmationStatus ¶
type ConfirmationStatus struct {
// contains filtered or unexported fields
}
func (*ConfirmationStatus) GetError ¶
func (cs *ConfirmationStatus) GetError() error
func (*ConfirmationStatus) GetErrorCode ¶
func (cs *ConfirmationStatus) GetErrorCode() uint16
func (*ConfirmationStatus) GetMessage ¶
func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
func (*ConfirmationStatus) GetProducerID ¶
func (cs *ConfirmationStatus) GetProducerID() uint8
func (*ConfirmationStatus) GetPublishingId ¶
func (cs *ConfirmationStatus) GetPublishingId() int64
func (*ConfirmationStatus) IsConfirmed ¶
func (cs *ConfirmationStatus) IsConfirmed() bool
func (*ConfirmationStatus) LinkedMessages ¶
func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
type ConnectionProperties ¶
type ConnectionProperties struct {
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct { ID uint8 MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*Consumer) GetCloseHandler ¶ added in v1.1.2
func (*Consumer) GetLastStoredOffset ¶
func (*Consumer) GetStreamName ¶
func (*Consumer) NotifyClose ¶
func (consumer *Consumer) NotifyClose() ChannelClose
func (*Consumer) QueryOffset ¶
func (*Consumer) StoreCustomOffset ¶
func (*Consumer) StoreOffset ¶
type ConsumerContext ¶
type ConsumerContext struct { Consumer *Consumer // contains filtered or unexported fields }
func (ConsumerContext) GetEntriesCount ¶ added in v1.3.2
func (cc ConsumerContext) GetEntriesCount() uint16
type ConsumerFilter ¶ added in v1.3.3
type ConsumerFilter struct { Values []string MatchUnfiltered bool PostFilter PostFilter }
func NewConsumerFilter ¶ added in v1.3.3
func NewConsumerFilter(values []string, matchUnfiltered bool, postFilter PostFilter) *ConsumerFilter
type ConsumerOptions ¶
type ConsumerOptions struct { ConsumerName string Offset OffsetSpecification CRCCheck bool ClientProvidedName string Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer // contains filtered or unexported fields }
func NewConsumerOptions ¶
func NewConsumerOptions() *ConsumerOptions
func (*ConsumerOptions) IsFilterEnabled ¶ added in v1.3.3
func (c *ConsumerOptions) IsFilterEnabled() bool
func (*ConsumerOptions) IsSingleActiveConsumerEnabled ¶ added in v1.4.0
func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
func (*ConsumerOptions) SetAutoCommit ¶
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
func (*ConsumerOptions) SetCRCCheck ¶
func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
func (*ConsumerOptions) SetClientProvidedName ¶ added in v1.3.1
func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
func (*ConsumerOptions) SetConsumerName ¶
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
func (*ConsumerOptions) SetFilter ¶ added in v1.3.3
func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
func (*ConsumerOptions) SetInitialCredits ¶ added in v1.0.2
func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
func (*ConsumerOptions) SetManualCommit ¶
func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
func (*ConsumerOptions) SetOffset ¶
func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
func (*ConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
type ConsumerUpdate ¶ added in v1.4.0
type ConsumerUpdate func(streamName string, isActive bool) OffsetSpecification
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator() *Coordinator
func (*Coordinator) ConsumersCount ¶
func (coordinator *Coordinator) ConsumersCount() int
func (*Coordinator) ExtractConsumerById ¶ added in v1.5.0
func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error)
func (*Coordinator) ExtractProducerById ¶ added in v1.5.0
func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error)
func (*Coordinator) GetConsumerById ¶
func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
func (*Coordinator) GetProducerById ¶
func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
func (*Coordinator) GetResponseById ¶
func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
func (*Coordinator) GetResponseByName ¶
func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
func (*Coordinator) NewConsumer ¶
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
Consumer functions
func (*Coordinator) NewProducer ¶
func (coordinator *Coordinator) NewProducer( parameters *ProducerOptions) (*Producer, error)
producersEnvironment
func (*Coordinator) NewResponse ¶
func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
func (*Coordinator) NewResponseWitName ¶
func (coordinator *Coordinator) NewResponseWitName(id string) *Response
func (*Coordinator) Producers ¶
func (coordinator *Coordinator) Producers() map[interface{}]interface{}
func (*Coordinator) ProducersCount ¶
func (coordinator *Coordinator) ProducersCount() int
func (*Coordinator) RemoveConsumerById ¶
func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
func (*Coordinator) RemoveProducerById ¶
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
func (*Coordinator) RemoveResponseById ¶
func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
func (*Coordinator) RemoveResponseByName ¶
func (coordinator *Coordinator) RemoveResponseByName(id string) error
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(options *EnvironmentOptions) (*Environment, error)
func (*Environment) Close ¶
func (env *Environment) Close() error
func (*Environment) DeclareStream ¶
func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
func (*Environment) DeclareSuperStream ¶ added in v1.4.0
func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
func (*Environment) DeleteStream ¶
func (env *Environment) DeleteStream(streamName string) error
func (*Environment) DeleteSuperStream ¶ added in v1.4.0
func (env *Environment) DeleteSuperStream(superStreamName string) error
func (*Environment) IsClosed ¶
func (env *Environment) IsClosed() bool
func (*Environment) NewConsumer ¶
func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Environment) NewProducer ¶
func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
func (*Environment) NewSuperStreamConsumer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, options *SuperStreamConsumerOptions) (*SuperStreamConsumer, error)
func (*Environment) NewSuperStreamProducer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
func (*Environment) QueryOffset ¶
func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
func (*Environment) QueryPartitions ¶ added in v1.4.0
func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
func (*Environment) QueryRoute ¶ added in v1.4.0
func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
func (*Environment) QuerySequence ¶
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
func (*Environment) StreamExists ¶
func (env *Environment) StreamExists(streamName string) (bool, error)
func (*Environment) StreamMetaData ¶
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
func (*Environment) StreamStats ¶ added in v1.1.0
func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
type EnvironmentOptions ¶
type EnvironmentOptions struct { ConnectionParameters []*Broker TCPParameters *TCPParameters SaslConfiguration *SaslConfiguration MaxProducersPerClient int MaxConsumersPerClient int AddressResolver *AddressResolver RPCTimeout time.Duration }
func NewEnvironmentOptions ¶
func NewEnvironmentOptions() *EnvironmentOptions
func (*EnvironmentOptions) IsTLS ¶
func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
func (*EnvironmentOptions) SetAddressResolver ¶
func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
func (*EnvironmentOptions) SetHost ¶
func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxConsumersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxProducersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetNoDelay ¶
func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
func (*EnvironmentOptions) SetPassword ¶
func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
func (*EnvironmentOptions) SetPort ¶
func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
func (*EnvironmentOptions) SetRPCTimeout ¶ added in v1.4.5
func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetReadBuffer ¶
func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedHeartbeat ¶
func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedMaxFrameSize ¶
func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
func (*EnvironmentOptions) SetSaslConfiguration ¶ added in v1.2.0
func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
func (*EnvironmentOptions) SetTLSConfig ¶
func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
func (*EnvironmentOptions) SetUri ¶
func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
func (*EnvironmentOptions) SetUris ¶
func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
func (*EnvironmentOptions) SetUser ¶
func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
func (*EnvironmentOptions) SetVHost ¶
func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
func (*EnvironmentOptions) SetWriteBuffer ¶
func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
type FilterValue ¶ added in v1.3.3
type FilterValue func(message message.StreamMessage) string
type HashRoutingStrategy ¶ added in v1.4.0
type HashRoutingStrategy struct {
RoutingKeyExtractor func(message message.StreamMessage) string
}
func NewHashRoutingStrategy ¶ added in v1.4.0
func NewHashRoutingStrategy(routingKeyExtractor func(message message.StreamMessage) string) *HashRoutingStrategy
func (*HashRoutingStrategy) Route ¶ added in v1.4.0
func (h *HashRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*HashRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type KeyRoutingStrategy ¶ added in v1.4.0
type KeyRoutingStrategy struct { // provided by the user to define the key based on a message RoutingKeyExtractor func(message message.StreamMessage) string // contains filtered or unexported fields }
KeyRoutingStrategy is a routing strategy that uses the key of the message
func NewKeyRoutingStrategy ¶ added in v1.4.0
func NewKeyRoutingStrategy( routingKeyExtractor func(message message.StreamMessage) string) *KeyRoutingStrategy
func (*KeyRoutingStrategy) Route ¶ added in v1.4.0
func (k *KeyRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*KeyRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type MessagesHandler ¶
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
type OffsetSpecification ¶
type OffsetSpecification struct {
// contains filtered or unexported fields
}
func (OffsetSpecification) First ¶
func (o OffsetSpecification) First() OffsetSpecification
func (OffsetSpecification) Last ¶
func (o OffsetSpecification) Last() OffsetSpecification
func (OffsetSpecification) LastConsumed
deprecated
func (o OffsetSpecification) LastConsumed() OffsetSpecification
Deprecated: The method name may be misleading. The method does not indicate the last message consumed of the stream but the last stored offset. The method was added to help the user, but it created confusion. Use `QueryOffset` instead.:
offset, err := env.QueryOffset(consumerName, streamName) // check the error .... SetOffset(stream.OffsetSpecification{}.Offset(offset)).
So in this way it possible to start from the last offset stored and customize the behavior
func (OffsetSpecification) Next ¶
func (o OffsetSpecification) Next() OffsetSpecification
func (OffsetSpecification) Offset ¶
func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
func (OffsetSpecification) String ¶
func (o OffsetSpecification) String() string
func (OffsetSpecification) Timestamp ¶
func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
type PPartitionClose ¶ added in v1.4.0
type PPartitionClose struct { Partition string Event Event Context PPartitionContext }
PPartitionClose is a struct that is used to notify the user when a partition from a producer is closed The user can use the NotifyPartitionClose to get the channel
type PPartitionContext ¶ added in v1.4.0
PPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the PPartitionContext to reconnect a partition to the SuperStreamProducer
type PartitionPublishConfirm ¶ added in v1.4.0
type PartitionPublishConfirm struct { Partition string ConfirmationStatus []*ConfirmationStatus }
PartitionPublishConfirm is a struct that is used to notify the user when a message is confirmed or not per partition The user can use the NotifyPublishConfirmation to get the channel
type PartitionsOptions ¶ added in v1.4.0
type PartitionsOptions struct { Partitions int MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewPartitionsOptions ¶ added in v1.4.0
func NewPartitionsOptions(partitions int) *PartitionsOptions
func (*PartitionsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
func (*PartitionsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
func (*PartitionsOptions) SetMaxAge ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
func (*PartitionsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
func (*PartitionsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
type PostFilter ¶ added in v1.3.3
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) BatchSend ¶
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. The method is synchronous.The aggregation is up to the user. The user has to aggregate the messages and send them in a batch. BatchSend is not affected by the BatchSize and BatchPublishingDelay options. returns an error if the message could not be sent for marshal problems or if the buffer is too large
func (*Producer) Close ¶
Close closes the producer and returns an error if the producer could not be closed.
func (*Producer) GetLastPublishingId ¶
GetLastPublishingId returns the last publishing id sent by the producer given the producer name. this function is useful when you need to know the last message sent by the producer in case of deduplication.
func (*Producer) GetOptions ¶
func (producer *Producer) GetOptions() *ProducerOptions
func (*Producer) GetStreamName ¶
func (*Producer) NotifyClose ¶
func (producer *Producer) NotifyClose() ChannelClose
NotifyClose returns a channel that receives the close event of the producer.
func (*Producer) NotifyPublishConfirmation ¶
func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer.
func (*Producer) Send ¶
func (producer *Producer) Send(streamMessage message.StreamMessage) error
Send sends a message to the stream and returns an error if the message could not be sent. The Send is asynchronous. The message is sent to a channel ant then other goroutines aggregate and sent the messages The Send is dynamic so the number of messages sent decided internally based on the BatchSize and the messages in the buffer. The aggregation is up to the client. returns an error if the message could not be sent for marshal problems or if the buffer is too large
type ProducerFilter ¶ added in v1.3.3
type ProducerFilter struct {
FilterValue FilterValue
}
func NewProducerFilter ¶ added in v1.3.3
func NewProducerFilter(filterValue FilterValue) *ProducerFilter
type ProducerOptions ¶
type ProducerOptions struct { // Producer name. You need to set it to enable the deduplication feature. // Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream. // see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information. // Don't use it if you don't need the deduplication. Name string QueueSize int // Internal queue to handle back-pressure. BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() // Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases // It is not used anymore given the dynamic batching BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() // Size of sub Entry, to aggregate more subEntry using one publishing id SubEntrySize int // Compression type, it is valid only if SubEntrySize > 1 // The messages can be compressed before sending them to the server Compression Compression // Time to wait for the confirmation, see the unConfirmed structure ConfirmationTimeOut time.Duration // Client provider name that will be shown in the management UI ClientProvidedName string // Enable the filter feature, by default is disabled. Pointer nil Filter *ProducerFilter // contains filtered or unexported fields }
func NewProducerOptions ¶
func NewProducerOptions() *ProducerOptions
func (*ProducerOptions) IsFilterEnabled ¶ added in v1.3.3
func (po *ProducerOptions) IsFilterEnabled() bool
IsFilterEnabled returns true if the filter is enabled
func (*ProducerOptions) SetBatchPublishingDelay
deprecated
func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases It is not used anymore given the dynamic batching
func (*ProducerOptions) SetBatchSize ¶
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
SetBatchSize sets the batch size for the producer The batch size is the number of messages that are aggregated before sending them to the server The SendBatch splits the messages in multiple frames if the messages are bigger than the BatchSize
func (*ProducerOptions) SetClientProvidedName ¶ added in v1.3.1
func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
SetClientProvidedName sets the client provided name that will be shown in the management UI
func (*ProducerOptions) SetCompression ¶
func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
SetCompression sets the compression for the producer. See ProducerOptions.Compression for more details
func (*ProducerOptions) SetConfirmationTimeOut ¶ added in v1.3.1
func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
SetConfirmationTimeOut sets the time to wait for the confirmation. See ProducerOptions.ConfirmationTimeOut for more details
func (*ProducerOptions) SetFilter ¶ added in v1.3.3
func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
SetFilter sets the filter for the producer. See ProducerOptions.Filter for more details
func (*ProducerOptions) SetProducerName ¶
func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
SetProducerName sets the producer name. This name is used to enable the deduplication feature. See ProducerOptions.Name for more details. Don't use it if you don't need the deduplication.
func (*ProducerOptions) SetQueueSize ¶
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
SetQueueSize See ProducerOptions.QueueSize for more details
func (*ProducerOptions) SetSubEntrySize ¶
func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
SetSubEntrySize See the ProducerOptions.SubEntrySize for more details
type PublishFilter ¶ added in v1.3.3
type PublishFilter struct { }
func (PublishFilter) GetCommandKey ¶ added in v1.3.3
func (p PublishFilter) GetCommandKey() uint16
func (PublishFilter) GetMaxVersion ¶ added in v1.3.3
func (p PublishFilter) GetMaxVersion() uint16
func (PublishFilter) GetMinVersion ¶ added in v1.3.3
func (p PublishFilter) GetMinVersion() uint16
type ReaderProtocol ¶
type RoutingStrategy ¶ added in v1.4.0
type RoutingStrategy interface { //Route Based on the message and the partitions the routing strategy returns the partitions where the message should be sent // It could be zero, one or more partitions Route(message message.StreamMessage, partitions []string) ([]string, error) // SetRouteParameters is useful for the routing key strategies to set the query route function // or in general to set the parameters needed by the routing strategy SetRouteParameters(superStream string, queryRoute func(superStream string, routingKey string) ([]string, error)) }
type SaslConfiguration ¶ added in v1.2.0
type SaslConfiguration struct {
Mechanism string
}
type SingleActiveConsumer ¶ added in v1.4.0
type SingleActiveConsumer struct { Enabled bool // ConsumerUpdate is the function that will be called when the consumer is promoted // that is when the consumer is active. The function will receive a boolean that is true // the user can decide to return a new offset to start from. ConsumerUpdate ConsumerUpdate // contains filtered or unexported fields }
func NewSingleActiveConsumer ¶ added in v1.4.0
func NewSingleActiveConsumer(ConsumerUpdate ConsumerUpdate) *SingleActiveConsumer
func (*SingleActiveConsumer) SetEnabled ¶ added in v1.4.0
func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer
type StreamMetadata ¶
type StreamMetadata struct { Leader *Broker Replicas []*Broker // contains filtered or unexported fields }
func (StreamMetadata) New ¶
func (StreamMetadata) New(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (StreamMetadata) String ¶
func (sm StreamMetadata) String() string
type StreamOptions ¶
type StreamOptions struct { MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity }
func NewStreamOptions ¶
func NewStreamOptions() *StreamOptions
func (*StreamOptions) SetMaxAge ¶
func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions
func (*StreamOptions) SetMaxLengthBytes ¶
func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions
func (*StreamOptions) SetMaxSegmentSizeBytes ¶
func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions
type StreamStats ¶ added in v1.1.0
type StreamStats struct {
// contains filtered or unexported fields
}
func (*StreamStats) CommittedChunkId ¶ added in v1.1.0
func (s *StreamStats) CommittedChunkId() (int64, error)
CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
It is the offset of the first message in the last chunk confirmed by a quorum of the stream cluster members (leader and replicas). The committed chunk ID is a good indication of what the last offset of a stream can be at a given time. The value can be stale as soon as the application reads it though, as the committed chunk ID for a stream that is published to changes all the time. return committed offset in this stream Error if there is no committed chunk yet
func (*StreamStats) FirstOffset ¶ added in v1.1.0
func (s *StreamStats) FirstOffset() (int64, error)
FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet
func (*StreamStats) LastOffset
deprecated
added in
v1.1.0
func (s *StreamStats) LastOffset() (int64, error)
Deprecated: The method name may be misleading. It does not indicate the last offset of the stream. It indicates the last uncommited chunk id. This information is not necessary. The user should use CommittedChunkId().
type StreamsMetadata ¶
type StreamsMetadata struct {
// contains filtered or unexported fields
}
func (*StreamsMetadata) Add ¶
func (smd *StreamsMetadata) Add(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (*StreamsMetadata) Get ¶
func (smd *StreamsMetadata) Get(stream string) *StreamMetadata
func (StreamsMetadata) New ¶
func (StreamsMetadata) New() *StreamsMetadata
type SuperStreamConsumer ¶ added in v1.4.0
type SuperStreamConsumer struct { SuperStream string SuperStreamConsumerOptions *SuperStreamConsumerOptions MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*SuperStreamConsumer) Close ¶ added in v1.4.0
func (s *SuperStreamConsumer) Close() error
func (*SuperStreamConsumer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSpecification) error
func (*SuperStreamConsumer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamConsumer) NotifyPartitionClose(size int) chan CPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
type SuperStreamConsumerOptions ¶ added in v1.4.0
type SuperStreamConsumerOptions struct { ClientProvidedName string Offset OffsetSpecification Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer ConsumerName string AutoCommitStrategy *AutoCommitStrategy Autocommit bool }
func NewSuperStreamConsumerOptions ¶ added in v1.4.0
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetAutoCommit ¶ added in v1.4.3
func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetClientProvidedName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetConsumerName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetFilter ¶ added in v1.4.2
func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetManualCommit ¶ added in v1.4.6
func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetOffset ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
type SuperStreamOptions ¶ added in v1.4.0
type SuperStreamOptions interface {
// contains filtered or unexported methods
}
type SuperStreamProducer ¶ added in v1.4.0
type SuperStreamProducer struct { // public SuperStream string SuperStreamProducerOptions *SuperStreamProducerOptions // contains filtered or unexported fields }
func (*SuperStreamProducer) Close ¶ added in v1.4.0
func (s *SuperStreamProducer) Close() error
func (*SuperStreamProducer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamProducer) ConnectPartition(partition string) error
ConnectPartition connects a partition to the SuperStreamProducer part of PPartitionContext interface The super stream producer is a producer that can send messages to multiple partitions that are hidden to the user. with the ConnectPartition the user can re-connect a partition to the SuperStreamProducer that should be used only in case of disconnection
func (*SuperStreamProducer) GetPartitions ¶ added in v1.4.0
func (s *SuperStreamProducer) GetPartitions() []string
func (*SuperStreamProducer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
func (*SuperStreamProducer) NotifyPublishConfirmation ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
NotifyPublishConfirmation returns a channel that will be notified when a message is confirmed or not per partition size is the size of the channel
func (*SuperStreamProducer) Send ¶ added in v1.4.0
func (s *SuperStreamProducer) Send(message message.StreamMessage) error
Send sends a message to the partitions based on the routing strategy
type SuperStreamProducerOptions ¶ added in v1.4.0
type SuperStreamProducerOptions struct { RoutingStrategy RoutingStrategy ClientProvidedName string Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil }
func NewSuperStreamProducerOptions ¶ added in v1.4.0
func NewSuperStreamProducerOptions(routingStrategy RoutingStrategy) *SuperStreamProducerOptions
NewSuperStreamProducerOptions creates a new SuperStreamProducerOptions The RoutingStrategy is mandatory
func (SuperStreamProducerOptions) SetClientProvidedName ¶ added in v1.4.0
func (o SuperStreamProducerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamProducerOptions
func (SuperStreamProducerOptions) SetFilter ¶ added in v1.4.2
func (o SuperStreamProducerOptions) SetFilter(filter *ProducerFilter) *SuperStreamProducerOptions
type TCPParameters ¶
Source Files
¶
- aggregation.go
- available_features.go
- blocking_queue.go
- brokers.go
- buffer_reader.go
- buffer_writer.go
- client.go
- constants.go
- consumer.go
- converters.go
- coordinator.go
- environment.go
- exchange_commands.go
- listeners.go
- producer.go
- producer_unconfirmed.go
- server_frame.go
- socket.go
- stream_options.go
- stream_stats.go
- super_stream.go
- super_stream_consumer.go
- super_stream_producer.go
- utils.go