isb

package
v0.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

Inter Step Buffer (isb)

Documentation

Index

Constants

View Source
const (
	PendingNotAvailable = int64(math.MinInt64)
	RateNotAvailable    = float64(math.MinInt)
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Body

type Body struct {
	Payload []byte
}

Body is the body of the message

func (Body) MarshalBinary

func (b Body) MarshalBinary() (data []byte, err error)

MarshalBinary encodes header to a binary format

func (*Body) UnmarshalBinary

func (b *Body) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes header from the binary format

type BufferReadErr

type BufferReadErr struct {
	Name        string
	Empty       bool
	InternalErr bool
	Message     string
}

BufferReadErr when we cannot read from the buffer.

func (BufferReadErr) Error

func (e BufferReadErr) Error() string

func (BufferReadErr) IsEmpty

func (e BufferReadErr) IsEmpty() bool

IsEmpty returns true if buffer is empty.

func (BufferReadErr) IsInternalErr

func (e BufferReadErr) IsInternalErr() bool

IsInternalErr returns true if reading is failing due to a buffer internal error.

type BufferReader

type BufferReader interface {
	BufferReaderInformation
	io.Closer
	// Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the
	// array of result is empty, the callee should process all the elements in the array even if the error is set. Read
	// will not mark the message in the buffer as "READ" if the read for that index is erring.
	// There is a chance that we have read the message and the container got forcefully terminated before processing. To provide
	// at-least-once semantics for reading, during restart we will have to reprocess all unacknowledged messages.
	Read(context.Context, int64) ([]*ReadMessage, error)
	// Ack acknowledges an array of offset.
	Ack(context.Context, []Offset) []error
}

BufferReader is the buffer from which we are reading.

type BufferReaderInformation

type BufferReaderInformation interface {
	GetName() string
}

BufferReaderInformation has information regarding the buffer we are reading from.

type BufferWriteErr

type BufferWriteErr struct {
	Name        string
	Full        bool
	InternalErr bool
	Message     string
}

BufferWriteErr when we cannot write to the buffer because of a full buffer.

func (BufferWriteErr) Error

func (e BufferWriteErr) Error() string

func (BufferWriteErr) IsFull

func (e BufferWriteErr) IsFull() bool

IsFull returns true if buffer is full.

func (BufferWriteErr) IsInternalErr

func (e BufferWriteErr) IsInternalErr() bool

IsInternalErr returns true if writing is failing due to a buffer internal error.

type BufferWriter

type BufferWriter interface {
	BufferWriterInformation
	io.Closer
	Write(context.Context, []Message) ([]Offset, []error)
}

BufferWriter is the buffer to which we are writing.

type BufferWriterInformation

type BufferWriterInformation interface {
	GetName() string
}

BufferWriterInformation has information regarding the buffer we are writing to.

type Header struct {
	PaneInfo
	// ID is used for exactly-once-semantics. ID is usually populated from the offset, if offset is available.
	ID string
	// Key is (key,value) in the map-reduce paradigm which will be used for conditional forwarding.
	Key string
}

Header is the header of the message

func (Header) MarshalBinary

func (h Header) MarshalBinary() (data []byte, err error)

MarshalBinary encodes header to a binary format

func (*Header) UnmarshalBinary

func (h *Header) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary decodes header from the binary format

type LagReader added in v0.5.3

type LagReader interface {
	// Pending returns the pending messages number.
	Pending(context.Context) (int64, error)
}

LagReader is the interface that wraps the Pending method.

type Message

type Message struct {
	Header
	Body
}

Message is inter step message

type MessageAckErr

type MessageAckErr struct {
	Name    string
	Offset  Offset
	Message string
}

MessageAckErr is for acknowledgement errors.

func (MessageAckErr) Error

func (e MessageAckErr) Error() string

type MessageReadErr

type MessageReadErr struct {
	Name    string
	Header  []byte
	Body    []byte
	Message string
}

MessageReadErr is associated with message read errors.

func (MessageReadErr) Error

func (e MessageReadErr) Error() string

type MessageWriteErr

type MessageWriteErr struct {
	Name    string
	Header  Header
	Body    Body
	Message string
}

MessageWriteErr is associated with message write errors.

func (MessageWriteErr) Error

func (e MessageWriteErr) Error() string

type Offset

type Offset interface {
	// String return the offset identifier
	String() string
	// Sequence returns a sequence id which can be used to index into the buffer (ISB)
	Sequence() (int64, error)
	// AckIt is used to ack the offset
	// This is often used when the BufferReader can not simply use the offset identifier to ack the message,
	// then the work can be done in this function, and call it in BufferReader Ack() function implementation.
	AckIt() error
}

Offset is an interface used in the ReadMessage referencing offset information.

type PaneInfo

type PaneInfo struct {
	EventTime time.Time
	StartTime time.Time
	EndTime   time.Time
	// IsLate is used to indicate if it's a late data .
	IsLate bool
}

PaneInfo is the time window of the payload.

type Ratable added in v0.5.3

type Ratable interface {
	// Rate returns the rough rate (messages/second) in the past seconds, this can be used for autoscaling calculation
	Rate(ctx context.Context, seconds int64) (float64, error)
}

Ratable is the interface that wraps the Rate method.

type ReadMessage

type ReadMessage struct {
	Message
	ReadOffset Offset
	Watermark  time.Time
}

ReadMessage is the message read from the buffer.

type SimpleIntOffset added in v0.6.1

type SimpleIntOffset func() int64

SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.

func (SimpleIntOffset) AckIt added in v0.6.1

func (si SimpleIntOffset) AckIt() error

func (SimpleIntOffset) Sequence added in v0.6.1

func (si SimpleIntOffset) Sequence() (int64, error)

func (SimpleIntOffset) String added in v0.6.1

func (si SimpleIntOffset) String() string

type SimpleStringOffset added in v0.6.1

type SimpleStringOffset func() string

SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.

func (SimpleStringOffset) AckIt added in v0.6.1

func (so SimpleStringOffset) AckIt() error

func (SimpleStringOffset) Sequence added in v0.6.1

func (so SimpleStringOffset) Sequence() (int64, error)

func (SimpleStringOffset) String added in v0.6.1

func (so SimpleStringOffset) String() string

Directories

Path Synopsis
Package forward does the Read (fromBuffer) -> Process (UDF) -> Forward (toBuffers) -> Ack (fromBuffer) loop.
Package forward does the Read (fromBuffer) -> Process (UDF) -> Forward (toBuffers) -> Ack (fromBuffer) loop.
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL