Documentation ¶
Index ¶
- Variables
- type CallbackWithMessageContentFunc
- type CommitRanges
- type Pool
- type PublicBatch
- type PublicCommitMode
- type PublicCommitRange
- type PublicCommitRangeGetter
- type PublicCreateDecoderFunc
- type PublicGetPartitionStartOffsetFunc
- type PublicGetPartitionStartOffsetRequest
- type PublicGetPartitionStartOffsetResponse
- type PublicMessage
- type PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Build() *PublicMessage
- func (pmb *PublicMessageBuilder) Context(ctx context.Context)
- func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) PartitionID(partitionID int64)
- func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) Topic(topic string)
- func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
- func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
- type PublicMessageContentUnmarshaler
- type PublicReadBatchOption
- type PublicReadSelector
- type PublicReaderOption
- type RawTopicReaderStream
- type ReadMessageBatchOptions
- type Reader
- func (r *Reader) Close(ctx context.Context) error
- func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error)
- func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error
- func (r *Reader) ID() int64
- func (r *Reader) ReadMessage(ctx context.Context) (*PublicMessage, error)
- func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOption) (batch *PublicBatch, err error)
- func (r *Reader) Tracer() *trace.Topic
- func (r *Reader) WaitInit(ctx context.Context) error
- type ReaderConfig
- type TopicSteamReaderConnect
Constants ¶
This section is empty.
Variables ¶
var ( ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled")) ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode")) )
var (
PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
)
var PublicErrUnexpectedCodec = errors.New("unexpected codec") //nolint:revive,stylecheck
PublicErrUnexpectedCodec return when try to read message content with unknown codec
Functions ¶
This section is empty.
Types ¶
type CallbackWithMessageContentFunc ¶
CallbackWithMessageContentFunc is callback function for work with message content data bytes MUST NOT be used after f returned if you need content longer - copy content to other slice
type CommitRanges ¶
type CommitRanges struct {
// contains filtered or unexported fields
}
func NewCommitRangesFromPublicCommits ¶
func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges
func NewCommitRangesWithCapacity ¶
func NewCommitRangesWithCapacity(capacity int) CommitRanges
func (*CommitRanges) Append ¶
func (r *CommitRanges) Append(ranges ...PublicCommitRangeGetter)
func (*CommitRanges) AppendMessages ¶
func (r *CommitRanges) AppendMessages(messages ...PublicMessage)
func (*CommitRanges) GetCommitsInfo ¶ added in v3.52.0
func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo
GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
func (*CommitRanges) Reset ¶
func (r *CommitRanges) Reset()
type Pool ¶
type Pool interface { Get() interface{} Put(x interface{}) }
Pool is interface for sync.Pool and may be extended by follow to original type
type PublicBatch ¶
type PublicBatch struct { empty.DoNotCopy Messages []*PublicMessage // contains filtered or unexported fields }
PublicBatch is ordered group of message from one partition
func (*PublicBatch) Context ¶
func (m *PublicBatch) Context() context.Context
Context is cancelled when code should stop to process messages batch for example - lost connection to server or receive stop partition signal without graceful flag
func (*PublicBatch) PartitionID ¶
func (m *PublicBatch) PartitionID() int64
PartitionID of messages in the batch
func (*PublicBatch) Topic ¶
func (m *PublicBatch) Topic() string
Topic is path of source topic of the messages in the batch
type PublicCommitMode ¶
type PublicCommitMode int
const ( CommitModeAsync PublicCommitMode = iota // default CommitModeNone CommitModeSync )
type PublicCommitRange ¶
type PublicCommitRange struct {
// contains filtered or unexported fields
}
PublicCommitRange contains data for commit messages range
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type PublicCommitRangeGetter ¶
type PublicCommitRangeGetter interface {
// contains filtered or unexported methods
}
PublicCommitRangeGetter return data piece for commit messages range
type PublicCreateDecoderFunc ¶
type PublicGetPartitionStartOffsetFunc ¶
type PublicGetPartitionStartOffsetFunc func( ctx context.Context, req PublicGetPartitionStartOffsetRequest, ) (res PublicGetPartitionStartOffsetResponse, err error)
PublicGetPartitionStartOffsetFunc callback function for optional manage read progress store at own side
type PublicGetPartitionStartOffsetRequest ¶
PublicGetPartitionStartOffsetRequest info about partition
type PublicGetPartitionStartOffsetResponse ¶
type PublicGetPartitionStartOffsetResponse struct {
// contains filtered or unexported fields
}
PublicGetPartitionStartOffsetResponse allow to set start offset for read messages for the partition
func (*PublicGetPartitionStartOffsetResponse) StartFrom ¶
func (r *PublicGetPartitionStartOffsetResponse) StartFrom(offset int64)
StartFrom set start offset for read the partition
type PublicMessage ¶
type PublicMessage struct { empty.DoNotCopy SeqNo int64 CreatedAt time.Time MessageGroupID string WriteSessionMetadata map[string]string Offset int64 WrittenAt time.Time ProducerID string Metadata map[string][]byte // Metadata, nil if no metadata UncompressedSize int // as sent by sender, server/sdk doesn't check the field. It may be empty or wrong. // contains filtered or unexported fields }
PublicMessage is representation of topic message
func (*PublicMessage) Context ¶
func (m *PublicMessage) Context() context.Context
func (*PublicMessage) PartitionID ¶
func (m *PublicMessage) PartitionID() int64
func (*PublicMessage) Read ¶
func (m *PublicMessage) Read(p []byte) (n int, err error)
Read implements io.Reader Read uncompressed message content return topicreader.UnexpectedCodec if message compressed with unknown codec
func (*PublicMessage) Topic ¶
func (m *PublicMessage) Topic() string
func (*PublicMessage) UnmarshalTo ¶
func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error
UnmarshalTo can call most once per message, it read all data from internal reader and call PublicMessageContentUnmarshaler.UnmarshalYDBTopicMessage with uncompressed content
type PublicMessageBuilder ¶ added in v3.48.6
type PublicMessageBuilder struct {
// contains filtered or unexported fields
}
func NewPublicMessageBuilder ¶ added in v3.48.6
func NewPublicMessageBuilder() *PublicMessageBuilder
func (*PublicMessageBuilder) Build ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) Build() *PublicMessage
Build return builded message and reset internal state for create new message
func (*PublicMessageBuilder) Context ¶ added in v3.48.7
func (pmb *PublicMessageBuilder) Context(ctx context.Context)
Context set message Context
func (*PublicMessageBuilder) CreatedAt ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder
CreatedAt set message CreatedAt
func (*PublicMessageBuilder) DataAndUncompressedSize ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder
DataAndUncompressedSize set message uncompressed content and field UncompressedSize
func (*PublicMessageBuilder) MessageGroupID ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder
MessageGroupID set message MessageGroupID
func (*PublicMessageBuilder) Metadata ¶ added in v3.54.3
func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder
func (*PublicMessageBuilder) Offset ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder
Offset set message Offset
func (*PublicMessageBuilder) PartitionID ¶ added in v3.48.7
func (pmb *PublicMessageBuilder) PartitionID(partitionID int64)
PartitionID set message PartitionID
func (*PublicMessageBuilder) ProducerID ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder
ProducerID set message ProducerID
func (*PublicMessageBuilder) Seqno ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder
Seqno set message Seqno
func (*PublicMessageBuilder) Topic ¶ added in v3.48.7
func (pmb *PublicMessageBuilder) Topic(topic string)
Topic set message Topic
func (*PublicMessageBuilder) UncompressedSize ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder
UncompressedSize set message UncompressedSize
func (*PublicMessageBuilder) WriteSessionMetadata ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder
WriteSessionMetadata set message WriteSessionMetadata
func (*PublicMessageBuilder) WrittenAt ¶ added in v3.48.6
func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder
WrittenAt set message WrittenAt
type PublicMessageContentUnmarshaler ¶
type PublicMessageContentUnmarshaler interface { // UnmarshalYDBTopicMessage MUST NOT use data after return. // If you need content after return from Consume - copy data content to // own slice with copy(dst, data) UnmarshalYDBTopicMessage(data []byte) error }
PublicMessageContentUnmarshaler is interface for unmarshal message content
type PublicReadBatchOption ¶
type PublicReadBatchOption interface {
Apply(options ReadMessageBatchOptions) ReadMessageBatchOptions
}
PublicReadBatchOption для различных пожеланий к батчу вроде WithMaxMessages(int)
type PublicReadSelector ¶
type PublicReadSelector struct { Path string Partitions []int64 ReadFrom time.Time // zero value mean skip read from filter MaxTimeLag time.Duration // 0 mean skip time lag filter }
func (PublicReadSelector) Clone ¶
func (s PublicReadSelector) Clone() *PublicReadSelector
Clone create deep clone of the selector
type PublicReaderOption ¶
type PublicReaderOption func(cfg *ReaderConfig)
func WithCredentials ¶ added in v3.37.8
func WithCredentials(cred credentials.Credentials) PublicReaderOption
func WithTrace ¶ added in v3.38.0
func WithTrace(tracer *trace.Topic) PublicReaderOption
type RawTopicReaderStream ¶
type RawTopicReaderStream interface { Recv() (rawtopicreader.ServerMessage, error) Send(msg rawtopicreader.ClientMessage) error CloseSend() error }
type ReadMessageBatchOptions ¶
type ReadMessageBatchOptions struct {
// contains filtered or unexported fields
}
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
func NewReader ¶
func NewReader( connector TopicSteamReaderConnect, consumer string, readSelectors []PublicReadSelector, opts ...PublicReaderOption, ) Reader
func (*Reader) Commit ¶
func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error)
func (*Reader) CommitRanges ¶
func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error
func (*Reader) ReadMessage ¶
func (r *Reader) ReadMessage(ctx context.Context) (*PublicMessage, error)
ReadMessage read exactly one message
func (*Reader) ReadMessageBatch ¶
func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...PublicReadBatchOption) (batch *PublicBatch, err error)
ReadMessageBatch read batch of messages. Batch is collection of messages, which can be atomically committed
type ReaderConfig ¶
type ReaderConfig struct { config.Common RetrySettings topic.RetrySettings DefaultBatchConfig ReadMessageBatchOptions // contains filtered or unexported fields }
type TopicSteamReaderConnect ¶
type TopicSteamReaderConnect func(connectionCtx context.Context) (RawTopicReaderStream, error)
TopicSteamReaderConnect connect to grpc stream when connectionCtx closed stream must stop work and return errors for all methods