kafka

package
v1.0.0-beta.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2021 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Example
package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync/atomic"
	"time"

	"github.com/Shopify/sarama"
	"github.com/boxgo/box/pkg/client/kafka"
)

const (
	testTopic = "wechat_event"
)

func main() {
	kfk := kafka.StdConfig("default").Build()

	producer, err := kfk.NewSyncProducer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := producer.Close(); err != nil {
			panic(err)
		}
	}()

	consumer, err := kfk.NewConsumer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			panic(err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition(testTopic, 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var cnt int32

	go func() {
		for {
			select {
			case <-partitionConsumer.Messages():
				atomic.AddInt32(&cnt, 1)
			case <-signals:
				break
			}
		}
	}()

	partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
		Topic: testTopic,
		Value: sarama.StringEncoder("hi"),
	})
	if err != nil {
		panic(err)
	}

	time.Sleep(time.Second)

	fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0)
}
Output:

true true true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncSyncProducer

type AsyncSyncProducer sarama.AsyncProducer

type Config

type Config struct {
	Addrs    []string       `config:"addrs"`
	Net      Net            `config:"net"`
	Metadata Metadata       `config:"metadata"`
	Producer ProducerConfig `config:"producer"`
	Consumer ConsumerConfig `config:"consumer"`
	// contains filtered or unexported fields
}

Config 配置

func DefaultConfig

func DefaultConfig(key string) *Config

DefaultConfig 默认配置

func StdConfig

func StdConfig(key string, optionFunc ...OptionFunc) *Config

StdConfig 标准配置

func (*Config) Build

func (c *Config) Build() *Kafka

Build 构建实例

func (*Config) Path

func (c *Config) Path() string

Path 实例配置目录

type Consumer

type Consumer sarama.Consumer

type ConsumerConfig

type ConsumerConfig struct {
	GroupSessionTimeout        time.Duration                   `config:"groupSessionTimeout"`
	GroupHeartbeatInterval     time.Duration                   `config:"groupHeartbeatInterval"`
	GroupRebalanceStrategy     sarama.BalanceStrategy          `config:"groupRebalanceStrategy"`
	GroupRebalanceTimeout      time.Duration                   `config:"groupRebalanceTimeout"`
	GroupRebalanceRetryMax     int                             `config:"groupRebalanceRetryMax"`
	GroupRebalanceRetryBackoff time.Duration                   `config:"groupRebalanceRetryBackoff"`
	GroupMemberUserData        []byte                          `config:"groupMemberUserData"`
	RetryBackoff               time.Duration                   `config:"retryBackoff"`
	RetryBackoffFunc           func(retries int) time.Duration `config:"-"`
	FetchMin                   int32                           `config:"fetchMin"`
	FetchMax                   int32                           `config:"fetchMax"`
	FetchDefault               int32                           `config:"fetchDefault"`
	MaxWaitTime                time.Duration                   `config:"maxWaitTime"`
	MaxProcessingTime          time.Duration                   `config:"maxProcessingTime"`
	ReturnErrors               bool                            `config:"returnErrors"`
	OffsetsCommitInterval      time.Duration                   `config:"offsetsCommitInterval"`
	OffsetsInitial             int64                           `config:"offsetsInitial"`
	OffsetsRetention           time.Duration                   `config:"offsetsRetention"`
	OffsetRetryMax             int                             `config:"offsetRetryMax"`
	OffsetAutoCommitEnable     bool                            `config:"offsetAutoCommitEnable"`
	OffsetAutoCommitInterval   time.Duration                   `config:"offsetAutoCommitInterval"`
	IsolationLevel             sarama.IsolationLevel           `config:"isolationLevel"`
	Interceptors               []sarama.ConsumerInterceptor    `config:"-"`
}

type ConsumerGroup

type ConsumerGroup sarama.ConsumerGroup

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func (Kafka) NewAsyncProducer

func (kfk Kafka) NewAsyncProducer() (AsyncSyncProducer, error)

func (Kafka) NewConsumer

func (kfk Kafka) NewConsumer() (Consumer, error)

func (Kafka) NewConsumerGroup

func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error)

func (Kafka) NewSyncProducer

func (kfk Kafka) NewSyncProducer() (SyncProducer, error)

type Metadata

type Metadata struct {
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"`
	RefreshFrequency time.Duration                               `config:"refreshFrequency"`
	Full             bool                                        `config:"full"`
	Timeout          time.Duration                               `config:"timeout"`
}

type Net

type Net struct {
	MaxOpenRequests int           `config:"maxOpenRequests"`
	DialTimeout     time.Duration `config:"dialTimeout"`
	ReadTimeout     time.Duration `config:"readTimeout"`
	WriteTimeout    time.Duration `config:"writeTimeout"`
	KeepAlive       time.Duration `config:"keepAlive"`
}

type OptionFunc

type OptionFunc func(*Config)

OptionFunc 选项信息

type ProducerConfig

type ProducerConfig struct {
	MaxMessageBytes  int                                         `config:"maxMessageBytes"`
	RequiredAcks     sarama.RequiredAcks                         `config:"requiredAcks"`
	Timeout          time.Duration                               `config:"timeout"`
	Compression      sarama.CompressionCodec                     `config:"compression"`
	CompressionLevel int                                         `config:"compressionLevel"`
	Partitioner      sarama.PartitionerConstructor               `config:"-"`
	Idempotent       bool                                        `config:"idempotent"`
	ReturnSuccesses  bool                                        `config:"returnSuccesses"`
	ReturnErrors     bool                                        `config:"returnErrors"`
	FlushBytes       int                                         `config:"flushBytes"`
	FlushMessages    int                                         `config:"flushMessages"`
	FlushFrequency   time.Duration                               `config:"flushFrequency"`
	FlushMaxMessages int                                         `config:"FlushMaxMessages"`
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"`
	Interceptors     []sarama.ProducerInterceptor                `config:"-"`
}

type SyncProducer

type SyncProducer sarama.SyncProducer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL