isb

package
v1.2.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

Inter Step Buffer (isb)

Documentation

Index

Constants

View Source
const PendingNotAvailable = int64(math.MinInt64)

Variables

View Source
var (
	BufferFullMessage  = "Buffer full!"
	DuplicateIDMessage = "Duplicate ID!"
)
View Source
var DefaultPartitionIdx = int32(0)

DefaultPartitionIdx Default partition index

Functions

This section is empty.

Types

type Body

type Body struct {
	Payload []byte
}

Body is the body of the message

func (Body) MarshalBinary

func (b Body) MarshalBinary() (data []byte, err error)

MarshalBinary encodes Body to a binary format

func (*Body) UnmarshalBinary

func (b *Body) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes Body from the binary format

type BufferReadErr

type BufferReadErr struct {
	Name        string
	Empty       bool
	InternalErr bool
	Message     string
}

BufferReadErr when we cannot read from the buffer.

func (BufferReadErr) Error

func (e BufferReadErr) Error() string

func (BufferReadErr) IsEmpty

func (e BufferReadErr) IsEmpty() bool

IsEmpty returns true if buffer is empty.

func (BufferReadErr) IsInternalErr

func (e BufferReadErr) IsInternalErr() bool

IsInternalErr returns true if reading is failing due to a buffer internal error.

type BufferReader

type BufferReader interface {
	BufferReaderInformation
	io.Closer
	// Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the
	// array of result is empty, the callee should process all the elements in the array even if the error is set. Read
	// will not mark the message in the buffer as "READ" if the read for that index is erring.
	// There is a chance that we have read the message and the container got forcefully terminated before processing. To provide
	// at-least-once semantics for reading, during the restart we will have to reprocess all unacknowledged messages.
	Read(context.Context, int64) ([]*ReadMessage, error)
	// Ack acknowledges an array of offset.
	Ack(context.Context, []Offset) []error
	// NoAck cancels acknowledgement of an array of offset.
	NoAck(context.Context, []Offset)
	// Pending returns the count of pending messages.
	Pending(context.Context) (int64, error)
}

BufferReader is the buffer from which we are reading.

type BufferReaderInformation

type BufferReaderInformation interface {
	// GetName returns the name.
	GetName() string
	// GetPartitionIdx returns the partition ID.
	GetPartitionIdx() int32
}

BufferReaderInformation has information regarding the buffer we are reading from.

type BufferWriteErr

type BufferWriteErr struct {
	Name        string
	Full        bool
	InternalErr bool
	Message     string
}

BufferWriteErr when we cannot write to the buffer because of a full buffer.

func (BufferWriteErr) Error

func (e BufferWriteErr) Error() string

func (BufferWriteErr) IsFull

func (e BufferWriteErr) IsFull() bool

IsFull returns true if buffer is full.

func (BufferWriteErr) IsInternalErr

func (e BufferWriteErr) IsInternalErr() bool

IsInternalErr returns true if writing is failing due to a buffer internal error.

type BufferWriter

type BufferWriter interface {
	BufferWriterInformation
	io.Closer
	Write(context.Context, []Message) ([]Offset, []error)
}

BufferWriter is the buffer to which we are writing.

type BufferWriterInformation

type BufferWriterInformation interface {
	// GetName returns the name.
	GetName() string
	// GetPartitionIdx returns the partition ID.
	GetPartitionIdx() int32
}

BufferWriterInformation has information regarding the buffer we are writing to.

type Header struct {
	MessageInfo
	// Kind indicates the kind of Message
	Kind MessageKind
	// ID is used for exactly-once-semantics. ID is usually populated from the offset, if offset is available.
	ID string
	// Keys is (key,value) in the map-reduce paradigm will be used for reduce operation, last key in the list
	// will be used for conditional forwarding
	Keys []string
	// Headers is the headers of the message which can be used to store and propagate source headers like kafka headers,
	// http headers and Numaflow platform headers like tracing headers etc.
	Headers map[string]string
}

Header is the header of the message

func (Header) MarshalBinary

func (h Header) MarshalBinary() (data []byte, err error)

MarshalBinary encodes Header to a binary format

func (*Header) UnmarshalBinary

func (h *Header) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes Header from the binary format

type LagReader added in v0.5.3

type LagReader interface {
	GetName() string
	// Pending returns the pending messages number.
	Pending(context.Context) (int64, error)
}

LagReader is the interface that wraps the Pending method and GetName method. will be used by the metrics server to get the pending messages count.

type Message

type Message struct {
	Header
	Body
}

Message is inter step message

func (Message) MarshalBinary added in v0.7.2

func (m Message) MarshalBinary() (data []byte, err error)

MarshalBinary encodes Message to the binary format

func (*Message) ToReadMessage added in v0.7.2

func (m *Message) ToReadMessage(ot Offset, wm time.Time) *ReadMessage

ToReadMessage converts Message to a ReadMessage by providing the offset and watermark

func (*Message) UnmarshalBinary added in v0.7.2

func (m *Message) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes Message from the binary format

type MessageAckErr

type MessageAckErr struct {
	Name    string
	Offset  Offset
	Message string
}

MessageAckErr is for acknowledgement errors.

func (MessageAckErr) Error

func (e MessageAckErr) Error() string

type MessageInfo added in v0.7.2

type MessageInfo struct {
	// EventTime when
	// MessageKind == Data represents the event time of the message
	// MessageKind == WMB, value is ignored
	EventTime time.Time
	// IsLate when
	// MessageKind == Data, IsLate is used to indicate if the message is a late data (assignment happens at source)
	// MessageKind == WMB, value is ignored
	IsLate bool
}

MessageInfo is the message information window of the payload. The contents inside the MessageInfo can be interpreted differently based on the MessageKind.

func (MessageInfo) MarshalBinary added in v0.7.2

func (p MessageInfo) MarshalBinary() (data []byte, err error)

MarshalBinary encodes MessageInfo to the binary format

func (*MessageInfo) UnmarshalBinary added in v0.7.2

func (p *MessageInfo) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes MessageInfo from the binary format

type MessageKind added in v0.7.2

type MessageKind int16

MessageKind represents the message type of the payload.

const (
	Data MessageKind = iota // Data payload
	WMB                     // Watermark Barrier
)

func (MessageKind) String added in v0.7.2

func (mt MessageKind) String() string

type MessageMetadata added in v0.8.0

type MessageMetadata struct {
	// NumDelivered is the number of times the message has been delivered.
	NumDelivered uint64
}

MessageMetadata is the metadata of the message

type MessageReadErr

type MessageReadErr struct {
	Name    string
	Header  []byte
	Body    []byte
	Message string
}

MessageReadErr is associated with message read errors.

func (MessageReadErr) Error

func (e MessageReadErr) Error() string

type MessageWriteErr

type MessageWriteErr struct {
	Name    string
	Header  Header
	Body    Body
	Message string
}

MessageWriteErr is associated with message write errors.

func (MessageWriteErr) Error

func (e MessageWriteErr) Error() string

type NonRetryableBufferWriteErr added in v1.2.0

type NonRetryableBufferWriteErr struct {
	Name    string
	Message string
}

NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.

func (NonRetryableBufferWriteErr) Error added in v1.2.0

type Offset

type Offset interface {
	// String returns the offset identifier
	String() string
	// Sequence returns a sequence id which can be used to index into the buffer (ISB)
	Sequence() (int64, error)
	// AckIt is used to ack the offset
	// This is often used when the BufferReader can not simply use the offset identifier to ack the message,
	// then the work can be done in this function, and call it in BufferReader Ack() function implementation.
	AckIt() error
	// NoAck to indicate the offset no longer needs to be acknowledged
	// It is used when error occurs, and we want to reprocess the batch to indicate acknowledgement no
	// longer needed.
	NoAck() error
	// PartitionIdx returns the partition index to which the offset belongs to.
	PartitionIdx() int32
}

Offset is an interface used in the ReadMessage referencing offset information.

func NewSimpleIntPartitionOffset added in v0.10.0

func NewSimpleIntPartitionOffset(seq int64, partitionIdx int32) Offset

func NewSimpleStringPartitionOffset added in v0.10.0

func NewSimpleStringPartitionOffset(seq string, partitionIdx int32) Offset

type ReadMessage

type ReadMessage struct {
	Message
	ReadOffset Offset
	Watermark  time.Time
	// Metadata is the metadata of the message after a message is read from the buffer.
	Metadata MessageMetadata
}

ReadMessage is the message read from the buffer.

func (ReadMessage) MarshalBinary added in v0.7.2

func (rm ReadMessage) MarshalBinary() (data []byte, err error)

MarshalBinary encodes ReadMessage to the binary format

func (*ReadMessage) UnmarshalBinary added in v0.7.2

func (rm *ReadMessage) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes ReadMessage from the binary format

type SimpleIntOffset added in v0.6.1

type SimpleIntOffset func() int64

SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.

func (SimpleIntOffset) AckIt added in v0.6.1

func (si SimpleIntOffset) AckIt() error

func (SimpleIntOffset) NoAck added in v0.7.3

func (si SimpleIntOffset) NoAck() error

func (SimpleIntOffset) PartitionIdx added in v0.10.0

func (si SimpleIntOffset) PartitionIdx() int32

func (SimpleIntOffset) Sequence added in v0.6.1

func (si SimpleIntOffset) Sequence() (int64, error)

func (SimpleIntOffset) String added in v0.6.1

func (si SimpleIntOffset) String() string

type SimpleStringOffset added in v0.6.1

type SimpleStringOffset func() string

SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.

func (SimpleStringOffset) AckIt added in v0.6.1

func (so SimpleStringOffset) AckIt() error

func (SimpleStringOffset) NoAck added in v0.7.3

func (so SimpleStringOffset) NoAck() error

func (SimpleStringOffset) PartitionIdx added in v0.10.0

func (so SimpleStringOffset) PartitionIdx() int32

func (SimpleStringOffset) Sequence added in v0.6.1

func (so SimpleStringOffset) Sequence() (int64, error)

func (SimpleStringOffset) String added in v0.6.1

func (so SimpleStringOffset) String() string

type WriteMessage added in v0.8.0

type WriteMessage struct {
	Message
	Tags []string
}

WriteMessage is a wrapper for an isb message with tag information which will be used for conditional forwarding.

Directories

Path Synopsis
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL