stream

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2023 License: MIT Imports: 26 Imported by: 23

Documentation

Index

Constants

View Source
const (
	None   = byte(0)
	GZIP   = byte(1)
	SNAPPY = byte(2)
	LZ4    = byte(3)
	ZSTD   = byte(4)
)
View Source
const (
	ClientVersion = "1.2.0"

	CommandDeletePublisher = 6

	CommandQueryOffset = 11
	CommandUnsubscribe = 12

	CommandMetadataUpdate = 16

	CommandClose = 22

	//
	LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

	StreamTcpPort = "5552"
)
View Source
const (
	SaslConfigurationPlain    = "PLAIN"
	SaslConfigurationExternal = "EXTERNAL"
)
View Source
const (
	UnitMb = "mb"
	UnitKb = "kb"
	UnitGb = "gb"
	UnitTb = "tb"
)

Variables

View Source
var AlreadyClosed = errors.New("Already Closed")
View Source
var AuthenticationFailure = errors.New("Authentication Failure")
View Source
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
View Source
var CodeAccessRefused = errors.New("Resources Access Refused")
View Source
var ConnectionClosed = errors.New("Can't send the message, connection closed")
View Source
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
View Source
var InternalError = errors.New("Internal Error")
View Source
var LeaderNotReady = errors.New("Leader not Ready yet")
View Source
var OffsetNotFoundError = errors.New("Offset not found")
View Source
var PreconditionFailed = errors.New("Precondition Failed")
View Source
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
View Source
var StreamAlreadyExists = errors.New("Stream Already Exists")
View Source
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
View Source
var StreamNotAvailable = errors.New("Stream Not Available")
View Source
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
View Source
var UnknownFrame = errors.New("Unknown Frame")
View Source
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")

Functions

func SetLevelInfo

func SetLevelInfo(value int8)

Types

type AddressResolver

type AddressResolver struct {
	Host string
	Port int
}

type AutoCommitStrategy

type AutoCommitStrategy struct {
	// contains filtered or unexported fields
}

func NewAutoCommitStrategy

func NewAutoCommitStrategy() *AutoCommitStrategy

func (*AutoCommitStrategy) SetCountBeforeStorage

func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy

func (*AutoCommitStrategy) SetFlushInterval

func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy

type Broker

type Broker struct {
	Host     string
	Port     string
	User     string
	Vhost    string
	Uri      string
	Password string
	Scheme   string
	// contains filtered or unexported fields
}

func (*Broker) GetUri

func (br *Broker) GetUri() string

type Brokers

type Brokers struct {
	// contains filtered or unexported fields
}

func (*Brokers) Add

func (brs *Brokers) Add(brokerReference int16, host string, port uint32) *Broker

func (*Brokers) Get

func (brs *Brokers) Get(brokerReference int16) *Broker

type ByteCapacity

type ByteCapacity struct {
	// contains filtered or unexported fields
}

func (ByteCapacity) B

func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity

func (ByteCapacity) From

func (byteCapacity ByteCapacity) From(value string) *ByteCapacity

func (ByteCapacity) GB

func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity

func (ByteCapacity) KB

func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity

func (ByteCapacity) MB

func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity

func (ByteCapacity) TB

func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity

type ChannelClose

type ChannelClose = <-chan Event

type ChannelPublishConfirm

type ChannelPublishConfirm chan []*ConfirmationStatus

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) BrokerForConsumer

func (c *Client) BrokerForConsumer(stream string) (*Broker, error)

func (*Client) BrokerLeader

func (c *Client) BrokerLeader(stream string) (*Broker, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) DeclarePublisher

func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)

func (*Client) DeclareStream

func (c *Client) DeclareStream(streamName string, options *StreamOptions) error

func (*Client) DeclareSubscriber

func (c *Client) DeclareSubscriber(streamName string,
	messagesHandler MessagesHandler,
	options *ConsumerOptions) (*Consumer, error)

func (*Client) DeleteStream

func (c *Client) DeleteStream(streamName string) error

func (*Client) ReusePublisher

func (c *Client) ReusePublisher(streamName string, existingProducer *Producer) (*Producer, error)

func (*Client) StreamExists

func (c *Client) StreamExists(stream string) bool

func (*Client) StreamStats added in v1.1.0

func (c *Client) StreamStats(streamName string) (*StreamStats, error)

type ClientProperties

type ClientProperties struct {
	// contains filtered or unexported fields
}

type Code

type Code struct {
	// contains filtered or unexported fields
}

type Compression

type Compression struct {
	// contains filtered or unexported fields
}

func (Compression) Gzip

func (compression Compression) Gzip() Compression

func (Compression) Lz4

func (compression Compression) Lz4() Compression

func (Compression) None

func (compression Compression) None() Compression

func (Compression) Snappy

func (compression Compression) Snappy() Compression

func (Compression) String

func (compression Compression) String() string

func (Compression) Zstd

func (compression Compression) Zstd() Compression

type ConfirmationStatus

type ConfirmationStatus struct {
	// contains filtered or unexported fields
}

func (*ConfirmationStatus) GetError

func (cs *ConfirmationStatus) GetError() error

func (*ConfirmationStatus) GetErrorCode

func (cs *ConfirmationStatus) GetErrorCode() uint16

func (*ConfirmationStatus) GetMessage

func (cs *ConfirmationStatus) GetMessage() message.StreamMessage

func (*ConfirmationStatus) GetProducerID

func (cs *ConfirmationStatus) GetProducerID() uint8

func (*ConfirmationStatus) GetPublishingId

func (cs *ConfirmationStatus) GetPublishingId() int64

func (*ConfirmationStatus) IsConfirmed

func (cs *ConfirmationStatus) IsConfirmed() bool

func (*ConfirmationStatus) LinkedMessages

func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus

type ConnectionProperties

type ConnectionProperties struct {
	// contains filtered or unexported fields
}

type Consumer

type Consumer struct {
	ID uint8

	MessagesHandler MessagesHandler
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (consumer *Consumer) Close() error

func (*Consumer) GetCloseHandler added in v1.1.2

func (consumer *Consumer) GetCloseHandler() chan Event

func (*Consumer) GetLastStoredOffset

func (consumer *Consumer) GetLastStoredOffset() int64

func (*Consumer) GetName

func (consumer *Consumer) GetName() string

func (*Consumer) GetOffset

func (consumer *Consumer) GetOffset() int64

func (*Consumer) GetStreamName

func (consumer *Consumer) GetStreamName() string

func (*Consumer) NotifyClose

func (consumer *Consumer) NotifyClose() ChannelClose

func (*Consumer) QueryOffset

func (consumer *Consumer) QueryOffset() (int64, error)

func (*Consumer) StoreCustomOffset

func (consumer *Consumer) StoreCustomOffset(offset int64) error

func (*Consumer) StoreOffset

func (consumer *Consumer) StoreOffset() error

type ConsumerContext

type ConsumerContext struct {
	Consumer *Consumer
}

type ConsumerOptions

type ConsumerOptions struct {
	ConsumerName string

	Offset   OffsetSpecification
	CRCCheck bool
	// contains filtered or unexported fields
}

func NewConsumerOptions

func NewConsumerOptions() *ConsumerOptions

func (*ConsumerOptions) SetAutoCommit

func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions

func (*ConsumerOptions) SetCRCCheck

func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions

func (*ConsumerOptions) SetConsumerName

func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions

func (*ConsumerOptions) SetInitialCredits added in v1.0.2

func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions

func (*ConsumerOptions) SetManualCommit

func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions

func (*ConsumerOptions) SetOffset

func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator() *Coordinator

func (*Coordinator) ConsumersCount

func (coordinator *Coordinator) ConsumersCount() int

func (*Coordinator) GetConsumerById

func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)

func (*Coordinator) GetProducerById

func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)

func (*Coordinator) GetResponseById

func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)

func (*Coordinator) GetResponseByName

func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)

func (*Coordinator) NewConsumer

func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
	parameters *ConsumerOptions) *Consumer

Consumer functions

func (*Coordinator) NewProducer

func (coordinator *Coordinator) NewProducer(
	parameters *ProducerOptions) (*Producer, error)

producersEnvironment

func (*Coordinator) NewResponse

func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response

func (*Coordinator) NewResponseWitName

func (coordinator *Coordinator) NewResponseWitName(id string) *Response

func (*Coordinator) Producers

func (coordinator *Coordinator) Producers() map[interface{}]interface{}

func (*Coordinator) ProducersCount

func (coordinator *Coordinator) ProducersCount() int

func (*Coordinator) RemoveConsumerById

func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error

func (*Coordinator) RemoveProducerById

func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error

func (*Coordinator) RemoveResponseById

func (coordinator *Coordinator) RemoveResponseById(id interface{}) error

func (*Coordinator) RemoveResponseByName

func (coordinator *Coordinator) RemoveResponseByName(id string) error

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

func NewEnvironment

func NewEnvironment(options *EnvironmentOptions) (*Environment, error)

func (*Environment) Close

func (env *Environment) Close() error

func (*Environment) DeclareStream

func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error

func (*Environment) DeleteStream

func (env *Environment) DeleteStream(streamName string) error

func (*Environment) IsClosed

func (env *Environment) IsClosed() bool

func (*Environment) NewConsumer

func (env *Environment) NewConsumer(streamName string,
	messagesHandler MessagesHandler,
	options *ConsumerOptions) (*Consumer, error)

func (*Environment) NewProducer

func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)

func (*Environment) QueryOffset

func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)

func (*Environment) QuerySequence

func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)

QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id

func (*Environment) StreamExists

func (env *Environment) StreamExists(streamName string) (bool, error)

func (*Environment) StreamMetaData

func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)

func (*Environment) StreamStats added in v1.1.0

func (env *Environment) StreamStats(streamName string) (*StreamStats, error)

type EnvironmentOptions

type EnvironmentOptions struct {
	ConnectionParameters  []*Broker
	TCPParameters         *TCPParameters
	SaslConfiguration     *SaslConfiguration
	MaxProducersPerClient int
	MaxConsumersPerClient int
	AddressResolver       *AddressResolver
}

func NewEnvironmentOptions

func NewEnvironmentOptions() *EnvironmentOptions

func (*EnvironmentOptions) IsTLS

func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions

func (*EnvironmentOptions) SetAddressResolver

func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions

func (*EnvironmentOptions) SetHost

func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions

func (*EnvironmentOptions) SetMaxConsumersPerClient

func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions

func (*EnvironmentOptions) SetMaxProducersPerClient

func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions

func (*EnvironmentOptions) SetNoDelay

func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions

func (*EnvironmentOptions) SetPassword

func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions

func (*EnvironmentOptions) SetPort

func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions

func (*EnvironmentOptions) SetReadBuffer

func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions

func (*EnvironmentOptions) SetRequestedHeartbeat

func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions

func (*EnvironmentOptions) SetRequestedMaxFrameSize

func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions

func (*EnvironmentOptions) SetSaslConfiguration added in v1.2.0

func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions

func (*EnvironmentOptions) SetTLSConfig

func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions

func (*EnvironmentOptions) SetUri

func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions

func (*EnvironmentOptions) SetUris

func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions

func (*EnvironmentOptions) SetUser

func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions

func (*EnvironmentOptions) SetVHost

func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions

func (*EnvironmentOptions) SetWriteBuffer

func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions

type Event

type Event struct {
	Command    uint16
	StreamName string
	Name       string
	Reason     string
	Err        error
}

type HeartBeat

type HeartBeat struct {
	// contains filtered or unexported fields
}

type MessagesHandler

type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)

type OffsetSpecification

type OffsetSpecification struct {
	// contains filtered or unexported fields
}

func (OffsetSpecification) First

func (OffsetSpecification) Last

func (OffsetSpecification) LastConsumed

func (o OffsetSpecification) LastConsumed() OffsetSpecification

func (OffsetSpecification) Next

func (OffsetSpecification) Offset

func (OffsetSpecification) String

func (o OffsetSpecification) String() string

func (OffsetSpecification) Timestamp

func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) BatchSend

func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error

func (*Producer) Close

func (producer *Producer) Close() error

func (*Producer) FlushUnConfirmedMessages

func (producer *Producer) FlushUnConfirmedMessages()

func (*Producer) GetBroker

func (producer *Producer) GetBroker() *Broker

func (*Producer) GetID

func (producer *Producer) GetID() uint8

func (*Producer) GetLastPublishingId

func (producer *Producer) GetLastPublishingId() (int64, error)

func (*Producer) GetName

func (producer *Producer) GetName() string

func (*Producer) GetOptions

func (producer *Producer) GetOptions() *ProducerOptions

func (*Producer) GetStreamName

func (producer *Producer) GetStreamName() string

func (*Producer) GetUnConfirmed

func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus

func (*Producer) NotifyClose

func (producer *Producer) NotifyClose() ChannelClose

func (*Producer) NotifyPublishConfirmation

func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm

func (*Producer) Send

func (producer *Producer) Send(streamMessage message.StreamMessage) error

type ProducerOptions

type ProducerOptions struct {
	Name                 string      // Producer name, it is useful to handle deduplication messages
	QueueSize            int         // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
	BatchSize            int         // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput
	BatchPublishingDelay int         // Period to send a batch of messages.
	SubEntrySize         int         // Size of sub Entry, to aggregate more subEntry using one publishing id
	Compression          Compression // Compression type, it is valid only if SubEntrySize > 1
	// contains filtered or unexported fields
}

func NewProducerOptions

func NewProducerOptions() *ProducerOptions

func (*ProducerOptions) SetBatchPublishingDelay

func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions

func (*ProducerOptions) SetBatchSize

func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions

func (*ProducerOptions) SetCompression

func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions

func (*ProducerOptions) SetProducerName

func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions

func (*ProducerOptions) SetQueueSize

func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions

func (*ProducerOptions) SetSubEntrySize

func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions

type ReaderProtocol

type ReaderProtocol struct {
	FrameLen          uint32
	CommandID         uint16
	Key               uint16
	Version           uint16
	CorrelationId     uint32
	ResponseCode      uint16
	PublishID         uint8
	PublishingIdCount uint64
}

type Response

type Response struct {
	// contains filtered or unexported fields
}

type SaslConfiguration added in v1.2.0

type SaslConfiguration struct {
	Mechanism string
}

type StreamMetadata

type StreamMetadata struct {
	Leader   *Broker
	Replicas []*Broker
	// contains filtered or unexported fields
}

func (StreamMetadata) New

func (StreamMetadata) New(stream string, responseCode uint16,
	leader *Broker, replicas []*Broker) *StreamMetadata

func (StreamMetadata) String

func (sm StreamMetadata) String() string

type StreamOptions

type StreamOptions struct {
	MaxAge              time.Duration
	MaxLengthBytes      *ByteCapacity
	MaxSegmentSizeBytes *ByteCapacity
}

func NewStreamOptions

func NewStreamOptions() *StreamOptions

func (*StreamOptions) SetMaxAge

func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions

func (*StreamOptions) SetMaxLengthBytes

func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions

func (*StreamOptions) SetMaxSegmentSizeBytes

func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions

type StreamStats added in v1.1.0

type StreamStats struct {
	// contains filtered or unexported fields
}

func (*StreamStats) CommittedChunkId added in v1.1.0

func (s *StreamStats) CommittedChunkId() (int64, error)

CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.

It is the offset of the first message in the last chunk confirmed by a quorum of the stream
cluster members (leader and replicas).

The committed chunk ID is a good indication of what the last offset of a stream can be at a
given time. The value can be stale as soon as the application reads it though, as the committed
chunk ID for a stream that is published to changes all the time.

return committed offset in this stream
Error if there is no committed chunk yet

func (*StreamStats) FirstOffset added in v1.1.0

func (s *StreamStats) FirstOffset() (int64, error)

FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet

func (*StreamStats) LastOffset added in v1.1.0

func (s *StreamStats) LastOffset() (int64, error)

LastOffset - The last offset in the stream. return last offset in the stream error if there is no first offset yet

type StreamsMetadata

type StreamsMetadata struct {
	// contains filtered or unexported fields
}

func (*StreamsMetadata) Add

func (smd *StreamsMetadata) Add(stream string, responseCode uint16,
	leader *Broker, replicas []*Broker) *StreamMetadata

func (*StreamsMetadata) Get

func (smd *StreamsMetadata) Get(stream string) *StreamMetadata

func (StreamsMetadata) New

type TCPParameters

type TCPParameters struct {
	RequestedHeartbeat    time.Duration
	RequestedMaxFrameSize int
	WriteBuffer           int
	ReadBuffer            int
	NoDelay               bool
	// contains filtered or unexported fields
}

type TuneState

type TuneState struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL