Documentation ¶
Overview ¶
Package streams is a toolkit crafted for data-in-motion ecosystems written in Go.
Index ¶
- Constants
- Variables
- func InjectMessageCausationID(ctx context.Context, messageID string) string
- func InjectMessageCorrelationID(ctx context.Context, messageID string) string
- func Read(message interface{}, opts ...ReaderNodeOption) error
- func ReadByStreamKey(stream string, opts ...ReaderNodeOption)
- func RegisterStream(message interface{}, metadata StreamMetadata)
- func RegisterStreamByString(messageType string, metadata StreamMetadata)
- func Start(ctx context.Context)
- func Write(ctx context.Context, message interface{}) error
- func WriteBatch(ctx context.Context, messages ...interface{}) (uint32, error)
- func WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error
- func WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)
- func WriteRawMessage(ctx context.Context, message Message) error
- func WriteRawMessageBatch(ctx context.Context, messages ...Message) (uint32, error)
- type AvroMarshaler
- type Event
- type FailingMarshalerNoop
- type Hashing64AlgorithmFactory
- type Hub
- func (h *Hub) GetStreamReaderNodes(stream string) *singlylinkedlist.List
- func (h *Hub) Read(message interface{}, opts ...ReaderNodeOption) error
- func (h *Hub) ReadByStreamKey(stream string, opts ...ReaderNodeOption)
- func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)
- func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)
- func (h *Hub) Start(ctx context.Context)
- func (h *Hub) Write(ctx context.Context, message interface{}) error
- func (h *Hub) WriteBatch(ctx context.Context, messages ...interface{}) (uint32, error)
- func (h *Hub) WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error
- func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)
- func (h *Hub) WriteRawMessage(ctx context.Context, message Message) error
- func (h *Hub) WriteRawMessageBatch(ctx context.Context, messages ...Message) (uint32, error)
- type HubOption
- func WithIDFactory(f IDFactoryFunc) HubOption
- func WithInstanceName(n string) HubOption
- func WithMarshaler(m Marshaler) HubOption
- func WithReader(d Reader) HubOption
- func WithReaderBaseOptions(opts ...ReaderNodeOption) HubOption
- func WithReaderBehaviours(b ...ReaderBehaviour) HubOption
- func WithSchemaRegistry(r SchemaRegistry) HubOption
- func WithWriter(p Writer) HubOption
- type IDFactoryFunc
- type InMemorySchemaRegistry
- type JSONMarshaler
- type Marshaler
- type Message
- type MessageContextKey
- type NewMessageArgs
- type NoopSchemaRegistry
- type ProtocolBuffersMarshaler
- type Reader
- type ReaderBehaviour
- type ReaderHandleFunc
- type ReaderHandler
- type ReaderHandlerNoop
- type ReaderNode
- type ReaderNodeOption
- func WithConcurrencyLevel(n int) ReaderNodeOption
- func WithDriver(d Reader) ReaderNodeOption
- func WithGroup(g string) ReaderNodeOption
- func WithHandler(l ReaderHandler) ReaderNodeOption
- func WithHandlerFunc(l ReaderHandleFunc) ReaderNodeOption
- func WithMaxHandlerPoolSize(n int) ReaderNodeOption
- func WithProviderConfiguration(cfg interface{}) ReaderNodeOption
- func WithRetryInitialInterval(d time.Duration) ReaderNodeOption
- func WithRetryMaxInterval(d time.Duration) ReaderNodeOption
- func WithRetryTimeout(d time.Duration) ReaderNodeOption
- type ReaderTask
- type SchemaRegistry
- type StreamMetadata
- type StreamRegistry
- func (r *StreamRegistry) Get(message interface{}) (StreamMetadata, error)
- func (r *StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)
- func (r *StreamRegistry) GetByString(key string) (StreamMetadata, error)
- func (r *StreamRegistry) Set(message interface{}, metadata StreamMetadata)
- func (r *StreamRegistry) SetByString(key string, metadata StreamMetadata)
- type WriteByMessageKeyBatchItems
- type Writer
Constants ¶
const ( // MarshalerProtoContentType default content-type header for Protocol Buffer marshaller. MarshalerProtoContentType = "application/octet-stream" // MarshalerJSONContentType default content-type header for JSON marshaller. MarshalerJSONContentType = "application/json" // MarshalerAvroContentType default content-type header for Apache Avro marshaller. MarshalerAvroContentType = "application/avro" )
const CloudEventsSpecVersion = "1.0"
CloudEventsSpecVersion the CloudEvents specification version used by streams
Variables ¶
var ( // DefaultConcurrencyLevel default stream-listening jobs to be running concurrently for each ReaderNode. DefaultConcurrencyLevel = 1 // DefaultRetryInitialInterval default initial interval duration between each stream-listening job provisioning on failures. DefaultRetryInitialInterval = time.Second * 3 // DefaultRetryMaxInterval default maximum interval duration between each stream-listening job provisioning on failures. DefaultRetryMaxInterval = time.Second * 15 // DefaultRetryTimeout default duration of each stream-listening job provisioning on failures. DefaultRetryTimeout = time.Second * 15 // DefaultMaxHandlerPoolSize default pool size of goroutines for ReaderNode's Reader(s) / ReaderHandleFunc(s) executions. DefaultMaxHandlerPoolSize = 10 )
var (
// DefaultHubInstanceName default instance names for nameless Hub instances
DefaultHubInstanceName = "com.streams"
)
var ( // ErrInvalidProtocolBufferFormat the given data is not a valid protocol buffer message ErrInvalidProtocolBufferFormat = errors.New("streams: Invalid protocol buffer data") )
var ErrMissingSchemaDefinition = errors.New("streams: Missing stream schema definition in schema registry")
ErrMissingSchemaDefinition the requested stream message definition was not found in the SchemaRegistry
var ErrMissingStream = errors.New("streams: Missing stream entry in stream registry")
ErrMissingStream the requested stream was not found in the StreamRegistry
var ErrMissingWriterDriver = errors.New("streams: Missing writer driver")
ErrMissingWriterDriver no publisher driver was found.
var ReaderBaseBehaviours = []ReaderBehaviour{
unmarshalReaderBehaviour,
injectGroupReaderBehaviour,
injectTxIDsReaderBehaviour,
retryReaderBehaviour,
}
ReaderBaseBehaviours default ReaderBehaviours
Behaviours will be executed in descending order
var ReaderBaseBehavioursNoUnmarshal = []ReaderBehaviour{
injectGroupReaderBehaviour,
injectTxIDsReaderBehaviour,
retryReaderBehaviour,
}
ReaderBaseBehavioursNoUnmarshal default ReaderBehaviours without unmarshaling
Behaviours will be executed in descending order
Functions ¶
func InjectMessageCausationID ¶
InjectMessageCausationID injects the causation id from the given context if available. If not, it will use the message id as fallback.
func InjectMessageCorrelationID ¶
InjectMessageCorrelationID injects the correlation id from the given context if available. If not, it will use the message id as fallback.
func Read ¶ added in v0.2.1
func Read(message interface{}, opts ...ReaderNodeOption) error
Read registers a new stream-reading background job.
If reading from a Google's Protocol Buffer message pipeline, DO NOT use a pointer as message schema to avoid marshaling problems
func ReadByStreamKey ¶ added in v0.2.1
func ReadByStreamKey(stream string, opts ...ReaderNodeOption)
ReadByStreamKey registers a new stream-reading background job using the raw stream identifier (e.g. topic name).
func RegisterStream ¶ added in v0.2.1
func RegisterStream(message interface{}, metadata StreamMetadata)
RegisterStream creates a relation between a stream message type and metadata.
If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems
func RegisterStreamByString ¶ added in v0.2.1
func RegisterStreamByString(messageType string, metadata StreamMetadata)
RegisterStreamByString creates a relation between a string key and metadata.
func Write ¶ added in v0.2.1
Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func WriteBatch ¶ added in v0.2.1
WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func WriteByMessageKey ¶ added in v0.2.1
WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func WriteByMessageKeyBatch ¶ added in v0.2.1
func WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)
WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func WriteRawMessage ¶ added in v0.2.1
WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func WriteRawMessageBatch ¶ added in v0.2.1
WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches
Types ¶
type AvroMarshaler ¶
type AvroMarshaler struct { HashingFactory Hashing64AlgorithmFactory // contains filtered or unexported fields }
AvroMarshaler handles data transformation between primitives and Apache Avro format.
Apache Avro REQUIRES a defined SchemaRegistry to decode/encode data.
func NewAvroMarshaler ¶
func NewAvroMarshaler() AvroMarshaler
NewAvroMarshaler allocates a new Apache Avro marshaler with a simple caching system to reduce memory footprint and computational usage when parsing Avro schema definition files.
func (AvroMarshaler) ContentType ¶
func (a AvroMarshaler) ContentType() string
ContentType retrieves the encoding/decoding Apache Avro format using RFC 2046 standard (application/avro).
type Event ¶
type Event interface { // GetSubject This describes the subject of the event in the context of the event producer (identified by source). // In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the // source identifier alone might not be sufficient as a qualifier for any specific event if the source // context has internal sub-structure. // // Identifying the subject of the event in context metadata (opposed to only in the data payload) is particularly // helpful in generic subscription filtering scenarios where middleware is unable to interpret the data content. // In the above example, the subscriber might only be interested in blobs with names ending with '.jpg' or '.jpeg' // and the subject attribute allows for constructing a simple and efficient string-suffix filter for that // subset of events. GetSubject() string }
Event is an abstract message unit used by streams-based systems to publish messages with a `subject` populated field of a Message
type FailingMarshalerNoop ¶
type FailingMarshalerNoop struct{}
FailingMarshalerNoop the no-operation failing Marshaler
For testing purposes only
func (FailingMarshalerNoop) ContentType ¶
func (f FailingMarshalerNoop) ContentType() string
ContentType the failing content type operation
type Hashing64AlgorithmFactory ¶
Hashing64AlgorithmFactory factory for hash.Hash64 algorithms (used by Apache Avro schema definition caching system)
var DefaultHashing64AlgorithmFactory Hashing64AlgorithmFactory = func() hash.Hash64 { return fnv.New64a() }
DefaultHashing64AlgorithmFactory the default hashing64 algorithm factory for Marshaler schema definition caching layer
type Hub ¶
type Hub struct { InstanceName string StreamRegistry StreamRegistry Writer Writer Marshaler Marshaler IDFactory IDFactoryFunc SchemaRegistry SchemaRegistry Reader Reader ReaderBehaviours []ReaderBehaviour ReaderBaseOptions []ReaderNodeOption // contains filtered or unexported fields }
Hub is the main component which enables interactions between several systems through the usage of streams.
var ( // DefaultHub is the `streams` base instance used by the `streams` simple API. Recommended to use where only one // hub instance is required. // // This variable MUST be allocated manually. DefaultHub *Hub // ErrNilDefaultHub DefaultHub has not been initialized. ErrNilDefaultHub = errors.New("streams: DefaultHub has not been initialized") )
func (*Hub) GetStreamReaderNodes ¶ added in v0.2.4
func (h *Hub) GetStreamReaderNodes(stream string) *singlylinkedlist.List
GetStreamReaderNodes retrieves ReaderNode(s) from a stream.
func (*Hub) Read ¶
func (h *Hub) Read(message interface{}, opts ...ReaderNodeOption) error
Read registers a new stream-listening background job.
If listening to a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems
func (*Hub) ReadByStreamKey ¶
func (h *Hub) ReadByStreamKey(stream string, opts ...ReaderNodeOption)
ReadByStreamKey registers a new stream-listening background job using the raw stream identifier (e.g. topic name).
func (*Hub) RegisterStream ¶
func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)
RegisterStream creates a relation between a stream message type and metadata.
If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems
func (*Hub) RegisterStreamByString ¶
func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)
RegisterStreamByString creates a relation between a string key and metadata.
func (*Hub) Write ¶
Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteBatch ¶
WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func (*Hub) WriteByMessageKey ¶
WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteByMessageKeyBatch ¶
func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error)
WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func (*Hub) WriteRawMessage ¶
WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteRawMessageBatch ¶
WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches
type HubOption ¶
type HubOption interface {
// contains filtered or unexported methods
}
HubOption enables configuration of a Hub instance.
func WithIDFactory ¶
func WithIDFactory(f IDFactoryFunc) HubOption
WithIDFactory sets the default unique identifier factory of a Hub instance.
func WithInstanceName ¶
WithInstanceName sets the name of a Hub instance.
func WithMarshaler ¶
WithMarshaler sets the default marshaler of a Hub instance.
func WithReader ¶
WithReader sets the default reader driver of a Hub instance.
func WithReaderBaseOptions ¶
func WithReaderBaseOptions(opts ...ReaderNodeOption) HubOption
WithReaderBaseOptions sets a list of ReaderNodeOption of a Hub instance used as global options for each reader node.
func WithReaderBehaviours ¶
func WithReaderBehaviours(b ...ReaderBehaviour) HubOption
WithReaderBehaviours sets a list of ReaderBehaviour of a Hub instance ready to be executed by every stream-reading job's ReaderFunc or Reader component.
func WithSchemaRegistry ¶
func WithSchemaRegistry(r SchemaRegistry) HubOption
WithSchemaRegistry sets the schema registry of a Hub instance for stream message schema definitions.
func WithWriter ¶
WithWriter sets the writer of a Hub instance.
If both Writer and WriterFunc are defined, Writer will override WriterFunc.
type IDFactoryFunc ¶
IDFactoryFunc creates an unique identifier.
var RandInt64Factory IDFactoryFunc = func() (string, error) { i := rand.Int63() return strconv.Itoa(int(i)), nil }
RandInt64Factory creates a unique identifier using math/rand built-in package with 64-bit signed integer format
var UuidIdFactory IDFactoryFunc = func() (string, error) { id, err := uuid.NewUUID() return id.String(), err }
UuidIdFactory creates a unique identifier using UUID v4 algorithm.
type InMemorySchemaRegistry ¶
InMemorySchemaRegistry is the in memory schema registry, crafted specially for basic and/or testing scenarios.
func (InMemorySchemaRegistry) GetSchemaDefinition ¶
func (i InMemorySchemaRegistry) GetSchemaDefinition(name string, version int) (string, error)
GetSchemaDefinition retrieves a schema definition (in string format) from the registry
func (InMemorySchemaRegistry) RegisterDefinition ¶
func (i InMemorySchemaRegistry) RegisterDefinition(name, def string, version int)
RegisterDefinition stores the given schema definition into the registry
type JSONMarshaler ¶
type JSONMarshaler struct{}
JSONMarshaler handles data transformation between primitives and JSON format.
func (JSONMarshaler) ContentType ¶
func (m JSONMarshaler) ContentType() string
ContentType retrieves the encoding/decoding JSON format using RFC 2046 standard (application/json).
type Marshaler ¶
type Marshaler interface { // Marshal transforms a complex data type into a primitive binary array for data transportation. Marshal(schemaDef string, data interface{}) ([]byte, error) // Unmarshal transforms a primitive binary array to a complex data type for data processing. Unmarshal(schemaDef string, data []byte, ref interface{}) error // ContentType retrieves the encoding/decoding format using RFC 2046 standard (e.g. application/json). ContentType() string }
Marshaler handles data transformation between primitives and specific codecs/formats (e.g. JSON, Apache Avro).
type Message ¶
type Message struct { // Stream name of destination stream (aka. topic) Stream string `json:"stream"` // StreamVersion destination stream major version. Useful when non-backwards compatible schema update is desired. StreamVersion int `json:"stream_version"` ID string `json:"id"` Source string `json:"source"` SpecVersion string `json:"specversion"` Type string `json:"type"` Data []byte `json:"data"` DataContentType string `json:"datacontenttype,omitempty"` DataSchema string `json:"dataschema,omitempty"` DataSchemaVersion int `json:"dataschemaversion,omitempty"` Timestamp string `json:"time,omitempty"` Subject string `json:"subject,omitempty"` // Streamhub fields CorrelationID string `json:"correlation_id"` CausationID string `json:"causation_id"` // DecodedData data decoded using unmarshalling ReaderBehaviour component. This field is ONLY available for usage // from ReaderNode(s). DecodedData interface{} `json:"-"` // GroupName name of the reader group (aka. consumer group). This field is ONLY available for usage // from ReaderNode(s). GroupName string `json:"-"` }
Message is a unit of information which holds the primitive message (data) in binary format along multiple fields in order to preserve a schema definition within a stream pipeline.
The schema is based on the Cloud Native Computing Foundation (CNCF)'s CloudEvents specification.
For more information, please look: https://github.com/cloudevents/spec
func NewMessage ¶
func NewMessage(args NewMessageArgs) Message
NewMessage allocates an immutable Message ready to be transported in a stream.
type MessageContextKey ¶
type MessageContextKey string
MessageContextKey is the streams context key to inject data into transport messages.
const ( // ContextCorrelationID is the main trace of a stream processing. Once generated, it MUST NOT be generated again // to keep track of the process from the beginning. ContextCorrelationID MessageContextKey = "shub-correlation-id" // ContextCausationID is reference of the last message processed. This helps to know a direct relation between // a new process and the past one. ContextCausationID MessageContextKey = "shub-causation-id" )
type NewMessageArgs ¶
type NewMessageArgs struct { SchemaVersion int Data []byte ID string Source string Stream string StreamVersion int SchemaDefinitionName string ContentType string GroupName string Subject string }
NewMessageArgs arguments required by NewMessage function to operate.
type NoopSchemaRegistry ¶
type NoopSchemaRegistry struct{}
NoopSchemaRegistry is the no-operation implementation of SchemaRegistry
func (NoopSchemaRegistry) GetSchemaDefinition ¶
func (n NoopSchemaRegistry) GetSchemaDefinition(_ string, _ int) (string, error)
GetSchemaDefinition retrieves an empty string and a nil error
type ProtocolBuffersMarshaler ¶
type ProtocolBuffersMarshaler struct{}
ProtocolBuffersMarshaler handles data transformation between primitives and Google Protocol Buffers format
func (ProtocolBuffersMarshaler) ContentType ¶
func (p ProtocolBuffersMarshaler) ContentType() string
ContentType retrieves the encoding/decoding Google Protocol Buffers format using the latest conventions.
More information here: https://github.com/google/protorpc/commit/eb03145a6a7c72ae6cc43867d9635a5b8d8c4545
type Reader ¶
type Reader interface { // ExecuteTask starts a background stream-reading task. ExecuteTask(_ context.Context, _ ReaderTask) error }
Reader defines the underlying implementation of the stream-reading job (driver), which addresses the usage of custom protocols and/or APIs from providers (Apache Kafka, Amazon SQS, ...).
type ReaderBehaviour ¶
type ReaderBehaviour func(node *ReaderNode, hub *Hub, next ReaderHandleFunc) ReaderHandleFunc
ReaderBehaviour is a middleware function with extra functionality which will be executed prior a ReaderHandleFunc or Reader component for every stream-reading job instance registered into a Hub.
The middleware gets injected the context ReaderNode (the stream-reading job to be executed), the root Hub instance and the parent middleware function.
Moreover, there are built-in behaviours ready to be used with streams:
- Retry backoff
- Correlation and causation ID injection
- Consumer group injection
- Auto-unmarshalling (*only if using reflection-based stream registry or GoType was defined when registering stream)
- Logging*
- Metrics*
- Tracing*
*Manual specification on configuration required
type ReaderHandleFunc ¶
ReaderHandleFunc is the execution process triggered when a message is received from a stream.
Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.
type ReaderHandler ¶
type ReaderHandler interface { // Read starts the execution process triggered when a message is received from a stream. // // Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff. Read(context.Context, Message) error }
ReaderHandler is a wrapping structure of the ReadFunc handler for complex data processing scenarios.
type ReaderHandlerNoop ¶
type ReaderHandlerNoop struct{}
ReaderHandlerNoop the no-operation implementation of ReaderHandler
type ReaderNode ¶
type ReaderNode struct { Stream string HandlerFunc ReaderHandleFunc Group string ProviderConfiguration interface{} ConcurrencyLevel int RetryInitialInterval time.Duration RetryMaxInterval time.Duration RetryTimeout time.Duration Reader Reader MaxHandlerPoolSize int }
ReaderNode is the worker unit which schedules stream-reading job(s).
Each ReaderNode is independent of other nodes to guarantee resiliency of interleaved processes and avoid cascading failures.
func GetStreamReaderNodes ¶ added in v0.2.4
func GetStreamReaderNodes(stream string) []ReaderNode
GetStreamReaderNodes retrieves ReaderNode(s) from a stream.
type ReaderNodeOption ¶
type ReaderNodeOption interface {
// contains filtered or unexported methods
}
ReaderNodeOption enables configuration of a ReaderNode.
func WithConcurrencyLevel ¶
func WithConcurrencyLevel(n int) ReaderNodeOption
WithConcurrencyLevel sets the concurrency level of a ReaderNode. In other words, jobs to be scheduled by the ReaderNode.
Note: If level was defined less or equal than 0, the ReaderNode will schedule 1 job
func WithDriver ¶
func WithDriver(d Reader) ReaderNodeOption
WithDriver sets the driver of a ReaderNode (e.g. Apache Kafka, Apache Pulsar, Amazon SQS).
func WithGroup ¶
func WithGroup(g string) ReaderNodeOption
WithGroup sets the consumer group or queue name of a ReaderNode.
Note: It may not be available for some providers.
func WithHandler ¶
func WithHandler(l ReaderHandler) ReaderNodeOption
WithHandler sets the ReaderHandler of a ReaderNode.
func WithHandlerFunc ¶
func WithHandlerFunc(l ReaderHandleFunc) ReaderNodeOption
WithHandlerFunc sets the ReaderHandleFunc of a ReaderNode.
func WithMaxHandlerPoolSize ¶
func WithMaxHandlerPoolSize(n int) ReaderNodeOption
WithMaxHandlerPoolSize sets the maximum number of goroutines executed by a ReaderNode's Reader or ReaderFunc.
Note: If size was defined less or equal than 0, the ReaderNode internal implementations will allocate a semaphore of 10 goroutines per handler.
func WithProviderConfiguration ¶
func WithProviderConfiguration(cfg interface{}) ReaderNodeOption
WithProviderConfiguration sets the custom provider configuration of a ReaderNode (e.g. aws.Config, sarama.Config).
func WithRetryInitialInterval ¶
func WithRetryInitialInterval(d time.Duration) ReaderNodeOption
WithRetryInitialInterval sets the initial duration interval for each retying tasks of a ReaderNode.
func WithRetryMaxInterval ¶
func WithRetryMaxInterval(d time.Duration) ReaderNodeOption
WithRetryMaxInterval sets the maximum duration interval for each retying tasks of a ReaderNode.
func WithRetryTimeout ¶
func WithRetryTimeout(d time.Duration) ReaderNodeOption
WithRetryTimeout sets the maximum duration for retying tasks of a ReaderNode.
type ReaderTask ¶
type ReaderTask struct { Stream string HandlerFunc ReaderHandleFunc Group string Configuration interface{} Timeout time.Duration MaxHandlerPoolSize int }
ReaderTask job metadata in order to be executed by the ListenerNodeDriver.
type SchemaRegistry ¶
type SchemaRegistry interface { // GetSchemaDefinition retrieves a schema definition (in string format) from the registry GetSchemaDefinition(name string, version int) (string, error) }
SchemaRegistry is an external storage of stream message schemas definitions with proper versioning.
Examples of this schema registries are Amazon Glue Schema Registry and Confluent Schema Registry.
type StreamMetadata ¶
type StreamMetadata struct { // Stream destination stream name (aka. topic) Stream string // StreamVersion destination stream major version. Useful when non-backwards compatible schema update is desired. StreamVersion int // SchemaDefinitionName SchemaDefinitionName string SchemaVersion int GoType reflect2.Type }
StreamMetadata contains information of stream messages.
type StreamRegistry ¶
type StreamRegistry struct {
// contains filtered or unexported fields
}
StreamRegistry is an in-memory storage of streams metadata used by Hub and any external agent to set and retrieve information about a specific stream.
Uses a custom string (or Go's struct type as string) as key.
Note: A message key differs from stream name as the message key COULD be anything the developer sets within the stream registry. Thus, scenarios where multiple data types require publishing messages to the same stream are possible. Moreover, the message key is set by reflection-based registries with the reflect.TypeOf function, so it will differ from the actual stream name.
func NewStreamRegistry ¶ added in v0.2.5
func NewStreamRegistry() StreamRegistry
func (*StreamRegistry) Get ¶
func (r *StreamRegistry) Get(message interface{}) (StreamMetadata, error)
Get retrieves a stream message metadata from a stream message type.
func (*StreamRegistry) GetByStreamName ¶
func (r *StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)
GetByStreamName retrieves a stream message metadata from a stream name.
It contains an optimistic lookup mechanism to keep constant time complexity.
If metadata is not found by the given key, then fallback default to O(log n) lookup. This will increase time complexity of the fallback function by the GetByString base complexity. Nevertheless, GetByString will be always constant, so it is guaranteed to keep a constant complexity sum to the overall GetByStream complexity. E.g. GetByString = 49.75 ns/op, therefore GetByStreamName = original ns/op + GetByString ns/op.
This optimistic lookup is done in order to keep amortized time complexity when using non-reflection based implementations on the root Hub (using only String methods from this very Stream Registry component). Thus, greater performance is achieved for scenarios when reflection-based stream registration is not required by the program.
func (*StreamRegistry) GetByString ¶
func (r *StreamRegistry) GetByString(key string) (StreamMetadata, error)
GetByString retrieves a stream message metadata from a string key.
func (*StreamRegistry) Set ¶
func (r *StreamRegistry) Set(message interface{}, metadata StreamMetadata)
Set creates a relation between a stream message type and metadata.
func (*StreamRegistry) SetByString ¶
func (r *StreamRegistry) SetByString(key string, metadata StreamMetadata)
SetByString creates a relation between a string key and metadata.
type WriteByMessageKeyBatchItems ¶
type WriteByMessageKeyBatchItems map[string]interface{}
WriteByMessageKeyBatchItems items to be written as batch on the Hub.WriteByMessageKeyBatch() function
type Writer ¶
type Writer interface { // Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the // data to a set of subscribed systems for further processing. Write(ctx context.Context, message Message) error // WriteBatch inserts a set of messages into a stream assigned to the message in the StreamRegistry in order to propagate the // data to a set of subscribed systems for further processing. // // Depending on the underlying Writer driver implementation, this function MIGHT return an error if a single operation failed, // or it MIGHT return an error if the whole operation failed. In addition, this function will return the number of // successful messages written into streams. WriteBatch(ctx context.Context, messages ...Message) (uint32, error) }
Writer inserts messages into streams assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
This type should be provided by a streams Driver (e.g. Apache Pulsar, Apache Kafka, Amazon SNS)
var NoopWriter Writer = noopWriter{}
NoopWriter is the no-operation implementation of Writer