Documentation ¶
Overview ¶
Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository
https://github.com/ydb-platform/ydb-go-examples/tree/master/topic/topicreader
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrConcurrencyCall = xerrors.Wrap(errors.New("ydb: concurrency call denied"))
ErrConcurrencyCall return if method on reader called in concurrency
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
var ErrUnexpectedCodec = topicreaderinternal.PublicErrUnexpectedCodec
ErrUnexpectedCodec will return if topicreader receive message with unknown codec. client side must check error with errors.Is
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch = topicreaderinternal.PublicBatch
Batch is group of ordered messages from one partition
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type CommitRangeGetter ¶
type CommitRangeGetter = topicreaderinternal.PublicCommitRangeGetter
CommitRangeGetter
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type Message ¶
type Message = topicreaderinternal.PublicMessage
Message
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type MessageContentUnmarshaler ¶
type MessageContentUnmarshaler = topicreaderinternal.PublicMessageContentUnmarshaler
MessageContentUnmarshaler
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type ReadBatchOption ¶
type ReadBatchOption = topicreaderinternal.PublicReadBatchOption
ReadBatchOption
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader allow to read message from YDB topics. ReadMessage or ReadMessageBatch can call concurrency with Commit, other concurrency call is denied.
In other words you can have one goroutine for read messages and one goroutine for commit messages.
Concurrency table | Method | ReadMessage | ReadMessageBatch | Commit | Close | | ReadMessage | - | - | + | - | | ReadMessageBatch | - | - | + | - | | Commit | + | + | - | - | | Close | - | - | - | - |
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func NewReader ¶
func NewReader(internalReader topicreaderinternal.Reader) *Reader
NewReader
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) Close ¶
Close stop work with reader return when reader complete internal works, flush commit buffer, ets or when ctx cancelled
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (*Reader) Commit ¶
func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error
Commit receive Message, Batch of single offset It can be fast (by default) or sync and waite response from server see topicoptions.CommitMode for details
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Example ¶
package main import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch.Context(), batch) // Commit may be fast (by default) or sync, depends on reader settings _ = reader.Commit(batch.Context(), batch) } } func processBatch(ctx context.Context, batch *topicreader.Batch) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
func (*Reader) ReadMessage ¶
ReadMessage read exactly one message exactly one of message, error is nil
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Example ¶
package main import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { msg, _ := reader.ReadMessage(ctx) processMessage(msg.Context(), msg) _ = reader.Commit(msg.Context(), msg) } } func processMessage(ctx context.Context, m *topicreader.Message) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
func (*Reader) ReadMessageBatch ¶
ReadMessageBatch read batch of messages Batch is ordered message group from one partition exactly one of Batch, err is nil if Batch is not nil - reader guarantee about all Batch.Messages are not nil
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
Example ¶
package main import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) func main() { ctx := context.TODO() reader := readerConnect() for { batch, _ := reader.ReadMessageBatch(ctx) processBatch(batch.Context(), batch) _ = reader.Commit(batch.Context(), batch) } } func processBatch(ctx context.Context, batch *topicreader.Batch) { panic("example stub") } func readerConnect() *topicreader.Reader { panic("example stub") }
Output:
type WithBatchMaxCount ¶
type WithBatchMaxCount int
WithBatchMaxCount
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (WithBatchMaxCount) Apply ¶
func (count WithBatchMaxCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
type WithBatchPreferMinCount ¶
type WithBatchPreferMinCount int
WithBatchPreferMinCount set prefer min count for batch size. Sometime result batch can be less then count for example if internal buffer full and can't receive more messages or server stop send messages in partition
count must be 1 or greater it will panic if count < 1
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (WithBatchPreferMinCount) Apply ¶
func (count WithBatchPreferMinCount) Apply( options topicreaderinternal.ReadMessageBatchOptions, ) topicreaderinternal.ReadMessageBatchOptions
Apply
Experimental ¶
Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.