stream

package
v1.4.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2024 License: MIT Imports: 27 Imported by: 23

Documentation

Index

Constants

View Source
const (
	None   = byte(0)
	GZIP   = byte(1)
	SNAPPY = byte(2)
	LZ4    = byte(3)
	ZSTD   = byte(4)
)
View Source
const (
	ClientVersion = "1.4.8"

	CommandDeletePublisher = 6

	CommandQueryOffset = 11
	CommandUnsubscribe = 12

	CommandMetadataUpdate = 16

	CommandClose = 22

	//
	LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

	SocketClosed             = "socket client closed"
	MetaDataUpdate           = "metadata Data update"
	LeaderLocatorBalanced    = "balanced"
	LeaderLocatorClientLocal = "client-local"

	StreamTcpPort = "5552"
)
View Source
const (
	SaslConfigurationPlain    = "PLAIN"
	SaslConfigurationExternal = "EXTERNAL"
)
View Source
const (
	UnitMb = "mb"
	UnitKb = "kb"
	UnitGb = "gb"
	UnitTb = "tb"
)
View Source
const SEED = 104729

Variables

View Source
var AlreadyClosed = errors.New("Already Closed")
View Source
var AuthenticationFailure = errors.New("Authentication Failure")
View Source
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
View Source
var CodeAccessRefused = errors.New("Resources Access Refused")
View Source
var ConfirmationTimoutError = errors.New("Confirmation Timeout Error")
View Source
var ConnectionClosed = errors.New("Can't Send the message, connection closed")
View Source
var ErrEnvironmentNotDefined = errors.New("Environment not defined")
View Source
var ErrMessageRouteNotFound = errors.New("Message Route not found for the message key")
View Source
var ErrProducerNotFound = errors.New("Producer not found in the SuperStream Producer")
View Source
var ErrSuperStreamConsumerOptionsNotDefined = errors.New("SuperStreamConsumerOptions not defined.")
View Source
var ErrSuperStreamProducerOptionsNotDefined = errors.New("SuperStreamProducerOptions not defined. The SuperStreamProducerOptions is mandatory with the RoutingStrategy")
View Source
var FilterNotSupported = errors.New("Filtering is not supported by the broker " +
	"(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")
View Source
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
View Source
var InternalError = errors.New("Internal Error")
View Source
var LeaderNotReady = errors.New("Leader not Ready yet")
View Source
var OffsetNotFoundError = errors.New("Offset not found")
View Source
var PreconditionFailed = errors.New("Precondition Failed")
View Source
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
View Source
var SingleActiveConsumerNotSupported = errors.New("Single Active Consumer is not supported by the broker " +
	"(requires RabbitMQ 3.11+ and stream_single_active_consumer feature flag activated)")
View Source
var StreamAlreadyExists = errors.New("Stream Already Exists")
View Source
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
View Source
var StreamNotAvailable = errors.New("Stream Not Available")
View Source
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
View Source
var UnknownFrame = errors.New("Unknown Frame")
View Source
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")

Functions

func IsVersionGreaterOrEqual added in v1.3.3

func IsVersionGreaterOrEqual(version, target string) bool

func SetLevelInfo

func SetLevelInfo(value int8)

Types

type AddressResolver

type AddressResolver struct {
	Host string
	Port int
}

type AutoCommitStrategy

type AutoCommitStrategy struct {
	// contains filtered or unexported fields
}

func NewAutoCommitStrategy

func NewAutoCommitStrategy() *AutoCommitStrategy

func (*AutoCommitStrategy) SetCountBeforeStorage

func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy

func (*AutoCommitStrategy) SetFlushInterval

func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy

type BindingsOptions added in v1.4.0

type BindingsOptions struct {
	Bindings            []string
	MaxAge              time.Duration
	MaxLengthBytes      *ByteCapacity
	MaxSegmentSizeBytes *ByteCapacity
	LeaderLocator       string
	// contains filtered or unexported fields
}

func NewBindingsOptions added in v1.4.0

func NewBindingsOptions(bindings []string) *BindingsOptions

func (*BindingsOptions) SetBalancedLeaderLocator added in v1.4.0

func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions

func (*BindingsOptions) SetClientLocalLocator added in v1.4.0

func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions

func (*BindingsOptions) SetMaxAge added in v1.4.0

func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions

func (*BindingsOptions) SetMaxLengthBytes added in v1.4.0

func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions

func (*BindingsOptions) SetMaxSegmentSizeBytes added in v1.4.0

func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions

type Broker

type Broker struct {
	Host     string
	Port     string
	User     string
	Vhost    string
	Uri      string
	Password string
	Scheme   string
	// contains filtered or unexported fields
}

func (*Broker) GetUri

func (br *Broker) GetUri() string

type Brokers

type Brokers struct {
	// contains filtered or unexported fields
}

func (*Brokers) Add

func (brs *Brokers) Add(brokerReference int16, host string, port uint32) *Broker

func (*Brokers) Get

func (brs *Brokers) Get(brokerReference int16) *Broker

type ByteCapacity

type ByteCapacity struct {
	// contains filtered or unexported fields
}

func (ByteCapacity) B

func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity

func (ByteCapacity) From

func (byteCapacity ByteCapacity) From(value string) *ByteCapacity

func (ByteCapacity) GB

func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity

func (ByteCapacity) KB

func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity

func (ByteCapacity) MB

func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity

func (ByteCapacity) TB

func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity

type CPartitionClose added in v1.4.0

type CPartitionClose struct {
	Partition string
	Event     Event
	Context   CPartitionContext
}

CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed The user can use the NotifyPartitionClose to get the channel

type CPartitionContext added in v1.4.0

type CPartitionContext interface {
	ConnectPartition(partition string, offset OffsetSpecification) error
}

CPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the CPartitionContext to reconnect a partition to the SuperStreamConsumer Specifying the offset to start from

type ChannelClose

type ChannelClose = <-chan Event

type ChannelPublishConfirm

type ChannelPublishConfirm chan []*ConfirmationStatus

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) BrokerForConsumer

func (c *Client) BrokerForConsumer(stream string) (*Broker, error)

func (*Client) BrokerLeader

func (c *Client) BrokerLeader(stream string) (*Broker, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) DeclarePublisher

func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)

func (*Client) DeclareStream

func (c *Client) DeclareStream(streamName string, options *StreamOptions) error

func (*Client) DeclareSubscriber

func (c *Client) DeclareSubscriber(streamName string,
	messagesHandler MessagesHandler,
	options *ConsumerOptions) (*Consumer, error)

func (*Client) DeclareSuperStream added in v1.4.0

func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error

func (*Client) DeleteStream

func (c *Client) DeleteStream(streamName string) error

func (*Client) DeleteSuperStream added in v1.4.0

func (c *Client) DeleteSuperStream(superStream string) error

func (*Client) QueryPartitions added in v1.4.0

func (c *Client) QueryPartitions(superStream string) ([]string, error)

func (*Client) StreamExists

func (c *Client) StreamExists(stream string) bool

func (*Client) StreamStats added in v1.1.0

func (c *Client) StreamStats(streamName string) (*StreamStats, error)

type ClientProperties

type ClientProperties struct {
	// contains filtered or unexported fields
}

type Code

type Code struct {
	// contains filtered or unexported fields
}

type Compression

type Compression struct {
	// contains filtered or unexported fields
}

func (Compression) Gzip

func (compression Compression) Gzip() Compression

func (Compression) Lz4

func (compression Compression) Lz4() Compression

func (Compression) None

func (compression Compression) None() Compression

func (Compression) Snappy

func (compression Compression) Snappy() Compression

func (Compression) String

func (compression Compression) String() string

func (Compression) Zstd

func (compression Compression) Zstd() Compression

type ConfirmationStatus

type ConfirmationStatus struct {
	// contains filtered or unexported fields
}

func (*ConfirmationStatus) GetError

func (cs *ConfirmationStatus) GetError() error

func (*ConfirmationStatus) GetErrorCode

func (cs *ConfirmationStatus) GetErrorCode() uint16

func (*ConfirmationStatus) GetMessage

func (cs *ConfirmationStatus) GetMessage() message.StreamMessage

func (*ConfirmationStatus) GetProducerID

func (cs *ConfirmationStatus) GetProducerID() uint8

func (*ConfirmationStatus) GetPublishingId

func (cs *ConfirmationStatus) GetPublishingId() int64

func (*ConfirmationStatus) IsConfirmed

func (cs *ConfirmationStatus) IsConfirmed() bool

func (*ConfirmationStatus) LinkedMessages

func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus

type ConnectionProperties

type ConnectionProperties struct {
	// contains filtered or unexported fields
}

type Consumer

type Consumer struct {
	ID uint8

	MessagesHandler MessagesHandler
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (consumer *Consumer) Close() error

func (*Consumer) GetCloseHandler added in v1.1.2

func (consumer *Consumer) GetCloseHandler() chan Event

func (*Consumer) GetLastStoredOffset

func (consumer *Consumer) GetLastStoredOffset() int64

func (*Consumer) GetName

func (consumer *Consumer) GetName() string

func (*Consumer) GetOffset

func (consumer *Consumer) GetOffset() int64

func (*Consumer) GetStreamName

func (consumer *Consumer) GetStreamName() string

func (*Consumer) NotifyClose

func (consumer *Consumer) NotifyClose() ChannelClose

func (*Consumer) QueryOffset

func (consumer *Consumer) QueryOffset() (int64, error)

func (*Consumer) StoreCustomOffset

func (consumer *Consumer) StoreCustomOffset(offset int64) error

func (*Consumer) StoreOffset

func (consumer *Consumer) StoreOffset() error

type ConsumerContext

type ConsumerContext struct {
	Consumer *Consumer
	// contains filtered or unexported fields
}

func (ConsumerContext) GetEntriesCount added in v1.3.2

func (cc ConsumerContext) GetEntriesCount() uint16

type ConsumerFilter added in v1.3.3

type ConsumerFilter struct {
	Values          []string
	MatchUnfiltered bool
	PostFilter      PostFilter
}

func NewConsumerFilter added in v1.3.3

func NewConsumerFilter(values []string, matchUnfiltered bool, postFilter PostFilter) *ConsumerFilter

type ConsumerOptions

type ConsumerOptions struct {
	ConsumerName string

	Offset   OffsetSpecification
	CRCCheck bool

	ClientProvidedName   string
	Filter               *ConsumerFilter
	SingleActiveConsumer *SingleActiveConsumer
	// contains filtered or unexported fields
}

func NewConsumerOptions

func NewConsumerOptions() *ConsumerOptions

func (*ConsumerOptions) IsFilterEnabled added in v1.3.3

func (c *ConsumerOptions) IsFilterEnabled() bool

func (*ConsumerOptions) IsSingleActiveConsumerEnabled added in v1.4.0

func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool

func (*ConsumerOptions) SetAutoCommit

func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions

func (*ConsumerOptions) SetCRCCheck

func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions

func (*ConsumerOptions) SetClientProvidedName added in v1.3.1

func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions

func (*ConsumerOptions) SetConsumerName

func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions

func (*ConsumerOptions) SetFilter added in v1.3.3

func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions

func (*ConsumerOptions) SetInitialCredits added in v1.0.2

func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions

func (*ConsumerOptions) SetManualCommit

func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions

func (*ConsumerOptions) SetOffset

func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions

func (*ConsumerOptions) SetSingleActiveConsumer added in v1.4.0

func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions

type ConsumerUpdate added in v1.4.0

type ConsumerUpdate func(streamName string, isActive bool) OffsetSpecification

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator() *Coordinator

func (*Coordinator) ConsumersCount

func (coordinator *Coordinator) ConsumersCount() int

func (*Coordinator) GetConsumerById

func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)

func (*Coordinator) GetProducerById

func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)

func (*Coordinator) GetResponseById

func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)

func (*Coordinator) GetResponseByName

func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)

func (*Coordinator) NewConsumer

func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
	parameters *ConsumerOptions) *Consumer

Consumer functions

func (*Coordinator) NewProducer

func (coordinator *Coordinator) NewProducer(
	parameters *ProducerOptions) (*Producer, error)

producersEnvironment

func (*Coordinator) NewResponse

func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response

func (*Coordinator) NewResponseWitName

func (coordinator *Coordinator) NewResponseWitName(id string) *Response

func (*Coordinator) Producers

func (coordinator *Coordinator) Producers() map[interface{}]interface{}

func (*Coordinator) ProducersCount

func (coordinator *Coordinator) ProducersCount() int

func (*Coordinator) RemoveConsumerById

func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error

func (*Coordinator) RemoveProducerById

func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error

func (*Coordinator) RemoveResponseById

func (coordinator *Coordinator) RemoveResponseById(id interface{}) error

func (*Coordinator) RemoveResponseByName

func (coordinator *Coordinator) RemoveResponseByName(id string) error

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

func NewEnvironment

func NewEnvironment(options *EnvironmentOptions) (*Environment, error)

func (*Environment) Close

func (env *Environment) Close() error

func (*Environment) DeclareStream

func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error

func (*Environment) DeclareSuperStream added in v1.4.0

func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error

func (*Environment) DeleteStream

func (env *Environment) DeleteStream(streamName string) error

func (*Environment) DeleteSuperStream added in v1.4.0

func (env *Environment) DeleteSuperStream(superStreamName string) error

func (*Environment) IsClosed

func (env *Environment) IsClosed() bool

func (*Environment) NewConsumer

func (env *Environment) NewConsumer(streamName string,
	messagesHandler MessagesHandler,
	options *ConsumerOptions) (*Consumer, error)

func (*Environment) NewProducer

func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)

func (*Environment) NewSuperStreamConsumer added in v1.4.0

func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, options *SuperStreamConsumerOptions) (*SuperStreamConsumer, error)

func (*Environment) NewSuperStreamProducer added in v1.4.0

func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)

func (*Environment) QueryOffset

func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)

func (*Environment) QueryPartitions added in v1.4.0

func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)

func (*Environment) QueryRoute added in v1.4.0

func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)

func (*Environment) QuerySequence

func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)

QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id

func (*Environment) StreamExists

func (env *Environment) StreamExists(streamName string) (bool, error)

func (*Environment) StreamMetaData

func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)

func (*Environment) StreamStats added in v1.1.0

func (env *Environment) StreamStats(streamName string) (*StreamStats, error)

type EnvironmentOptions

type EnvironmentOptions struct {
	ConnectionParameters  []*Broker
	TCPParameters         *TCPParameters
	SaslConfiguration     *SaslConfiguration
	MaxProducersPerClient int
	MaxConsumersPerClient int
	AddressResolver       *AddressResolver
	RPCTimeout            time.Duration
}

func NewEnvironmentOptions

func NewEnvironmentOptions() *EnvironmentOptions

func (*EnvironmentOptions) IsTLS

func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions

func (*EnvironmentOptions) SetAddressResolver

func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions

func (*EnvironmentOptions) SetHost

func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions

func (*EnvironmentOptions) SetMaxConsumersPerClient

func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions

func (*EnvironmentOptions) SetMaxProducersPerClient

func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions

func (*EnvironmentOptions) SetNoDelay

func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions

func (*EnvironmentOptions) SetPassword

func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions

func (*EnvironmentOptions) SetPort

func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions

func (*EnvironmentOptions) SetRPCTimeout added in v1.4.5

func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions

func (*EnvironmentOptions) SetReadBuffer

func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions

func (*EnvironmentOptions) SetRequestedHeartbeat

func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions

func (*EnvironmentOptions) SetRequestedMaxFrameSize

func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions

func (*EnvironmentOptions) SetSaslConfiguration added in v1.2.0

func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions

func (*EnvironmentOptions) SetTLSConfig

func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions

func (*EnvironmentOptions) SetUri

func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions

func (*EnvironmentOptions) SetUris

func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions

func (*EnvironmentOptions) SetUser

func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions

func (*EnvironmentOptions) SetVHost

func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions

func (*EnvironmentOptions) SetWriteBuffer

func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions

type Event

type Event struct {
	Command    uint16
	StreamName string
	Name       string
	Reason     string
	Err        error
}

type FilterValue added in v1.3.3

type FilterValue func(message message.StreamMessage) string

type HashRoutingStrategy added in v1.4.0

type HashRoutingStrategy struct {
	RoutingKeyExtractor func(message message.StreamMessage) string
}

func NewHashRoutingStrategy added in v1.4.0

func NewHashRoutingStrategy(routingKeyExtractor func(message message.StreamMessage) string) *HashRoutingStrategy

func (*HashRoutingStrategy) Route added in v1.4.0

func (h *HashRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)

func (*HashRoutingStrategy) SetRouteParameters added in v1.4.0

func (h *HashRoutingStrategy) SetRouteParameters(_ string, _ func(superStream string, routingKey string) ([]string, error))

type HeartBeat

type HeartBeat struct {
	// contains filtered or unexported fields
}

type KeyRoutingStrategy added in v1.4.0

type KeyRoutingStrategy struct {
	// provided by the user to define the key based on a message
	RoutingKeyExtractor func(message message.StreamMessage) string
	// contains filtered or unexported fields
}

KeyRoutingStrategy is a routing strategy that uses the key of the message

func NewKeyRoutingStrategy added in v1.4.0

func NewKeyRoutingStrategy(
	routingKeyExtractor func(message message.StreamMessage) string) *KeyRoutingStrategy

func (*KeyRoutingStrategy) Route added in v1.4.0

func (k *KeyRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)

func (*KeyRoutingStrategy) SetRouteParameters added in v1.4.0

func (k *KeyRoutingStrategy) SetRouteParameters(superStream string, queryRoute func(superStream string, routingKey string) ([]string, error))

type MessagesHandler

type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)

type OffsetSpecification

type OffsetSpecification struct {
	// contains filtered or unexported fields
}

func (OffsetSpecification) First

func (OffsetSpecification) Last

func (OffsetSpecification) LastConsumed deprecated

func (o OffsetSpecification) LastConsumed() OffsetSpecification

Deprecated: The method name may be misleading. The method does not indicate the last message consumed of the stream but the last stored offset. The method was added to help the user, but it created confusion. Use `QueryOffset` instead.:

	offset, err := env.QueryOffset(consumerName, streamName)
 // check the error
 ....
 SetOffset(stream.OffsetSpecification{}.Offset(offset)).

So in this way it possible to start from the last offset stored and customize the behavior

func (OffsetSpecification) Next

func (OffsetSpecification) Offset

func (OffsetSpecification) String

func (o OffsetSpecification) String() string

func (OffsetSpecification) Timestamp

func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification

type PPartitionClose added in v1.4.0

type PPartitionClose struct {
	Partition string
	Event     Event
	Context   PPartitionContext
}

PPartitionClose is a struct that is used to notify the user when a partition from a producer is closed The user can use the NotifyPartitionClose to get the channel

type PPartitionContext added in v1.4.0

type PPartitionContext interface {
	ConnectPartition(partition string) error
}

PPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the PPartitionContext to reconnect a partition to the SuperStreamProducer

type PartitionPublishConfirm added in v1.4.0

type PartitionPublishConfirm struct {
	Partition          string
	ConfirmationStatus []*ConfirmationStatus
}

PartitionPublishConfirm is a struct that is used to notify the user when a message is confirmed or not per partition The user can use the NotifyPublishConfirmation to get the channel

type PartitionsOptions added in v1.4.0

type PartitionsOptions struct {
	Partitions          int
	MaxAge              time.Duration
	MaxLengthBytes      *ByteCapacity
	MaxSegmentSizeBytes *ByteCapacity
	LeaderLocator       string
	// contains filtered or unexported fields
}

func NewPartitionsOptions added in v1.4.0

func NewPartitionsOptions(partitions int) *PartitionsOptions

func (*PartitionsOptions) SetBalancedLeaderLocator added in v1.4.0

func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions

func (*PartitionsOptions) SetClientLocalLocator added in v1.4.0

func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions

func (*PartitionsOptions) SetMaxAge added in v1.4.0

func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions

func (*PartitionsOptions) SetMaxLengthBytes added in v1.4.0

func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions

func (*PartitionsOptions) SetMaxSegmentSizeBytes added in v1.4.0

func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions

type PostFilter added in v1.3.3

type PostFilter func(message *amqp.Message) bool

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) BatchSend

func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error

func (*Producer) Close

func (producer *Producer) Close() error

func (*Producer) GetBroker

func (producer *Producer) GetBroker() *Broker

func (*Producer) GetID

func (producer *Producer) GetID() uint8

func (*Producer) GetLastPublishingId

func (producer *Producer) GetLastPublishingId() (int64, error)

func (*Producer) GetName

func (producer *Producer) GetName() string

func (*Producer) GetOptions

func (producer *Producer) GetOptions() *ProducerOptions

func (*Producer) GetStreamName

func (producer *Producer) GetStreamName() string

func (*Producer) GetUnConfirmed

func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus

func (*Producer) NotifyClose

func (producer *Producer) NotifyClose() ChannelClose

func (*Producer) NotifyPublishConfirmation

func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm

func (*Producer) Send

func (producer *Producer) Send(streamMessage message.StreamMessage) error

type ProducerFilter added in v1.3.3

type ProducerFilter struct {
	FilterValue FilterValue
}

func NewProducerFilter added in v1.3.3

func NewProducerFilter(filterValue FilterValue) *ProducerFilter

type ProducerOptions

type ProducerOptions struct {
	Name                 string          // Producer name, it is useful to handle deduplication messages
	QueueSize            int             // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
	BatchSize            int             // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput
	BatchPublishingDelay int             // Period to Send a batch of messages.
	SubEntrySize         int             // Size of sub Entry, to aggregate more subEntry using one publishing id
	Compression          Compression     // Compression type, it is valid only if SubEntrySize > 1
	ConfirmationTimeOut  time.Duration   // Time to wait for the confirmation
	ClientProvidedName   string          // Client provider name that will be shown in the management UI
	Filter               *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil
	// contains filtered or unexported fields
}

func NewProducerOptions

func NewProducerOptions() *ProducerOptions

func (*ProducerOptions) IsFilterEnabled added in v1.3.3

func (po *ProducerOptions) IsFilterEnabled() bool

func (*ProducerOptions) SetBatchPublishingDelay

func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions

func (*ProducerOptions) SetBatchSize

func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions

func (*ProducerOptions) SetClientProvidedName added in v1.3.1

func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions

func (*ProducerOptions) SetCompression

func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions

func (*ProducerOptions) SetConfirmationTimeOut added in v1.3.1

func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions

func (*ProducerOptions) SetFilter added in v1.3.3

func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions

func (*ProducerOptions) SetProducerName

func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions

func (*ProducerOptions) SetQueueSize

func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions

func (*ProducerOptions) SetSubEntrySize

func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions

type PublishFilter added in v1.3.3

type PublishFilter struct {
}

func (PublishFilter) GetCommandKey added in v1.3.3

func (p PublishFilter) GetCommandKey() uint16

func (PublishFilter) GetMaxVersion added in v1.3.3

func (p PublishFilter) GetMaxVersion() uint16

func (PublishFilter) GetMinVersion added in v1.3.3

func (p PublishFilter) GetMinVersion() uint16

type ReaderProtocol

type ReaderProtocol struct {
	FrameLen          uint32
	CommandID         uint16
	Key               uint16
	Version           uint16
	CorrelationId     uint32
	ResponseCode      uint16
	PublishID         uint8
	PublishingIdCount uint64
}

type Response

type Response struct {
	// contains filtered or unexported fields
}

type RoutingStrategy added in v1.4.0

type RoutingStrategy interface {
	//Route Based on the message and the partitions the routing strategy returns the partitions where the message should be sent
	// It could be zero, one or more partitions
	Route(message message.StreamMessage, partitions []string) ([]string, error)

	// SetRouteParameters is useful for the routing key strategies to set the query route function
	// or in general to set the parameters needed by the routing strategy
	SetRouteParameters(superStream string, queryRoute func(superStream string, routingKey string) ([]string, error))
}

type SaslConfiguration added in v1.2.0

type SaslConfiguration struct {
	Mechanism string
}

type SingleActiveConsumer added in v1.4.0

type SingleActiveConsumer struct {
	Enabled bool
	// ConsumerUpdate is the function that will be called when the consumer is promoted
	// that is when the consumer is active. The function will receive a boolean that is true
	// the user can decide to return a new offset to start from.
	ConsumerUpdate ConsumerUpdate
	// contains filtered or unexported fields
}

func NewSingleActiveConsumer added in v1.4.0

func NewSingleActiveConsumer(ConsumerUpdate ConsumerUpdate) *SingleActiveConsumer

func (*SingleActiveConsumer) SetEnabled added in v1.4.0

func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer

type StreamMetadata

type StreamMetadata struct {
	Leader   *Broker
	Replicas []*Broker
	// contains filtered or unexported fields
}

func (StreamMetadata) New

func (StreamMetadata) New(stream string, responseCode uint16,
	leader *Broker, replicas []*Broker) *StreamMetadata

func (StreamMetadata) String

func (sm StreamMetadata) String() string

type StreamOptions

type StreamOptions struct {
	MaxAge              time.Duration
	MaxLengthBytes      *ByteCapacity
	MaxSegmentSizeBytes *ByteCapacity
}

func NewStreamOptions

func NewStreamOptions() *StreamOptions

func (*StreamOptions) SetMaxAge

func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions

func (*StreamOptions) SetMaxLengthBytes

func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions

func (*StreamOptions) SetMaxSegmentSizeBytes

func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions

type StreamStats added in v1.1.0

type StreamStats struct {
	// contains filtered or unexported fields
}

func (*StreamStats) CommittedChunkId added in v1.1.0

func (s *StreamStats) CommittedChunkId() (int64, error)

CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.

It is the offset of the first message in the last chunk confirmed by a quorum of the stream
cluster members (leader and replicas).

The committed chunk ID is a good indication of what the last offset of a stream can be at a
given time. The value can be stale as soon as the application reads it though, as the committed
chunk ID for a stream that is published to changes all the time.

return committed offset in this stream
Error if there is no committed chunk yet

func (*StreamStats) FirstOffset added in v1.1.0

func (s *StreamStats) FirstOffset() (int64, error)

FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet

func (*StreamStats) LastOffset deprecated added in v1.1.0

func (s *StreamStats) LastOffset() (int64, error)

Deprecated: The method name may be misleading. It does not indicate the last offset of the stream. It indicates the last uncommited chunk id. This information is not necessary. The user should use CommittedChunkId().

type StreamsMetadata

type StreamsMetadata struct {
	// contains filtered or unexported fields
}

func (*StreamsMetadata) Add

func (smd *StreamsMetadata) Add(stream string, responseCode uint16,
	leader *Broker, replicas []*Broker) *StreamMetadata

func (*StreamsMetadata) Get

func (smd *StreamsMetadata) Get(stream string) *StreamMetadata

func (StreamsMetadata) New

type SuperStreamConsumer added in v1.4.0

type SuperStreamConsumer struct {
	SuperStream                string
	SuperStreamConsumerOptions *SuperStreamConsumerOptions
	MessagesHandler            MessagesHandler
	// contains filtered or unexported fields
}

func (*SuperStreamConsumer) Close added in v1.4.0

func (s *SuperStreamConsumer) Close() error

func (*SuperStreamConsumer) ConnectPartition added in v1.4.0

func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSpecification) error

func (*SuperStreamConsumer) NotifyPartitionClose added in v1.4.0

func (s *SuperStreamConsumer) NotifyPartitionClose(size int) chan CPartitionClose

NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel

type SuperStreamConsumerOptions added in v1.4.0

type SuperStreamConsumerOptions struct {
	ClientProvidedName   string
	Offset               OffsetSpecification
	Filter               *ConsumerFilter
	SingleActiveConsumer *SingleActiveConsumer
	ConsumerName         string
	AutoCommitStrategy   *AutoCommitStrategy
	Autocommit           bool
}

func NewSuperStreamConsumerOptions added in v1.4.0

func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions

func (*SuperStreamConsumerOptions) SetAutoCommit added in v1.4.3

func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions

func (*SuperStreamConsumerOptions) SetClientProvidedName added in v1.4.0

func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions

func (*SuperStreamConsumerOptions) SetConsumerName added in v1.4.0

func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions

func (*SuperStreamConsumerOptions) SetFilter added in v1.4.2

func (*SuperStreamConsumerOptions) SetManualCommit added in v1.4.6

func (*SuperStreamConsumerOptions) SetOffset added in v1.4.0

func (*SuperStreamConsumerOptions) SetSingleActiveConsumer added in v1.4.0

func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions

type SuperStreamOptions added in v1.4.0

type SuperStreamOptions interface {
	// contains filtered or unexported methods
}

type SuperStreamProducer added in v1.4.0

type SuperStreamProducer struct {

	// public
	SuperStream                string
	SuperStreamProducerOptions *SuperStreamProducerOptions
	// contains filtered or unexported fields
}

func (*SuperStreamProducer) Close added in v1.4.0

func (s *SuperStreamProducer) Close() error

func (*SuperStreamProducer) ConnectPartition added in v1.4.0

func (s *SuperStreamProducer) ConnectPartition(partition string) error

ConnectPartition connects a partition to the SuperStreamProducer part of PPartitionContext interface The super stream producer is a producer that can send messages to multiple partitions that are hidden to the user. with the ConnectPartition the user can re-connect a partition to the SuperStreamProducer that should be used only in case of disconnection

func (*SuperStreamProducer) GetPartitions added in v1.4.0

func (s *SuperStreamProducer) GetPartitions() []string

func (*SuperStreamProducer) NotifyPartitionClose added in v1.4.0

func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose

NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel

func (*SuperStreamProducer) NotifyPublishConfirmation added in v1.4.0

func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm

NotifyPublishConfirmation returns a channel that will be notified when a message is confirmed or not per partition size is the size of the channel

func (*SuperStreamProducer) Send added in v1.4.0

Send sends a message to the partitions based on the routing strategy

type SuperStreamProducerOptions added in v1.4.0

type SuperStreamProducerOptions struct {
	RoutingStrategy    RoutingStrategy
	ClientProvidedName string
	Filter             *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil
}

func NewSuperStreamProducerOptions added in v1.4.0

func NewSuperStreamProducerOptions(routingStrategy RoutingStrategy) *SuperStreamProducerOptions

NewSuperStreamProducerOptions creates a new SuperStreamProducerOptions The RoutingStrategy is mandatory

func (SuperStreamProducerOptions) SetClientProvidedName added in v1.4.0

func (o SuperStreamProducerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamProducerOptions

func (SuperStreamProducerOptions) SetFilter added in v1.4.2

type TCPParameters

type TCPParameters struct {
	RequestedHeartbeat    time.Duration
	RequestedMaxFrameSize int
	WriteBuffer           int
	ReadBuffer            int
	NoDelay               bool
	// contains filtered or unexported fields
}

type TuneState

type TuneState struct {
	// contains filtered or unexported fields
}

type Version added in v1.3.3

type Version struct {
	Major int
	Minor int
	Patch int
}

func (Version) Compare added in v1.3.3

func (v Version) Compare(other Version) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL