kafka

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kafka https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go

Index

Constants

This section is empty.

Variables

View Source
var (
	// SHA256 todo
	SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
	// SHA512 todo
	SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
)
View Source
var (
	// ErrNoBroker todo
	ErrNoBroker = errors.New("connect kafka error, no broker here")
)

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher kafka broker

func NewPublisher

func NewPublisher(conf *PublisherConfig) (*Publisher, error)

NewPublisher kafka broker

func (*Publisher) Connect

func (b *Publisher) Connect() error

Connect 连接

func (*Publisher) Debug

func (b *Publisher) Debug(l logger.Logger)

Debug 日志

func (*Publisher) Disconnect

func (b *Publisher) Disconnect() error

Disconnect 端口连接

func (*Publisher) Pub

func (b *Publisher) Pub(topic string, e *event.Event) error

Pub 发布事件

type PublisherConfig

type PublisherConfig struct {
	BulkMaxSize        int           `json:"bulk_max_size" yaml:"bulk_max_size" toml:"bulk_max_size" env:"BUS_KAFKA_PUBLISHER_BULK_MAX_SIZE"`
	PublishTimeout     time.Duration `json:"publish_timeout" yaml:"publish_timeout" toml:"publish_timeout" env:"BUS_KAFKA_PUBLISHER_TIMEOUT"`
	BulkFlushFrequency time.Duration `` /* 130-byte string literal not displayed */
	Partitioner        string        `json:"partitioner" yaml:"partitioner" toml:"partitioner" env:"BUS_KAFKA_PUBLISHER_PARTITIONER"`
	Compression        string        `json:"compression" yaml:"compression" toml:"compression" env:"BUS_KAFKA_PUBLISHER_COMPRESSION"`
	CompressionLevel   int           `json:"compression_level" yaml:"compression_level" toml:"compression_level" env:"BUS_KAFKA_PUBLISHER_COMPRESSION_LEVEL"`
	MaxRetries         int           `json:"max_retries" yaml:"max_retries" toml:"max_retries" env:"BUS_KAFKA_PUBLISHER_MAX_RETRIES"`
	MaxMessageBytes    *int          `json:"max_message_bytes" yaml:"max_message_bytes" toml:"max_message_bytes" env:"BUS_KAFKA_PUBLISHER_MAX_MESSAGE_BYTES"`
	RequiredACKs       *int          `json:"required_acks" yaml:"required_acks" toml:"required_acks" env:"BUS_KAFKA_PUBLISHER_REQUIRED_ACKS"`
	// contains filtered or unexported fields
}

PublisherConfig todo

func DefaultPublisherConfig

func DefaultPublisherConfig() *PublisherConfig

DefaultPublisherConfig 默认配置

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

Validate 校验配置

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber kafka broker

func NewSubscriber

func NewSubscriber(conf *SubscriberConfig) (*Subscriber, error)

NewSubscriber kafka broker

func (*Subscriber) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Subscriber) Connect

func (s *Subscriber) Connect() error

Connect 连接

func (*Subscriber) ConsumeClaim

func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Subscriber) Disconnect

func (s *Subscriber) Disconnect() error

Disconnect 端口连接

func (*Subscriber) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Subscriber) Sub

func (s *Subscriber) Sub(topic string, h bus.EventHandler) error

Sub 订阅事件

type SubscriberConfig

type SubscriberConfig struct {
	GroupID         string `json:"group_id" yaml:"group_id" toml:"group_id" env:"BUS_KAFKA_SUBSCRIBER_GROUP_ID"`
	Offset          string `json:"offset" yaml:"offset" toml:"offset" env:"BUS_KAFKA_SUBSCRIBER_OFFSET"`
	BalanceStrategy string `json:"balance_strategy" yaml:"balance_strategy" toml:"balance_strategy" env:"BUS_KAFKA_SUBSCRIBER_BALANCE_STRATEGY"`
	// contains filtered or unexported fields
}

SubscriberConfig todo

func DefaultSubscriberConfig

func DefaultSubscriberConfig() *SubscriberConfig

DefaultSubscriberConfig 默认配置

func (*SubscriberConfig) Validate

func (s *SubscriberConfig) Validate() error

Validate 校验配置

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient todo

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin todo

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done todo

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step todo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL