Documentation ¶
Index ¶
- Variables
- type Close
- type Consumer
- type Dialect
- type Handler
- type HandlerFunc
- type Message
- func (message *Message) Ack() bool
- func (message *Message) Acked() <-chan struct{}
- func (message *Message) Ctx() context.Context
- func (message *Message) Finally() error
- func (message *Message) Nack() bool
- func (message *Message) Nacked() <-chan struct{}
- func (message *Message) NewCtx(ctx context.Context)
- func (message *Message) NewError(action string, err error) *Message
- func (message *Message) NewMessage(action string, version Version, key metadata.Key, data []byte) *Message
- func (message *Message) Reset()
- func (message *Message) Schema() interface{}
- type MessageType
- type Next
- type Producer
- type Resolved
- type Topic
- type TopicMode
- type Version
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNegativeAcknowledgement is a error representing a negative message acknowledgement ErrNegativeAcknowledgement = errors.New("negative acknowledgement") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface { // Subscribe creates a new topic subscription that will receive // messages consumed by the consumer of the given topic. This method // will return a message channel and a close function. // Once a message is successfully processed should the next message be called. Subscribe(topics ...Topic) (subscription <-chan *Message, err error) // Unsubscribe unsubscribes the given channel subscription from the given topic. // A boolean is returned that represents if the channel successfully got unsubscribed. Unsubscribe(subscription <-chan *Message) error // Close closes the kafka consumer, all topic subscriptions and event channels. Close() error }
Consumer a message consumer
type Dialect ¶
type Dialect interface { // Consumer returns the dialect consumer Consumer() Consumer // Producer returns the dialect producer Producer() Producer // Healthy when called should it check if the dialect's consumer/producer are healthy and // up and running. This method could be called to check if the service is up and running. // The user should implement the health check Healthy() bool // Opens the given dialect to start accepting incoming and outgoing connections. Open(topics []Topic) error // Close awaits till the consumer(s) and producer(s) of the given dialect are closed. // If an error is returned is the closing aborted and the error returned to the user. Close() error }
Dialect represents a commander dialect. A dialect is responsible for the consumption/production of the targeted protocol.
type HandlerFunc ¶
HandlerFunc message handle message, writer implementation
type Message ¶
type Message struct { ID string `json:"id"` Topic Topic `json:"topic"` Action string `json:"action"` Version Version `json:"version"` Data []byte `json:"data"` Key []byte `json:"key"` Timestamp time.Time `json:"timestamp"` // contains filtered or unexported fields }
Message representation
func NewMessage ¶
NewMessage constructs a new message
func (*Message) Acked ¶
func (message *Message) Acked() <-chan struct{}
Acked returns a channel thet get's closed once a acknowledged signal got sent
func (*Message) Ctx ¶
Ctx returns the message context. This method could safely be called concurrently.
func (*Message) Finally ¶
Finally is returned once the message is resolved. A ErrNegativeAcknowledgement error is returned if the message got negative acknowledged.
func (*Message) Nacked ¶
func (message *Message) Nacked() <-chan struct{}
Nacked returns a channel that get's closed once a negative acknowledged signal got sent
func (*Message) NewCtx ¶
NewCtx updates the message context. This method could safely be called concurrently.
func (*Message) NewMessage ¶
func (message *Message) NewMessage(action string, version Version, key metadata.Key, data []byte) *Message
NewMessage construct a new event message with the given message as parent
type MessageType ¶
type MessageType int8
MessageType represents a message type
const ( EventMessage MessageType = iota + 1 CommandMessage )
Available message types
type Producer ¶
type Producer interface { // Publish produces a message to the given topic Publish(message *Message) error // Close closes the producer Close() error }
Producer a message producer
type Topic ¶
type Topic interface { // Dialect returns the topic Dialect Dialect() Dialect // Type returns the topic type Type() MessageType // Mode returns the topic mode Mode() TopicMode // HasMode checks if the topic represents the given topic type HasMode(TopicMode) bool // Name returns the topic name Name() string }
Topic represents a subject for a dialect including it's consumer/producer mode.
type TopicMode ¶
type TopicMode int8
TopicMode represents the mode of the given topic. The mode describes if the given topic is marked for consumption/production/streaming...
const ( ConsumeMode TopicMode = 1 << iota ProduceMode DefaultMode = ConsumeMode | ProduceMode )
Available topic modes
type Version ¶
type Version int8
Version message version
const (
NullVersion Version = 0
)
Null value of given type
type Writer ¶
type Writer interface { // Event creates and produces a new event to the assigned group. Event(action string, version int8, key []byte, data []byte) (*Message, error) // Error produces a new error event to the assigned group. Error(action string, err error) (*Message, error) // Command produces a new command to the assigned group. Command(action string, version int8, key []byte, data []byte) (*Message, error) }
Writer handle implementation for a given group and message