utility

package
v0.10.3-0...-00edec2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ModifyAppendResultExtra

func ModifyAppendResultExtra[M protoreflect.ProtoMessage](ctx context.Context, modifier func(old M) (new M))

ModifyAppendResultExtra modify extra in context

func ReplaceAppendResultTimeTick

func ReplaceAppendResultTimeTick(ctx context.Context, timeTick uint64)

ReplaceAppendResultTimeTick set time tick to context

func ReplaceAppendResultTxnContext

func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext)

ReplaceAppendResultTxnContext set txn context to context

func WithExtraAppendResult

func WithExtraAppendResult(ctx context.Context, r *ExtraAppendResult) context.Context

WithExtraAppendResult set extra to context

func WithNotPersisted

func WithNotPersisted(ctx context.Context, hint *NotPersistedHint) context.Context

WithNotPersisted set not persisted message to context

Types

type ExtraAppendResult

type ExtraAppendResult struct {
	TimeTick uint64
	TxnCtx   *message.TxnContext
	Extra    protoreflect.ProtoMessage
}

ExtraAppendResult is the extra append result.

type NotPersistedHint

type NotPersistedHint struct {
	MessageID message.MessageID // The reused MessageID.
}

NotPersistedHint is the hint of not persisted message.

func GetNotPersisted

func GetNotPersisted(ctx context.Context) *NotPersistedHint

GetNotPersisted get not persisted message from context

type PendingQueue

type PendingQueue struct {
	*typeutil.MultipartQueue[message.ImmutableMessage]
	// contains filtered or unexported fields
}

func NewPendingQueue

func NewPendingQueue() *PendingQueue

func (*PendingQueue) Add

func (q *PendingQueue) Add(msg []message.ImmutableMessage)

func (*PendingQueue) AddOne

func (q *PendingQueue) AddOne(msg message.ImmutableMessage)

func (*PendingQueue) Bytes

func (q *PendingQueue) Bytes() int

func (*PendingQueue) UnsafeAdvance

func (q *PendingQueue) UnsafeAdvance()

type ReOrderByTimeTickBuffer

type ReOrderByTimeTickBuffer struct {
	// contains filtered or unexported fields
}

ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.

func NewReOrderBuffer

func NewReOrderBuffer() *ReOrderByTimeTickBuffer

NewReOrderBuffer creates a new ReOrderBuffer.

func (*ReOrderByTimeTickBuffer) Bytes

func (r *ReOrderByTimeTickBuffer) Bytes() int

func (*ReOrderByTimeTickBuffer) Len

func (r *ReOrderByTimeTickBuffer) Len() int

Len returns the number of messages in the buffer.

func (*ReOrderByTimeTickBuffer) PopUtilTimeTick

func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.ImmutableMessage

PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick. The result is sorted by time tick in ascending order.

func (*ReOrderByTimeTickBuffer) Push

Push pushes a message into the buffer.

type TxnBuffer

type TxnBuffer struct {
	// contains filtered or unexported fields
}

TxnBuffer is a buffer for txn messages.

func NewTxnBuffer

func NewTxnBuffer(logger *log.MLogger, metrics *metricsutil.ScannerMetrics) *TxnBuffer

NewTxnBuffer creates a new txn buffer.

func (*TxnBuffer) Bytes

func (b *TxnBuffer) Bytes() int

func (*TxnBuffer) HandleImmutableMessages

func (b *TxnBuffer) HandleImmutableMessages(msgs []message.ImmutableMessage, ts uint64) []message.ImmutableMessage

HandleImmutableMessages handles immutable messages. The timetick of msgs should be in ascending order, and the timetick of all messages is less than or equal to ts. Hold the uncommitted txn messages until the commit or rollback message comes and pop the committed txn messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL