kafka

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 16 Imported by: 0

README

Kafka

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

Kafka的基本概念

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。

  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

Docker部署开发环境

docker pull bitnami/kafka:latest
docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka-exporter:latest

docker run -itd \
    --name zookeeper-test \
    -p 2181:2181 \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest

docker run -itd \
    --name kafka-standalone \
    --link zookeeper-test \
    -p 9092:9092 \
    -v /home/data/kafka:/bitnami/kafka \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    --user root \
    bitnami/kafka:latest

管理工具

参考资料

Documentation

Index

Constants

View Source
const (
	KindKafka kratosTransport.Kind = "kafka"
)

Variables

This section is empty.

Functions

func LogDebug

func LogDebug(args ...interface{})

func LogDebugf

func LogDebugf(format string, args ...interface{})

func LogError

func LogError(args ...interface{})

func LogErrorf

func LogErrorf(format string, args ...interface{})

func LogFatal

func LogFatal(args ...interface{})

func LogFatalf

func LogFatalf(format string, args ...interface{})

func LogInfo

func LogInfo(args ...interface{})

func LogInfof

func LogInfof(format string, args ...interface{})

func LogWarn

func LogWarn(args ...interface{})

func LogWarnf

func LogWarnf(format string, args ...interface{})

func RegisterSubscriber

func RegisterSubscriber[T any](srv *Server, ctx context.Context, topic, queue string, disableAutoAck bool, handler func(context.Context, string, broker.Headers, *T) error, opts ...broker.SubscribeOption) error

Types

type Server

type Server struct {
	broker.Broker

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) Name

func (s *Server) Name() string

func (*Server) RegisterSubscriber

func (s *Server) RegisterSubscriber(ctx context.Context, topic, queue string, disableAutoAck bool, handler broker.Handler, binder broker.Binder, opts ...broker.SubscribeOption) error

RegisterSubscriber 注册一个订阅者 @param ctx 上下文 @param topic 订阅的主题 @param queue 订阅的分组 @param handler 订阅者的处理函数

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

type ServerOption

type ServerOption func(o *Server)

func WithAddress

func WithAddress(addrs []string) ServerOption

WithAddress MQ代理地址

func WithBrokerOptions

func WithBrokerOptions(opts ...broker.Option) ServerOption

WithBrokerOptions MQ代理配置

func WithCodec

func WithCodec(c string) ServerOption

WithCodec 编解码器

func WithEnableKeepAlive

func WithEnableKeepAlive(enable bool) ServerOption

WithEnableKeepAlive enable keep alive

func WithGlobalPropagator

func WithGlobalPropagator() ServerOption

WithGlobalPropagator 注入全局的链路追踪器的Propagator

func WithGlobalTracerProvider

func WithGlobalTracerProvider() ServerOption

WithGlobalTracerProvider 注入全局的链路追踪器的Provider

func WithMiddleware

func WithMiddleware(m ...broker.MiddlewareFunc) ServerOption

WithMiddleware 注入中间件

func WithPlainMechanism

func WithPlainMechanism(username, password string) ServerOption

WithPlainMechanism PLAIN认证信息

func WithPropagator

func WithPropagator(propagators propagation.TextMapPropagator) ServerOption

WithPropagator 注入链路追踪器的Propagator

func WithScramMechanism

func WithScramMechanism(algo string, username, password string) ServerOption

WithScramMechanism SCRAM认证信息

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

WithTLSConfig TLS配置

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider, tracerName string) ServerOption

WithTracerProvider 注入链路追踪器的Provider

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a Kafka transport.

func (*Transport) Endpoint

func (tr *Transport) Endpoint() string

Endpoint returns the transport endpoint.

func (*Transport) Kind

func (tr *Transport) Kind() kratosTransport.Kind

Kind returns the transport kind.

func (*Transport) NodeFilters

func (tr *Transport) NodeFilters() []selector.NodeFilter

NodeFilters returns the client select filters.

func (*Transport) Operation

func (tr *Transport) Operation() string

Operation returns the transport operation.

func (*Transport) ReplyHeader

func (tr *Transport) ReplyHeader() kratosTransport.Header

ReplyHeader returns the reply header.

func (*Transport) RequestHeader

func (tr *Transport) RequestHeader() kratosTransport.Header

RequestHeader returns the request header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL