connect

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type IKafkaMessageListener

type IKafkaMessageListener interface {
	kafka.ConsumerGroupHandler
}

type KafkaConnection

type KafkaConnection struct {

	// The logger.
	Logger *clog.CompositeLogger
	// The connection resolver.
	ConnectionResolver *KafkaConnectionResolver
	// The configuration options.
	Options *cconf.ConfigParams
	// contains filtered or unexported fields
}

*

Kafka connection using plain driver.
By defining a connection and sharing it through multiple message queues
you can reduce number of used connections.

### Configuration parameters ###

- client_id: (optional) name of the client id - connection(s):

  • discovery_key: (optional) a key to retrieve the connection from IDiscovery
  • host: host name or IP address
  • port: port number (default: 27017)
  • uri: resource URI or connection string with all parameters in it

- credential(s):

  • store_key: (optional) a key to retrieve the credentials from ICredentialStore
  • username: user name
  • password: user password

- options:

  • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
  • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
  • max_retries: (optional) maximum retry attempts (default: 5)
  • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
  • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

### References ###

  • \*:logger:\*:\*:1.0 (optional) ILogger components to pass log messages
  • \*:discovery:\*:\*:1.0 (optional) IDiscovery services
  • \*:credential-store:\*:\*:1.0 (optional) Credential stores to resolve credentials

func NewKafkaConnection

func NewKafkaConnection() *KafkaConnection

NewKafkaConnection creates a new instance of the connection component.

func (*KafkaConnection) Close

func (c *KafkaConnection) Close(correlationId string) error

Closes component and frees used resources.

  • correlationId (optional) transaction id to trace execution through call chain.

Return error or nil no errors occured

func (*KafkaConnection) Configure

func (c *KafkaConnection) Configure(config *cconf.ConfigParams)

Configures component by passing configuration parameters.

  • config configuration parameters to be set.

func (*KafkaConnection) CreateQueue

func (c *KafkaConnection) CreateQueue() error

func (*KafkaConnection) DeleteQueue

func (c *KafkaConnection) DeleteQueue() error

func (*KafkaConnection) GetConnection

func (c *KafkaConnection) GetConnection() kafka.SyncProducer

func (*KafkaConnection) IsOpen

func (c *KafkaConnection) IsOpen() bool

Checks if the component is opened. Returns true if the component has been opened and false otherwise.

func (*KafkaConnection) Open

func (c *KafkaConnection) Open(correlationId string) error

Opens the component.

  • correlationId (optional) transaction id to trace execution through call chain.
  • Return error or nil no errors occured.

func (*KafkaConnection) Publish

func (c *KafkaConnection) Publish(topic string, messages []*kafka.ProducerMessage, config *kafka.Config) error

Publish a message to a specified topic

Parameters:

  • topic a topic name
  • messages messages to be published
  • config a producer config parameters

Returns: error or nil for success

func (*KafkaConnection) ReadQueueNames

func (c *KafkaConnection) ReadQueueNames() ([]string, error)

func (*KafkaConnection) SetReferences

func (c *KafkaConnection) SetReferences(references cref.IReferences)

Sets references to dependent components.

  • references references to locate the component dependencies.

func (*KafkaConnection) Subscribe

func (c *KafkaConnection) Subscribe(topic string, groupId string, config *kafka.Config, listener IKafkaMessageListener) error

Subscribe to a topic

Parameters:

  • topic a subject (topic) name
  • groupId (optional) a consumer group id
  • config consumer configuration parameters
  • listener a message listener

Returns: err or nil for success

func (*KafkaConnection) Unsubscribe

func (c *KafkaConnection) Unsubscribe(topic string, groupId string, listener IKafkaMessageListener) error

Unsubscribe from a previously subscribed topic topic

Parameters:

  • topic a topic name
  • groupId (optional) a consumer group id
  • listener a message listener

Returns: err or nil for success

type KafkaConnectionResolver

type KafkaConnectionResolver struct {
	// The connections resolver.
	ConnectionResolver *ccon.ConnectionResolver
	//The credentials resolver.
	CredentialResolver *cauth.CredentialResolver
}

KafkaConnectionResolver helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.

Configuration parameters:

- connection(s):

  • discovery_key: (optional) a key to retrieve the connection from IDiscovery
  • host: host name or IP address
  • port: port number
  • uri: resource URI or connection string with all parameters in it

- credential(s):

  • store_key: (optional) a key to retrieve the credentials from ICredentialStore

  • username: user name

  • password: user password

    References:

- *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections - *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

func NewKafkaConnectionResolver

func NewKafkaConnectionResolver() *KafkaConnectionResolver

func (*KafkaConnectionResolver) Compose

func (c *KafkaConnectionResolver) Compose(correlationId string, connections []*ccon.ConnectionParams, credential *cauth.CredentialParams) (*cconf.ConfigParams, error)

Compose method are composes Kafka connection options from connection and credential parameters. Parameters:

  • correlationId string (optional) transaction id to trace execution through call chain.
  • connection *ccon.ConnectionParams connection parameters
  • credential *cauth.CredentialParams credential parameters

Returns: options *cconf.ConfigParams, err error resolved options or error.

func (*KafkaConnectionResolver) Configure

func (c *KafkaConnectionResolver) Configure(config *cconf.ConfigParams)

Configure are configures component by passing configuration parameters. Parameters:

  • config *cconf.ConfigParams

configuration parameters to be set.

func (*KafkaConnectionResolver) Resolve

func (c *KafkaConnectionResolver) Resolve(correlationId string) (*cconf.ConfigParams, error)

Resolves Kafka connection options from connection and credential parameters. Parameters:

  • correlationId string (optional) transaction id to trace execution through call chain.

Returns options *cconf.ConfigParams, err error receives resolved options or error.

func (*KafkaConnectionResolver) SetReferences

func (c *KafkaConnectionResolver) SetReferences(references cref.IReferences)

SetReferences are sets references to dependent components. Parameters:

  • references cref.IReferences references to locate the component dependencies.

type KafkaMessage

type KafkaMessage struct {
	Message *kafka.ConsumerMessage
	Session kafka.ConsumerGroupSession
}

type KafkaSubscription

type KafkaSubscription struct {
	Topic    string
	GroupId  string
	Listener IKafkaMessageListener
	Handler  *kafka.ConsumerGroup
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL