Documentation
¶
Index ¶
- func Ack(ack func())
- func Metrics(b bool) func(*Options)
- func Parallelism(n int) func(*Options)
- func Tracing(b bool) func(*Options)
- func TransformUnmarshalJSON[T any](bs []byte) (T, error)
- type Attributes
- type ByteSource
- type Config
- type DeserFunc
- type DeserializationSource
- type Destination
- type DestinationFunc
- type Handler
- type HandlerFunc
- type Message
- type MsgAck
- type Option
- type Options
- type Processor
- type Source
- type SourceFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Ack ¶ added in v0.0.4
func Ack(ack func())
Ack is a convenience function for calling the ack function after checking if it's nil.
func Parallelism ¶
func TransformUnmarshalJSON ¶
Types ¶
type Attributes ¶
type Attributes interface {
Unwrap() Attributes
}
type ByteSource ¶
type Config ¶
type Config[T1, T2 any] struct { Source Source[T1] Destination Destination[T2] Handler Handler[T1, T2] }
type DeserializationSource ¶
type DeserializationSource[T any] struct { // contains filtered or unexported fields }
func NewDeserSource ¶
func NewDeserSource[T any](src ByteSource, deser DeserFunc[T]) DeserializationSource[T]
type Destination ¶
type Destination[T any] interface { // Send sends the passed in messages to the Destination. Implementations // _must_ listen on <-ctx.Done() and return ctx.Err() if the context finishes // while waiting to send messages. // // *Send need not be blocking*. In the case of a non-blocking call to send, // it's expected that ack will be called _only after_ the message has been // successfully written to the Destination. // // All errors which are retryable must be handled inside the Send func, or // otherwise handled internally. Any errors returned from Send indicate a // fatal error to the processor, and the processor will terminate. If you // want to be able to delegate the responsibility of deciding retryable // errors to the user of the Destination, then allow the user to register a // callback, e.g. `IsRetryable(err error) bool`, when instantiating a // Destination. // // The second argument value is the acknowlegement function. Ack is called // when the message has been successfully written to the Destination. It // should not be called twice. Sources may panic if ack is called twice as // it indicates a logical flaw for delivery guarantees within the program. // // In the case of sending to multiple destinations, or teeing the data stream // inside a processor's handler function, then the programmer must decide // themselves how to properly acknowledge the event, and recognize that // destinations will probably be acknowledging the message as well. Send(context.Context, func(), ...Message[T]) error }
Destination defines the abstraction for writing messages to an external entity. Most notable implementations are queues (Kafka, RabbitMQ, Redis), but anything which is message oriented could be made into a Destination (e.g. a newline-delimited-JSON file could conceivably be a Destination).
type DestinationFunc ¶
type Handler ¶
Handler defines a function which operates on a single event of type T1 and returns a list of events of type T2. T1 and T2 may be equivalent types. Returning an empty slice and a nil error indicates that the message passed in was processed successfully, no output was necessary, and therefore should be acknowledged by the processor as having been processed successfully.
type HandlerFunc ¶
type Message ¶
type Message[T any] struct { // Key represents the key of this message. This field is intended to be used // primarily as an input into sharding functions to determine how a message // should be routed within a topic. Key string // Value is the embedded value of this message. It is the object of interest // to the users of this library. It can be any serializable type so long as // the sources and destinations know how to serialize it. Value T // Topic indicates which topic this message came from (if applicable). It // should not be used as a means to set the output topic for destinations. Topic string // Attributes are inspired by context.Context and are used as a means to pass // metadata from a source implementation through to a consumer. See examples // for details. Attributes Attributes }
Message is the data wrapper which accepts any serializable type as it's embedded Value as well as some other metadata.
type MsgAck ¶ added in v0.0.4
MsgAck is a utility type which is used to pass a message and it's corresponding ack function through a channel internal to a source or destination
type Processor ¶
type Processor[T1, T2 any] struct { // contains filtered or unexported fields }
func New ¶
New instantiates a new Processor. `Processor.Run` must be called after calling `New` before events will be processed.
func (*Processor[T1, T2]) Run ¶
Run is a blocking call, and runs until either the ctx is canceled, or an unrecoverable error is encountered. If any error is returned from a source, destination or the handler func, then it's wrapped and returned. If the passed-in context is canceled, this will not return the context.Canceled error to indicate a clean shutdown was successful. Run will return ctx.Err() in other cases where context termination leads to shutdown of the processor.
type Source ¶
type Source[T any] interface { // Recv should block until Message is available to be returned from the // source. Implementations _must_ listen on <-ctx.Done() and return // ctx.Err() if the context finishes while waiting for new messages. // // All errors which are retryable must be handled inside the Recv func, or // otherwise handled internally. Any errors returned from Recv indicate a // fatal error to the processor, and the processor will terminate. If you // want to be able to delegate the responsibility of deciding retryable // errors to the user of the Source, then allow the user to register a // callback, e.g. `IsRetryable(err error) bool`, on source instantiation. // // The second return value is the acknowlegement function. Ack is called when // the message returned from Recv has been successfully written to it's // destination. It should not be called twice. Sources may panic in that // scenario as it indicates a logical flaw for delivery guarantees within the // program. // // In the case of sending to multiple destinations, or teeing the data stream // inside a processor's handler function, then the programmer must decide // themselves how to properly acknowledge the event, and recognize that // destinations will probably be acknowledging the message as well. Recv(context.Context) (Message[T], func(), error) }
Source defines the abstraction for which kawa consumes or receives messages from an external entity. Most notable implementations are queues (Kafka, RabbitMQ, Redis), but anything which is message oriented could be made into a source (e.g. a newline-delimited-JSON file could conceivably be a source).