consumers

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

Consumers

Consumers provide an abstraction of various SuperMQ consumers. SuperMQ consumer is a generic service that can handle received messages - consume them. The message is not necessarily a SuperMQ message - before consuming, SuperMQ message can be transformed into any valid format that specific consumer can understand. For example, writers are consumers that can take a SenML or JSON message and store it.

Consumers are optional services and are treated as plugins. In order to run consumer services, core services must be up and running.

For an in-depth explanation of the usage of consumers, as well as thorough understanding of SuperMQ, please check out the official documentation.

For more information about service capabilities and its usage, please check out the API documentation.

Documentation

Overview

Package consumers contain the domain concept definitions needed to support SuperMQ consumer services functionality.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Start

func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger *slog.Logger) error

Start method starts consuming messages received from Message broker. This method transforms messages to SenML format before using MessageRepository to store them.

Types

type AsyncConsumer

type AsyncConsumer interface {
	// ConsumeAsync method is used to asynchronously consume received messages.
	ConsumeAsync(ctx context.Context, messages interface{})

	// Errors method returns a channel for reading errors which occur during async writes.
	// Must be  called before performing any writes for errors to be collected.
	// The channel is buffered(1) so it allows only 1 error without blocking if not drained.
	// The channel may receive nil error to indicate success.
	Errors() <-chan error
}

AsyncConsumer specifies a non-blocking message-consuming API, which can be used for writing data to the DB, publishing messages to broker, sending notifications, or any other asynchronous job.

type BlockingConsumer

type BlockingConsumer interface {
	// ConsumeBlocking method is used to consume received messages synchronously.
	// A non-nil error is returned to indicate operation failure.
	ConsumeBlocking(ctx context.Context, messages interface{}) error
}

BlockingConsumer specifies a blocking message-consuming API, which can be used for writing data to the DB, publishing messages to broker, sending notifications... BlockingConsumer implementations might also support concurrent use, but consult implementation for more details.

Directories

Path Synopsis
Package notifiers contain the domain concept definitions needed to support SuperMQ notifications functionality.
Package notifiers contain the domain concept definitions needed to support SuperMQ notifications functionality.
api
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
mocks
Package mocks contains mocks for testing purposes.
Package mocks contains mocks for testing purposes.
postgres
Package postgres contains repository implementations using PostgreSQL as the underlying database.
Package postgres contains repository implementations using PostgreSQL as the underlying database.
tracing
Package tracing provides tracing instrumentation for SuperMQ WebSocket adapter service.
Package tracing provides tracing instrumentation for SuperMQ WebSocket adapter service.
Package writers contain the domain concept definitions needed to support SuperMQ writer services functionality.
Package writers contain the domain concept definitions needed to support SuperMQ writer services functionality.
api
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
postgres
Package postgres contains repository implementations using Postgres as the underlying database.
Package postgres contains repository implementations using Postgres as the underlying database.
timescale
Package timescale contains repository implementations using Timescale as the underlying database.
Package timescale contains repository implementations using Timescale as the underlying database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL