Documentation ¶
Index ¶
- Constants
- Variables
- func Register(name string, driver Driver)
- type Bus
- func (b *Bus) GetSchemaMetadata(schema interface{}) (*MessageMetadata, error)
- func (b *Bus) GetSchemaMetadataFromTopic(topic string) *MessageMetadata
- func (b *Bus) ListSubscribersFromTopic(t string) []*Subscriber
- func (b *Bus) ListenAndServe() error
- func (b *Bus) Publish(ctx context.Context, data interface{}) error
- func (b *Bus) PublishBulk(ctx context.Context, data ...interface{}) error
- func (b *Bus) PublishRaw(ctx context.Context, msg *TransportMessage) error
- func (b *Bus) PublishWithSubject(ctx context.Context, data interface{}, subject string) error
- func (b *Bus) PublishWithTopic(ctx context.Context, topic string, data interface{}) error
- func (b *Bus) PublishWithTopicAndSubject(ctx context.Context, topic, subject string, data interface{}) error
- func (b *Bus) PublishWithType(ctx context.Context, msgType string, data interface{}) error
- func (b *Bus) PublishWithTypeAndSubject(ctx context.Context, msgType, subject string, data interface{}) error
- func (b *Bus) RegisterSchema(schema interface{}, opts ...SchemaRegistryOption)
- func (b *Bus) Shutdown(ctx context.Context) error
- func (b *Bus) Subscribe(schema interface{}) *Subscriber
- func (b *Bus) SubscribeTopic(topic string) *Subscriber
- type BusConfiguration
- type Driver
- type Error
- type Factories
- type FactoryUUID
- type Handler
- type HandlerFunc
- type IDFactory
- type InternalMessageHandler
- type LocalSchemaRegistry
- type Marshaler
- type MarshalerAvro
- type MarshalerJSON
- type Message
- func (m Message) GetCausationID() string
- func (m Message) GetConsumerGroup() string
- func (m Message) GetContentType() string
- func (m Message) GetCorrelationID() string
- func (m Message) GetMessageID() string
- func (m Message) GetMessageTime() time.Time
- func (m Message) GetMessageType() string
- func (m Message) GetSchema() string
- func (m Message) GetSource() string
- func (m Message) GetSpecVersion() string
- func (m Message) GetSubject() string
- func (m Message) GetTraceContext() interface{}
- type MessageMetadata
- type MiddlewareHandlerFunc
- type MiddlewarePublisherFunc
- type Option
- func WithBaseContext(ctx context.Context) Option
- func WithCluster(addr ...string) Option
- func WithConsumerGroup(s string) Option
- func WithConsumerMiddleware(f ...MiddlewareHandlerFunc) Option
- func WithDriverConfiguration(cfg interface{}) Option
- func WithIDFactory(f IDFactory) Option
- func WithLogger(l zerolog.Logger) Option
- func WithMajorVersion(v int) Option
- func WithMarshaler(m Marshaler) Option
- func WithPublisherMiddleware(f ...MiddlewarePublisherFunc) Option
- func WithSchemaRegistry(s SchemaRegistry) Option
- type PublisherFunc
- type SchemaRegistry
- type SchemaRegistryOption
- type Subscriber
- func (e *Subscriber) DriverConfiguration(cfg interface{}) *Subscriber
- func (e Subscriber) GetDefaultHandler() HandlerFunc
- func (e *Subscriber) GetDriverConfiguration() interface{}
- func (e Subscriber) GetGroup() string
- func (e Subscriber) GetHandler() Handler
- func (e Subscriber) GetHandlerFunc() HandlerFunc
- func (e Subscriber) GetTopic() string
- func (e *Subscriber) Group(g string) *Subscriber
- func (e *Subscriber) Handler(h Handler) *Subscriber
- func (e *Subscriber) HandlerFunc(h HandlerFunc) *Subscriber
- type TransportMessage
Constants ¶
const CloudEventsSpecVersion = "1.0"
CloudEventsSpecVersion The CloudEvents specification version used by `Gluon` internals.
Variables ¶
var ErrBusClosed = errors.New("gluon: The bus is closed")
ErrBusClosed Cannot perform the action with a closed Bus.
var ( // ErrMessageNotRegistered The message type was not found on the message registry ErrMessageNotRegistered = errors.New("gluon: The specified message type is not present on the message registry") )
var ErrMissingSchemaDefinition = errors.New("gluon: Missing schema definition")
Functions ¶
Types ¶
type Bus ¶
type Bus struct { BaseContext context.Context Marshaler Marshaler Factories Factories SchemaRegistry SchemaRegistry Configuration BusConfiguration Logger zerolog.Logger Addresses []string // contains filtered or unexported fields }
Bus Is a facade component used to interact with foreign systems through streaming messaging mechanisms.
func (*Bus) GetSchemaMetadata ¶ added in v0.2.2
func (b *Bus) GetSchemaMetadata(schema interface{}) (*MessageMetadata, error)
GetSchemaMetadata retrieves metadata from the internal schema registry
func (*Bus) GetSchemaMetadataFromTopic ¶ added in v0.2.4
func (b *Bus) GetSchemaMetadataFromTopic(topic string) *MessageMetadata
GetSchemaMetadataFromTopic retrieves metadata from the internal schema registry using the topic name
func (*Bus) ListSubscribersFromTopic ¶
func (b *Bus) ListSubscribersFromTopic(t string) []*Subscriber
ListSubscribersFromTopic Get the subscription task queue of a registered topic.
func (*Bus) ListenAndServe ¶
ListenAndServe Bootstrap and start a Bus along its internal components (subscribers).
func (*Bus) Publish ¶
Publish Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.
Note: To propagate correlation and causation IDs, use Subscription's context.
func (*Bus) PublishBulk ¶
PublishBulk Propagate multiple messages to the ecosystem.
func (*Bus) PublishRaw ¶
func (b *Bus) PublishRaw(ctx context.Context, msg *TransportMessage) error
PublishRaw Propagate a raw `Gluon` internal message to the ecosystem.
func (*Bus) PublishWithSubject ¶
PublishWithSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.
This method also exposes the `Subject` property to define the CloudEvent property with the same name.
func (*Bus) PublishWithTopic ¶ added in v0.2.3
PublishWithTopic Propagate a message to the ecosystem using the internal schema registry to get the topic.
Note: To propagate correlation and causation IDs, use Subscription's context.
func (*Bus) PublishWithTopicAndSubject ¶ added in v0.3.0
func (b *Bus) PublishWithTopicAndSubject(ctx context.Context, topic, subject string, data interface{}) error
PublishWithTopicAndSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.
This method also exposes the `Subject` property to define the CloudEvent property with the same name.
Note: To propagate correlation and causation IDs, use Subscription's context.
func (*Bus) PublishWithType ¶ added in v0.3.1
PublishWithType Propagate a message to the ecosystem using the internal schema registry Go's struct type.
Note: To propagate correlation and causation IDs, use Subscription's context.
func (*Bus) PublishWithTypeAndSubject ¶ added in v0.3.1
func (b *Bus) PublishWithTypeAndSubject(ctx context.Context, msgType, subject string, data interface{}) error
PublishWithTypeAndSubject Propagate a message to the ecosystem using the internal schema registry Go's struct type and the subject.
Note: To propagate correlation and causation IDs, use Subscription's context.
func (*Bus) RegisterSchema ¶
func (b *Bus) RegisterSchema(schema interface{}, opts ...SchemaRegistryOption)
RegisterSchema Link a message schema to specific metadata (MessageMetadata) and store it for Bus further operations.
func (*Bus) Subscribe ¶
func (b *Bus) Subscribe(schema interface{}) *Subscriber
Subscribe Set a subscription task using schema metadata.
It will return nil if no schema was found on local schema registry.
func (*Bus) SubscribeTopic ¶
func (b *Bus) SubscribeTopic(topic string) *Subscriber
SubscribeTopic Set a subscription task using a raw topic name.
type BusConfiguration ¶
type Driver ¶
type Driver interface { SetParentBus(b *Bus) SetInternalHandler(h InternalMessageHandler) Start(context.Context) error Shutdown(context.Context) error Subscribe(ctx context.Context, subscriber *Subscriber) error // Publish Propagate a low-level message (CloudEvent) to the message bus. Publish(ctx context.Context, message *TransportMessage) error }
Driver Is the transportation vendor (e.g. Apache Kafka, AWS SNS/SQS, Redis Streams) which implements `Gluon` internal mechanisms.
type Error ¶ added in v0.3.5
type Error struct {
// contains filtered or unexported fields
}
func (Error) Description ¶ added in v0.3.5
func (Error) ParentDescription ¶ added in v0.3.6
type Factories ¶
type Factories struct {
IDFactory IDFactory
}
Factories Is a list of factories used by `Gluon` internals (e.g. ID factory) to successfully execute certain operations.
For example, when Gluon is constructing a message when a publication was requested, Gluon internals use the specified ID factory to generate IDs which contains the underlying unique identification algorithm (e.g. UUID).
type FactoryUUID ¶
type FactoryUUID struct{}
func (FactoryUUID) NewID ¶
func (f FactoryUUID) NewID() (string, error)
type HandlerFunc ¶
HandlerFunc Is an anonymous function used to subscribe to a topic.
type IDFactory ¶
IDFactory Is the kind of factory for generating unique identifications using a specific algorithm (e.g. UUID, Nano ID)
The default IDFactory is FactoryUUID (uses google/uuid package).
type InternalMessageHandler ¶
type InternalMessageHandler func(ctx context.Context, subscriber *Subscriber, message *TransportMessage) error
InternalMessageHandler is the message handler used by concrete drivers.
type LocalSchemaRegistry ¶ added in v0.3.0
type LocalSchemaRegistry struct {
BasePath string
}
func (LocalSchemaRegistry) GetBaseLocation ¶ added in v0.3.0
func (l LocalSchemaRegistry) GetBaseLocation() string
func (LocalSchemaRegistry) GetSchemaDefinition ¶ added in v0.3.0
func (l LocalSchemaRegistry) GetSchemaDefinition(schemaName string, _ int) (string, error)
func (LocalSchemaRegistry) IsUsingLatestSchema ¶ added in v0.3.0
func (l LocalSchemaRegistry) IsUsingLatestSchema() bool
type Marshaler ¶
type Marshaler interface { GetContentType() string Marshal(schemaDef string, v interface{}) ([]byte, error) Unmarshal(schemaDef string, data []byte, v interface{}) error }
Marshaler Is a composable component which parses given data into a specific codec in binary format.
The Marshaler composable component is used by `Gluon` internals to preserve a specific codec for Message(s) which are transported through stream pipelines.
The default Marshaler is MarshalerJSON.
type MarshalerAvro ¶ added in v0.2.1
type MarshalerAvro struct{}
func NewMarshalerAvro ¶ added in v0.2.1
func NewMarshalerAvro() *MarshalerAvro
func (*MarshalerAvro) GetContentType ¶ added in v0.2.1
func (m *MarshalerAvro) GetContentType() string
type MarshalerJSON ¶
type MarshalerJSON struct{}
func (MarshalerJSON) GetContentType ¶
func (m MarshalerJSON) GetContentType() string
type Message ¶
type Message struct { Headers map[string]interface{} Data interface{} }
func (Message) GetCausationID ¶
func (Message) GetConsumerGroup ¶
func (Message) GetContentType ¶
func (Message) GetCorrelationID ¶
func (Message) GetMessageID ¶
func (Message) GetMessageTime ¶
func (Message) GetMessageType ¶
func (Message) GetSpecVersion ¶
func (Message) GetSubject ¶
func (Message) GetTraceContext ¶
func (m Message) GetTraceContext() interface{}
type MessageMetadata ¶
type MessageMetadata struct { Topic string Source string SchemaName string SchemaVersion int SchemaInternalType reflect.Type }
MessageMetadata Is a set of definitions to describe a specific message schema.
type MiddlewareHandlerFunc ¶
type MiddlewareHandlerFunc func(next HandlerFunc) HandlerFunc
MiddlewareHandlerFunc Is an anonymous function used to add behaviour to a consumer process.
This pattern is also known as Chain of Responsibility (CoR).
type MiddlewarePublisherFunc ¶
type MiddlewarePublisherFunc func(next PublisherFunc) PublisherFunc
MiddlewarePublisherFunc Is an anonymous function used to add behaviour to a publishing process.
This pattern is also known as Chain of Responsibility (CoR).
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option set a specific configuration of a resource (e.g. bus).
func WithBaseContext ¶
WithBaseContext Set the Bus base context.
func WithCluster ¶
WithCluster Set one up to N nodes for the Bus to use.
Not applicable when using a local Bus.
func WithConsumerGroup ¶
WithConsumerGroup Set a global consumer group, useful for microservices.
func WithConsumerMiddleware ¶
func WithConsumerMiddleware(f ...MiddlewareHandlerFunc) Option
WithConsumerMiddleware Attach a chain of behaviour(s) for `Gluon` message consumption operations.
func WithDriverConfiguration ¶
func WithDriverConfiguration(cfg interface{}) Option
WithDriverConfiguration Set a configuration for a specific driver.
func WithIDFactory ¶
WithIDFactory Set a unique identifier factory for `Gluon` operations.
func WithLogger ¶
WithLogger Set a global logger to output Bus internal operations.
func WithMajorVersion ¶
WithMajorVersion Set a global major version for message schemas.
func WithMarshaler ¶
WithMarshaler Set a marshaler strategy to encode/decode in-transit messages.
func WithPublisherMiddleware ¶
func WithPublisherMiddleware(f ...MiddlewarePublisherFunc) Option
WithPublisherMiddleware Attach a chain of behaviour(s) for `Gluon` message production of message operations.
func WithSchemaRegistry ¶ added in v0.3.0
func WithSchemaRegistry(s SchemaRegistry) Option
WithSchemaRegistry Set a schema registry used by specific codecs (e.g. Apache Avro) to decode/encode in-transit messages.
type PublisherFunc ¶
type PublisherFunc func(ctx context.Context, message *TransportMessage) error
PublisherFunc Is an anonymous function used by `Gluon` to propagate low-level messages.
type SchemaRegistry ¶ added in v0.3.0
type SchemaRegistryOption ¶
type SchemaRegistryOption interface {
// contains filtered or unexported methods
}
SchemaRegistryOption set a specific configuration for internal schema registry.
func WithSchemaName ¶ added in v0.3.0
func WithSchemaName(s string) SchemaRegistryOption
WithSchemaName Set the name of the schema stored on the SchemaRegistry.
func WithSchemaVersion ¶
func WithSchemaVersion(v int) SchemaRegistryOption
WithSchemaVersion Set a major version for a message schema.
func WithSource ¶
func WithSource(s string) SchemaRegistryOption
WithSource Set a source to a message schema.
func WithTopic ¶
func WithTopic(s string) SchemaRegistryOption
WithTopic Set a topic name to a message schema.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber Is an entry for the subscriber registry component.
It contains metadata for a specific consumer.
func (*Subscriber) DriverConfiguration ¶
func (e *Subscriber) DriverConfiguration(cfg interface{}) *Subscriber
DriverConfiguration Set configuration for a specific driver.
func (Subscriber) GetDefaultHandler ¶
func (e Subscriber) GetDefaultHandler() HandlerFunc
GetDefaultHandler Retrieve the Subscriber's default HandlerFunc.
It will return the defined HandlerFunc by default. If no HandlerFunc was specified, then the Handler.Handle function is returned.
func (*Subscriber) GetDriverConfiguration ¶
func (e *Subscriber) GetDriverConfiguration() interface{}
GetDriverConfiguration Get the configuration of a specific driver.
func (Subscriber) GetGroup ¶
func (e Subscriber) GetGroup() string
GetGroup Retrieve the Subscriber's consumer group.
func (Subscriber) GetHandler ¶
func (e Subscriber) GetHandler() Handler
GetHandler Retrieve the Subscriber's Handler.
func (Subscriber) GetHandlerFunc ¶
func (e Subscriber) GetHandlerFunc() HandlerFunc
GetHandlerFunc Retrieve the Subscriber's HandlerFunc.
func (Subscriber) GetTopic ¶
func (e Subscriber) GetTopic() string
GetTopic Retrieve the Subscriber's topic name.
func (*Subscriber) Group ¶
func (e *Subscriber) Group(g string) *Subscriber
Group Set a consumer group (if Driver allows them).
A consumer group is a mechanism used to consume messages in parallel by multiple workers as a single unit.
This kind of mechanisms are very useful when consuming messages from within a microservice environment, where each node from the microservice cluster gets a message every specific time (depending on the load balancing algorithm).
func (*Subscriber) Handler ¶
func (e *Subscriber) Handler(h Handler) *Subscriber
Handler Set a Handler component.
In-transit messages will go straight through to Handler.Handle function.
func (*Subscriber) HandlerFunc ¶
func (e *Subscriber) HandlerFunc(h HandlerFunc) *Subscriber
HandlerFunc Set a HandlerFunc component.
In-transit messages will go straight through to the function.
type TransportMessage ¶
type TransportMessage struct { ID string `json:"id"` Source string `json:"source"` SpecVersion string `json:"specversion"` Type string `json:"type"` Data []byte `json:"data"` DataContentType string `json:"datacontenttype,omitempty"` DataSchema string `json:"dataschema,omitempty"` Subject string `json:"subject,omitempty"` Time string `json:"time,omitempty"` CorrelationID string `json:"gluon_correlation_id"` CausationID string `json:"gluon_causation_id"` TraceContext interface{} `json:"gluon_trace_context"` // Internal fields Topic string `json:"-"` DriverHeaders map[string]string `json:"-"` }
TransportMessage Is the basic unit of data transportation (also known as integration event).
Based on the CloudEvents specification (a project from the CNCF), the payload of this structure was made to comply with most of event-driven systems.
For greater performance gains in further streaming pipelines, it is widely recommended to transport small pieces of data (<64 KB). In addition, there are infrastructure vendors that might accept up to a fixed message size.
For more information about the fields that compose this structure, check https://github.com/cloudevents/spec/blob/master/spec.md
Source Files ¶
- bus.go
- bus_configuration.go
- context_headers.go
- driver.go
- error.go
- factory.go
- factory_uuid.go
- handler.go
- headers.go
- internal_message_handler.go
- internal_schema_registry.go
- internal_schema_registry_options.go
- local_schema_registry.go
- marshaler.go
- marshaler_avro.go
- marshaler_json.go
- message.go
- options.go
- publisher.go
- schema_registry.go
- schema_registry_caching_mw.go
- subscriber.go
- subscriber_registry.go
- transport_message.go