Documentation
¶
Index ¶
- type IKafkaMessageListener
- type KafkaConnection
- func (c *KafkaConnection) Close(ctx context.Context, correlationId string) error
- func (c *KafkaConnection) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *KafkaConnection) CreateQueue(name string) error
- func (c *KafkaConnection) DeleteQueue(name string) error
- func (c *KafkaConnection) GetConnection() kafka.SyncProducer
- func (c *KafkaConnection) IsOpen() bool
- func (c *KafkaConnection) Open(ctx context.Context, correlationId string) error
- func (c *KafkaConnection) Publish(ctx context.Context, topic string, messages []*kafka.ProducerMessage) error
- func (c *KafkaConnection) ReadQueueNames() ([]string, error)
- func (c *KafkaConnection) SetReferences(ctx context.Context, references cref.IReferences)
- func (c *KafkaConnection) Subscribe(ctx context.Context, topic string, groupId string, config *kafka.Config, ...) error
- func (c *KafkaConnection) Unsubscribe(ctx context.Context, topic string, groupId string, ...) error
- type KafkaConnectionResolver
- func (c *KafkaConnectionResolver) Compose(correlationId string, connections []*ccon.ConnectionParams, ...) (*cconf.ConfigParams, error)
- func (c *KafkaConnectionResolver) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *KafkaConnectionResolver) Resolve(correlationId string) (*cconf.ConfigParams, error)
- func (c *KafkaConnectionResolver) SetReferences(ctx context.Context, references cref.IReferences)
- type KafkaMessage
- type KafkaSubscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type IKafkaMessageListener ¶
type IKafkaMessageListener interface { // Setup is run at the beginning of a new session, before ConsumeClaim. Setup(kafka.ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. Cleanup(kafka.ConsumerGroupSession) error // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. ConsumeClaim(kafka.ConsumerGroupSession, kafka.ConsumerGroupClaim) error // channel that recive signal that consummer is already start Ready() chan bool // set new channel for send ready signal SetReady(chFlag chan bool) }
type KafkaConnection ¶
type KafkaConnection struct { // The logger. Logger *clog.CompositeLogger // The connection resolver. ConnectionResolver *KafkaConnectionResolver // The configuration options. Options *cconf.ConfigParams // contains filtered or unexported fields }
Kafka connection using plain driver. By defining a connection and sharing it through multiple message queues you can reduce number of used connections.
### Configuration parameters ###
- client_id: (optional) name of the client id
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number (default: 27017)
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: user name
- password: user password
- options:
- acks (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
- num_partitions: (optional) number of partitions of the created topic (default: 1)
- replication_factor: (optional) kafka replication factor of the topic (default: 1)
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
### References ###
- *:logger:*:*:1.0 (optional) ILogger components to pass log messages
- *:discovery:*:*:1.0 (optional) IDiscovery services
- *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
func NewKafkaConnection ¶
func NewKafkaConnection() *KafkaConnection
NewKafkaConnection creates a new instance of the connection component.
func (*KafkaConnection) Close ¶
func (c *KafkaConnection) Close(ctx context.Context, correlationId string) error
Closes component and frees used resources. Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain.
Return error or nil no errors occured
func (*KafkaConnection) Configure ¶
func (c *KafkaConnection) Configure(ctx context.Context, config *cconf.ConfigParams)
Configures component by passing configuration parameters. Parameters: - ctx context.Context operation context - config configuration parameters to be set.
func (*KafkaConnection) CreateQueue ¶
func (c *KafkaConnection) CreateQueue(name string) error
Creates a message queue. If connection doesn't support this function it exists without error. Parameters:
- name string the name of the queue to be created.
Returns: the name of the queue to be created.
func (*KafkaConnection) DeleteQueue ¶
func (c *KafkaConnection) DeleteQueue(name string) error
Deletes a message queue. If connection doesn't support this function it exists without error. Parameters:
- name string the name of the queue to be deleted.
func (*KafkaConnection) GetConnection ¶
func (c *KafkaConnection) GetConnection() kafka.SyncProducer
Returns connection object
func (*KafkaConnection) IsOpen ¶
func (c *KafkaConnection) IsOpen() bool
Checks if the component is opened. Returns: true if the component has been opened and false otherwise.
func (*KafkaConnection) Open ¶
func (c *KafkaConnection) Open(ctx context.Context, correlationId string) error
Opens the component. Parameters: - ctx context.Context operation context - correlationId (optional) transaction id to trace execution through call chain. - Return error or nil no errors occured.
func (*KafkaConnection) Publish ¶
func (c *KafkaConnection) Publish(ctx context.Context, topic string, messages []*kafka.ProducerMessage) error
Publish a message to a specified topic
Parameters:
- ctx context.Context operation context
- topic a topic name
- messages messages to be published
Returns: error or nil for success
func (*KafkaConnection) ReadQueueNames ¶
func (c *KafkaConnection) ReadQueueNames() ([]string, error)
Reads a list of registered queue names. If connection doesn't support this function returnes an empty list. Returns queue names.
func (*KafkaConnection) SetReferences ¶
func (c *KafkaConnection) SetReferences(ctx context.Context, references cref.IReferences)
Sets references to dependent components. Parameters: - ctx context.Context operation context - references references to locate the component dependencies.
func (*KafkaConnection) Subscribe ¶
func (c *KafkaConnection) Subscribe(ctx context.Context, topic string, groupId string, config *kafka.Config, listener IKafkaMessageListener) error
Subscribe to a topic
Parameters:
- ctx context.Context operation context
- topic a subject (topic) name
- groupId (optional) a consumer group id
- config consumer configuration parameters
- listener a message listener
Returns: err or nil for success
func (*KafkaConnection) Unsubscribe ¶
func (c *KafkaConnection) Unsubscribe(ctx context.Context, topic string, groupId string, listener IKafkaMessageListener) error
Unsubscribe from a previously subscribed topic topic
Parameters:
- ctx context.Context operation context
- topic a topic name
- groupId (optional) a consumer group id
- listener a message listener
Returns: err or nil for success
type KafkaConnectionResolver ¶
type KafkaConnectionResolver struct { // The connections resolver. ConnectionResolver *ccon.ConnectionResolver // The credentials resolver. CredentialResolver *cauth.CredentialResolver }
KafkaConnectionResolver helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.
Configuration parameters:
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: user name
- password: user password
References:
- *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections
- *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
func NewKafkaConnectionResolver ¶
func NewKafkaConnectionResolver() *KafkaConnectionResolver
func (*KafkaConnectionResolver) Compose ¶
func (c *KafkaConnectionResolver) Compose(correlationId string, connections []*ccon.ConnectionParams, credential *cauth.CredentialParams) (*cconf.ConfigParams, error)
Compose method are composes Kafka connection options from connection and credential parameters. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
- connection *ccon.ConnectionParams connection parameters
- credential *cauth.CredentialParams credential parameters
Returns: options *cconf.ConfigParams, err error resolved options or error.
func (*KafkaConnectionResolver) Configure ¶
func (c *KafkaConnectionResolver) Configure(ctx context.Context, config *cconf.ConfigParams)
Configure are configures component by passing configuration parameters. Parameters:
- ctx context.Context operation context
- config *cconf.ConfigParams
configuration parameters to be set.
func (*KafkaConnectionResolver) Resolve ¶
func (c *KafkaConnectionResolver) Resolve(correlationId string) (*cconf.ConfigParams, error)
Resolves Kafka connection options from connection and credential parameters. Parameters:
- ctx context.Context operation context
- correlationId string (optional) transaction id to trace execution through call chain.
Returns options *cconf.ConfigParams, err error receives resolved options or error.
func (*KafkaConnectionResolver) SetReferences ¶
func (c *KafkaConnectionResolver) SetReferences(ctx context.Context, references cref.IReferences)
SetReferences are sets references to dependent components.
Parameters: - ctx context.Context operation context - references cref.IReferences references to locate the component dependencies.
type KafkaMessage ¶
type KafkaMessage struct { // Kafka consummer message Message *kafka.ConsumerMessage // Counsummer session Session kafka.ConsumerGroupSession }
Kafka message structure
type KafkaSubscription ¶
type KafkaSubscription struct { Topic string GroupId string Listener IKafkaMessageListener Handler *kafka.ConsumerGroup }
Subscription structure