Documentation ¶
Index ¶
- Constants
- Variables
- func SetLevelInfo(value int8)
- type AddressResolver
- type AutoCommitStrategy
- type Broker
- type Brokers
- type ByteCapacity
- func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
- func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
- type ChannelClose
- type ChannelPublishConfirm
- type Client
- func (c *Client) BrokerForConsumer(stream string) (*Broker, error)
- func (c *Client) BrokerLeader(stream string) (*Broker, error)
- func (c *Client) Close() error
- func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
- func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
- func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (c *Client) DeleteStream(streamName string) error
- func (c *Client) ReusePublisher(streamName string, existingProducer *Producer) (*Producer, error)
- func (c *Client) StreamExists(stream string) bool
- func (c *Client) StreamStats(streamName string) (*StreamStats, error)
- type ClientProperties
- type Code
- type Compression
- type ConfirmationStatus
- func (cs *ConfirmationStatus) GetError() error
- func (cs *ConfirmationStatus) GetErrorCode() uint16
- func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
- func (cs *ConfirmationStatus) GetProducerID() uint8
- func (cs *ConfirmationStatus) GetPublishingId() int64
- func (cs *ConfirmationStatus) IsConfirmed() bool
- func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
- type ConnectionProperties
- type Consumer
- func (consumer *Consumer) Close() error
- func (consumer *Consumer) GetCloseHandler() chan Event
- func (consumer *Consumer) GetLastStoredOffset() int64
- func (consumer *Consumer) GetName() string
- func (consumer *Consumer) GetOffset() int64
- func (consumer *Consumer) GetStreamName() string
- func (consumer *Consumer) NotifyClose() ChannelClose
- func (consumer *Consumer) QueryOffset() (int64, error)
- func (consumer *Consumer) StoreCustomOffset(offset int64) error
- func (consumer *Consumer) StoreOffset() error
- type ConsumerContext
- type ConsumerOptions
- func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
- func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
- func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
- func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
- func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
- func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
- type Coordinator
- func (coordinator *Coordinator) ConsumersCount() int
- func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
- func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
- func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
- func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
- func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
- func (coordinator *Coordinator) NewProducer(parameters *ProducerOptions) (*Producer, error)
- func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
- func (coordinator *Coordinator) NewResponseWitName(id string) *Response
- func (coordinator *Coordinator) Producers() map[interface{}]interface{}
- func (coordinator *Coordinator) ProducersCount() int
- func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
- func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
- func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
- func (coordinator *Coordinator) RemoveResponseByName(id string) error
- type Environment
- func (env *Environment) Close() error
- func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
- func (env *Environment) DeleteStream(streamName string) error
- func (env *Environment) IsClosed() bool
- func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
- func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
- func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
- func (env *Environment) StreamExists(streamName string) (bool, error)
- func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
- func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
- type EnvironmentOptions
- func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
- type Event
- type HeartBeat
- type MessagesHandler
- type OffsetSpecification
- func (o OffsetSpecification) First() OffsetSpecification
- func (o OffsetSpecification) Last() OffsetSpecification
- func (o OffsetSpecification) LastConsumed() OffsetSpecification
- func (o OffsetSpecification) Next() OffsetSpecification
- func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
- func (o OffsetSpecification) String() string
- func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
- type Producer
- func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
- func (producer *Producer) Close() error
- func (producer *Producer) FlushUnConfirmedMessages()
- func (producer *Producer) GetBroker() *Broker
- func (producer *Producer) GetID() uint8
- func (producer *Producer) GetLastPublishingId() (int64, error)
- func (producer *Producer) GetName() string
- func (producer *Producer) GetOptions() *ProducerOptions
- func (producer *Producer) GetStreamName() string
- func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
- func (producer *Producer) NotifyClose() ChannelClose
- func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
- func (producer *Producer) Send(streamMessage message.StreamMessage) error
- type ProducerOptions
- func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
- func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
- func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
- func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
- type ReaderProtocol
- type Response
- type SaslConfiguration
- type StreamMetadata
- type StreamOptions
- type StreamStats
- type StreamsMetadata
- type TCPParameters
- type TuneState
Constants ¶
View Source
const ( ClientVersion = "1.2.0" CommandDeletePublisher = 6 CommandQueryOffset = 11 CommandUnsubscribe = 12 CommandMetadataUpdate = 16 CommandClose = 22 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" StreamTcpPort = "5552" )
View Source
const ( SaslConfigurationPlain = "PLAIN" SaslConfigurationExternal = "EXTERNAL" )
View Source
const ( UnitMb = "mb" UnitKb = "kb" UnitGb = "gb" UnitTb = "tb" )
Variables ¶
View Source
var AlreadyClosed = errors.New("Already Closed")
View Source
var AuthenticationFailure = errors.New("Authentication Failure")
View Source
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
View Source
var CodeAccessRefused = errors.New("Resources Access Refused")
View Source
var ConnectionClosed = errors.New("Can't send the message, connection closed")
View Source
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
View Source
var InternalError = errors.New("Internal Error")
View Source
var LeaderNotReady = errors.New("Leader not Ready yet")
View Source
var OffsetNotFoundError = errors.New("Offset not found")
View Source
var PreconditionFailed = errors.New("Precondition Failed")
View Source
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
View Source
var StreamAlreadyExists = errors.New("Stream Already Exists")
View Source
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
View Source
var StreamNotAvailable = errors.New("Stream Not Available")
View Source
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
View Source
var UnknownFrame = errors.New("Unknown Frame")
View Source
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
Functions ¶
func SetLevelInfo ¶
func SetLevelInfo(value int8)
Types ¶
type AddressResolver ¶
type AutoCommitStrategy ¶
type AutoCommitStrategy struct {
// contains filtered or unexported fields
}
func NewAutoCommitStrategy ¶
func NewAutoCommitStrategy() *AutoCommitStrategy
func (*AutoCommitStrategy) SetCountBeforeStorage ¶
func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy
func (*AutoCommitStrategy) SetFlushInterval ¶
func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy
type Broker ¶
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
type ByteCapacity ¶
type ByteCapacity struct {
// contains filtered or unexported fields
}
func (ByteCapacity) B ¶
func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
func (ByteCapacity) From ¶
func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
func (ByteCapacity) GB ¶
func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
func (ByteCapacity) KB ¶
func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
func (ByteCapacity) MB ¶
func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
func (ByteCapacity) TB ¶
func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
type ChannelClose ¶
type ChannelClose = <-chan Event
type ChannelPublishConfirm ¶
type ChannelPublishConfirm chan []*ConfirmationStatus
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BrokerForConsumer ¶
func (*Client) DeclarePublisher ¶
func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
func (*Client) DeclareStream ¶
func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
func (*Client) DeclareSubscriber ¶
func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Client) DeleteStream ¶
func (*Client) ReusePublisher ¶
func (*Client) StreamExists ¶
func (*Client) StreamStats ¶ added in v1.1.0
func (c *Client) StreamStats(streamName string) (*StreamStats, error)
type ClientProperties ¶
type ClientProperties struct {
// contains filtered or unexported fields
}
type Compression ¶
type Compression struct {
// contains filtered or unexported fields
}
func (Compression) Gzip ¶
func (compression Compression) Gzip() Compression
func (Compression) Lz4 ¶
func (compression Compression) Lz4() Compression
func (Compression) None ¶
func (compression Compression) None() Compression
func (Compression) Snappy ¶
func (compression Compression) Snappy() Compression
func (Compression) String ¶
func (compression Compression) String() string
func (Compression) Zstd ¶
func (compression Compression) Zstd() Compression
type ConfirmationStatus ¶
type ConfirmationStatus struct {
// contains filtered or unexported fields
}
func (*ConfirmationStatus) GetError ¶
func (cs *ConfirmationStatus) GetError() error
func (*ConfirmationStatus) GetErrorCode ¶
func (cs *ConfirmationStatus) GetErrorCode() uint16
func (*ConfirmationStatus) GetMessage ¶
func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
func (*ConfirmationStatus) GetProducerID ¶
func (cs *ConfirmationStatus) GetProducerID() uint8
func (*ConfirmationStatus) GetPublishingId ¶
func (cs *ConfirmationStatus) GetPublishingId() int64
func (*ConfirmationStatus) IsConfirmed ¶
func (cs *ConfirmationStatus) IsConfirmed() bool
func (*ConfirmationStatus) LinkedMessages ¶
func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
type ConnectionProperties ¶
type ConnectionProperties struct {
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct { ID uint8 MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*Consumer) GetCloseHandler ¶ added in v1.1.2
func (*Consumer) GetLastStoredOffset ¶
func (*Consumer) GetStreamName ¶
func (*Consumer) NotifyClose ¶
func (consumer *Consumer) NotifyClose() ChannelClose
func (*Consumer) QueryOffset ¶
func (*Consumer) StoreCustomOffset ¶
func (*Consumer) StoreOffset ¶
type ConsumerContext ¶
type ConsumerContext struct {
Consumer *Consumer
}
type ConsumerOptions ¶
type ConsumerOptions struct { ConsumerName string Offset OffsetSpecification CRCCheck bool // contains filtered or unexported fields }
func NewConsumerOptions ¶
func NewConsumerOptions() *ConsumerOptions
func (*ConsumerOptions) SetAutoCommit ¶
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
func (*ConsumerOptions) SetCRCCheck ¶
func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
func (*ConsumerOptions) SetConsumerName ¶
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
func (*ConsumerOptions) SetInitialCredits ¶ added in v1.0.2
func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
func (*ConsumerOptions) SetManualCommit ¶
func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
func (*ConsumerOptions) SetOffset ¶
func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator() *Coordinator
func (*Coordinator) ConsumersCount ¶
func (coordinator *Coordinator) ConsumersCount() int
func (*Coordinator) GetConsumerById ¶
func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
func (*Coordinator) GetProducerById ¶
func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
func (*Coordinator) GetResponseById ¶
func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
func (*Coordinator) GetResponseByName ¶
func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
func (*Coordinator) NewConsumer ¶
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
Consumer functions
func (*Coordinator) NewProducer ¶
func (coordinator *Coordinator) NewProducer( parameters *ProducerOptions) (*Producer, error)
producersEnvironment
func (*Coordinator) NewResponse ¶
func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
func (*Coordinator) NewResponseWitName ¶
func (coordinator *Coordinator) NewResponseWitName(id string) *Response
func (*Coordinator) Producers ¶
func (coordinator *Coordinator) Producers() map[interface{}]interface{}
func (*Coordinator) ProducersCount ¶
func (coordinator *Coordinator) ProducersCount() int
func (*Coordinator) RemoveConsumerById ¶
func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
func (*Coordinator) RemoveProducerById ¶
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
func (*Coordinator) RemoveResponseById ¶
func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
func (*Coordinator) RemoveResponseByName ¶
func (coordinator *Coordinator) RemoveResponseByName(id string) error
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(options *EnvironmentOptions) (*Environment, error)
func (*Environment) Close ¶
func (env *Environment) Close() error
func (*Environment) DeclareStream ¶
func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
func (*Environment) DeleteStream ¶
func (env *Environment) DeleteStream(streamName string) error
func (*Environment) IsClosed ¶
func (env *Environment) IsClosed() bool
func (*Environment) NewConsumer ¶
func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Environment) NewProducer ¶
func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
func (*Environment) QueryOffset ¶
func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
func (*Environment) QuerySequence ¶
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
func (*Environment) StreamExists ¶
func (env *Environment) StreamExists(streamName string) (bool, error)
func (*Environment) StreamMetaData ¶
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
func (*Environment) StreamStats ¶ added in v1.1.0
func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
type EnvironmentOptions ¶
type EnvironmentOptions struct { ConnectionParameters []*Broker TCPParameters *TCPParameters SaslConfiguration *SaslConfiguration MaxProducersPerClient int MaxConsumersPerClient int AddressResolver *AddressResolver }
func NewEnvironmentOptions ¶
func NewEnvironmentOptions() *EnvironmentOptions
func (*EnvironmentOptions) IsTLS ¶
func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
func (*EnvironmentOptions) SetAddressResolver ¶
func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
func (*EnvironmentOptions) SetHost ¶
func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxConsumersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxProducersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetNoDelay ¶
func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
func (*EnvironmentOptions) SetPassword ¶
func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
func (*EnvironmentOptions) SetPort ¶
func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
func (*EnvironmentOptions) SetReadBuffer ¶
func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedHeartbeat ¶
func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedMaxFrameSize ¶
func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
func (*EnvironmentOptions) SetSaslConfiguration ¶ added in v1.2.0
func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
func (*EnvironmentOptions) SetTLSConfig ¶
func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
func (*EnvironmentOptions) SetUri ¶
func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
func (*EnvironmentOptions) SetUris ¶
func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
func (*EnvironmentOptions) SetUser ¶
func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
func (*EnvironmentOptions) SetVHost ¶
func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
func (*EnvironmentOptions) SetWriteBuffer ¶
func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
type MessagesHandler ¶
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
type OffsetSpecification ¶
type OffsetSpecification struct {
// contains filtered or unexported fields
}
func (OffsetSpecification) First ¶
func (o OffsetSpecification) First() OffsetSpecification
func (OffsetSpecification) Last ¶
func (o OffsetSpecification) Last() OffsetSpecification
func (OffsetSpecification) LastConsumed ¶
func (o OffsetSpecification) LastConsumed() OffsetSpecification
func (OffsetSpecification) Next ¶
func (o OffsetSpecification) Next() OffsetSpecification
func (OffsetSpecification) Offset ¶
func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
func (OffsetSpecification) String ¶
func (o OffsetSpecification) String() string
func (OffsetSpecification) Timestamp ¶
func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) BatchSend ¶
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
func (*Producer) FlushUnConfirmedMessages ¶
func (producer *Producer) FlushUnConfirmedMessages()
func (*Producer) GetLastPublishingId ¶
func (*Producer) GetOptions ¶
func (producer *Producer) GetOptions() *ProducerOptions
func (*Producer) GetStreamName ¶
func (*Producer) GetUnConfirmed ¶
func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
func (*Producer) NotifyClose ¶
func (producer *Producer) NotifyClose() ChannelClose
func (*Producer) NotifyPublishConfirmation ¶
func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
type ProducerOptions ¶
type ProducerOptions struct { Name string // Producer name, it is useful to handle deduplication messages QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput BatchPublishingDelay int // Period to send a batch of messages. SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id Compression Compression // Compression type, it is valid only if SubEntrySize > 1 // contains filtered or unexported fields }
func NewProducerOptions ¶
func NewProducerOptions() *ProducerOptions
func (*ProducerOptions) SetBatchPublishingDelay ¶
func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
func (*ProducerOptions) SetBatchSize ¶
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
func (*ProducerOptions) SetCompression ¶
func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
func (*ProducerOptions) SetProducerName ¶
func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
func (*ProducerOptions) SetQueueSize ¶
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
func (*ProducerOptions) SetSubEntrySize ¶
func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
type ReaderProtocol ¶
type SaslConfiguration ¶ added in v1.2.0
type SaslConfiguration struct {
Mechanism string
}
type StreamMetadata ¶
type StreamMetadata struct { Leader *Broker Replicas []*Broker // contains filtered or unexported fields }
func (StreamMetadata) New ¶
func (StreamMetadata) New(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (StreamMetadata) String ¶
func (sm StreamMetadata) String() string
type StreamOptions ¶
type StreamOptions struct { MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity }
func NewStreamOptions ¶
func NewStreamOptions() *StreamOptions
func (*StreamOptions) SetMaxAge ¶
func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions
func (*StreamOptions) SetMaxLengthBytes ¶
func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions
func (*StreamOptions) SetMaxSegmentSizeBytes ¶
func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions
type StreamStats ¶ added in v1.1.0
type StreamStats struct {
// contains filtered or unexported fields
}
func (*StreamStats) CommittedChunkId ¶ added in v1.1.0
func (s *StreamStats) CommittedChunkId() (int64, error)
CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
It is the offset of the first message in the last chunk confirmed by a quorum of the stream cluster members (leader and replicas). The committed chunk ID is a good indication of what the last offset of a stream can be at a given time. The value can be stale as soon as the application reads it though, as the committed chunk ID for a stream that is published to changes all the time. return committed offset in this stream Error if there is no committed chunk yet
func (*StreamStats) FirstOffset ¶ added in v1.1.0
func (s *StreamStats) FirstOffset() (int64, error)
FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet
func (*StreamStats) LastOffset ¶ added in v1.1.0
func (s *StreamStats) LastOffset() (int64, error)
LastOffset - The last offset in the stream. return last offset in the stream error if there is no first offset yet
type StreamsMetadata ¶
type StreamsMetadata struct {
// contains filtered or unexported fields
}
func (*StreamsMetadata) Add ¶
func (smd *StreamsMetadata) Add(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (*StreamsMetadata) Get ¶
func (smd *StreamsMetadata) Get(stream string) *StreamMetadata
func (StreamsMetadata) New ¶
func (StreamsMetadata) New() *StreamsMetadata
type TCPParameters ¶
Click to show internal directories.
Click to hide internal directories.