Documentation ¶
Index ¶
- Constants
- Variables
- type Body
- type BufferReadErr
- type BufferReader
- type BufferReaderInformation
- type BufferWriteErr
- type BufferWriter
- type BufferWriterInformation
- type Header
- type LagReader
- type Message
- type MessageAckErr
- type MessageID
- type MessageInfo
- type MessageKind
- type MessageMetadata
- type MessageReadErr
- type MessageWriteErr
- type NonRetryableBufferWriteErr
- type Offset
- type ReadMessage
- type ReadWriteMessagePair
- type SimpleIntOffset
- type SimpleStringOffset
- type WriteMessage
Constants ¶
const PendingNotAvailable = int64(math.MinInt64)
Variables ¶
var ( BufferFullMessage = "Buffer full!" DuplicateIDMessage = "Duplicate ID!" )
var DefaultPartitionIdx = int32(0)
DefaultPartitionIdx Default partition index
Functions ¶
This section is empty.
Types ¶
type BufferReadErr ¶
BufferReadErr when we cannot read from the buffer.
func (BufferReadErr) Error ¶
func (e BufferReadErr) Error() string
func (BufferReadErr) IsEmpty ¶
func (e BufferReadErr) IsEmpty() bool
IsEmpty returns true if buffer is empty.
func (BufferReadErr) IsInternalErr ¶
func (e BufferReadErr) IsInternalErr() bool
IsInternalErr returns true if reading is failing due to a buffer internal error.
type BufferReader ¶
type BufferReader interface { BufferReaderInformation io.Closer // Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the // array of result is empty, the callee should process all the elements in the array even if the error is set. Read // will not mark the message in the buffer as "READ" if the read for that index is erring. // There is a chance that we have read the message and the container got forcefully terminated before processing. To provide // at-least-once semantics for reading, during the restart we will have to reprocess all unacknowledged messages. Read(context.Context, int64) ([]*ReadMessage, error) // Ack acknowledges an array of offset. Ack(context.Context, []Offset) []error // NoAck cancels acknowledgement of an array of offset. NoAck(context.Context, []Offset) // Pending returns the count of pending messages. Pending(context.Context) (int64, error) }
BufferReader is the buffer from which we are reading.
type BufferReaderInformation ¶
type BufferReaderInformation interface { // GetName returns the name. GetName() string // GetPartitionIdx returns the partition ID. GetPartitionIdx() int32 }
BufferReaderInformation has information regarding the buffer we are reading from.
type BufferWriteErr ¶
BufferWriteErr when we cannot write to the buffer because of a full buffer.
func (BufferWriteErr) Error ¶
func (e BufferWriteErr) Error() string
func (BufferWriteErr) IsFull ¶
func (e BufferWriteErr) IsFull() bool
IsFull returns true if buffer is full.
func (BufferWriteErr) IsInternalErr ¶
func (e BufferWriteErr) IsInternalErr() bool
IsInternalErr returns true if writing is failing due to a buffer internal error.
type BufferWriter ¶
type BufferWriter interface { BufferWriterInformation io.Closer Write(context.Context, []Message) ([]Offset, []error) }
BufferWriter is the buffer to which we are writing.
type BufferWriterInformation ¶
type BufferWriterInformation interface { // GetName returns the name. GetName() string // GetPartitionIdx returns the partition ID. GetPartitionIdx() int32 }
BufferWriterInformation has information regarding the buffer we are writing to.
type Header ¶
type Header struct { MessageInfo // Kind indicates the kind of Message Kind MessageKind // ID is used for exactly-once-semantics. ID is a combination of vertex name, offset and index of the message. ID MessageID // Keys is (key,value) in the map-reduce paradigm will be used for reduce operation, last key in the list // will be used for conditional forwarding Keys []string // Headers is the headers of the message which can be used to store and propagate source headers like kafka headers, // http headers and Numaflow platform headers like tracing headers etc. //TODO: can we rename this? Gets confusing for understanding headers under header Headers map[string]string }
Header is the header of the message
func (Header) MarshalBinary ¶
MarshalBinary encodes Header to proto bytes.
func (*Header) UnmarshalBinary ¶
UnmarshalBinary decodes Header from the proto bytes.
type LagReader ¶ added in v0.5.3
type LagReader interface { GetName() string // Pending returns the pending messages number. Pending(context.Context) (int64, error) }
LagReader is the interface that wraps the Pending method and GetName method. will be used by the metrics server to get the pending messages count.
type Message ¶
Message is inter step message
func (Message) MarshalBinary ¶ added in v0.7.2
MarshalBinary encodes Message to proto bytes.
func (*Message) ToReadMessage ¶ added in v0.7.2
func (m *Message) ToReadMessage(ot Offset, wm time.Time) *ReadMessage
ToReadMessage converts Message to a ReadMessage by providing the offset and watermark
func (*Message) UnmarshalBinary ¶ added in v0.7.2
UnmarshalBinary decodes Message from the proto bytes. Even though the header and body are pointers in the proto, we don't need nil checks because while marshalling, we always set the default values.
type MessageAckErr ¶
MessageAckErr is for acknowledgement errors.
func (MessageAckErr) Error ¶
func (e MessageAckErr) Error() string
type MessageID ¶ added in v1.3.0
type MessageID struct { // VertexName is the name of the vertex VertexName string // Offset is the offset of the message // NOTE: should be unique across the replicas of the vertex, that is the // reason we don't have a separate replica field in the MessageID Offset string // Index is the index of a flatmap message, otherwise use 0 Index int32 }
MessageID is the message ID of the message which is used for exactly-once-semantics.
func (MessageID) MarshalBinary ¶ added in v1.3.0
MarshalBinary encodes MessageID to proto bytes.
func (*MessageID) UnmarshalBinary ¶ added in v1.3.0
UnmarshalBinary decodes MessageID from proto bytes.
type MessageInfo ¶ added in v0.7.2
type MessageInfo struct { // EventTime when // MessageKind == Data represents the event time of the message // MessageKind == WMB, value is ignored EventTime time.Time // IsLate when // MessageKind == Data, IsLate is used to indicate if the message is a late data (assignment happens at source) // MessageKind == WMB, value is ignored IsLate bool }
MessageInfo is the message information window of the payload. The contents inside the MessageInfo can be interpreted differently based on the MessageKind.
type MessageKind ¶ added in v0.7.2
type MessageKind int16
MessageKind represents the message type of the payload.
const ( Data MessageKind = iota // Data payload WMB // Watermark Barrier )
func (MessageKind) String ¶ added in v0.7.2
func (mt MessageKind) String() string
type MessageMetadata ¶ added in v0.8.0
type MessageMetadata struct { // NumDelivered is the number of times the message has been delivered. NumDelivered uint64 }
MessageMetadata is the metadata of the message
type MessageReadErr ¶
MessageReadErr is associated with message read errors.
func (MessageReadErr) Error ¶
func (e MessageReadErr) Error() string
type MessageWriteErr ¶
MessageWriteErr is associated with message write errors.
func (MessageWriteErr) Error ¶
func (e MessageWriteErr) Error() string
type NonRetryableBufferWriteErr ¶ added in v1.2.0
NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
func (NonRetryableBufferWriteErr) Error ¶ added in v1.2.0
func (e NonRetryableBufferWriteErr) Error() string
type Offset ¶
type Offset interface { // String returns the offset identifier String() string // Sequence returns a sequence id which can be used to index into the buffer (ISB) Sequence() (int64, error) // AckIt is used to ack the offset // This is often used when the BufferReader can not simply use the offset identifier to ack the message, // then the work can be done in this function, and call it in BufferReader Ack() function implementation. AckIt() error // NoAck to indicate the offset no longer needs to be acknowledged // It is used when error occurs, and we want to reprocess the batch to indicate acknowledgement no // longer needed. NoAck() error // PartitionIdx returns the partition index to which the offset belongs to. PartitionIdx() int32 }
Offset is an interface used in the ReadMessage referencing offset information.
func NewSimpleIntPartitionOffset ¶ added in v0.10.0
func NewSimpleStringPartitionOffset ¶ added in v0.10.0
type ReadMessage ¶
type ReadMessage struct { Message ReadOffset Offset Watermark time.Time // Metadata is the metadata of the message after a message is read from the buffer. Metadata MessageMetadata }
ReadMessage is the message read from the buffer.
type ReadWriteMessagePair ¶ added in v1.3.0
type ReadWriteMessagePair struct { ReadMessage *ReadMessage WriteMessages []*WriteMessage }
ReadWriteMessagePair is a pair of ReadMessage and a list of WriteMessage which will be used to map the read message to a list of write messages that the udf returns.
type SimpleIntOffset ¶ added in v0.6.1
type SimpleIntOffset func() int64
SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.
func (SimpleIntOffset) AckIt ¶ added in v0.6.1
func (si SimpleIntOffset) AckIt() error
func (SimpleIntOffset) NoAck ¶ added in v0.7.3
func (si SimpleIntOffset) NoAck() error
func (SimpleIntOffset) PartitionIdx ¶ added in v0.10.0
func (si SimpleIntOffset) PartitionIdx() int32
func (SimpleIntOffset) Sequence ¶ added in v0.6.1
func (si SimpleIntOffset) Sequence() (int64, error)
func (SimpleIntOffset) String ¶ added in v0.6.1
func (si SimpleIntOffset) String() string
type SimpleStringOffset ¶ added in v0.6.1
type SimpleStringOffset func() string
SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.
func (SimpleStringOffset) AckIt ¶ added in v0.6.1
func (so SimpleStringOffset) AckIt() error
func (SimpleStringOffset) NoAck ¶ added in v0.7.3
func (so SimpleStringOffset) NoAck() error
func (SimpleStringOffset) PartitionIdx ¶ added in v0.10.0
func (so SimpleStringOffset) PartitionIdx() int32
func (SimpleStringOffset) Sequence ¶ added in v0.6.1
func (so SimpleStringOffset) Sequence() (int64, error)
func (SimpleStringOffset) String ¶ added in v0.6.1
func (so SimpleStringOffset) String() string
type WriteMessage ¶ added in v0.8.0
WriteMessage is a wrapper for an isb message with tag information which will be used for conditional forwarding.