Documentation ¶
Index ¶
- Constants
- Variables
- func IsVersionGreaterOrEqual(version, target string) bool
- func SetLevelInfo(value int8)
- type AddressResolver
- type AutoCommitStrategy
- type BindingsOptions
- func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
- func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
- func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
- func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
- func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
- type Broker
- type Brokers
- type ByteCapacity
- func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
- func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
- type CPartitionClose
- type CPartitionContext
- type ChannelClose
- type ChannelPublishConfirm
- type Client
- func (c *Client) BrokerForConsumer(stream string) (*Broker, error)
- func (c *Client) BrokerLeader(stream string) (*Broker, error)
- func (c *Client) Close() error
- func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
- func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
- func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
- func (c *Client) DeleteStream(streamName string) error
- func (c *Client) DeleteSuperStream(superStream string) error
- func (c *Client) QueryPartitions(superStream string) ([]string, error)
- func (c *Client) StreamExists(stream string) bool
- func (c *Client) StreamStats(streamName string) (*StreamStats, error)
- type ClientProperties
- type Code
- type Compression
- type ConfirmationStatus
- func (cs *ConfirmationStatus) GetError() error
- func (cs *ConfirmationStatus) GetErrorCode() uint16
- func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
- func (cs *ConfirmationStatus) GetProducerID() uint8
- func (cs *ConfirmationStatus) GetPublishingId() int64
- func (cs *ConfirmationStatus) IsConfirmed() bool
- func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
- type ConnectionProperties
- type Consumer
- func (consumer *Consumer) Close() error
- func (consumer *Consumer) GetCloseHandler() chan Event
- func (consumer *Consumer) GetLastStoredOffset() int64
- func (consumer *Consumer) GetName() string
- func (consumer *Consumer) GetOffset() int64
- func (consumer *Consumer) GetStreamName() string
- func (consumer *Consumer) NotifyClose() ChannelClose
- func (consumer *Consumer) QueryOffset() (int64, error)
- func (consumer *Consumer) StoreCustomOffset(offset int64) error
- func (consumer *Consumer) StoreOffset() error
- type ConsumerContext
- type ConsumerFilter
- type ConsumerOptions
- func (c *ConsumerOptions) IsFilterEnabled() bool
- func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
- func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
- func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
- func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
- func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
- func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
- func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
- func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
- func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
- func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
- type ConsumerUpdate
- type Coordinator
- func (coordinator *Coordinator) ConsumersCount() int
- func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
- func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
- func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
- func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
- func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
- func (coordinator *Coordinator) NewProducer(parameters *ProducerOptions) (*Producer, error)
- func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
- func (coordinator *Coordinator) NewResponseWitName(id string) *Response
- func (coordinator *Coordinator) Producers() map[interface{}]interface{}
- func (coordinator *Coordinator) ProducersCount() int
- func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
- func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
- func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
- func (coordinator *Coordinator) RemoveResponseByName(id string) error
- type Environment
- func (env *Environment) Close() error
- func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
- func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
- func (env *Environment) DeleteStream(streamName string) error
- func (env *Environment) DeleteSuperStream(superStreamName string) error
- func (env *Environment) IsClosed() bool
- func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
- func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, ...) (*SuperStreamConsumer, error)
- func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
- func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
- func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
- func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
- func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
- func (env *Environment) StreamExists(streamName string) (bool, error)
- func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
- func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
- type EnvironmentOptions
- func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
- type Event
- type FilterValue
- type HashRoutingStrategy
- type HeartBeat
- type KeyRoutingStrategy
- type MessagesHandler
- type OffsetSpecification
- func (o OffsetSpecification) First() OffsetSpecification
- func (o OffsetSpecification) Last() OffsetSpecification
- func (o OffsetSpecification) LastConsumed() OffsetSpecificationdeprecated
- func (o OffsetSpecification) Next() OffsetSpecification
- func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
- func (o OffsetSpecification) String() string
- func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
- type PPartitionClose
- type PPartitionContext
- type PartitionPublishConfirm
- type PartitionsOptions
- func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
- type PostFilter
- type Producer
- func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
- func (producer *Producer) Close() error
- func (producer *Producer) GetBroker() *Broker
- func (producer *Producer) GetID() uint8
- func (producer *Producer) GetLastPublishingId() (int64, error)
- func (producer *Producer) GetName() string
- func (producer *Producer) GetOptions() *ProducerOptions
- func (producer *Producer) GetStreamName() string
- func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
- func (producer *Producer) NotifyClose() ChannelClose
- func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
- func (producer *Producer) Send(streamMessage message.StreamMessage) error
- type ProducerFilter
- type ProducerOptions
- func (po *ProducerOptions) IsFilterEnabled() bool
- func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
- func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
- func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
- func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
- func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
- func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
- func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
- type PublishFilter
- type ReaderProtocol
- type Response
- type RoutingStrategy
- type SaslConfiguration
- type SingleActiveConsumer
- type StreamMetadata
- type StreamOptions
- type StreamStats
- type StreamsMetadata
- type SuperStreamConsumer
- type SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
- type SuperStreamOptions
- type SuperStreamProducer
- func (s *SuperStreamProducer) Close() error
- func (s *SuperStreamProducer) ConnectPartition(partition string) error
- func (s *SuperStreamProducer) GetPartitions() []string
- func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
- func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
- func (s *SuperStreamProducer) Send(message message.StreamMessage) error
- type SuperStreamProducerOptions
- type TCPParameters
- type TuneState
- type Version
Constants ¶
const ( ClientVersion = "1.4.11" CommandDeletePublisher = 6 CommandQueryOffset = 11 CommandUnsubscribe = 12 CommandMetadataUpdate = 16 CommandClose = 22 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" SocketClosed = "socket client closed" MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" StreamTcpPort = "5552" )
const ( SaslConfigurationPlain = "PLAIN" SaslConfigurationExternal = "EXTERNAL" )
const ( UnitMb = "mb" UnitKb = "kb" UnitGb = "gb" UnitTb = "tb" )
const SEED = 104729
Variables ¶
var AlreadyClosed = errors.New("Already Closed")
var AuthenticationFailure = errors.New("Authentication Failure")
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
var CodeAccessRefused = errors.New("Resources Access Refused")
var ConfirmationTimoutError = errors.New("Confirmation Timeout Error")
var ConnectionClosed = errors.New("Can't Send the message, connection closed")
var ErrEnvironmentNotDefined = errors.New("Environment not defined")
var ErrMessageRouteNotFound = errors.New("Message Route not found for the message key")
var ErrProducerNotFound = errors.New("Producer not found in the SuperStream Producer")
var ErrSuperStreamConsumerOptionsNotDefined = errors.New("SuperStreamConsumerOptions not defined.")
var ErrSuperStreamProducerOptionsNotDefined = errors.New("SuperStreamProducerOptions not defined. The SuperStreamProducerOptions is mandatory with the RoutingStrategy")
var FilterNotSupported = errors.New("Filtering is not supported by the broker " +
"(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
var InternalError = errors.New("Internal Error")
var LeaderNotReady = errors.New("Leader not Ready yet")
var OffsetNotFoundError = errors.New("Offset not found")
var PreconditionFailed = errors.New("Precondition Failed")
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
var SingleActiveConsumerNotSupported = errors.New("Single Active Consumer is not supported by the broker " +
"(requires RabbitMQ 3.11+ and stream_single_active_consumer feature flag activated)")
var StreamAlreadyExists = errors.New("Stream Already Exists")
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
var StreamNotAvailable = errors.New("Stream Not Available")
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
var UnknownFrame = errors.New("Unknown Frame")
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
Functions ¶
func IsVersionGreaterOrEqual ¶ added in v1.3.3
func SetLevelInfo ¶
func SetLevelInfo(value int8)
Types ¶
type AddressResolver ¶
type AutoCommitStrategy ¶
type AutoCommitStrategy struct {
// contains filtered or unexported fields
}
func NewAutoCommitStrategy ¶
func NewAutoCommitStrategy() *AutoCommitStrategy
func (*AutoCommitStrategy) SetCountBeforeStorage ¶
func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy
func (*AutoCommitStrategy) SetFlushInterval ¶
func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy
type BindingsOptions ¶ added in v1.4.0
type BindingsOptions struct { Bindings []string MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewBindingsOptions ¶ added in v1.4.0
func NewBindingsOptions(bindings []string) *BindingsOptions
func (*BindingsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
func (*BindingsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
func (*BindingsOptions) SetMaxAge ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
func (*BindingsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
func (*BindingsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
type Broker ¶
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
type ByteCapacity ¶
type ByteCapacity struct {
// contains filtered or unexported fields
}
func (ByteCapacity) B ¶
func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
func (ByteCapacity) From ¶
func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
func (ByteCapacity) GB ¶
func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
func (ByteCapacity) KB ¶
func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
func (ByteCapacity) MB ¶
func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
func (ByteCapacity) TB ¶
func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
type CPartitionClose ¶ added in v1.4.0
type CPartitionClose struct { Partition string Event Event Context CPartitionContext }
CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed The user can use the NotifyPartitionClose to get the channel
type CPartitionContext ¶ added in v1.4.0
type CPartitionContext interface {
ConnectPartition(partition string, offset OffsetSpecification) error
}
CPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the CPartitionContext to reconnect a partition to the SuperStreamConsumer Specifying the offset to start from
type ChannelClose ¶
type ChannelClose = <-chan Event
type ChannelPublishConfirm ¶
type ChannelPublishConfirm chan []*ConfirmationStatus
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BrokerForConsumer ¶
func (*Client) DeclarePublisher ¶
func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
func (*Client) DeclareStream ¶
func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
func (*Client) DeclareSubscriber ¶
func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Client) DeclareSuperStream ¶ added in v1.4.0
func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
func (*Client) DeleteStream ¶
func (*Client) DeleteSuperStream ¶ added in v1.4.0
func (*Client) QueryPartitions ¶ added in v1.4.0
func (*Client) StreamExists ¶
func (*Client) StreamStats ¶ added in v1.1.0
func (c *Client) StreamStats(streamName string) (*StreamStats, error)
type ClientProperties ¶
type ClientProperties struct {
// contains filtered or unexported fields
}
type Compression ¶
type Compression struct {
// contains filtered or unexported fields
}
func (Compression) Gzip ¶
func (compression Compression) Gzip() Compression
func (Compression) Lz4 ¶
func (compression Compression) Lz4() Compression
func (Compression) None ¶
func (compression Compression) None() Compression
func (Compression) Snappy ¶
func (compression Compression) Snappy() Compression
func (Compression) String ¶
func (compression Compression) String() string
func (Compression) Zstd ¶
func (compression Compression) Zstd() Compression
type ConfirmationStatus ¶
type ConfirmationStatus struct {
// contains filtered or unexported fields
}
func (*ConfirmationStatus) GetError ¶
func (cs *ConfirmationStatus) GetError() error
func (*ConfirmationStatus) GetErrorCode ¶
func (cs *ConfirmationStatus) GetErrorCode() uint16
func (*ConfirmationStatus) GetMessage ¶
func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
func (*ConfirmationStatus) GetProducerID ¶
func (cs *ConfirmationStatus) GetProducerID() uint8
func (*ConfirmationStatus) GetPublishingId ¶
func (cs *ConfirmationStatus) GetPublishingId() int64
func (*ConfirmationStatus) IsConfirmed ¶
func (cs *ConfirmationStatus) IsConfirmed() bool
func (*ConfirmationStatus) LinkedMessages ¶
func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
type ConnectionProperties ¶
type ConnectionProperties struct {
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct { ID uint8 MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*Consumer) GetCloseHandler ¶ added in v1.1.2
func (*Consumer) GetLastStoredOffset ¶
func (*Consumer) GetStreamName ¶
func (*Consumer) NotifyClose ¶
func (consumer *Consumer) NotifyClose() ChannelClose
func (*Consumer) QueryOffset ¶
func (*Consumer) StoreCustomOffset ¶
func (*Consumer) StoreOffset ¶
type ConsumerContext ¶
type ConsumerContext struct { Consumer *Consumer // contains filtered or unexported fields }
func (ConsumerContext) GetEntriesCount ¶ added in v1.3.2
func (cc ConsumerContext) GetEntriesCount() uint16
type ConsumerFilter ¶ added in v1.3.3
type ConsumerFilter struct { Values []string MatchUnfiltered bool PostFilter PostFilter }
func NewConsumerFilter ¶ added in v1.3.3
func NewConsumerFilter(values []string, matchUnfiltered bool, postFilter PostFilter) *ConsumerFilter
type ConsumerOptions ¶
type ConsumerOptions struct { ConsumerName string Offset OffsetSpecification CRCCheck bool ClientProvidedName string Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer // contains filtered or unexported fields }
func NewConsumerOptions ¶
func NewConsumerOptions() *ConsumerOptions
func (*ConsumerOptions) IsFilterEnabled ¶ added in v1.3.3
func (c *ConsumerOptions) IsFilterEnabled() bool
func (*ConsumerOptions) IsSingleActiveConsumerEnabled ¶ added in v1.4.0
func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
func (*ConsumerOptions) SetAutoCommit ¶
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
func (*ConsumerOptions) SetCRCCheck ¶
func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
func (*ConsumerOptions) SetClientProvidedName ¶ added in v1.3.1
func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
func (*ConsumerOptions) SetConsumerName ¶
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
func (*ConsumerOptions) SetFilter ¶ added in v1.3.3
func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
func (*ConsumerOptions) SetInitialCredits ¶ added in v1.0.2
func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
func (*ConsumerOptions) SetManualCommit ¶
func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
func (*ConsumerOptions) SetOffset ¶
func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
func (*ConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
type ConsumerUpdate ¶ added in v1.4.0
type ConsumerUpdate func(streamName string, isActive bool) OffsetSpecification
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator() *Coordinator
func (*Coordinator) ConsumersCount ¶
func (coordinator *Coordinator) ConsumersCount() int
func (*Coordinator) GetConsumerById ¶
func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
func (*Coordinator) GetProducerById ¶
func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
func (*Coordinator) GetResponseById ¶
func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
func (*Coordinator) GetResponseByName ¶
func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
func (*Coordinator) NewConsumer ¶
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
Consumer functions
func (*Coordinator) NewProducer ¶
func (coordinator *Coordinator) NewProducer( parameters *ProducerOptions) (*Producer, error)
producersEnvironment
func (*Coordinator) NewResponse ¶
func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
func (*Coordinator) NewResponseWitName ¶
func (coordinator *Coordinator) NewResponseWitName(id string) *Response
func (*Coordinator) Producers ¶
func (coordinator *Coordinator) Producers() map[interface{}]interface{}
func (*Coordinator) ProducersCount ¶
func (coordinator *Coordinator) ProducersCount() int
func (*Coordinator) RemoveConsumerById ¶
func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
func (*Coordinator) RemoveProducerById ¶
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
func (*Coordinator) RemoveResponseById ¶
func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
func (*Coordinator) RemoveResponseByName ¶
func (coordinator *Coordinator) RemoveResponseByName(id string) error
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(options *EnvironmentOptions) (*Environment, error)
func (*Environment) Close ¶
func (env *Environment) Close() error
func (*Environment) DeclareStream ¶
func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
func (*Environment) DeclareSuperStream ¶ added in v1.4.0
func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
func (*Environment) DeleteStream ¶
func (env *Environment) DeleteStream(streamName string) error
func (*Environment) DeleteSuperStream ¶ added in v1.4.0
func (env *Environment) DeleteSuperStream(superStreamName string) error
func (*Environment) IsClosed ¶
func (env *Environment) IsClosed() bool
func (*Environment) NewConsumer ¶
func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Environment) NewProducer ¶
func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
func (*Environment) NewSuperStreamConsumer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, options *SuperStreamConsumerOptions) (*SuperStreamConsumer, error)
func (*Environment) NewSuperStreamProducer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
func (*Environment) QueryOffset ¶
func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
func (*Environment) QueryPartitions ¶ added in v1.4.0
func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
func (*Environment) QueryRoute ¶ added in v1.4.0
func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
func (*Environment) QuerySequence ¶
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
func (*Environment) StreamExists ¶
func (env *Environment) StreamExists(streamName string) (bool, error)
func (*Environment) StreamMetaData ¶
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
func (*Environment) StreamStats ¶ added in v1.1.0
func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
type EnvironmentOptions ¶
type EnvironmentOptions struct { ConnectionParameters []*Broker TCPParameters *TCPParameters SaslConfiguration *SaslConfiguration MaxProducersPerClient int MaxConsumersPerClient int AddressResolver *AddressResolver RPCTimeout time.Duration }
func NewEnvironmentOptions ¶
func NewEnvironmentOptions() *EnvironmentOptions
func (*EnvironmentOptions) IsTLS ¶
func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
func (*EnvironmentOptions) SetAddressResolver ¶
func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
func (*EnvironmentOptions) SetHost ¶
func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxConsumersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxProducersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetNoDelay ¶
func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
func (*EnvironmentOptions) SetPassword ¶
func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
func (*EnvironmentOptions) SetPort ¶
func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
func (*EnvironmentOptions) SetRPCTimeout ¶ added in v1.4.5
func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetReadBuffer ¶
func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedHeartbeat ¶
func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedMaxFrameSize ¶
func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
func (*EnvironmentOptions) SetSaslConfiguration ¶ added in v1.2.0
func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
func (*EnvironmentOptions) SetTLSConfig ¶
func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
func (*EnvironmentOptions) SetUri ¶
func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
func (*EnvironmentOptions) SetUris ¶
func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
func (*EnvironmentOptions) SetUser ¶
func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
func (*EnvironmentOptions) SetVHost ¶
func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
func (*EnvironmentOptions) SetWriteBuffer ¶
func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
type FilterValue ¶ added in v1.3.3
type FilterValue func(message message.StreamMessage) string
type HashRoutingStrategy ¶ added in v1.4.0
type HashRoutingStrategy struct {
RoutingKeyExtractor func(message message.StreamMessage) string
}
func NewHashRoutingStrategy ¶ added in v1.4.0
func NewHashRoutingStrategy(routingKeyExtractor func(message message.StreamMessage) string) *HashRoutingStrategy
func (*HashRoutingStrategy) Route ¶ added in v1.4.0
func (h *HashRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*HashRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type KeyRoutingStrategy ¶ added in v1.4.0
type KeyRoutingStrategy struct { // provided by the user to define the key based on a message RoutingKeyExtractor func(message message.StreamMessage) string // contains filtered or unexported fields }
KeyRoutingStrategy is a routing strategy that uses the key of the message
func NewKeyRoutingStrategy ¶ added in v1.4.0
func NewKeyRoutingStrategy( routingKeyExtractor func(message message.StreamMessage) string) *KeyRoutingStrategy
func (*KeyRoutingStrategy) Route ¶ added in v1.4.0
func (k *KeyRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*KeyRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type MessagesHandler ¶
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
type OffsetSpecification ¶
type OffsetSpecification struct {
// contains filtered or unexported fields
}
func (OffsetSpecification) First ¶
func (o OffsetSpecification) First() OffsetSpecification
func (OffsetSpecification) Last ¶
func (o OffsetSpecification) Last() OffsetSpecification
func (OffsetSpecification) LastConsumed
deprecated
func (o OffsetSpecification) LastConsumed() OffsetSpecification
Deprecated: The method name may be misleading. The method does not indicate the last message consumed of the stream but the last stored offset. The method was added to help the user, but it created confusion. Use `QueryOffset` instead.:
offset, err := env.QueryOffset(consumerName, streamName) // check the error .... SetOffset(stream.OffsetSpecification{}.Offset(offset)).
So in this way it possible to start from the last offset stored and customize the behavior
func (OffsetSpecification) Next ¶
func (o OffsetSpecification) Next() OffsetSpecification
func (OffsetSpecification) Offset ¶
func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
func (OffsetSpecification) String ¶
func (o OffsetSpecification) String() string
func (OffsetSpecification) Timestamp ¶
func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
type PPartitionClose ¶ added in v1.4.0
type PPartitionClose struct { Partition string Event Event Context PPartitionContext }
PPartitionClose is a struct that is used to notify the user when a partition from a producer is closed The user can use the NotifyPartitionClose to get the channel
type PPartitionContext ¶ added in v1.4.0
PPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the PPartitionContext to reconnect a partition to the SuperStreamProducer
type PartitionPublishConfirm ¶ added in v1.4.0
type PartitionPublishConfirm struct { Partition string ConfirmationStatus []*ConfirmationStatus }
PartitionPublishConfirm is a struct that is used to notify the user when a message is confirmed or not per partition The user can use the NotifyPublishConfirmation to get the channel
type PartitionsOptions ¶ added in v1.4.0
type PartitionsOptions struct { Partitions int MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewPartitionsOptions ¶ added in v1.4.0
func NewPartitionsOptions(partitions int) *PartitionsOptions
func (*PartitionsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
func (*PartitionsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
func (*PartitionsOptions) SetMaxAge ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
func (*PartitionsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
func (*PartitionsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
type PostFilter ¶ added in v1.3.3
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) BatchSend ¶
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages and send them in a batch. BatchSend is not affected by the BatchSize and BatchPublishingDelay options. BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and calls BatchSend internally.
func (*Producer) GetLastPublishingId ¶
func (*Producer) GetOptions ¶
func (producer *Producer) GetOptions() *ProducerOptions
func (*Producer) GetStreamName ¶
func (*Producer) GetUnConfirmed ¶
func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
func (*Producer) NotifyClose ¶
func (producer *Producer) NotifyClose() ChannelClose
func (*Producer) NotifyPublishConfirmation ¶
func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
func (*Producer) Send ¶
func (producer *Producer) Send(streamMessage message.StreamMessage) error
Send sends a message to the stream and returns an error if the message could not be sent. Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
type ProducerFilter ¶ added in v1.3.3
type ProducerFilter struct {
FilterValue FilterValue
}
func NewProducerFilter ¶ added in v1.3.3
func NewProducerFilter(filterValue FilterValue) *ProducerFilter
type ProducerOptions ¶
type ProducerOptions struct { Name string // Producer name, valid for deduplication QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id Compression Compression // Compression type, it is valid only if SubEntrySize > 1 ConfirmationTimeOut time.Duration // Time to wait for the confirmation ClientProvidedName string // Client provider name that will be shown in the management UI Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil // contains filtered or unexported fields }
func NewProducerOptions ¶
func NewProducerOptions() *ProducerOptions
func (*ProducerOptions) IsFilterEnabled ¶ added in v1.3.3
func (po *ProducerOptions) IsFilterEnabled() bool
func (*ProducerOptions) SetBatchPublishingDelay ¶
func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
func (*ProducerOptions) SetBatchSize ¶
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
SetBatchSize sets the batch size for the producer Valid only for the method Send()
func (*ProducerOptions) SetClientProvidedName ¶ added in v1.3.1
func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
func (*ProducerOptions) SetCompression ¶
func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
func (*ProducerOptions) SetConfirmationTimeOut ¶ added in v1.3.1
func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
func (*ProducerOptions) SetFilter ¶ added in v1.3.3
func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
func (*ProducerOptions) SetProducerName ¶
func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
func (*ProducerOptions) SetQueueSize ¶
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
func (*ProducerOptions) SetSubEntrySize ¶
func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
type PublishFilter ¶ added in v1.3.3
type PublishFilter struct { }
func (PublishFilter) GetCommandKey ¶ added in v1.3.3
func (p PublishFilter) GetCommandKey() uint16
func (PublishFilter) GetMaxVersion ¶ added in v1.3.3
func (p PublishFilter) GetMaxVersion() uint16
func (PublishFilter) GetMinVersion ¶ added in v1.3.3
func (p PublishFilter) GetMinVersion() uint16
type ReaderProtocol ¶
type RoutingStrategy ¶ added in v1.4.0
type RoutingStrategy interface { //Route Based on the message and the partitions the routing strategy returns the partitions where the message should be sent // It could be zero, one or more partitions Route(message message.StreamMessage, partitions []string) ([]string, error) // SetRouteParameters is useful for the routing key strategies to set the query route function // or in general to set the parameters needed by the routing strategy SetRouteParameters(superStream string, queryRoute func(superStream string, routingKey string) ([]string, error)) }
type SaslConfiguration ¶ added in v1.2.0
type SaslConfiguration struct {
Mechanism string
}
type SingleActiveConsumer ¶ added in v1.4.0
type SingleActiveConsumer struct { Enabled bool // ConsumerUpdate is the function that will be called when the consumer is promoted // that is when the consumer is active. The function will receive a boolean that is true // the user can decide to return a new offset to start from. ConsumerUpdate ConsumerUpdate // contains filtered or unexported fields }
func NewSingleActiveConsumer ¶ added in v1.4.0
func NewSingleActiveConsumer(ConsumerUpdate ConsumerUpdate) *SingleActiveConsumer
func (*SingleActiveConsumer) SetEnabled ¶ added in v1.4.0
func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer
type StreamMetadata ¶
type StreamMetadata struct { Leader *Broker Replicas []*Broker // contains filtered or unexported fields }
func (StreamMetadata) New ¶
func (StreamMetadata) New(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (StreamMetadata) String ¶
func (sm StreamMetadata) String() string
type StreamOptions ¶
type StreamOptions struct { MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity }
func NewStreamOptions ¶
func NewStreamOptions() *StreamOptions
func (*StreamOptions) SetMaxAge ¶
func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions
func (*StreamOptions) SetMaxLengthBytes ¶
func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions
func (*StreamOptions) SetMaxSegmentSizeBytes ¶
func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions
type StreamStats ¶ added in v1.1.0
type StreamStats struct {
// contains filtered or unexported fields
}
func (*StreamStats) CommittedChunkId ¶ added in v1.1.0
func (s *StreamStats) CommittedChunkId() (int64, error)
CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
It is the offset of the first message in the last chunk confirmed by a quorum of the stream cluster members (leader and replicas). The committed chunk ID is a good indication of what the last offset of a stream can be at a given time. The value can be stale as soon as the application reads it though, as the committed chunk ID for a stream that is published to changes all the time. return committed offset in this stream Error if there is no committed chunk yet
func (*StreamStats) FirstOffset ¶ added in v1.1.0
func (s *StreamStats) FirstOffset() (int64, error)
FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet
func (*StreamStats) LastOffset
deprecated
added in
v1.1.0
func (s *StreamStats) LastOffset() (int64, error)
Deprecated: The method name may be misleading. It does not indicate the last offset of the stream. It indicates the last uncommited chunk id. This information is not necessary. The user should use CommittedChunkId().
type StreamsMetadata ¶
type StreamsMetadata struct {
// contains filtered or unexported fields
}
func (*StreamsMetadata) Add ¶
func (smd *StreamsMetadata) Add(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (*StreamsMetadata) Get ¶
func (smd *StreamsMetadata) Get(stream string) *StreamMetadata
func (StreamsMetadata) New ¶
func (StreamsMetadata) New() *StreamsMetadata
type SuperStreamConsumer ¶ added in v1.4.0
type SuperStreamConsumer struct { SuperStream string SuperStreamConsumerOptions *SuperStreamConsumerOptions MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*SuperStreamConsumer) Close ¶ added in v1.4.0
func (s *SuperStreamConsumer) Close() error
func (*SuperStreamConsumer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSpecification) error
func (*SuperStreamConsumer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamConsumer) NotifyPartitionClose(size int) chan CPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
type SuperStreamConsumerOptions ¶ added in v1.4.0
type SuperStreamConsumerOptions struct { ClientProvidedName string Offset OffsetSpecification Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer ConsumerName string AutoCommitStrategy *AutoCommitStrategy Autocommit bool }
func NewSuperStreamConsumerOptions ¶ added in v1.4.0
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetAutoCommit ¶ added in v1.4.3
func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetClientProvidedName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetConsumerName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetFilter ¶ added in v1.4.2
func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetManualCommit ¶ added in v1.4.6
func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetOffset ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
type SuperStreamOptions ¶ added in v1.4.0
type SuperStreamOptions interface {
// contains filtered or unexported methods
}
type SuperStreamProducer ¶ added in v1.4.0
type SuperStreamProducer struct { // public SuperStream string SuperStreamProducerOptions *SuperStreamProducerOptions // contains filtered or unexported fields }
func (*SuperStreamProducer) Close ¶ added in v1.4.0
func (s *SuperStreamProducer) Close() error
func (*SuperStreamProducer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamProducer) ConnectPartition(partition string) error
ConnectPartition connects a partition to the SuperStreamProducer part of PPartitionContext interface The super stream producer is a producer that can send messages to multiple partitions that are hidden to the user. with the ConnectPartition the user can re-connect a partition to the SuperStreamProducer that should be used only in case of disconnection
func (*SuperStreamProducer) GetPartitions ¶ added in v1.4.0
func (s *SuperStreamProducer) GetPartitions() []string
func (*SuperStreamProducer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
func (*SuperStreamProducer) NotifyPublishConfirmation ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
NotifyPublishConfirmation returns a channel that will be notified when a message is confirmed or not per partition size is the size of the channel
func (*SuperStreamProducer) Send ¶ added in v1.4.0
func (s *SuperStreamProducer) Send(message message.StreamMessage) error
Send sends a message to the partitions based on the routing strategy
type SuperStreamProducerOptions ¶ added in v1.4.0
type SuperStreamProducerOptions struct { RoutingStrategy RoutingStrategy ClientProvidedName string Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil }
func NewSuperStreamProducerOptions ¶ added in v1.4.0
func NewSuperStreamProducerOptions(routingStrategy RoutingStrategy) *SuperStreamProducerOptions
NewSuperStreamProducerOptions creates a new SuperStreamProducerOptions The RoutingStrategy is mandatory
func (SuperStreamProducerOptions) SetClientProvidedName ¶ added in v1.4.0
func (o SuperStreamProducerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamProducerOptions
func (SuperStreamProducerOptions) SetFilter ¶ added in v1.4.2
func (o SuperStreamProducerOptions) SetFilter(filter *ProducerFilter) *SuperStreamProducerOptions
type TCPParameters ¶
Source Files ¶
- aggregation.go
- available_features.go
- brokers.go
- buffer_reader.go
- buffer_writer.go
- client.go
- constants.go
- consumer.go
- converters.go
- coordinator.go
- environment.go
- exchange_commands.go
- listeners.go
- producer.go
- server_frame.go
- socket.go
- stream_options.go
- stream_stats.go
- super_stream.go
- super_stream_consumer.go
- super_stream_producer.go
- utils.go