Documentation ¶
Index ¶
- Constants
- Variables
- func BatchIsEmpty(b *PublicBatch) bool
- func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest
- func MessageGetBufferBytesAccount(m *PublicMessage) int
- func MessageSetNilDataForTest(m *PublicMessage)
- func NextReaderID() int64
- type CallbackWithMessageContentFunc
- type CommitRange
- type CommitRanges
- func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)
- func (r *CommitRanges) AppendCommitRange(cr CommitRange)
- func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)
- func (r *CommitRanges) AppendMessages(messages ...PublicMessage)
- func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo
- func (r *CommitRanges) Len() int
- func (r *CommitRanges) Optimize()
- func (r *CommitRanges) Reset()
- func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset
- func (r *CommitRanges) ToRawMessage() *rawtopicreader.CommitOffsetRequest
- type Committer
- type DecoderMap
- type PartitionSession
- func (s *PartitionSession) Close()
- func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset
- func (s *PartitionSession) Context() context.Context
- func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset
- func (s *PartitionSession) SetCommittedOffsetForward(v rawtopiccommon.Offset)
- func (s *PartitionSession) SetContext(ctx context.Context)
- func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
- func (s *PartitionSession) ToPublic() PublicPartitionSession
- type PartitionSessionStorage
- func (c *PartitionSessionStorage) Add(session *PartitionSession) error
- func (c *PartitionSessionStorage) Get(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
- func (c *PartitionSessionStorage) GetAll() []*PartitionSession
- func (c *PartitionSessionStorage) Remove(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
- type Pool
- type PublicBatch
- func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)
- func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)
- func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch
- func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)
- func NewBatchFromStream(decoders DecoderMap, session *PartitionSession, sb rawtopicreader.Batch) (*PublicBatch, error)
- func ReadRawBatchesToPublicBatches(msg *rawtopicreader.ReadResponse, sessions *PartitionSessionStorage, ...) ([]*PublicBatch, error)
- type PublicCommitMode
- type PublicCommitRange
- type PublicCommitRangeGetter
- type PublicCreateDecoderFunc
- type PublicMessage
- type PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Build() *PublicMessage
- func (pmb *PublicMessageBuilder) CommitRange(cr CommitRange) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Context(ctx context.Context) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) PartitionSession(session *PartitionSession) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Topic(topic string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
- type PublicMessageContentUnmarshaler
- type PublicPartitionSession
- type PublicReadSelector
- type RawTopicReaderStream
- type SendMessageToServerFunc
- type SyncedStream
Constants ¶
const DefaultBufferSize = 1024 * 1024
Variables ¶
var ( ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled")) ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode")) )
var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec"))
ErrPublicUnexpectedCodec return when try to read message content with unknown codec
var PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
Functions ¶
func BatchIsEmpty ¶
func BatchIsEmpty(b *PublicBatch) bool
func CreateInitMessage ¶
func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest
func MessageGetBufferBytesAccount ¶
func MessageGetBufferBytesAccount(m *PublicMessage) int
func MessageSetNilDataForTest ¶
func MessageSetNilDataForTest(m *PublicMessage)
func NextReaderID ¶
func NextReaderID() int64
Types ¶
type CallbackWithMessageContentFunc ¶
CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice
type CommitRange ¶
type CommitRange struct { CommitOffsetStart rawtopiccommon.Offset CommitOffsetEnd rawtopiccommon.Offset PartitionSession *PartitionSession }
func GetCommitRange ¶
func GetCommitRange(item PublicCommitRangeGetter) CommitRange
type CommitRanges ¶
type CommitRanges struct {
Ranges []CommitRange
}
func NewCommitRangesFromPublicCommits ¶
func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges
func NewCommitRangesWithCapacity ¶
func NewCommitRangesWithCapacity(capacity int) CommitRanges
func (*CommitRanges) Append ¶
func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)
func (*CommitRanges) AppendCommitRange ¶
func (r *CommitRanges) AppendCommitRange(cr CommitRange)
func (*CommitRanges) AppendCommitRanges ¶
func (r *CommitRanges) AppendCommitRanges(ranges []CommitRange)
func (*CommitRanges) AppendMessages ¶
func (r *CommitRanges) AppendMessages(messages ...PublicMessage)
func (*CommitRanges) GetCommitsInfo ¶
func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo
GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
func (*CommitRanges) Len ¶
func (r *CommitRanges) Len() int
func (*CommitRanges) Optimize ¶
func (r *CommitRanges) Optimize()
func (*CommitRanges) Reset ¶
func (r *CommitRanges) Reset()
func (*CommitRanges) ToPartitionsOffsets ¶
func (r *CommitRanges) ToPartitionsOffsets() []rawtopicreader.PartitionCommitOffset
func (*CommitRanges) ToRawMessage ¶
func (r *CommitRanges) ToRawMessage() *rawtopicreader.CommitOffsetRequest
type Committer ¶
type Committer struct { BufferTimeLagTrigger time.Duration // 0 mean no additional time lag BufferCountTrigger int // contains filtered or unexported fields }
func NewCommitterStopped ¶
func NewCommitterStopped( tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send SendMessageToServerFunc, ) *Committer
func (*Committer) Commit ¶
func (c *Committer) Commit(ctx context.Context, commitRange CommitRange) error
func (*Committer) OnCommitNotify ¶
func (c *Committer) OnCommitNotify(session *PartitionSession, offset rawtopiccommon.Offset)
type DecoderMap ¶
type DecoderMap struct {
// contains filtered or unexported fields
}
func NewDecoderMap ¶
func NewDecoderMap() DecoderMap
func (*DecoderMap) AddDecoder ¶
func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc)
func (*DecoderMap) Decode ¶
func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error)
type PartitionSession ¶
type PartitionSession struct { Topic string PartitionID int64 ReaderID int64 StreamPartitionSessionID rawtopicreader.PartitionSessionID ClientPartitionSessionID int64 // contains filtered or unexported fields }
func BatchGetPartitionSession ¶
func BatchGetPartitionSession(item *PublicBatch) *PartitionSession
func NewPartitionSession ¶
func NewPartitionSession( partitionContext context.Context, topic string, partitionID int64, readerID int64, connectionID string, partitionSessionID rawtopicreader.PartitionSessionID, clientPartitionSessionID int64, committedOffset rawtopiccommon.Offset, ) *PartitionSession
func (*PartitionSession) Close ¶
func (s *PartitionSession) Close()
func (*PartitionSession) CommittedOffset ¶
func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset
func (*PartitionSession) Context ¶
func (s *PartitionSession) Context() context.Context
func (*PartitionSession) LastReceivedMessageOffset ¶
func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset
func (*PartitionSession) SetCommittedOffsetForward ¶
func (s *PartitionSession) SetCommittedOffsetForward(v rawtopiccommon.Offset)
SetCommittedOffsetForward set new offset if new offset greater, then old
func (*PartitionSession) SetContext ¶
func (s *PartitionSession) SetContext(ctx context.Context)
func (*PartitionSession) SetLastReceivedMessageOffset ¶
func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
func (*PartitionSession) ToPublic ¶
func (s *PartitionSession) ToPublic() PublicPartitionSession
type PartitionSessionStorage ¶
type PartitionSessionStorage struct {
// contains filtered or unexported fields
}
func (*PartitionSessionStorage) Add ¶
func (c *PartitionSessionStorage) Add(session *PartitionSession) error
func (*PartitionSessionStorage) Get ¶
func (c *PartitionSessionStorage) Get(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
func (*PartitionSessionStorage) GetAll ¶
func (c *PartitionSessionStorage) GetAll() []*PartitionSession
func (*PartitionSessionStorage) Remove ¶
func (c *PartitionSessionStorage) Remove(id rawtopicreader.PartitionSessionID) (*PartitionSession, error)
type Pool ¶
type Pool interface { Get() interface{} Put(x interface{}) }
Pool is interface for sync.Pool and may be extended by follow to original type
type PublicBatch ¶
type PublicBatch struct { empty.DoNotCopy Messages []*PublicMessage // contains filtered or unexported fields }
PublicBatch is ordered group of message from one partition
func BatchAppend ¶
func BatchAppend(original, appended *PublicBatch) (*PublicBatch, error)
func BatchCutMessages ¶
func BatchCutMessages(b *PublicBatch, count int) (head, rest *PublicBatch)
func BatchSetCommitRangeForTest ¶
func BatchSetCommitRangeForTest(b *PublicBatch, commitRange CommitRange) *PublicBatch
func NewBatch ¶
func NewBatch(session *PartitionSession, messages []*PublicMessage) (*PublicBatch, error)
func NewBatchFromStream ¶
func NewBatchFromStream( decoders DecoderMap, session *PartitionSession, sb rawtopicreader.Batch, ) (*PublicBatch, error)
func ReadRawBatchesToPublicBatches ¶
func ReadRawBatchesToPublicBatches( msg *rawtopicreader.ReadResponse, sessions *PartitionSessionStorage, decoders DecoderMap, ) ([]*PublicBatch, error)
func (*PublicBatch) Context ¶
func (m *PublicBatch) Context() context.Context
Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag
func (*PublicBatch) PartitionID ¶
func (m *PublicBatch) PartitionID() int64
PartitionID of messages in the batch
func (*PublicBatch) Topic ¶
func (m *PublicBatch) Topic() string
Topic is path of source topic of the messages in the batch
type PublicCommitMode ¶
type PublicCommitMode int
const ( CommitModeAsync PublicCommitMode = iota // default CommitModeNone CommitModeSync )
func (PublicCommitMode) CommitsEnabled ¶
func (m PublicCommitMode) CommitsEnabled() bool
type PublicCommitRange ¶
type PublicCommitRange struct {
// contains filtered or unexported fields
}
PublicCommitRange contains data for commit messages range
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type PublicCommitRangeGetter ¶
type PublicCommitRangeGetter interface {
// contains filtered or unexported methods
}
PublicCommitRangeGetter return data piece for commit messages range
type PublicCreateDecoderFunc ¶
type PublicMessage ¶
type PublicMessage struct { empty.DoNotCopy SeqNo int64 CreatedAt time.Time MessageGroupID string WriteSessionMetadata map[string]string Offset int64 WrittenAt time.Time ProducerID string Metadata map[string][]byte // Metadata, nil if no metadata UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong. // contains filtered or unexported fields }
PublicMessage is representation of topic message
func MessageWithSetCommitRangeForTest ¶
func MessageWithSetCommitRangeForTest(m *PublicMessage, commitRange CommitRange) *PublicMessage
func (*PublicMessage) Context ¶
func (m *PublicMessage) Context() context.Context
func (*PublicMessage) PartitionID ¶
func (m *PublicMessage) PartitionID() int64
func (*PublicMessage) Read ¶
func (m *PublicMessage) Read(p []byte) (n int, err error)
Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec
Content of the message released from the memory after first read error including io.EOF.
func (*PublicMessage) Topic ¶
func (m *PublicMessage) Topic() string
func (*PublicMessage) UnmarshalTo ¶
func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error
UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content
type PublicMessageBuilder ¶
type PublicMessageBuilder struct {
// contains filtered or unexported fields
}
func NewPublicMessageBuilder ¶
func NewPublicMessageBuilder() *PublicMessageBuilder
func (*PublicMessageBuilder) Build ¶
func (pmb *PublicMessageBuilder) Build() *PublicMessage
Build return builded message and reset internal state for create new message
func (*PublicMessageBuilder) CommitRange ¶
func (pmb *PublicMessageBuilder) CommitRange(cr CommitRange) *PublicMessageBuilder
func (*PublicMessageBuilder) Context ¶
func (pmb *PublicMessageBuilder) Context(ctx context.Context) *PublicMessageBuilder
Context set message Context
func (*PublicMessageBuilder) CreatedAt ¶
func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
CreatedAt set message CreatedAt
func (*PublicMessageBuilder) DataAndUncompressedSize ¶
func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
DataAndUncompressedSize set message uncompressed content and field UncompressedSize
func (*PublicMessageBuilder) MessageGroupID ¶
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
MessageGroupID set message MessageGroupID
func (*PublicMessageBuilder) Metadata ¶
func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
func (*PublicMessageBuilder) Offset ¶
func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
Offset set message Offset
func (*PublicMessageBuilder) PartitionID ¶
func (pmb *PublicMessageBuilder) PartitionID(partitionID int64) *PublicMessageBuilder
PartitionID set message PartitionID
func (*PublicMessageBuilder) PartitionSession ¶
func (pmb *PublicMessageBuilder) PartitionSession(session *PartitionSession) *PublicMessageBuilder
func (*PublicMessageBuilder) ProducerID ¶
func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
ProducerID set message ProducerID
func (*PublicMessageBuilder) RawDataLen ¶
func (pmb *PublicMessageBuilder) RawDataLen(val int) *PublicMessageBuilder
func (*PublicMessageBuilder) Seqno ¶
func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
Seqno set message Seqno
func (*PublicMessageBuilder) Topic ¶
func (pmb *PublicMessageBuilder) Topic(topic string) *PublicMessageBuilder
Topic set message Topic
func (*PublicMessageBuilder) UncompressedSize ¶
func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
UncompressedSize set message UncompressedSize
func (*PublicMessageBuilder) WriteSessionMetadata ¶
func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
WriteSessionMetadata set message WriteSessionMetadata
func (*PublicMessageBuilder) WrittenAt ¶
func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
WrittenAt set message WrittenAt
type PublicMessageContentUnmarshaler ¶
type PublicMessageContentUnmarshaler interface { // UnmarshalYDBTopicMessage MUST NOT use data after return. // If you need content after return from Consume - copy data content to // own slice with copy(dst, data) UnmarshalYDBTopicMessage(data []byte) error }
PublicMessageContentUnmarshaler is interface for unmarshal message content
type PublicPartitionSession ¶
type PublicPartitionSession struct { // PartitionSessionID is unique session ID per listener object PartitionSessionID int64 // TopicPath contains path for the topic TopicPath string // PartitionID contains partition id. It can be repeated for one reader if the partition will stop/start few times. PartitionID int64 }
PublicPartitionSession contains information about partition session for the event
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type PublicReadSelector ¶
type PublicReadSelector struct { Path string Partitions []int64 ReadFrom time.Time // zero value mean skip read from filter MaxTimeLag time.Duration // 0 mean skip time lag filter }
func (PublicReadSelector) Clone ¶
func (s PublicReadSelector) Clone() *PublicReadSelector
Clone create deep clone of the selector
type RawTopicReaderStream ¶
type RawTopicReaderStream interface { Recv() (rawtopicreader.ServerMessage, error) Send(msg rawtopicreader.ClientMessage) error CloseSend() error }
type SendMessageToServerFunc ¶
type SendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error
type SyncedStream ¶
type SyncedStream struct {
// contains filtered or unexported fields
}
func NewSyncedStream ¶
func NewSyncedStream(stream RawTopicReaderStream) *SyncedStream
func (*SyncedStream) CloseSend ¶
func (s *SyncedStream) CloseSend() error
func (*SyncedStream) Recv ¶
func (s *SyncedStream) Recv() (rawtopicreader.ServerMessage, error)
func (*SyncedStream) Send ¶
func (s *SyncedStream) Send(msg rawtopicreader.ClientMessage) error