gluon

package module
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

README ¶

🎇 Gluon

A composable message bus for Event-Driven systems written in Go.

Documentation ¶

Index ¶

Constants ¶

View Source
const CloudEventsSpecVersion = "1.0"

CloudEventsSpecVersion The CloudEvents specification version used by `Gluon` internals.

Variables ¶

View Source
var ErrBusClosed = errors.New("gluon: The bus is closed")

ErrBusClosed Cannot perform the action with a closed Bus.

View Source
var (
	// ErrMessageNotRegistered The message type was not found on the message registry
	ErrMessageNotRegistered = errors.New("gluon: The specified message type is not present on the message registry")
)
View Source
var ErrMissingSchemaDefinition = errors.New("gluon: Missing schema definition")

Functions ¶

func Register ¶

func Register(name string, driver Driver)

Register makes a message broker driver available for the Bus.

If Register is called with a driver equals to nil, it panics.

Types ¶

type Bus ¶

type Bus struct {
	BaseContext    context.Context
	Marshaler      Marshaler
	Factories      Factories
	SchemaRegistry SchemaRegistry
	Configuration  BusConfiguration
	Logger         zerolog.Logger
	Addresses      []string
	// contains filtered or unexported fields
}

Bus Is a facade component used to interact with foreign systems through streaming messaging mechanisms.

func NewBus ¶

func NewBus(driver string, opts ...Option) *Bus

NewBus Allocate a new Bus with default configurations.

func (*Bus) GetSchemaMetadata ¶ added in v0.2.2

func (b *Bus) GetSchemaMetadata(schema interface{}) (*MessageMetadata, error)

GetSchemaMetadata retrieves metadata from the internal schema registry

func (*Bus) GetSchemaMetadataFromTopic ¶ added in v0.2.4

func (b *Bus) GetSchemaMetadataFromTopic(topic string) *MessageMetadata

GetSchemaMetadataFromTopic retrieves metadata from the internal schema registry using the topic name

func (*Bus) ListSubscribersFromTopic ¶

func (b *Bus) ListSubscribersFromTopic(t string) []*Subscriber

ListSubscribersFromTopic Get the subscription task queue of a registered topic.

func (*Bus) ListenAndServe ¶

func (b *Bus) ListenAndServe() error

ListenAndServe Bootstrap and start a Bus along its internal components (subscribers).

func (*Bus) Publish ¶

func (b *Bus) Publish(ctx context.Context, data interface{}) error

Publish Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishBulk ¶

func (b *Bus) PublishBulk(ctx context.Context, data ...interface{}) error

PublishBulk Propagate multiple messages to the ecosystem.

func (*Bus) PublishRaw ¶

func (b *Bus) PublishRaw(ctx context.Context, msg *TransportMessage) error

PublishRaw Propagate a raw `Gluon` internal message to the ecosystem.

func (*Bus) PublishWithSubject ¶

func (b *Bus) PublishWithSubject(ctx context.Context, data interface{}, subject string) error

PublishWithSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

This method also exposes the `Subject` property to define the CloudEvent property with the same name.

func (*Bus) PublishWithTopic ¶ added in v0.2.3

func (b *Bus) PublishWithTopic(ctx context.Context, topic string, data interface{}) error

PublishWithTopic Propagate a message to the ecosystem using the internal schema registry to get the topic.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishWithTopicAndSubject ¶ added in v0.3.0

func (b *Bus) PublishWithTopicAndSubject(ctx context.Context, topic, subject string, data interface{}) error

PublishWithTopicAndSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

This method also exposes the `Subject` property to define the CloudEvent property with the same name.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishWithType ¶ added in v0.3.1

func (b *Bus) PublishWithType(ctx context.Context, msgType string, data interface{}) error

PublishWithType Propagate a message to the ecosystem using the internal schema registry Go's struct type.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishWithTypeAndSubject ¶ added in v0.3.1

func (b *Bus) PublishWithTypeAndSubject(ctx context.Context, msgType, subject string, data interface{}) error

PublishWithTypeAndSubject Propagate a message to the ecosystem using the internal schema registry Go's struct type and the subject.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) RegisterSchema ¶

func (b *Bus) RegisterSchema(schema interface{}, opts ...SchemaRegistryOption)

RegisterSchema Link a message schema to specific metadata (MessageMetadata) and store it for Bus further operations.

func (*Bus) Shutdown ¶

func (b *Bus) Shutdown(ctx context.Context) error

Shutdown Close a Bus and its internal resources gracefully.

func (*Bus) Subscribe ¶

func (b *Bus) Subscribe(schema interface{}) *Subscriber

Subscribe Set a subscription task using schema metadata.

It will return nil if no schema was found on local schema registry.

func (*Bus) SubscribeTopic ¶

func (b *Bus) SubscribeTopic(topic string) *Subscriber

SubscribeTopic Set a subscription task using a raw topic name.

type BusConfiguration ¶

type BusConfiguration struct {
	ConsumerGroup string
	MajorVersion  int
	// Driver Custom driver configuration(s)
	Driver interface{}
}

type Driver ¶

type Driver interface {
	SetParentBus(b *Bus)
	SetInternalHandler(h InternalMessageHandler)
	Start(context.Context) error
	Shutdown(context.Context) error
	Subscribe(ctx context.Context, subscriber *Subscriber) error
	// Publish Propagate a low-level message (CloudEvent) to the message bus.
	Publish(ctx context.Context, message *TransportMessage) error
}

Driver Is the transportation vendor (e.g. Apache Kafka, AWS SNS/SQS, Redis Streams) which implements `Gluon` internal mechanisms.

type Error ¶ added in v0.3.5

type Error struct {
	// contains filtered or unexported fields
}

func NewError ¶ added in v0.3.5

func NewError(kind, description string, parent error) Error

func (Error) Description ¶ added in v0.3.5

func (e Error) Description() string

func (Error) Error ¶ added in v0.3.5

func (e Error) Error() string

func (Error) Kind ¶ added in v0.3.5

func (e Error) Kind() string

func (Error) Parent ¶ added in v0.3.5

func (e Error) Parent() error

func (Error) ParentDescription ¶ added in v0.3.6

func (e Error) ParentDescription() string

type Factories ¶

type Factories struct {
	IDFactory IDFactory
}

Factories Is a list of factories used by `Gluon` internals (e.g. ID factory) to successfully execute certain operations.

For example, when Gluon is constructing a message when a publication was requested, Gluon internals use the specified ID factory to generate IDs which contains the underlying unique identification algorithm (e.g. UUID).

type FactoryUUID ¶

type FactoryUUID struct{}

func (FactoryUUID) NewID ¶

func (f FactoryUUID) NewID() (string, error)

type Handler ¶

type Handler interface {
	Handle(context.Context, *Message) error
}

Handler Is a component used to subscribe to a topic.

type HandlerFunc ¶

type HandlerFunc func(context.Context, *Message) error

HandlerFunc Is an anonymous function used to subscribe to a topic.

type IDFactory ¶

type IDFactory interface {
	NewID() (string, error)
}

IDFactory Is the kind of factory for generating unique identifications using a specific algorithm (e.g. UUID, Nano ID)

The default IDFactory is FactoryUUID (uses google/uuid package).

type InternalMessageHandler ¶

type InternalMessageHandler func(ctx context.Context, subscriber *Subscriber, message *TransportMessage) error

InternalMessageHandler is the message handler used by concrete drivers.

type LocalSchemaRegistry ¶ added in v0.3.0

type LocalSchemaRegistry struct {
	BasePath string
}

func (LocalSchemaRegistry) GetBaseLocation ¶ added in v0.3.0

func (l LocalSchemaRegistry) GetBaseLocation() string

func (LocalSchemaRegistry) GetSchemaDefinition ¶ added in v0.3.0

func (l LocalSchemaRegistry) GetSchemaDefinition(schemaName string, _ int) (string, error)

func (LocalSchemaRegistry) IsUsingLatestSchema ¶ added in v0.3.0

func (l LocalSchemaRegistry) IsUsingLatestSchema() bool

type Marshaler ¶

type Marshaler interface {
	GetContentType() string
	Marshal(schemaDef string, v interface{}) ([]byte, error)
	Unmarshal(schemaDef string, data []byte, v interface{}) error
}

Marshaler Is a composable component which parses given data into a specific codec in binary format.

The Marshaler composable component is used by `Gluon` internals to preserve a specific codec for Message(s) which are transported through stream pipelines.

The default Marshaler is MarshalerJSON.

type MarshalerAvro ¶ added in v0.2.1

type MarshalerAvro struct{}

func NewMarshalerAvro ¶ added in v0.2.1

func NewMarshalerAvro() *MarshalerAvro

func (*MarshalerAvro) GetContentType ¶ added in v0.2.1

func (m *MarshalerAvro) GetContentType() string

func (*MarshalerAvro) Marshal ¶ added in v0.2.1

func (m *MarshalerAvro) Marshal(schemaDef string, v interface{}) ([]byte, error)

func (*MarshalerAvro) Unmarshal ¶ added in v0.2.1

func (m *MarshalerAvro) Unmarshal(schemaDef string, data []byte, v interface{}) error

type MarshalerJSON ¶

type MarshalerJSON struct{}

func (MarshalerJSON) GetContentType ¶

func (m MarshalerJSON) GetContentType() string

func (MarshalerJSON) Marshal ¶

func (m MarshalerJSON) Marshal(_ string, v interface{}) ([]byte, error)

func (MarshalerJSON) Unmarshal ¶

func (m MarshalerJSON) Unmarshal(_ string, data []byte, v interface{}) error

type Message ¶

type Message struct {
	Headers map[string]interface{}
	Data    interface{}
}

func (Message) GetCausationID ¶

func (m Message) GetCausationID() string

func (Message) GetConsumerGroup ¶

func (m Message) GetConsumerGroup() string

func (Message) GetContentType ¶

func (m Message) GetContentType() string

func (Message) GetCorrelationID ¶

func (m Message) GetCorrelationID() string

func (Message) GetMessageID ¶

func (m Message) GetMessageID() string

func (Message) GetMessageTime ¶

func (m Message) GetMessageTime() time.Time

func (Message) GetMessageType ¶

func (m Message) GetMessageType() string

func (Message) GetSchema ¶

func (m Message) GetSchema() string

func (Message) GetSource ¶

func (m Message) GetSource() string

func (Message) GetSpecVersion ¶

func (m Message) GetSpecVersion() string

func (Message) GetSubject ¶

func (m Message) GetSubject() string

func (Message) GetTraceContext ¶

func (m Message) GetTraceContext() interface{}

type MessageMetadata ¶

type MessageMetadata struct {
	Topic         string
	Source        string
	SchemaName    string
	SchemaVersion int

	SchemaInternalType reflect.Type
}

MessageMetadata Is a set of definitions to describe a specific message schema.

type MiddlewareHandlerFunc ¶

type MiddlewareHandlerFunc func(next HandlerFunc) HandlerFunc

MiddlewareHandlerFunc Is an anonymous function used to add behaviour to a consumer process.

This pattern is also known as Chain of Responsibility (CoR).

type MiddlewarePublisherFunc ¶

type MiddlewarePublisherFunc func(next PublisherFunc) PublisherFunc

MiddlewarePublisherFunc Is an anonymous function used to add behaviour to a publishing process.

This pattern is also known as Chain of Responsibility (CoR).

type Option ¶

type Option interface {
	// contains filtered or unexported methods
}

Option set a specific configuration of a resource (e.g. bus).

func WithBaseContext ¶

func WithBaseContext(ctx context.Context) Option

WithBaseContext Set the Bus base context.

func WithCluster ¶

func WithCluster(addr ...string) Option

WithCluster Set one up to N nodes for the Bus to use.

Not applicable when using a local Bus.

func WithConsumerGroup ¶

func WithConsumerGroup(s string) Option

WithConsumerGroup Set a global consumer group, useful for microservices.

func WithConsumerMiddleware ¶

func WithConsumerMiddleware(f ...MiddlewareHandlerFunc) Option

WithConsumerMiddleware Attach a chain of behaviour(s) for `Gluon` message consumption operations.

func WithDriverConfiguration ¶

func WithDriverConfiguration(cfg interface{}) Option

WithDriverConfiguration Set a configuration for a specific driver.

func WithIDFactory ¶

func WithIDFactory(f IDFactory) Option

WithIDFactory Set a unique identifier factory for `Gluon` operations.

func WithLogger ¶

func WithLogger(l zerolog.Logger) Option

WithLogger Set a global logger to output Bus internal operations.

func WithMajorVersion ¶

func WithMajorVersion(v int) Option

WithMajorVersion Set a global major version for message schemas.

func WithMarshaler ¶

func WithMarshaler(m Marshaler) Option

WithMarshaler Set a marshaler strategy to encode/decode in-transit messages.

func WithPublisherMiddleware ¶

func WithPublisherMiddleware(f ...MiddlewarePublisherFunc) Option

WithPublisherMiddleware Attach a chain of behaviour(s) for `Gluon` message production of message operations.

func WithSchemaRegistry ¶ added in v0.3.0

func WithSchemaRegistry(s SchemaRegistry) Option

WithSchemaRegistry Set a schema registry used by specific codecs (e.g. Apache Avro) to decode/encode in-transit messages.

type PublisherFunc ¶

type PublisherFunc func(ctx context.Context, message *TransportMessage) error

PublisherFunc Is an anonymous function used by `Gluon` to propagate low-level messages.

type SchemaRegistry ¶ added in v0.3.0

type SchemaRegistry interface {
	GetBaseLocation() string
	IsUsingLatestSchema() bool
	GetSchemaDefinition(schemaName string, version int) (string, error)
}

type SchemaRegistryOption ¶

type SchemaRegistryOption interface {
	// contains filtered or unexported methods
}

SchemaRegistryOption set a specific configuration for internal schema registry.

func WithSchemaName ¶ added in v0.3.0

func WithSchemaName(s string) SchemaRegistryOption

WithSchemaName Set the name of the schema stored on the SchemaRegistry.

func WithSchemaVersion ¶

func WithSchemaVersion(v int) SchemaRegistryOption

WithSchemaVersion Set a major version for a message schema.

func WithSource ¶

func WithSource(s string) SchemaRegistryOption

WithSource Set a source to a message schema.

func WithTopic ¶

func WithTopic(s string) SchemaRegistryOption

WithTopic Set a topic name to a message schema.

type Subscriber ¶

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber Is an entry for the subscriber registry component.

It contains metadata for a specific consumer.

func (*Subscriber) DriverConfiguration ¶

func (e *Subscriber) DriverConfiguration(cfg interface{}) *Subscriber

DriverConfiguration Set configuration for a specific driver.

func (Subscriber) GetDefaultHandler ¶

func (e Subscriber) GetDefaultHandler() HandlerFunc

GetDefaultHandler Retrieve the Subscriber's default HandlerFunc.

It will return the defined HandlerFunc by default. If no HandlerFunc was specified, then the Handler.Handle function is returned.

func (*Subscriber) GetDriverConfiguration ¶

func (e *Subscriber) GetDriverConfiguration() interface{}

GetDriverConfiguration Get the configuration of a specific driver.

func (Subscriber) GetGroup ¶

func (e Subscriber) GetGroup() string

GetGroup Retrieve the Subscriber's consumer group.

func (Subscriber) GetHandler ¶

func (e Subscriber) GetHandler() Handler

GetHandler Retrieve the Subscriber's Handler.

func (Subscriber) GetHandlerFunc ¶

func (e Subscriber) GetHandlerFunc() HandlerFunc

GetHandlerFunc Retrieve the Subscriber's HandlerFunc.

func (Subscriber) GetTopic ¶

func (e Subscriber) GetTopic() string

GetTopic Retrieve the Subscriber's topic name.

func (*Subscriber) Group ¶

func (e *Subscriber) Group(g string) *Subscriber

Group Set a consumer group (if Driver allows them).

A consumer group is a mechanism used to consume messages in parallel by multiple workers as a single unit.

This kind of mechanisms are very useful when consuming messages from within a microservice environment, where each node from the microservice cluster gets a message every specific time (depending on the load balancing algorithm).

func (*Subscriber) Handler ¶

func (e *Subscriber) Handler(h Handler) *Subscriber

Handler Set a Handler component.

In-transit messages will go straight through to Handler.Handle function.

func (*Subscriber) HandlerFunc ¶

func (e *Subscriber) HandlerFunc(h HandlerFunc) *Subscriber

HandlerFunc Set a HandlerFunc component.

In-transit messages will go straight through to the function.

type TransportMessage ¶

type TransportMessage struct {
	ID          string `json:"id"`
	Source      string `json:"source"`
	SpecVersion string `json:"specversion"`
	Type        string `json:"type"`
	Data        []byte `json:"data"`

	DataContentType string `json:"datacontenttype,omitempty"`
	DataSchema      string `json:"dataschema,omitempty"`
	Subject         string `json:"subject,omitempty"`
	Time            string `json:"time,omitempty"`

	CorrelationID string      `json:"gluon_correlation_id"`
	CausationID   string      `json:"gluon_causation_id"`
	TraceContext  interface{} `json:"gluon_trace_context"`

	// Internal fields
	Topic         string            `json:"-"`
	DriverHeaders map[string]string `json:"-"`
}

TransportMessage Is the basic unit of data transportation (also known as integration event).

Based on the CloudEvents specification (a project from the CNCF), the payload of this structure was made to comply with most of event-driven systems.

For greater performance gains in further streaming pipelines, it is widely recommended to transport small pieces of data (<64 KB). In addition, there are infrastructure vendors that might accept up to a fixed message size.

For more information about the fields that compose this structure, check https://github.com/cloudevents/spec/blob/master/spec.md

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL