topicreader

package
v3.31.0-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2022 License: Apache-2.0 Imports: 5 Imported by: 4

Documentation

Overview

Example (EffectiveUnmarshalMessageContentToJSONStruct)
package main

import (
	"context"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	type S struct {
		MyField int `json:"my_field"`
	}

	var v S
	msg, _ := reader.ReadMessage(ctx)
	_ = topicsugar.JSONUnmarshal(msg, &v)
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (EffectiveUnmarshalMessageContentToOwnType)
package main

import (
	"context"
	"encoding/binary"
	"errors"
)

type MyMessage struct {
	ID         byte
	ChangeType byte
	Delta      uint32
}

func (m *MyMessage) UnmarshalYDBTopicMessage(data []byte) error {
	if len(data) != 6 {
		return errors.New("bad data len")
	}
	m.ID = data[0]
	m.ChangeType = data[1]
	m.Delta = binary.BigEndian.Uint32(data[2:])
	return nil
}

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	var v MyMessage
	mess, _ := reader.ReadMessage(ctx)
	_ = mess.UnmarshalTo(&v)
}
Output:

Example (HandlePartitionHardOff_NeedRare)
package main

import (
	"bytes"
	"context"
	"time"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	batch, _ := reader.ReadMessageBatch(ctx)
	if len(batch.Messages) == 0 {
		return
	}

	batchContext := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke

	buf := &bytes.Buffer{}
	for _, msg := range batch.Messages {
		if batchContext.Err() != nil {
			// if batch context cancelled - it mean client need to stop process messages from batch
			// next messages will send to other reader
			return
		}
		_, _ = buf.ReadFrom(msg)
		writeBatchToDB(ctx, batch.Messages[0].WrittenAt, buf.Bytes())
	}
}

func writeBatchToDB(ctx context.Context, t time.Time, data []byte) {
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (HandlePartitionSoftOff_NeedRare)
ctx := context.TODO()
db := dbConnect()
reader, _ := db.Topic().StartReader("consumer", nil,
	topicoptions.WithBatchReadMinCount(1000),
)

for {
	batch, _ := reader.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
	processBatch(batch)
	_ = reader.Commit(batch.Context(), batch)
}
Output:

Example (ReadAndCommitEveryMessage)
package main

import (
	"context"
	"io/ioutil"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		msg, _ := reader.ReadMessage(ctx)
		processMessage(msg)
		_ = reader.Commit(msg.Context(), msg)
	}
}

func processMessage(m *topicreader.Message) {
	body, _ := ioutil.ReadAll(m)
	writeToDB(
		m.Context(),
		m.SeqNo, body)
}

func writeToDB(ctx context.Context, id int64, body []byte) {
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (ReadBatchWithMessageCommits)
package main

import (
	"context"
	"io/ioutil"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		batch, _ := reader.ReadMessageBatch(ctx)
		for _, msg := range batch.Messages {
			processMessage(msg)
			_ = reader.Commit(msg.Context(), batch)
		}
	}
}

func processMessage(m *topicreader.Message) {
	body, _ := ioutil.ReadAll(m)
	writeToDB(
		m.Context(),
		m.SeqNo, body)
}

func writeToDB(ctx context.Context, id int64, body []byte) {
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (ReadBatchesWithBatchCommit)
package main

import (
	"bytes"
	"context"
	"io"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		batch, _ := reader.ReadMessageBatch(ctx)
		processBatch(batch)
		_ = reader.Commit(batch.Context(), batch)
	}
}

func processBatch(batch *topicreader.Batch) {
	ctx := batch.Context()
	if len(batch.Messages) == 0 {
		return
	}

	buf := &bytes.Buffer{}
	for _, msg := range batch.Messages {
		buf.Reset()
		_, _ = buf.ReadFrom(msg)
		_, _ = io.Copy(buf, msg)
		writeMessagesToDB(ctx, buf.Bytes())
	}
}

func writeMessagesToDB(ctx context.Context, data []byte) {}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (ReadMessagesWithAsyncBufferedCommit)
ctx := context.TODO()
db := dbConnect()
reader, _ := db.Topic().StartReader("consumer", nil,
	topicoptions.WithCommitMode(topicoptions.CommitModeAsync),
	topicoptions.WithCommitCountTrigger(1000),
)
defer func() {
	_ = reader.Close(ctx) // wait until flush buffered commits
}()

for {
	msg, _ := reader.ReadMessage(ctx)
	processMessage(msg)
	_ = reader.Commit(ctx, msg) // will fast - in async mode commit will append to internal buffer only
}
Output:

Example (ReadMessagesWithCustomBatching)
ctx := context.TODO()
db := dbConnect()

reader, _ := db.Topic().StartReader("consumer", nil,
	topicoptions.WithBatchReadMinCount(1000),
)

for {
	batch, _ := reader.ReadMessageBatch(ctx)
	processBatch(batch)
	_ = reader.Commit(batch.Context(), batch)
}
Output:

Example (ReadWithExplicitPartitionStartStopHandler)
ctx := context.TODO()
db := dbConnect()

readContext, stopReader := context.WithCancel(context.Background())
defer stopReader()

reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
	topicoptions.WithTracer(
		trace.Topic{
			OnPartitionReadStart: func(info trace.OnPartitionReadStartInfo) {
				err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
				if err != nil {
					stopReader()
				}
			},
			OnPartitionReadStop: func(info trace.OnPartitionReadStopInfo) {
				if info.Graceful {
					err := externalSystemUnlock(ctx, info.Topic, info.PartitionID)
					if err != nil {
						stopReader()
					}
				}
			},
		},
	),
)

go func() {
	<-readContext.Done()
	_ = reader.Close(ctx)
}()

for {
	batch, _ := reader.ReadMessageBatch(readContext)

	processBatch(batch)
	_ = externalSystemCommit(
		batch.Context(),
		batch.Topic(),
		batch.PartitionID(),
		getEndOffset(batch),
	)
}
Output:

Example (ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage)
ctx := context.TODO()
db := dbConnect()

readContext, stopReader := context.WithCancel(context.Background())
defer stopReader()

readStartPosition := func(
	ctx context.Context,
	req topicoptions.GetPartitionStartOffsetRequest,
) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
	offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
	res.StartFrom(offset)

	// Reader will stop if return err != nil
	return res, err
}

onPartitionStart := func(info trace.OnPartitionReadStartInfo) {
	err := externalSystemLock(info.PartitionContext, info.Topic, info.PartitionID)
	if err != nil {
		stopReader()
	}
}

onPartitionStop := func(info trace.OnPartitionReadStopInfo) {
	if info.Graceful {
		err := externalSystemUnlock(ctx, info.Topic, info.PartitionID)
		if err != nil {
			stopReader()
		}
	}
}

r, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),

	topicoptions.WithGetPartitionStartOffset(readStartPosition),
	topicoptions.WithTracer(
		trace.Topic{
			OnPartitionReadStart: onPartitionStart,
			OnPartitionReadStop:  onPartitionStop,
		},
	),
)
go func() {
	<-readContext.Done()
	_ = r.Close(ctx)
}()

for {
	batch, _ := r.ReadMessageBatch(readContext)

	processBatch(batch)
	_ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), getEndOffset(batch))
}
Output:

Example (ReadWithOwnReadProgressStorage)
ctx := context.TODO()
db := dbConnect()

reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
	topicoptions.WithGetPartitionStartOffset(
		func(
			ctx context.Context,
			req topicoptions.GetPartitionStartOffsetRequest,
		) (
			res topicoptions.GetPartitionStartOffsetResponse,
			err error,
		) {
			offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
			res.StartFrom(offset)

			// Reader will stop if return err != nil
			return res, err
		}),
)

for {
	batch, _ := reader.ReadMessageBatch(ctx)

	processBatch(batch)
	_ = externalSystemCommit(
		batch.Context(),
		batch.Topic(),
		batch.PartitionID(),
		getEndOffset(batch),
	)
}
Output:

Example (ReceiveCommitNotify)
ctx := context.TODO()
db := dbConnect()

reader, _ := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"),
	topicoptions.WithTracer(trace.Topic{
		OnPartitionCommittedNotify: func(info trace.OnPartitionCommittedInfo) {
			// called when receive commit notify from server
			fmt.Println(info.Topic, info.PartitionID, info.CommittedOffset)
		},
	},
	),
)

for {
	msg, _ := reader.ReadMessage(ctx)
	processMessage(msg)
}
Output:

Example (SimplePrintMessageContent)
package main

import (
	"context"
	"fmt"
	"io/ioutil"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		msg, _ := reader.ReadMessage(ctx)
		content, _ := ioutil.ReadAll(msg)
		fmt.Println(string(content))
	}
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Example (SimpleReadMessagesWithErrorHandle)
package main

import (
	"context"
	"io/ioutil"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()
	for {
		msg, _ := reader.ReadMessage(ctx)
		processMessage(msg)
	}
}

func processMessage(m *topicreader.Message) {
	body, _ := ioutil.ReadAll(m)
	writeToDB(
		m.Context(),
		m.SeqNo, body)
}

func writeToDB(ctx context.Context, id int64, body []byte) {
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrConcurrencyCall = xerrors.Wrap(errors.New("concurrency call"))

ErrConcurrencyCall return if method on reader called in concurrency

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

ErrUnexpectedCodec will return if topicreader receive message with unknown codec. client side must check error with errors.Is

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

Functions

This section is empty.

Types

type Batch

Batch is group of ordered messages from one partition

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type CommitRangeGetter

type CommitRangeGetter = topicreaderinternal.PublicCommitRangeGetter

CommitRangeGetter

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type Message

Message

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type MessageContentUnmarshaler

type MessageContentUnmarshaler = topicreaderinternal.PublicMessageContentUnmarshaler

MessageContentUnmarshaler

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type ReadBatchOption

ReadBatchOption

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader allow to read message from YDB topics reader methods must not call concurrency

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func NewReader

func NewReader(internalReader topicreaderinternal.Reader) *Reader

NewReader

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (*Reader) Close

func (r *Reader) Close(ctx context.Context) error

Close stop work with reader return when reader complete internal works, flush commit buffer, ets or when ctx cancelled

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (*Reader) Commit

func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error

Commit receive Message, Batch of single offset

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (*Message, error)

ReadMessage read exactly one message exactly one of message, error is nil

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (*Reader) ReadMessageBatch

func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)

ReadMessageBatch read batch of messages Batch is ordered message group from one partition exactly one of Batch, err is nil if Batch is not nil - reader guarantee about all Batch.Messages are not nil

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type WithBatchMaxCount

type WithBatchMaxCount int

WithBatchMaxCount

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (WithBatchMaxCount) Apply

Apply

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

type WithBatchPreferMinCount

type WithBatchPreferMinCount int

WithBatchPreferMinCount set prefer min count for batch size. Sometime result batch can be less then count for example if internal buffer full and can't receive more messages or server stop send messages in partition

count must be 1 or greater it will panic if count < 1

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

func (WithBatchPreferMinCount) Apply

Apply

Experimental

Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL