kafka

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

README

Kubemq Kafka Source Connector

Kubemq kafka target connector allows services using kubemq server to store messages on kafka specific topics.

Prerequisites

The following are required to run the redis target connector:

  • kubemq cluster
  • kafka server
  • kubemq-targets deployment

Configuration

Kafka source connector configuration properties:

Properties Key Required Description Example
brokers yes kafka brokers connection, comma separated "localhost:9092"
topic yes kafka stored topic "TestTopic"
sasl_username no SASL based authentication with broker "user"
sasl_password no SASL based authentication with broker "pass"
sasl_mechanism no SASL Mechanism SCRAM-SHA-256, SCRAM-SHA-512, plain, 0Auth bearer, or GSS-API
security_protocol no Set connection security protocol plaintext, SASL-plaintext, SASL-SSL, SSL
ca_cert no SSL CA certificate pem certificate value
client_certificate no SSL Client certificate (mMTL) pem certificate value
client_key no SSL Client Key (mTLS) pem key value
insecure no SSL Insecure (Self signed) true / false
Example:
bindings:
  - name: kubemq-query-kafka
    source:
      kind: kubemq.query
      name: kubemq-query
      properties:
        address: "kubemq-cluster:50000"
        client_id: "kubemq-query-kafka-connector"
        auth_token: ""
        channel: "query.kafka"
        group:   ""
        auto_reconnect: "true"
        reconnect_interval_seconds: "1"
        max_reconnects: "0"
    target:
      kind: messaging.kafka
      name: kafka-stream
      properties:
        brokers: "localhost:9092"
        topic: "TestTopic"
        sasl_username: "test"
        sasl_password: "pass"

Usage

Get Request

Get request metadata setting:

Metadata Key Required Description Possible values
key yes kafka message key base64 "a2V5"
headers no kafka message headers Key Value base64 [{"Key": "ZG9n","Value": "bWV0YTE="}]

Example:

{
  "metadata": {
    "key": "a2V5",
    "headers": [{"Key": "ZG9n","Value": "bWV0YTE="}]
  },
  "data": null
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Connector

func Connector() *common.Connector

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New() *Client

func (*Client) Connector

func (c *Client) Connector() *common.Connector

func (*Client) Do

func (c *Client) Do(ctx context.Context, request *types.Request) (*types.Response, error)

func (*Client) Init

func (c *Client) Init(ctx context.Context, cfg config.Spec, log *logger.Logger) error

func (*Client) Stop

func (c *Client) Stop() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL