Documentation ¶
Overview ¶
Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository
https://github.com/ydb-platform/ydb-go-examples/tree/master/topic/topicreader
Index ¶
- Variables
- type Batch
- type CommitRangeGetter
- type Message
- type MessageContentUnmarshaler
- type ReadBatchOption
- type Reader
- func (r *Reader) Close(ctx context.Context) error
- func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error
- func (r *Reader) PopMessagesBatchTx(ctx context.Context, transaction tx.Identifier, opts ...ReadBatchOption) (resBatch *Batch, resErr error)
- func (r *Reader) ReadMessage(ctx context.Context) (*Message, error)
- func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)deprecated
- func (r *Reader) ReadMessagesBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)
- func (r *Reader) WaitInit(ctx context.Context) error
- type WithBatchMaxCount
- type WithBatchPreferMinCountdeprecated
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrCommitToExpiredSession = topicreadercommon.PublicErrCommitSessionToExpiredSession
ErrCommitToExpiredSession it is not fatal error and reader can continue work client side must check error with errors.Is
var ErrConcurrencyCall = xerrors.Wrap(errors.New("ydb: concurrency call denied"))
ErrConcurrencyCall return if method on reader called in concurrency client side must check error with errors.Is
var ErrUnexpectedCodec = topicreadercommon.ErrPublicUnexpectedCodec
ErrUnexpectedCodec will return if topicreader receive message with unknown codec. client side must check error with errors.Is
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch = topicreadercommon.PublicBatch
Batch is ordered group of messages from one partition
type CommitRangeGetter ¶
type CommitRangeGetter = topicreadercommon.PublicCommitRangeGetter
CommitRangeGetter interface for get commit offsets
type Message ¶
type Message = topicreadercommon.PublicMessage
Message contains data and metadata, readed from the server
type MessageContentUnmarshaler ¶
type MessageContentUnmarshaler = topicreadercommon.PublicMessageContentUnmarshaler
MessageContentUnmarshaler is interface for unmarshal message content to own struct
type ReadBatchOption ¶
type ReadBatchOption = topicreaderinternal.PublicReadBatchOption
ReadBatchOption is type for options of read batch
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader allow to read message from YDB topics. ReadMessage or ReadMessageBatch can call concurrency with Commit, other concurrency call is denied.
In other words you can have one goroutine for read messages and one goroutine for commit messages.
Concurrency table | Method | ReadMessage | ReadMessageBatch | Commit | Close | | ReadMessage | - | - | + | - | | ReadMessageBatch | - | - | + | - | | Commit | + | + | - | - | | Close | - | - | - | - |
func NewReader ¶
func NewReader(internalReader topicreaderinternal.Reader) *Reader
NewReader create new reader, used internally only.
func (*Reader) Close ¶
Close stop work with reader return when reader complete internal works, flush commit buffer, ets or when ctx cancelled
func (*Reader) Commit ¶
func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error
Commit receive Message, Batch of single offset It can be fast (by default) or sync and waite response from server see topicoptions.CommitMode for details
for topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession it means about the message/batch was not committed because connection broken or partition routed to other reader by server. Client code should continue work normally
Example ¶
package main import ( "context" "github.com/UgnineSirdis/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch.Context(), batch) // Commit may be fast (by default) or sync, depends on reader settings _ = reader.Commit(batch.Context(), batch) } } func processBatch(ctx context.Context, batch *topicreader.Batch) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
func (*Reader) PopMessagesBatchTx ¶
func (r *Reader) PopMessagesBatchTx( ctx context.Context, transaction tx.Identifier, opts ...ReadBatchOption, ) ( resBatch *Batch, resErr error, )
PopMessagesBatchTx read messages batch and commit them within tx. If tx failed - the batch will be received again.
Now it means reconnect to the server and re-read messages from the server to the readers buffer. It is expensive operation and will be good to minimize transaction failures.
The reconnect is implementation detail and may be changed in the future.
Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (*Reader) ReadMessage ¶
ReadMessage read exactly one message exactly one of message, error is nil
Example ¶
package main import ( "context" "github.com/UgnineSirdis/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { msg, _ := reader.ReadMessage(ctx) processMessage(msg.Context(), msg) _ = reader.Commit(msg.Context(), msg) } } func processMessage(ctx context.Context, m *topicreader.Message) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
func (*Reader) ReadMessageBatch
deprecated
ReadMessageBatch
Deprecated: was experimental and not actual now. Use ReadMessagesBatch instead. Will be removed after Oct 2024. Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
Example ¶
package main import ( "context" "github.com/UgnineSirdis/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch.Context(), batch) _ = reader.Commit(batch.Context(), batch) } } func processBatch(ctx context.Context, batch *topicreader.Batch) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
func (*Reader) ReadMessagesBatch ¶
ReadMessagesBatch read batch of messages Batch is ordered message group from one partition exactly one of Batch, err is nil if Batch is not nil - reader guarantee about all Batch.Messages are not nil
type WithBatchMaxCount ¶
type WithBatchMaxCount int
WithBatchMaxCount max messages within batch
func (WithBatchMaxCount) Apply ¶
func (count WithBatchMaxCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply implements ReadBatchOption interface
type WithBatchPreferMinCount
deprecated
type WithBatchPreferMinCount int
WithBatchPreferMinCount set prefer min count for batch size. Sometime result batch can be less then count for example if internal buffer full and can't receive more messages or server stop send messages in partition
count must be 1 or greater it will panic if count < 1
Deprecated: was experimental and not actual now. The option will be removed for simplify code internals. Will be removed after Oct 2024. Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
func (WithBatchPreferMinCount) Apply ¶
func (count WithBatchPreferMinCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply implements ReadBatchOption interface