README ¶
Go channels for distributed systems
Benefits:
- Use Go channels transparently over a messaging technology of your choice
- Write idiomatic Go code instead of using vendor specific APIs
- Write simple and independent unit test (no need to mock interfaces or running the queue technology)
Usage
The following code receives added-wishlist items from one queue and send recommendations to another queue.
wishlistItems, _ := transport.Receive("added-wishlist-items")
recommendations, _ := transport.Send("recommendations")
for received := range wishlistItems {
// Channel to subscribe to the result of sending event
sendResult := make(chan error, 1)
recommendations <- goevents.EventEnvelop{
Result: sendResult,
Event: goevents.JsonEvent(wishlistItem.Context(), getRecommendation(wishlistItem.Event)),
}
// Acknowledge the processing of the received event
received.Result <- <-sendResult
}
}
Features
- Producers can
- explicitly receive and handle the message's publishing result, enabling at-least-one delivery guarantee
- Or fire-and-forget (removing the latency related to message publishing)
- Consumers must acknowledge the processing of message to enable message redelivery (at-least-one processing guarantee).
- Support for graceful shutdown to prevent message loses by ensuring all messages are flushed to the queue
- Middlewares to customize the processing of events .e.g. error handling, logging, retry
Supported queue technologies
- Amazon SQS
- Amazon SNS (only send of events)
- Kafka
- Rabbit MQ
- Redis
- Postgres database queue (support at-least-one delivery guarantee for producers with transaction)
Documentation ¶
Index ¶
Constants ¶
View Source
const ( DefaultPublisherCount = 16 DefaultSendChannelBufferSize = 1024 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Event ¶
type EventEnvelop ¶
type EventEnvelop struct { // For sending message // Result is an optional channel to receive the result of the message publishing, which is // + nil if message was send successfully // + error if the message couldn't be send // // For receiving message, Result channel is used to acknowledge the processing of message by receiver. // + Receiver sends nil if message is processed successfully // + Receiver sends error if the message cannot be processed Result chan<- error Event }
type MiddleWare ¶
type Option ¶
type Option = func(c *writeGroupConfig)
func WithBufferSize ¶
func WithMiddlewares ¶
func WithMiddlewares(middlewares ...MiddleWare) Option
func WithPublisherCount ¶
type QueueProvider ¶
type Transport ¶
type Transport interface { Send(topic string) (chan<- EventEnvelop, error) Receive(topic string) (<-chan EventEnvelop, error) Shutdown(ctx context.Context) }
func NewTransport ¶
func NewTransport(queueProvider QueueProvider, options ...Option) Transport
Click to show internal directories.
Click to hide internal directories.