Versions in this module Expand all Collapse all v0 v0.7.0 Apr 28, 2021 v0.6.1 Dec 2, 2019 v0.6.0 Oct 26, 2019 Changes in this version + const NullStatusCode + const NullVersion + var ErrNegativeAcknowledgement = errors.New("negative acknowledgement") + type Close func() + type Consumer interface + Close func() error + Subscribe func(topics ...Topic) (subscription <-chan *Message, err error) + Unsubscribe func(subscription <-chan *Message) error + type Dialect interface + Close func() error + Consumer func() Consumer + Healthy func() bool + Open func(topics []Topic) error + Producer func() Producer + type EOS bool + func (eos *EOS) Parse(value string) EOS + func (eos *EOS) String() string + type Handler interface + Handle func(*Message, Writer) + type HandlerFunc func(*Message, Writer) + type Message struct + Action string + Data []byte + EOS EOS + ID string + Key []byte + Status StatusCode + Timestamp time.Time + Topic Topic + Version Version + func NewMessage(action string, version int8, key []byte, data []byte) *Message + func (message *Message) Ack() bool + func (message *Message) Acked() <-chan struct{} + func (message *Message) Ctx() context.Context + func (message *Message) Finally() error + func (message *Message) Nack() bool + func (message *Message) Nacked() <-chan struct{} + func (message *Message) NewCtx(ctx context.Context) + func (message *Message) NewError(action string, status StatusCode, err error) *Message + func (message *Message) NewMessage(action string, version Version, key metadata.Key, data []byte) *Message + func (message *Message) NewSchema(v interface{}) + func (message *Message) Reset() + func (message *Message) Schema() interface{} + type MessageType int8 + const CommandMessage + const EventMessage + type Next func() + type Producer interface + Close func() error + Publish func(message *Message) error + type Resolved int + const ResolvedAck + const ResolvedNack + const UnkownResolvedStatus + type StatusCode int16 + const StatusBadRequest + const StatusConflict + const StatusForbidden + const StatusImATeapot + const StatusInternalServerError + const StatusNotFound + const StatusOK + const StatusUnauthorized + func (code StatusCode) String() string + type Topic interface + Dialect func() Dialect + HasMode func(TopicMode) bool + Mode func() TopicMode + Name func() string + Type func() MessageType + func NewTopic(name string, dialect Dialect, t MessageType, m TopicMode) Topic + type TopicMode int8 + const ConsumeMode + const DefaultMode + const ProduceMode + type Version int8 + func (version Version) String() string + type Writer interface + Command func(action string, version int8, key []byte, data []byte) (*Message, error) + CommandEOS func(action string, version int8, key []byte, data []byte) (*Message, error) + CommandStream func(action string, version int8, key []byte, data []byte) (*Message, error) + Error func(action string, status StatusCode, err error) (*Message, error) + ErrorEOS func(action string, status StatusCode, err error) (*Message, error) + ErrorStream func(action string, status StatusCode, err error) (*Message, error) + Event func(action string, version int8, key []byte, data []byte) (*Message, error) + EventEOS func(action string, version int8, key []byte, data []byte) (*Message, error) + EventStream func(action string, version int8, key []byte, data []byte) (*Message, error)