isb

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

Inter Step Buffer (isb)

Documentation

Index

Constants

View Source
const PendingNotAvailable = int64(math.MinInt64)

Variables

View Source
var (
	BufferFullMessage  = "Buffer full!"
	DuplicateIDMessage = "Duplicate ID!"
)
View Source
var DefaultPartitionIdx = int32(0)

DefaultPartitionIdx Default partition index

Functions

This section is empty.

Types

type Body

type Body struct {
	Payload []byte
}

Body is the body of the message

type BufferReadErr

type BufferReadErr struct {
	Name        string
	Empty       bool
	InternalErr bool
	Message     string
}

BufferReadErr when we cannot read from the buffer.

func (BufferReadErr) Error

func (e BufferReadErr) Error() string

func (BufferReadErr) IsEmpty

func (e BufferReadErr) IsEmpty() bool

IsEmpty returns true if buffer is empty.

func (BufferReadErr) IsInternalErr

func (e BufferReadErr) IsInternalErr() bool

IsInternalErr returns true if reading is failing due to a buffer internal error.

type BufferReader

type BufferReader interface {
	BufferReaderInformation
	io.Closer
	// Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the
	// array of result is empty, the callee should process all the elements in the array even if the error is set. Read
	// will not mark the message in the buffer as "READ" if the read for that index is erring.
	// There is a chance that we have read the message and the container got forcefully terminated before processing. To provide
	// at-least-once semantics for reading, during the restart we will have to reprocess all unacknowledged messages.
	Read(context.Context, int64) ([]*ReadMessage, error)
	// Ack acknowledges an array of offset.
	Ack(context.Context, []Offset) []error
	// NoAck cancels acknowledgement of an array of offset.
	NoAck(context.Context, []Offset)
	// Pending returns the count of pending messages.
	Pending(context.Context) (int64, error)
}

BufferReader is the buffer from which we are reading.

type BufferReaderInformation

type BufferReaderInformation interface {
	// GetName returns the name.
	GetName() string
	// GetPartitionIdx returns the partition ID.
	GetPartitionIdx() int32
}

BufferReaderInformation has information regarding the buffer we are reading from.

type BufferWriteErr

type BufferWriteErr struct {
	Name        string
	Full        bool
	InternalErr bool
	Message     string
}

BufferWriteErr when we cannot write to the buffer because of a full buffer.

func (BufferWriteErr) Error

func (e BufferWriteErr) Error() string

func (BufferWriteErr) IsFull

func (e BufferWriteErr) IsFull() bool

IsFull returns true if buffer is full.

func (BufferWriteErr) IsInternalErr

func (e BufferWriteErr) IsInternalErr() bool

IsInternalErr returns true if writing is failing due to a buffer internal error.

type BufferWriter

type BufferWriter interface {
	BufferWriterInformation
	io.Closer
	Write(context.Context, []Message) ([]Offset, []error)
}

BufferWriter is the buffer to which we are writing.

type BufferWriterInformation

type BufferWriterInformation interface {
	// GetName returns the name.
	GetName() string
	// GetPartitionIdx returns the partition ID.
	GetPartitionIdx() int32
}

BufferWriterInformation has information regarding the buffer we are writing to.

type Header struct {
	MessageInfo
	// Kind indicates the kind of Message
	Kind MessageKind
	// ID is used for exactly-once-semantics. ID is a combination of vertex name, offset and index of the message.
	ID MessageID
	// Keys is (key,value) in the map-reduce paradigm will be used for reduce operation, last key in the list
	// will be used for conditional forwarding
	Keys []string
	// Headers is the headers of the message which can be used to store and propagate source headers like kafka headers,
	// http headers and Numaflow platform headers like tracing headers etc.
	//TODO: can we rename this? Gets confusing for understanding headers under header
	Headers map[string]string
}

Header is the header of the message

func (Header) MarshalBinary

func (h Header) MarshalBinary() ([]byte, error)

MarshalBinary encodes Header to proto bytes.

func (*Header) UnmarshalBinary

func (h *Header) UnmarshalBinary(data []byte) error

UnmarshalBinary decodes Header from the proto bytes.

type LagReader added in v0.5.3

type LagReader interface {
	GetName() string
	// Pending returns the pending messages number.
	Pending(context.Context) (int64, error)
}

LagReader is the interface that wraps the Pending method and GetName method. will be used by the metrics server to get the pending messages count.

type Message

type Message struct {
	Header
	Body
}

Message is inter step message

func (Message) MarshalBinary added in v0.7.2

func (m Message) MarshalBinary() ([]byte, error)

MarshalBinary encodes Message to proto bytes.

func (*Message) ToReadMessage added in v0.7.2

func (m *Message) ToReadMessage(ot Offset, wm time.Time) *ReadMessage

ToReadMessage converts Message to a ReadMessage by providing the offset and watermark

func (*Message) UnmarshalBinary added in v0.7.2

func (m *Message) UnmarshalBinary(data []byte) error

UnmarshalBinary decodes Message from the proto bytes. Even though the header and body are pointers in the proto, we don't need nil checks because while marshalling, we always set the default values.

type MessageAckErr

type MessageAckErr struct {
	Name    string
	Offset  Offset
	Message string
}

MessageAckErr is for acknowledgement errors.

func (MessageAckErr) Error

func (e MessageAckErr) Error() string

type MessageID added in v1.3.0

type MessageID struct {
	// VertexName is the name of the vertex
	VertexName string
	// Offset is the offset of the message
	// NOTE: should be unique across the replicas of the vertex, that is the
	// reason we don't have a separate replica field in the MessageID
	Offset string
	// Index is the index of a flatmap message, otherwise use 0
	Index int32
}

MessageID is the message ID of the message which is used for exactly-once-semantics.

func (MessageID) MarshalBinary added in v1.3.0

func (id MessageID) MarshalBinary() ([]byte, error)

MarshalBinary encodes MessageID to proto bytes.

func (MessageID) String added in v1.3.0

func (id MessageID) String() string

String returns the string representation of the MessageID

func (*MessageID) UnmarshalBinary added in v1.3.0

func (id *MessageID) UnmarshalBinary(data []byte) error

UnmarshalBinary decodes MessageID from proto bytes.

type MessageInfo added in v0.7.2

type MessageInfo struct {
	// EventTime when
	// MessageKind == Data represents the event time of the message
	// MessageKind == WMB, value is ignored
	EventTime time.Time
	// IsLate when
	// MessageKind == Data, IsLate is used to indicate if the message is a late data (assignment happens at source)
	// MessageKind == WMB, value is ignored
	IsLate bool
}

MessageInfo is the message information window of the payload. The contents inside the MessageInfo can be interpreted differently based on the MessageKind.

type MessageKind added in v0.7.2

type MessageKind int16

MessageKind represents the message type of the payload.

const (
	Data MessageKind = iota // Data payload
	WMB                     // Watermark Barrier
)

func (MessageKind) String added in v0.7.2

func (mt MessageKind) String() string

type MessageMetadata added in v0.8.0

type MessageMetadata struct {
	// NumDelivered is the number of times the message has been delivered.
	NumDelivered uint64
}

MessageMetadata is the metadata of the message

type MessageReadErr

type MessageReadErr struct {
	Name    string
	Payload []byte
	Message string
}

MessageReadErr is associated with message read errors.

func (MessageReadErr) Error

func (e MessageReadErr) Error() string

type MessageWriteErr

type MessageWriteErr struct {
	Name    string
	Header  Header
	Body    Body
	Message string
}

MessageWriteErr is associated with message write errors.

func (MessageWriteErr) Error

func (e MessageWriteErr) Error() string

type NonRetryableBufferWriteErr added in v1.2.0

type NonRetryableBufferWriteErr struct {
	Name    string
	Message string
}

NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.

func (NonRetryableBufferWriteErr) Error added in v1.2.0

type Offset

type Offset interface {
	// String returns the offset identifier
	String() string
	// Sequence returns a sequence id which can be used to index into the buffer (ISB)
	Sequence() (int64, error)
	// AckIt is used to ack the offset
	// This is often used when the BufferReader can not simply use the offset identifier to ack the message,
	// then the work can be done in this function, and call it in BufferReader Ack() function implementation.
	AckIt() error
	// NoAck to indicate the offset no longer needs to be acknowledged
	// It is used when error occurs, and we want to reprocess the batch to indicate acknowledgement no
	// longer needed.
	NoAck() error
	// PartitionIdx returns the partition index to which the offset belongs to.
	PartitionIdx() int32
}

Offset is an interface used in the ReadMessage referencing offset information.

func NewSimpleIntPartitionOffset added in v0.10.0

func NewSimpleIntPartitionOffset(seq int64, partitionIdx int32) Offset

func NewSimpleStringPartitionOffset added in v0.10.0

func NewSimpleStringPartitionOffset(seq string, partitionIdx int32) Offset

type ReadMessage

type ReadMessage struct {
	Message
	ReadOffset Offset
	Watermark  time.Time
	// Metadata is the metadata of the message after a message is read from the buffer.
	Metadata MessageMetadata
}

ReadMessage is the message read from the buffer.

type ReadWriteMessagePair added in v1.3.0

type ReadWriteMessagePair struct {
	ReadMessage   *ReadMessage
	WriteMessages []*WriteMessage
}

ReadWriteMessagePair is a pair of ReadMessage and a list of WriteMessage which will be used to map the read message to a list of write messages that the udf returns.

type SimpleIntOffset added in v0.6.1

type SimpleIntOffset func() int64

SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.

func (SimpleIntOffset) AckIt added in v0.6.1

func (si SimpleIntOffset) AckIt() error

func (SimpleIntOffset) NoAck added in v0.7.3

func (si SimpleIntOffset) NoAck() error

func (SimpleIntOffset) PartitionIdx added in v0.10.0

func (si SimpleIntOffset) PartitionIdx() int32

func (SimpleIntOffset) Sequence added in v0.6.1

func (si SimpleIntOffset) Sequence() (int64, error)

func (SimpleIntOffset) String added in v0.6.1

func (si SimpleIntOffset) String() string

type SimpleStringOffset added in v0.6.1

type SimpleStringOffset func() string

SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.

func (SimpleStringOffset) AckIt added in v0.6.1

func (so SimpleStringOffset) AckIt() error

func (SimpleStringOffset) NoAck added in v0.7.3

func (so SimpleStringOffset) NoAck() error

func (SimpleStringOffset) PartitionIdx added in v0.10.0

func (so SimpleStringOffset) PartitionIdx() int32

func (SimpleStringOffset) Sequence added in v0.6.1

func (so SimpleStringOffset) Sequence() (int64, error)

func (SimpleStringOffset) String added in v0.6.1

func (so SimpleStringOffset) String() string

type WriteMessage added in v0.8.0

type WriteMessage struct {
	Message
	Tags []string
}

WriteMessage is a wrapper for an isb message with tag information which will be used for conditional forwarding.

Directories

Path Synopsis
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL