Documentation ¶
Index ¶
- type IKafkaMessageListener
- type KafkaConnection
- func (c *KafkaConnection) Close(correlationId string) error
- func (c *KafkaConnection) Configure(config *cconf.ConfigParams)
- func (c *KafkaConnection) CreateQueue() error
- func (c *KafkaConnection) DeleteQueue() error
- func (c *KafkaConnection) GetConnection() kafka.SyncProducer
- func (c *KafkaConnection) IsOpen() bool
- func (c *KafkaConnection) Open(correlationId string) error
- func (c *KafkaConnection) Publish(topic string, messages []*kafka.ProducerMessage, config *kafka.Config) error
- func (c *KafkaConnection) ReadQueueNames() ([]string, error)
- func (c *KafkaConnection) SetReferences(references cref.IReferences)
- func (c *KafkaConnection) Subscribe(topic string, groupId string, config *kafka.Config, ...) error
- func (c *KafkaConnection) Unsubscribe(topic string, groupId string, listener IKafkaMessageListener) error
- type KafkaConnectionResolver
- func (c *KafkaConnectionResolver) Compose(correlationId string, connections []*ccon.ConnectionParams, ...) (*cconf.ConfigParams, error)
- func (c *KafkaConnectionResolver) Configure(config *cconf.ConfigParams)
- func (c *KafkaConnectionResolver) Resolve(correlationId string) (*cconf.ConfigParams, error)
- func (c *KafkaConnectionResolver) SetReferences(references cref.IReferences)
- type KafkaMessage
- type KafkaSubscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type IKafkaMessageListener ¶
type IKafkaMessageListener interface { kafka.ConsumerGroupHandler }
type KafkaConnection ¶
type KafkaConnection struct { // The logger. Logger *clog.CompositeLogger // The connection resolver. ConnectionResolver *KafkaConnectionResolver // The configuration options. Options *cconf.ConfigParams // contains filtered or unexported fields }
*
Kafka connection using plain driver. By defining a connection and sharing it through multiple message queues you can reduce number of used connections. ### Configuration parameters ###
- client_id: (optional) name of the client id - connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number (default: 27017)
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: user name
- password: user password
- options:
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
### References ###
- \*:logger:\*:\*:1.0 (optional) ILogger components to pass log messages
- \*:discovery:\*:\*:1.0 (optional) IDiscovery services
- \*:credential-store:\*:\*:1.0 (optional) Credential stores to resolve credentials
func NewKafkaConnection ¶
func NewKafkaConnection() *KafkaConnection
NewKafkaConnection creates a new instance of the connection component.
func (*KafkaConnection) Close ¶
func (c *KafkaConnection) Close(correlationId string) error
Closes component and frees used resources.
- correlationId (optional) transaction id to trace execution through call chain.
Return error or nil no errors occured
func (*KafkaConnection) Configure ¶
func (c *KafkaConnection) Configure(config *cconf.ConfigParams)
Configures component by passing configuration parameters.
- config configuration parameters to be set.
func (*KafkaConnection) CreateQueue ¶
func (c *KafkaConnection) CreateQueue() error
func (*KafkaConnection) DeleteQueue ¶
func (c *KafkaConnection) DeleteQueue() error
func (*KafkaConnection) GetConnection ¶
func (c *KafkaConnection) GetConnection() kafka.SyncProducer
func (*KafkaConnection) IsOpen ¶
func (c *KafkaConnection) IsOpen() bool
Checks if the component is opened. Returns true if the component has been opened and false otherwise.
func (*KafkaConnection) Open ¶
func (c *KafkaConnection) Open(correlationId string) error
Opens the component.
- correlationId (optional) transaction id to trace execution through call chain.
- Return error or nil no errors occured.
func (*KafkaConnection) Publish ¶
func (c *KafkaConnection) Publish(topic string, messages []*kafka.ProducerMessage, config *kafka.Config) error
Publish a message to a specified topic
Parameters:
- topic a topic name
- messages messages to be published
- config a producer config parameters
Returns: error or nil for success
func (*KafkaConnection) ReadQueueNames ¶
func (c *KafkaConnection) ReadQueueNames() ([]string, error)
func (*KafkaConnection) SetReferences ¶
func (c *KafkaConnection) SetReferences(references cref.IReferences)
Sets references to dependent components.
- references references to locate the component dependencies.
func (*KafkaConnection) Subscribe ¶
func (c *KafkaConnection) Subscribe(topic string, groupId string, config *kafka.Config, listener IKafkaMessageListener) error
Subscribe to a topic
Parameters:
- topic a subject (topic) name
- groupId (optional) a consumer group id
- config consumer configuration parameters
- listener a message listener
Returns: err or nil for success
func (*KafkaConnection) Unsubscribe ¶
func (c *KafkaConnection) Unsubscribe(topic string, groupId string, listener IKafkaMessageListener) error
Unsubscribe from a previously subscribed topic topic
Parameters:
- topic a topic name
- groupId (optional) a consumer group id
- listener a message listener
Returns: err or nil for success
type KafkaConnectionResolver ¶
type KafkaConnectionResolver struct { // The connections resolver. ConnectionResolver *ccon.ConnectionResolver //The credentials resolver. CredentialResolver *cauth.CredentialResolver }
KafkaConnectionResolver helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.
Configuration parameters:
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
store_key: (optional) a key to retrieve the credentials from ICredentialStore
username: user name
password: user password
References:
- *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections - *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
func NewKafkaConnectionResolver ¶
func NewKafkaConnectionResolver() *KafkaConnectionResolver
func (*KafkaConnectionResolver) Compose ¶
func (c *KafkaConnectionResolver) Compose(correlationId string, connections []*ccon.ConnectionParams, credential *cauth.CredentialParams) (*cconf.ConfigParams, error)
Compose method are composes Kafka connection options from connection and credential parameters. Parameters:
- correlationId string (optional) transaction id to trace execution through call chain.
- connection *ccon.ConnectionParams connection parameters
- credential *cauth.CredentialParams credential parameters
Returns: options *cconf.ConfigParams, err error resolved options or error.
func (*KafkaConnectionResolver) Configure ¶
func (c *KafkaConnectionResolver) Configure(config *cconf.ConfigParams)
Configure are configures component by passing configuration parameters. Parameters:
- config *cconf.ConfigParams
configuration parameters to be set.
func (*KafkaConnectionResolver) Resolve ¶
func (c *KafkaConnectionResolver) Resolve(correlationId string) (*cconf.ConfigParams, error)
Resolves Kafka connection options from connection and credential parameters. Parameters:
- correlationId string (optional) transaction id to trace execution through call chain.
Returns options *cconf.ConfigParams, err error receives resolved options or error.
func (*KafkaConnectionResolver) SetReferences ¶
func (c *KafkaConnectionResolver) SetReferences(references cref.IReferences)
SetReferences are sets references to dependent components. Parameters:
- references cref.IReferences references to locate the component dependencies.
type KafkaMessage ¶
type KafkaMessage struct { Message *kafka.ConsumerMessage Session kafka.ConsumerGroupSession }
type KafkaSubscription ¶
type KafkaSubscription struct { Topic string GroupId string Listener IKafkaMessageListener Handler *kafka.ConsumerGroup }