Documentation ¶
Index ¶
- Constants
- Variables
- type Body
- type BufferReadErr
- type BufferReader
- type BufferReaderInformation
- type BufferWriteErr
- type BufferWriter
- type BufferWriterInformation
- type Header
- type LagReader
- type Message
- type MessageAckErr
- type MessageInfo
- type MessageKind
- type MessageMetadata
- type MessageReadErr
- type MessageWriteErr
- type NonRetryableBufferWriteErr
- type Offset
- type ReadMessage
- type SimpleIntOffset
- type SimpleStringOffset
- type WriteMessage
Constants ¶
const PendingNotAvailable = int64(math.MinInt64)
Variables ¶
var ( BufferFullMessage = "Buffer full!" DuplicateIDMessage = "Duplicate ID!" )
var DefaultPartitionIdx = int32(0)
DefaultPartitionIdx Default partition index
Functions ¶
This section is empty.
Types ¶
type Body ¶
type Body struct {
Payload []byte
}
Body is the body of the message
func (Body) MarshalBinary ¶
MarshalBinary encodes Body to a binary format
func (*Body) UnmarshalBinary ¶
UnmarshalBinary decodes Body from the binary format
type BufferReadErr ¶
BufferReadErr when we cannot read from the buffer.
func (BufferReadErr) Error ¶
func (e BufferReadErr) Error() string
func (BufferReadErr) IsEmpty ¶
func (e BufferReadErr) IsEmpty() bool
IsEmpty returns true if buffer is empty.
func (BufferReadErr) IsInternalErr ¶
func (e BufferReadErr) IsInternalErr() bool
IsInternalErr returns true if reading is failing due to a buffer internal error.
type BufferReader ¶
type BufferReader interface { BufferReaderInformation io.Closer // Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the // array of result is empty, the callee should process all the elements in the array even if the error is set. Read // will not mark the message in the buffer as "READ" if the read for that index is erring. // There is a chance that we have read the message and the container got forcefully terminated before processing. To provide // at-least-once semantics for reading, during the restart we will have to reprocess all unacknowledged messages. Read(context.Context, int64) ([]*ReadMessage, error) // Ack acknowledges an array of offset. Ack(context.Context, []Offset) []error // NoAck cancels acknowledgement of an array of offset. NoAck(context.Context, []Offset) // Pending returns the count of pending messages. Pending(context.Context) (int64, error) }
BufferReader is the buffer from which we are reading.
type BufferReaderInformation ¶
type BufferReaderInformation interface { // GetName returns the name. GetName() string // GetPartitionIdx returns the partition ID. GetPartitionIdx() int32 }
BufferReaderInformation has information regarding the buffer we are reading from.
type BufferWriteErr ¶
BufferWriteErr when we cannot write to the buffer because of a full buffer.
func (BufferWriteErr) Error ¶
func (e BufferWriteErr) Error() string
func (BufferWriteErr) IsFull ¶
func (e BufferWriteErr) IsFull() bool
IsFull returns true if buffer is full.
func (BufferWriteErr) IsInternalErr ¶
func (e BufferWriteErr) IsInternalErr() bool
IsInternalErr returns true if writing is failing due to a buffer internal error.
type BufferWriter ¶
type BufferWriter interface { BufferWriterInformation io.Closer Write(context.Context, []Message) ([]Offset, []error) }
BufferWriter is the buffer to which we are writing.
type BufferWriterInformation ¶
type BufferWriterInformation interface { // GetName returns the name. GetName() string // GetPartitionIdx returns the partition ID. GetPartitionIdx() int32 }
BufferWriterInformation has information regarding the buffer we are writing to.
type Header ¶
type Header struct { MessageInfo // Kind indicates the kind of Message Kind MessageKind // ID is used for exactly-once-semantics. ID is usually populated from the offset, if offset is available. ID string // Keys is (key,value) in the map-reduce paradigm will be used for reduce operation, last key in the list // will be used for conditional forwarding Keys []string // Headers is the headers of the message which can be used to store and propagate source headers like kafka headers, // http headers and Numaflow platform headers like tracing headers etc. Headers map[string]string }
Header is the header of the message
func (Header) MarshalBinary ¶
MarshalBinary encodes Header to a binary format
func (*Header) UnmarshalBinary ¶
UnmarshalBinary decodes Header from the binary format
type LagReader ¶ added in v0.5.3
type LagReader interface { GetName() string // Pending returns the pending messages number. Pending(context.Context) (int64, error) }
LagReader is the interface that wraps the Pending method and GetName method. will be used by the metrics server to get the pending messages count.
type Message ¶
Message is inter step message
func (Message) MarshalBinary ¶ added in v0.7.2
MarshalBinary encodes Message to the binary format
func (*Message) ToReadMessage ¶ added in v0.7.2
func (m *Message) ToReadMessage(ot Offset, wm time.Time) *ReadMessage
ToReadMessage converts Message to a ReadMessage by providing the offset and watermark
func (*Message) UnmarshalBinary ¶ added in v0.7.2
UnmarshalBinary decodes Message from the binary format
type MessageAckErr ¶
MessageAckErr is for acknowledgement errors.
func (MessageAckErr) Error ¶
func (e MessageAckErr) Error() string
type MessageInfo ¶ added in v0.7.2
type MessageInfo struct { // EventTime when // MessageKind == Data represents the event time of the message // MessageKind == WMB, value is ignored EventTime time.Time // IsLate when // MessageKind == Data, IsLate is used to indicate if the message is a late data (assignment happens at source) // MessageKind == WMB, value is ignored IsLate bool }
MessageInfo is the message information window of the payload. The contents inside the MessageInfo can be interpreted differently based on the MessageKind.
func (MessageInfo) MarshalBinary ¶ added in v0.7.2
func (p MessageInfo) MarshalBinary() (data []byte, err error)
MarshalBinary encodes MessageInfo to the binary format
func (*MessageInfo) UnmarshalBinary ¶ added in v0.7.2
func (p *MessageInfo) UnmarshalBinary(data []byte) (err error)
UnmarshalBinary decodes MessageInfo from the binary format
type MessageKind ¶ added in v0.7.2
type MessageKind int16
MessageKind represents the message type of the payload.
const ( Data MessageKind = iota // Data payload WMB // Watermark Barrier )
func (MessageKind) String ¶ added in v0.7.2
func (mt MessageKind) String() string
type MessageMetadata ¶ added in v0.8.0
type MessageMetadata struct { // NumDelivered is the number of times the message has been delivered. NumDelivered uint64 }
MessageMetadata is the metadata of the message
type MessageReadErr ¶
MessageReadErr is associated with message read errors.
func (MessageReadErr) Error ¶
func (e MessageReadErr) Error() string
type MessageWriteErr ¶
MessageWriteErr is associated with message write errors.
func (MessageWriteErr) Error ¶
func (e MessageWriteErr) Error() string
type NonRetryableBufferWriteErr ¶ added in v1.2.0
NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
func (NonRetryableBufferWriteErr) Error ¶ added in v1.2.0
func (e NonRetryableBufferWriteErr) Error() string
type Offset ¶
type Offset interface { // String returns the offset identifier String() string // Sequence returns a sequence id which can be used to index into the buffer (ISB) Sequence() (int64, error) // AckIt is used to ack the offset // This is often used when the BufferReader can not simply use the offset identifier to ack the message, // then the work can be done in this function, and call it in BufferReader Ack() function implementation. AckIt() error // NoAck to indicate the offset no longer needs to be acknowledged // It is used when error occurs, and we want to reprocess the batch to indicate acknowledgement no // longer needed. NoAck() error // PartitionIdx returns the partition index to which the offset belongs to. PartitionIdx() int32 }
Offset is an interface used in the ReadMessage referencing offset information.
func NewSimpleIntPartitionOffset ¶ added in v0.10.0
func NewSimpleStringPartitionOffset ¶ added in v0.10.0
type ReadMessage ¶
type ReadMessage struct { Message ReadOffset Offset Watermark time.Time // Metadata is the metadata of the message after a message is read from the buffer. Metadata MessageMetadata }
ReadMessage is the message read from the buffer.
func (ReadMessage) MarshalBinary ¶ added in v0.7.2
func (rm ReadMessage) MarshalBinary() (data []byte, err error)
MarshalBinary encodes ReadMessage to the binary format
func (*ReadMessage) UnmarshalBinary ¶ added in v0.7.2
func (rm *ReadMessage) UnmarshalBinary(data []byte) (err error)
UnmarshalBinary decodes ReadMessage from the binary format
type SimpleIntOffset ¶ added in v0.6.1
type SimpleIntOffset func() int64
SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.
func (SimpleIntOffset) AckIt ¶ added in v0.6.1
func (si SimpleIntOffset) AckIt() error
func (SimpleIntOffset) NoAck ¶ added in v0.7.3
func (si SimpleIntOffset) NoAck() error
func (SimpleIntOffset) PartitionIdx ¶ added in v0.10.0
func (si SimpleIntOffset) PartitionIdx() int32
func (SimpleIntOffset) Sequence ¶ added in v0.6.1
func (si SimpleIntOffset) Sequence() (int64, error)
func (SimpleIntOffset) String ¶ added in v0.6.1
func (si SimpleIntOffset) String() string
type SimpleStringOffset ¶ added in v0.6.1
type SimpleStringOffset func() string
SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.
func (SimpleStringOffset) AckIt ¶ added in v0.6.1
func (so SimpleStringOffset) AckIt() error
func (SimpleStringOffset) NoAck ¶ added in v0.7.3
func (so SimpleStringOffset) NoAck() error
func (SimpleStringOffset) PartitionIdx ¶ added in v0.10.0
func (so SimpleStringOffset) PartitionIdx() int32
func (SimpleStringOffset) Sequence ¶ added in v0.6.1
func (so SimpleStringOffset) Sequence() (int64, error)
func (SimpleStringOffset) String ¶ added in v0.6.1
func (so SimpleStringOffset) String() string
type WriteMessage ¶ added in v0.8.0
WriteMessage is a wrapper for an isb message with tag information which will be used for conditional forwarding.