Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSinkAlreadyClosed is an error returned when user tries to publish a message or // close a sink after it was already closed. ErrSinkAlreadyClosed = errors.New("sink was closed already") // ErrSinkClosedOrFailedDuringSend is an error returned when a sink is closed or fails while sending a message. ErrSinkClosedOrFailedDuringSend = errors.New("sink was closed or failed while sending the message") )
Functions ¶
This section is empty.
Types ¶
type AsyncMessageSink ¶
type AsyncMessageSink interface { // PublishMessages publishes any messages found on the `messages` // channel and returns them on the `acks` channel once they have // been published. This function will block until `ctx` is done, // or until an error occurs. Messages will always be processed // and acknowledged in order. // Normal termination is achieved when the passed Context is done, // and will return the associated Context error. PublishMessages(ctx context.Context, acks chan<- Message, messages <-chan Message) error // Close permanently closes the AsyncMessageSink and frees underlying resources Close() error Statuser }
AsyncMessageSink represents a message sink that allows publishing messages, and multiple messages can be in flight before any acks are recieved, depending upon the configuration of the underlying message sink.
type AsyncMessageSource ¶
type AsyncMessageSource interface { // ConsumeMessages provides messages to the caller on the `messages` // channel and expects them to be sent back to the `acks` channel once // that have been handled properly. This function will block until // `ctx` is done, or until an error occurs. // Normal termination is achieved when the passed Context is done, // and will return the associated Context error. ConsumeMessages(ctx context.Context, messages chan<- Message, acks <-chan Message) error // Close permanently closes the AsyncMessageSource and frees underlying resources Close() error Statuser }
AsyncMessageSource represents a message source that allows consuming messages, and multiple messages can be in flight before any acks are sent, depending upon the configuration of the underlying message source.
type ConsumerMessageHandler ¶
ConsumerMessageHandler is the callback function type that synchronous message consumers must implement.
type DiscardableMessage ¶
type DiscardableMessage interface { Message // DiscardPayload discards the payload of the message. After calling this, // Calls to Data() will panic. // Calling this a second or subsequent time has no effect. DiscardPayload() }
DiscardableMessage allows a consumer to discard the payload after use (but before acking) in order to release memory earlier. This can be useful in cases where a consumer reads a very large number of messages before acking any of them. Since not all backends implement this, a checked type assertion is recommended.
type InvalidAckError ¶
InvalidAckError means that a message acknowledgement was not as expected. This is possilbly from mis-use of the asynchronous APIs, for example acking out of order.
func (InvalidAckError) Error ¶
func (e InvalidAckError) Error() string
type KeyedMessage ¶
Some brokers have the notion of keyed messages. Callers may optionally implement this interface in their message types for the benefiy of those brokers.
type Message ¶
type Message interface {
Data() []byte
}
Message is the single type that represents all messages in substrate.
type Status ¶
type Status struct { // Working indicates whether the source or sink is in a working state Working bool // Problems indicates and problems with the source or sink, whether or not they prevent it working. Problems []string }
Status represents a snapshot of the state of a source or sink.
type SynchronousMessageSink ¶
type SynchronousMessageSink interface { // Close closed the SynchronousMessageSink, freeing underlying // resources. Close() error // PublishMessage publishes messages to the broker, waiting for // confirmation from the broker before returning. PublishMessage(context.Context, Message) error Statuser }
SynchronousMessageSink represents a message source that allows "message at a time" publishing and relieves the consumer from having to deal with acknowledgements themselves.
func NewSynchronousMessageSink ¶
func NewSynchronousMessageSink(ams AsyncMessageSink) SynchronousMessageSink
NewSynchronousMessageSink returns a new synchronous message sink, given an AsyncMessageSink. When Close is called on the SynchronousMessageSink, this is also propogated to the underlying SynchronousMessageSink
type SynchronousMessageSource ¶
type SynchronousMessageSource interface { // Close closed the SynchronousMessageSource, freeing underlying // resources. Close() error // ConsumeMessages calls the `handler` function for each messages // available to consume. If the handler returns no error, an // acknowledgement will be sent to the broker. If an error is returned // by the handler, it will be propogated and returned from this // function. This function will block until `ctx` is done or until an // error occurs. ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error Statuser }
SynchronousMessageSource represents a message source that allows "message at a time" consumption and relieves the consumer from having to deal with acknowledgements themselves.
func NewSynchronousMessageSource ¶
func NewSynchronousMessageSource(ams AsyncMessageSource) SynchronousMessageSource
NewSynchronousMessageSource returns a new synchronous message source, given an AsyncMessageSource. When Close is called on the SynchronousMessageSource, this is also propogated to the underlying SynchronousMessageSource.
Directories ¶
Path | Synopsis |
---|---|
Package freezer provides freezer support for substrate
|
Package freezer provides freezer support for substrate |
internal
|
|
Package kafka provides kafka support for substrate
|
Package kafka provides kafka support for substrate |
Package natsstreaming provides kafka support for substrate
|
Package natsstreaming provides kafka support for substrate |
Package proximo provides proximo support for substrate
|
Package proximo provides proximo support for substrate |
Package suburl provides a generic URL based interface for obtaining substrate source and sink objects.
|
Package suburl provides a generic URL based interface for obtaining substrate source and sink objects. |