source

package
v0.19.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

README

Apache Kafka - Source

The Apache Kafka Event source enables Knative Eventing integration with Apache Kafka. When a message is produced to Apache Kafka, the Apache Kafka Event Source will consume the produced message and post that message to the corresponding event sink.

Deployment steps

  1. Setup Knative Eventing

  2. If not done already, install an Apache Kafka cluster!

    • For Kubernetes a simple installation is done using the Strimzi Kafka Operator. Its installation guides provide content for Kubernetes and Openshift.

    Note: The KafkaSource is not limited to Apache Kafka installations on Kubernetes. It is also possible to use an off-cluster Apache Kafka installation.

  3. Now that Apache Kafka is installed, apply the KafkaSource config:

    ko apply -f config/
    
  4. Create the KafkaSource custom objects, by configuring the required consumerGroup, bootstrapServers and topics values on the CR file of your source. Below is an example:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: optional-consumer-group
      # Broker URL. Replace this with the URLs for your kafka cluster,
      # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
      bootstrapServers:
        - REPLACE_WITH_CLUSTER_URL
      topics:
        - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    

Example

A more detailed example of the KafkaSource can be found in the Knative documentation.

Documentation

Overview

Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }

SHA256 hash generator function for SCRAM conversation

View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

SHA512 hash generator function for SCRAM conversation

Functions

func MakeAdminClient added in v0.19.1

func MakeAdminClient(clientID string, kafkaAuthCfg *utils.KafkaAuthConfig, bootstrapServers []string) (sarama.ClusterAdmin, error)

func NewConfig

func NewConfig(ctx context.Context) ([]string, *sarama.Config, error)

NewConfig extracts the Kafka configuration from the environment.

func NewProducer

func NewProducer(ctx context.Context) (sarama.Client, error)

NewProducer is a helper method for constructing a client for producing kafka methods.

func NewTLSConfig added in v0.19.1

func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error)

NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key, and CA certificate. If none are appropriate, a nil *tls.Config is returned.

func UpdateSaramaConfigWithKafkaAuthConfig added in v0.19.1

func UpdateSaramaConfigWithKafkaAuthConfig(saramaConf *sarama.Config, kafkaAuthCfg *utils.KafkaAuthConfig) error

Types

type AdapterNet

type AdapterNet struct {
	SASL AdapterSASL
	TLS  AdapterTLS
}

type AdapterSASL

type AdapterSASL struct {
	Enable   bool   `envconfig:"KAFKA_NET_SASL_ENABLE" required:"false"`
	User     string `envconfig:"KAFKA_NET_SASL_USER" required:"false"`
	Password string `envconfig:"KAFKA_NET_SASL_PASSWORD" required:"false"`
	Type     string `envconfig:"KAFKA_NET_SASL_TYPE" required:"false"`
}

type AdapterTLS

type AdapterTLS struct {
	Enable bool   `envconfig:"KAFKA_NET_TLS_ENABLE" required:"false"`
	Cert   string `envconfig:"KAFKA_NET_TLS_CERT" required:"false"`
	Key    string `envconfig:"KAFKA_NET_TLS_KEY" required:"false"`
	CACert string `envconfig:"KAFKA_NET_TLS_CA_CERT" required:"false"`
}

type XDGSCRAMClient added in v0.19.1

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient struct to perform SCRAM conversation

func (*XDGSCRAMClient) Begin added in v0.19.1

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin starts SCRAM conversation

func (*XDGSCRAMClient) Done added in v0.19.1

func (x *XDGSCRAMClient) Done() bool

Done completes SCRAM conversation

func (*XDGSCRAMClient) Step added in v0.19.1

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step performs step in SCRAM conversation

Directories

Path Synopsis
reconciler
source
Package source implements the KafkaSource controller.
Package source implements the KafkaSource controller.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL