messaging

package
v0.0.0-...-81dd437 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

README

Messaging

messaging package defines Publisher, Subscriber and an aggregate Pubsub interface.

Subscriber interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ.

Publisher interface defines methods used to publish messages to a message broker such as MQTT or NATS or RabbitMQ.

Pubsub interface is composed of Publisher and Subscriber interface and can be used to send messages to as well as to receive messages from a message broker.

Documentation

Index

Constants

View Source
const (
	SenMLContentType = "application/senml+json"
	CBORContentType  = "application/senml+cbor"
	JSONContentType  = "application/json"
	SenMLFormat      = "senml"
	JSONFormat       = "json"
	CBORFormat       = "cbor"
)

Variables

View Source
var (
	ErrInvalidLengthMessage        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowMessage          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group")
)
View Source
var (
	// ErrConnect indicates that connection to MQTT broker failed
	ErrConnect = errors.New("failed to connect to MQTT broker")

	// ErrPublishTimeout indicates that the publishing failed due to timeout.
	ErrPublishTimeout = errors.New("failed to publish due to timeout reached")

	// ErrSubscribeTimeout indicates that the subscription failed due to timeout.
	ErrSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")

	// ErrUnsubscribeTimeout indicates that unsubscribe failed due to timeout.
	ErrUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")

	// ErrUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted.
	ErrUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")

	// ErrNotSubscribed indicates that the topic is not subscribed to.
	ErrNotSubscribed = errors.New("not subscribed")

	// ErrEmptyTopic indicates the absence of topic.
	ErrEmptyTopic = errors.New("empty topic")

	// ErrMalformedSubtopic indicates that the subtopic is malformed.
	ErrMalformedSubtopic = errors.New("malformed subtopic")

	// ErrEmptyID indicates the absence of ID.
	ErrEmptyID = errors.New("empty ID")

	// ErrUnknownContent indicates that the content type is unknown.
	ErrUnknownContent = errors.New("unknown content type")
)

Functions

func CreateSubject

func CreateSubject(subtopic string) (string, error)

func ExtractSubtopic

func ExtractSubtopic(path string) (string, error)

Types

type Message

type Message struct {
	Channel              string   `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
	Subtopic             string   `protobuf:"bytes,2,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
	Publisher            string   `protobuf:"bytes,3,opt,name=publisher,proto3" json:"publisher,omitempty"`
	Protocol             string   `protobuf:"bytes,4,opt,name=protocol,proto3" json:"protocol,omitempty"`
	Payload              []byte   `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"`
	Created              int64    `protobuf:"varint,6,opt,name=created,proto3" json:"created,omitempty"`
	Profile              *Profile `protobuf:"bytes,7,opt,name=profile,proto3" json:"profile,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

Message represents a message emitted by the Mainflux adapters layer.

func CreateMessage

func CreateMessage(conn *mainflux.ConnByKeyRes, protocol, subject string, payload *[]byte) Message

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) GetChannel

func (m *Message) GetChannel() string

func (*Message) GetCreated

func (m *Message) GetCreated() int64

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

func (*Message) GetProfile

func (m *Message) GetProfile() *Profile

func (*Message) GetProtocol

func (m *Message) GetProtocol() string

func (*Message) GetPublisher

func (m *Message) GetPublisher() string

func (*Message) GetSubtopic

func (m *Message) GetSubtopic() string

func (*Message) Marshal

func (m *Message) Marshal() (dAtA []byte, err error)

func (*Message) MarshalTo

func (m *Message) MarshalTo(dAtA []byte) (int, error)

func (*Message) MarshalToSizedBuffer

func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) Size

func (m *Message) Size() (n int)

func (*Message) String

func (m *Message) String() string

func (*Message) Unmarshal

func (m *Message) Unmarshal(dAtA []byte) error

func (*Message) XXX_DiscardUnknown

func (m *Message) XXX_DiscardUnknown()

func (*Message) XXX_Marshal

func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Message) XXX_Merge

func (m *Message) XXX_Merge(src proto.Message)

func (*Message) XXX_Size

func (m *Message) XXX_Size() int

func (*Message) XXX_Unmarshal

func (m *Message) XXX_Unmarshal(b []byte) error

type MessageHandler

type MessageHandler interface {
	// Handle handles messages passed by underlying implementation.
	Handle(msg Message) error

	// Cancel is used for cleanup during unsubscribing and it's optional.
	Cancel() error
}

MessageHandler represents Message handler for Subscriber.

type Notifier

type Notifier struct {
	Protocol             string   `protobuf:"bytes,1,opt,name=protocol,proto3" json:"protocol,omitempty"`
	Subtopics            []string `protobuf:"bytes,2,rep,name=subtopics,proto3" json:"subtopics,omitempty"`
	Contacts             []string `protobuf:"bytes,3,rep,name=contacts,proto3" json:"contacts,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Notifier) Descriptor

func (*Notifier) Descriptor() ([]byte, []int)

func (*Notifier) GetContacts

func (m *Notifier) GetContacts() []string

func (*Notifier) GetProtocol

func (m *Notifier) GetProtocol() string

func (*Notifier) GetSubtopics

func (m *Notifier) GetSubtopics() []string

func (*Notifier) Marshal

func (m *Notifier) Marshal() (dAtA []byte, err error)

func (*Notifier) MarshalTo

func (m *Notifier) MarshalTo(dAtA []byte) (int, error)

func (*Notifier) MarshalToSizedBuffer

func (m *Notifier) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Notifier) ProtoMessage

func (*Notifier) ProtoMessage()

func (*Notifier) Reset

func (m *Notifier) Reset()

func (*Notifier) Size

func (m *Notifier) Size() (n int)

func (*Notifier) String

func (m *Notifier) String() string

func (*Notifier) Unmarshal

func (m *Notifier) Unmarshal(dAtA []byte) error

func (*Notifier) XXX_DiscardUnknown

func (m *Notifier) XXX_DiscardUnknown()

func (*Notifier) XXX_Marshal

func (m *Notifier) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Notifier) XXX_Merge

func (m *Notifier) XXX_Merge(src proto.Message)

func (*Notifier) XXX_Size

func (m *Notifier) XXX_Size() int

func (*Notifier) XXX_Unmarshal

func (m *Notifier) XXX_Unmarshal(b []byte) error

type Profile

type Profile struct {
	ContentType          string       `protobuf:"bytes,1,opt,name=contentType,proto3" json:"contentType,omitempty"`
	Write                bool         `protobuf:"varint,2,opt,name=write,proto3" json:"write,omitempty"`
	Notify               bool         `protobuf:"varint,3,opt,name=notify,proto3" json:"notify,omitempty"`
	Webhook              bool         `protobuf:"varint,4,opt,name=webhook,proto3" json:"webhook,omitempty"`
	Transformer          *Transformer `protobuf:"bytes,5,opt,name=transformer,proto3" json:"transformer,omitempty"`
	Notifier             *Notifier    `protobuf:"bytes,6,opt,name=notifier,proto3" json:"notifier,omitempty"`
	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
	XXX_unrecognized     []byte       `json:"-"`
	XXX_sizecache        int32        `json:"-"`
}

func (*Profile) Descriptor

func (*Profile) Descriptor() ([]byte, []int)

func (*Profile) GetContentType

func (m *Profile) GetContentType() string

func (*Profile) GetNotifier

func (m *Profile) GetNotifier() *Notifier

func (*Profile) GetNotify

func (m *Profile) GetNotify() bool

func (*Profile) GetTransformer

func (m *Profile) GetTransformer() *Transformer

func (*Profile) GetWebhook

func (m *Profile) GetWebhook() bool

func (*Profile) GetWrite

func (m *Profile) GetWrite() bool

func (*Profile) Marshal

func (m *Profile) Marshal() (dAtA []byte, err error)

func (*Profile) MarshalTo

func (m *Profile) MarshalTo(dAtA []byte) (int, error)

func (*Profile) MarshalToSizedBuffer

func (m *Profile) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Profile) ProtoMessage

func (*Profile) ProtoMessage()

func (*Profile) Reset

func (m *Profile) Reset()

func (*Profile) Size

func (m *Profile) Size() (n int)

func (*Profile) String

func (m *Profile) String() string

func (*Profile) Unmarshal

func (m *Profile) Unmarshal(dAtA []byte) error

func (*Profile) XXX_DiscardUnknown

func (m *Profile) XXX_DiscardUnknown()

func (*Profile) XXX_Marshal

func (m *Profile) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Profile) XXX_Merge

func (m *Profile) XXX_Merge(src proto.Message)

func (*Profile) XXX_Size

func (m *Profile) XXX_Size() int

func (*Profile) XXX_Unmarshal

func (m *Profile) XXX_Unmarshal(b []byte) error

type PubSub

type PubSub interface {
	Publisher
	Subscriber
}

PubSub represents aggregation interface for publisher and subscriber.

type Publisher

type Publisher interface {
	// Publish publishes message to the message broker.
	Publish(msg Message) error

	// Close gracefully closes message publisher's connection.
	Close() error
}

Publisher specifies message publishing API.

type Subscriber

type Subscriber interface {
	// Subscribe subscribes to the message stream and consumes messages.
	Subscribe(id, topic string, handler MessageHandler) error

	// Unsubscribe unsubscribes from the message stream and
	// stops consuming messages.
	Unsubscribe(id, topic string) error

	// Close gracefully closes message subscriber's connection.
	Close() error
}

Subscriber specifies message subscription API.

type Transformer

type Transformer struct {
	ValueFields          []string `protobuf:"bytes,1,rep,name=valueFields,proto3" json:"valueFields,omitempty"`
	TimeField            string   `protobuf:"bytes,2,opt,name=timeField,proto3" json:"timeField,omitempty"`
	TimeFormat           string   `protobuf:"bytes,3,opt,name=timeFormat,proto3" json:"timeFormat,omitempty"`
	TimeLocation         string   `protobuf:"bytes,4,opt,name=timeLocation,proto3" json:"timeLocation,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Transformer) Descriptor

func (*Transformer) Descriptor() ([]byte, []int)

func (*Transformer) GetTimeField

func (m *Transformer) GetTimeField() string

func (*Transformer) GetTimeFormat

func (m *Transformer) GetTimeFormat() string

func (*Transformer) GetTimeLocation

func (m *Transformer) GetTimeLocation() string

func (*Transformer) GetValueFields

func (m *Transformer) GetValueFields() []string

func (*Transformer) Marshal

func (m *Transformer) Marshal() (dAtA []byte, err error)

func (*Transformer) MarshalTo

func (m *Transformer) MarshalTo(dAtA []byte) (int, error)

func (*Transformer) MarshalToSizedBuffer

func (m *Transformer) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Transformer) ProtoMessage

func (*Transformer) ProtoMessage()

func (*Transformer) Reset

func (m *Transformer) Reset()

func (*Transformer) Size

func (m *Transformer) Size() (n int)

func (*Transformer) String

func (m *Transformer) String() string

func (*Transformer) Unmarshal

func (m *Transformer) Unmarshal(dAtA []byte) error

func (*Transformer) XXX_DiscardUnknown

func (m *Transformer) XXX_DiscardUnknown()

func (*Transformer) XXX_Marshal

func (m *Transformer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Transformer) XXX_Merge

func (m *Transformer) XXX_Merge(src proto.Message)

func (*Transformer) XXX_Size

func (m *Transformer) XXX_Size() int

func (*Transformer) XXX_Unmarshal

func (m *Transformer) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the Mainflux IoT platform.
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the Mainflux IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the Mainflux IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the Mainflux IoT platform.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the Mainflux IoT platform.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the Mainflux IoT platform.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL