Documentation ¶
Index ¶
- func ModifyAppendResultExtra[M protoreflect.ProtoMessage](ctx context.Context, modifier func(old M) (new M))
- func ReplaceAppendResultTimeTick(ctx context.Context, timeTick uint64)
- func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext)
- func WithExtraAppendResult(ctx context.Context, r *ExtraAppendResult) context.Context
- func WithNotPersisted(ctx context.Context, hint *NotPersistedHint) context.Context
- type ExtraAppendResult
- type NotPersistedHint
- type PendingQueue
- type ReOrderByTimeTickBuffer
- type TxnBuffer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ModifyAppendResultExtra ¶
func ModifyAppendResultExtra[M protoreflect.ProtoMessage](ctx context.Context, modifier func(old M) (new M))
ModifyAppendResultExtra modify extra in context
func ReplaceAppendResultTimeTick ¶
ReplaceAppendResultTimeTick set time tick to context
func ReplaceAppendResultTxnContext ¶
func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext)
ReplaceAppendResultTxnContext set txn context to context
func WithExtraAppendResult ¶
func WithExtraAppendResult(ctx context.Context, r *ExtraAppendResult) context.Context
WithExtraAppendResult set extra to context
func WithNotPersisted ¶
func WithNotPersisted(ctx context.Context, hint *NotPersistedHint) context.Context
WithNotPersisted set not persisted message to context
Types ¶
type ExtraAppendResult ¶
type ExtraAppendResult struct { TimeTick uint64 TxnCtx *message.TxnContext Extra protoreflect.ProtoMessage }
ExtraAppendResult is the extra append result.
type NotPersistedHint ¶
NotPersistedHint is the hint of not persisted message.
func GetNotPersisted ¶
func GetNotPersisted(ctx context.Context) *NotPersistedHint
GetNotPersisted get not persisted message from context
type PendingQueue ¶
type PendingQueue struct { *typeutil.MultipartQueue[message.ImmutableMessage] // contains filtered or unexported fields }
func NewPendingQueue ¶
func NewPendingQueue() *PendingQueue
func (*PendingQueue) Add ¶
func (q *PendingQueue) Add(msg []message.ImmutableMessage)
func (*PendingQueue) AddOne ¶
func (q *PendingQueue) AddOne(msg message.ImmutableMessage)
func (*PendingQueue) Bytes ¶
func (q *PendingQueue) Bytes() int
func (*PendingQueue) UnsafeAdvance ¶
func (q *PendingQueue) UnsafeAdvance()
type ReOrderByTimeTickBuffer ¶
type ReOrderByTimeTickBuffer struct {
// contains filtered or unexported fields
}
ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.
func NewReOrderBuffer ¶
func NewReOrderBuffer() *ReOrderByTimeTickBuffer
NewReOrderBuffer creates a new ReOrderBuffer.
func (*ReOrderByTimeTickBuffer) Bytes ¶
func (r *ReOrderByTimeTickBuffer) Bytes() int
func (*ReOrderByTimeTickBuffer) Len ¶
func (r *ReOrderByTimeTickBuffer) Len() int
Len returns the number of messages in the buffer.
func (*ReOrderByTimeTickBuffer) PopUtilTimeTick ¶
func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.ImmutableMessage
PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick. The result is sorted by time tick in ascending order.
func (*ReOrderByTimeTickBuffer) Push ¶
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error
Push pushes a message into the buffer.
type TxnBuffer ¶
type TxnBuffer struct {
// contains filtered or unexported fields
}
TxnBuffer is a buffer for txn messages.
func NewTxnBuffer ¶
func NewTxnBuffer(logger *log.MLogger, metrics *metricsutil.ScannerMetrics) *TxnBuffer
NewTxnBuffer creates a new txn buffer.
func (*TxnBuffer) HandleImmutableMessages ¶
func (b *TxnBuffer) HandleImmutableMessages(msgs []message.ImmutableMessage, ts uint64) []message.ImmutableMessage
HandleImmutableMessages handles immutable messages. The timetick of msgs should be in ascending order, and the timetick of all messages is less than or equal to ts. Hold the uncommitted txn messages until the commit or rollback message comes and pop the committed txn messages.