topicwriter

package
v3.93.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 2 Imported by: 1

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueLimitExceed                      = topicwriterinternal.PublicErrQueueIsFull
	ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError
)

Functions

This section is empty.

Types

type PublicInitialInfo added in v3.52.3

type PublicInitialInfo struct {
	LastSeqNum int64
}

PublicInitialInfo is an information about writer after initialize

type TxWriter added in v3.81.0

type TxWriter struct {
	// contains filtered or unexported fields
}

TxWriter used for send messages to the transaction

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func NewTxWriterInternal added in v3.81.0

func NewTxWriterInternal(w *topicwriterinternal.WriterWithTransaction) *TxWriter

func (*TxWriter) WaitInit added in v3.81.0

func (w *TxWriter) WaitInit(ctx context.Context) (err error)

WaitInit waits until the reader is initialized or an error occurs, return PublicInitialInfo and err

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*TxWriter) WaitInitInfo added in v3.81.0

func (w *TxWriter) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)

WaitInitInfo waits until the reader is initialized or an error occurs, return PublicInitialInfo and err

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*TxWriter) Write added in v3.81.0

func (w *TxWriter) Write(ctx context.Context, messages ...Message) error

Write messages to the transaction

It has not retries. If fails - needs to retry full transaction, as with any other error with table.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

type WriteOption added in v3.81.0

type WriteOption interface {
	// contains filtered or unexported methods
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer represent write session to topic It handles connection problems, reconnect to server when need and resend buffered messages

func NewWriter

func NewWriter(writer *topicwriterinternal.WriterReconnector) *Writer

NewWriter create new writer from internal type. Used internally only.

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

Close will flush rested messages from buffer and close the writer. You can't write new messages after call Close

func (*Writer) Flush added in v3.66.1

func (w *Writer) Flush(ctx context.Context) error

Flush waits till all in-flight messages are acknowledged.

func (*Writer) WaitInit added in v3.52.3

func (w *Writer) WaitInit(ctx context.Context) (err error)

WaitInit waits until the reader is initialized or an error occurs, return PublicInitialInfo and err

func (*Writer) WaitInitInfo added in v3.52.3

func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err error)

WaitInitInfo waits until the reader is initialized or an error occurs, return PublicInitialInfo and err

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, messages ...Message) error

Write send messages to topic return after save messages into buffer in async mode (default) and after ack from server in sync mode. see topicoptions.WithSyncWrite

The method will wait first initial connection even for async mode, that mean first write may be slower. especially when connection has problems.

It returns ErrQueueLimitExceed (must be checked by errors.Is) if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue. If err != nil you can check errors.Is(err, ErrMessagesPutToInternalQueueBeforeError) for check if the messages put to buffer before error. It means that it is messages can be delivered to the server.

Example
ctx := context.Background()
db, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"))
if err != nil {
	log.Fatalf("failed ydb connection: %v", err)
}

writer, err := db.Topic().StartWriter("topicName")
if err != nil {
	log.Fatalf("failed to create topic writer: %v", err)
}

err = writer.Write(ctx,
	topicwriter.Message{Data: strings.NewReader("1")},
	topicwriter.Message{Data: strings.NewReader("2")},
	topicwriter.Message{Data: strings.NewReader("3")},
)
if err != nil {
	fmt.Println("OK")
} else {
	log.Fatalf("failed write to stream")
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL