Documentation ¶
Index ¶
- Constants
- Variables
- func BatchIsEmpty(b *PublicBatch) bool
- func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest
- func MessageGetBufferBytesAccount(m *PublicMessage) int
- func MessageSetNilDataForTest(m *PublicMessage)
- func NextReaderID() int64
- type CallbackWithMessageContentFunc
- type CommitRange
- type CommitRanges
- func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)
- func (r *CommitRanges) AppendCommitRange(cr CommitRange)
- func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)
- func (r *CommitRanges) AppendMessages(messages ...PublicMessage)
- func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo
- func (r *CommitRanges) Len() int
- func (r *CommitRanges) Optimize()
- func (r *CommitRanges) Reset()
- func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset
- type DecoderMap
- type PartitionSession
- func (s *PartitionSession) Close()
- func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset
- func (s *PartitionSession) Context() context.Context
- func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset
- func (s *PartitionSession) SetCommittedOffset(v rawtopiccommon.Offset)
- func (s *PartitionSession) SetContext(ctx context.Context)
- func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
- func (s *PartitionSession) ToPublic() PublicPartitionSession
- type PartitionSessionStorage
- func (c *PartitionSessionStorage) Add(session *PartitionSession) error
- func (c *PartitionSessionStorage) Get(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
- func (c *PartitionSessionStorage) GetAll() []*PartitionSession
- func (c *PartitionSessionStorage) Remove(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
- type Pool
- type PublicBatch
- func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)
- func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)
- func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch
- func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)
- func NewBatchFromStream(decoders DecoderMap, session *PartitionSession, sb rawtopicreader.Batch) (*PublicBatch, error)
- func ReadRawBatchesToPublicBatches(msg *rawtopicreader.ReadResponse, sessions *PartitionSessionStorage, ...) ([]*PublicBatch, error)
- type PublicCommitRange
- type PublicCommitRangeGetter
- type PublicCreateDecoderFunc
- type PublicMessage
- type PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Build() *PublicMessage
- func (pmb *PublicMessageBuilder) CommitRange(cr CommitRange) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Context(ctx context.Context) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Topic(topic string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
- type PublicMessageContentUnmarshaler
- type PublicPartitionSession
- type PublicReadSelector
Constants ¶
const DefaultBufferSize = 1024 * 1024
Variables ¶
var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec"))
ErrPublicUnexpectedCodec return when try to read message content with unknown codec
Functions ¶
func BatchIsEmpty ¶
func BatchIsEmpty(b *PublicBatch) bool
func CreateInitMessage ¶
func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest
func MessageGetBufferBytesAccount ¶
func MessageGetBufferBytesAccount(m *PublicMessage) int
func MessageSetNilDataForTest ¶
func MessageSetNilDataForTest(m *PublicMessage)
func NextReaderID ¶
func NextReaderID() int64
Types ¶
type CallbackWithMessageContentFunc ¶
CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice
type CommitRange ¶
type CommitRange struct { CommitOffsetStart rawtopiccommon.Offset CommitOffsetEnd rawtopiccommon.Offset PartitionSession *PartitionSession }
func GetCommitRange ¶
func GetCommitRange(item PublicCommitRangeGetter) CommitRange
type CommitRanges ¶
type CommitRanges struct {
Ranges []CommitRange
}
func NewCommitRangesFromPublicCommits ¶
func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges
func NewCommitRangesWithCapacity ¶
func NewCommitRangesWithCapacity(capacity int) CommitRanges
func (*CommitRanges) Append ¶
func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)
func (*CommitRanges) AppendCommitRange ¶
func (r *CommitRanges) AppendCommitRange(cr CommitRange)
func (*CommitRanges) AppendCommitRanges ¶
func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)
func (*CommitRanges) AppendMessages ¶
func (r *CommitRanges) AppendMessages(messages ...PublicMessage)
func (*CommitRanges) GetCommitsInfo ¶
func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo
GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
func (*CommitRanges) Len ¶
func (r *CommitRanges) Len() int
func (*CommitRanges) Optimize ¶
func (r *CommitRanges) Optimize()
func (*CommitRanges) Reset ¶
func (r *CommitRanges) Reset()
func (*CommitRanges) ToPartitionsOffsets ¶
func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset
type DecoderMap ¶
type DecoderMap struct {
// contains filtered or unexported fields
}
func NewDecoderMap ¶
func NewDecoderMap() DecoderMap
func (*DecoderMap) AddDecoder ¶
func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc)
func (*DecoderMap) Decode ¶
func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error)
type PartitionSession ¶
type PartitionSession struct { Topic string PartitionID int64 ReaderID int64 StreamPartitionSessionID rawtopicreader.PartitionSessionID ClientPartitionSessionID int64 // contains filtered or unexported fields }
func BatchGetPartitionSession ¶
func BatchGetPartitionSession(item *PublicBatch) *PartitionSession
func NewPartitionSession ¶
func NewPartitionSession( partitionContext context.Context, topic string, partitionID int64, readerID int64, connectionID string, partitionSessionID rawtopicreader.PartitionSessionID, clientPartitionSessionID int64, committedOffset rawtopiccommon.Offset, ) *PartitionSession
func (*PartitionSession) Close ¶
func (s *PartitionSession) Close()
func (*PartitionSession) CommittedOffset ¶
func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset
func (*PartitionSession) Context ¶
func (s *PartitionSession) Context() context.Context
func (*PartitionSession) LastReceivedMessageOffset ¶
func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset
func (*PartitionSession) SetCommittedOffset ¶
func (s *PartitionSession) SetCommittedOffset(v rawtopiccommon.Offset)
func (*PartitionSession) SetContext ¶
func (s *PartitionSession) SetContext(ctx context.Context)
func (*PartitionSession) SetLastReceivedMessageOffset ¶
func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
func (*PartitionSession) ToPublic ¶
func (s *PartitionSession) ToPublic() PublicPartitionSession
type PartitionSessionStorage ¶
type PartitionSessionStorage struct {
// contains filtered or unexported fields
}
func (*PartitionSessionStorage) Add ¶
func (c *PartitionSessionStorage) Add(session *PartitionSession) error
func (*PartitionSessionStorage) Get ¶
func (c *PartitionSessionStorage) Get(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
func (*PartitionSessionStorage) GetAll ¶
func (c *PartitionSessionStorage) GetAll() []*PartitionSession
func (*PartitionSessionStorage) Remove ¶
func (c *PartitionSessionStorage) Remove(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
type Pool ¶
type Pool interface { Get() interface{} Put(x interface{}) }
Pool is interface for sync.Pool and may be extended by follow to original type
type PublicBatch ¶
type PublicBatch struct { empty.DoNotCopy Messages []*PublicMessage // contains filtered or unexported fields }
PublicBatch is ordered group of message from one partition
func BatchAppend ¶
func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)
func BatchCutMessages ¶
func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)
func BatchSetCommitRangeForTest ¶
func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch
func NewBatch ¶
func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)
func NewBatchFromStream ¶
func NewBatchFromStream( decoders DecoderMap, session *PartitionSession, sb rawtopicreader.Batch, ) (*PublicBatch, error)
func ReadRawBatchesToPublicBatches ¶
func ReadRawBatchesToPublicBatches( msg *rawtopicreader.ReadResponse, sessions *PartitionSessionStorage, decoders DecoderMap, ) ([]*PublicBatch, error)
func (*PublicBatch) Context ¶
func (m *PublicBatch) Context() context.Context
Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag
func (*PublicBatch) PartitionID ¶
func (m *PublicBatch) PartitionID() int64
PartitionID of messages in the batch
func (*PublicBatch) Topic ¶
func (m *PublicBatch) Topic() string
Topic is path of source topic of the messages in the batch
type PublicCommitRange ¶
type PublicCommitRange struct {
// contains filtered or unexported fields
}
PublicCommitRange contains data for commit messages range
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type PublicCommitRangeGetter ¶
type PublicCommitRangeGetter interface {
// contains filtered or unexported methods
}
PublicCommitRangeGetter return data piece for commit messages range
type PublicCreateDecoderFunc ¶
type PublicMessage ¶
type PublicMessage struct { empty.DoNotCopy SeqNo int64 CreatedAt time.Time MessageGroupID string WriteSessionMetadata map[string]string Offset int64 WrittenAt time.Time ProducerID string Metadata map[string][]byte // Metadata, nil if no metadata UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong. // contains filtered or unexported fields }
PublicMessage is representation of topic message
func MessageWithSetCommitRangeForTest ¶
func MessageWithSetCommitRangeForTest(m *PublicMessage, commitRange CommitRange) *PublicMessage
func (*PublicMessage) Context ¶
func (m *PublicMessage) Context() context.Context
func (*PublicMessage) PartitionID ¶
func (m *PublicMessage) PartitionID() int64
func (*PublicMessage) Read ¶
func (m *PublicMessage) Read(p []byte) (n int, err error)
Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec
func (*PublicMessage) Topic ¶
func (m *PublicMessage) Topic() string
func (*PublicMessage) UnmarshalTo ¶
func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error
UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content
type PublicMessageBuilder ¶
type PublicMessageBuilder struct {
// contains filtered or unexported fields
}
func NewPublicMessageBuilder ¶
func NewPublicMessageBuilder() *PublicMessageBuilder
func (*PublicMessageBuilder) Build ¶
func (pmb *PublicMessageBuilder) Build() *PublicMessage
Build return builded message and reset internal state for create new message
func (*PublicMessageBuilder) CommitRange ¶
func (pmb *PublicMessageBuilder) CommitRange(cr CommitRange) *PublicMessageBuilder
func (*PublicMessageBuilder) Context ¶
func (pmb *PublicMessageBuilder) Context(ctx context.Context) *PublicMessageBuilder
Context set message Context
func (*PublicMessageBuilder) CreatedAt ¶
func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
CreatedAt set message CreatedAt
func (*PublicMessageBuilder) DataAndUncompressedSize ¶
func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
DataAndUncompressedSize set message uncompressed content and field UncompressedSize
func (*PublicMessageBuilder) MessageGroupID ¶
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
MessageGroupID set message MessageGroupID
func (*PublicMessageBuilder) Metadata ¶
func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
func (*PublicMessageBuilder) Offset ¶
func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
Offset set message Offset
func (*PublicMessageBuilder) PartitionID ¶
func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder
PartitionID set message PartitionID
func (*PublicMessageBuilder) ProducerID ¶
func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
ProducerID set message ProducerID
func (*PublicMessageBuilder) RawDataLen ¶
func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder
func (*PublicMessageBuilder) Seqno ¶
func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
Seqno set message Seqno
func (*PublicMessageBuilder) Topic ¶
func (pmb *PublicMessageBuilder) Topic(topic string) *PublicMessageBuilder
Topic set message Topic
func (*PublicMessageBuilder) UncompressedSize ¶
func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
UncompressedSize set message UncompressedSize
func (*PublicMessageBuilder) WriteSessionMetadata ¶
func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
WriteSessionMetadata set message WriteSessionMetadata
func (*PublicMessageBuilder) WrittenAt ¶
func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
WrittenAt set message WrittenAt
type PublicMessageContentUnmarshaler ¶
type PublicMessageContentUnmarshaler interface { // UnmarshalYDBTopicMessage MUST NOT use data after return. // If you need content after return from Consume - copy data content to // own slice with copy(dst, data) UnmarshalYDBTopicMessage(data []byte) error }
PublicMessageContentUnmarshaler is interface for unmarshal message content
type PublicPartitionSession ¶
type PublicPartitionSession struct { // PartitionSessionID is unique session ID per listener object PartitionSessionID int64 // TopicPath contains path for the topic TopicPath string // PartitionID contains partition id. It can be repeated for one reader if the partition will stop/start few times. PartitionID int64 }
PublicPartitionSession contains information about partition session for the event
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type PublicReadSelector ¶
type PublicReadSelector struct { Path string Partitions []int64 ReadFrom time.Time // zero value mean skip read from filter MaxTimeLag time.Duration // 0 mean skip time lag filter }
func (PublicReadSelector) Clone ¶
func (s PublicReadSelector) Clone() *PublicReadSelector
Clone create deep clone of the selector