broker

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2017 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsumerOffsetTopicName = "consumer_offsets"
)

Variables

View Source
var (
	ErrTopicAlreadyExist      = errors.New("ErrTopicAlreadyExist")
	ErrInvalidTopicName       = errors.New("ErrInvalidTopicName")
	ErrUnableToSelectReplicas = errors.New("ErrUnableToSelectReplicas")
	ErrTopicNotFound          = errors.New("ErrTopicNotFound")
	ErrPartitionNotFound      = errors.New("ErrPartitionNotFound")
	ErrNoPartitionSet         = errors.New("ErrNoPartitionSet")
	ErrNoControllerSet        = errors.New("ErrNoControllerSet")
	ErrNoLeaderFound          = errors.New("ErrNoLeaderFound")
	ErrNoConsumerFound        = errors.New("ErrNoConsumerFound")
)
View Source
var DefaultStateCheckInterval = 1 * time.Second
View Source
var (
	ErrNoKeySet = errors.New("ErrNoKeySet")
)
View Source
var (
	RedeliveryTimeout = 10 * time.Second
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	logy.Logger

	ShutdownCh chan struct{}
	// contains filtered or unexported fields
}

func New

func New(conf *Config) (*Broker, error)

func (*Broker) Acknowledge

func (b *Broker) Acknowledge(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offset sandflake.ID) (bool, error)

func (*Broker) AcknowledgeMessages

func (b *Broker) AcknowledgeMessages(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offsets []sandflake.ID) error

FIXME: share same code between AcknowledgeMessages a AcknowledgeMessage

func (*Broker) Bootstrap

func (b *Broker) Bootstrap() error

func (*Broker) Commit

func (b *Broker) Commit(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offset sandflake.ID) (bool, error)

func (*Broker) Conf

func (b *Broker) Conf() *Config

func (*Broker) Consume

func (b *Broker) Consume(ctx context.Context, topicName, partition, consumerGroup, consumerName string, fn func(msg *sgproto.Message) error) error

func (*Broker) CreateTopic

func (b *Broker) CreateTopic(ctx context.Context, params *sgproto.CreateTopicParams) error

func (*Broker) FetchFromSync

func (b *Broker) FetchFromSync(topicName, partition string, from []byte, fn func(msg *sgproto.Message) error) error

func (*Broker) FetchRange

func (b *Broker) FetchRange(ctx context.Context, req *sgproto.FetchRangeRequest, fn func(msg *sgproto.Message) error) error

func (*Broker) Get

func (b *Broker) Get(ctx context.Context, topicName string, partition string, key []byte) (*sgproto.Message, error)

func (*Broker) GetController

func (b *Broker) GetController() *sandglass.Node

func (*Broker) GetMarkStateMessage added in v0.1.5

func (b *Broker) GetMarkStateMessage(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offset sandflake.ID) (*sgproto.Message, error)

func (*Broker) GetTopic

func (b *Broker) GetTopic(name string) *topic.Topic

func (*Broker) HasKey

func (b *Broker) HasKey(ctx context.Context, topicName string, partition string, key, clusterKey []byte) (bool, error)

func (*Broker) IsController

func (b *Broker) IsController() bool

func (*Broker) Join

func (b *Broker) Join(clusterAddrs ...string) (err error)

func (*Broker) LastOffset

func (b *Broker) LastOffset(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, kind sgproto.MarkKind) (sandflake.ID, error)

func (*Broker) LaunchWatchers

func (b *Broker) LaunchWatchers() error

func (*Broker) MarkConsumed added in v0.1.5

func (b *Broker) MarkConsumed(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offset sandflake.ID) (bool, error)

func (*Broker) Members

func (b *Broker) Members() []*sandglass.Node

func (*Broker) Name

func (b *Broker) Name() string

func (*Broker) NotAcknowledge added in v0.1.7

func (b *Broker) NotAcknowledge(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offset sandflake.ID) (bool, error)

func (*Broker) Produce added in v0.1.7

func (*Broker) Stop

func (b *Broker) Stop(ctx context.Context) error

func (*Broker) Topics

func (b *Broker) Topics() []*topic.Topic

func (*Broker) TriggerSyncRequest

func (b *Broker) TriggerSyncRequest() error

func (*Broker) WaitForIt

func (b *Broker) WaitForIt() error

type Config

type Config struct {
	Name                    string      `yaml:"name,omitempty"`
	DCName                  string      `yaml:"dc_name,omitempty"`
	BindAddr                string      `yaml:"bind_addr,omitempty"`
	AdvertiseAddr           string      `yaml:"advertise_addr,omitempty"`
	DBPath                  string      `yaml:"db_path,omitempty"`
	GossipPort              string      `yaml:"gossip_port,omitempty"`
	HTTPPort                string      `yaml:"http_port,omitempty"`
	GRPCPort                string      `yaml:"grpc_port,omitempty"`
	RaftPort                string      `yaml:"raft_port,omitempty"`
	InitialPeers            []string    `yaml:"initial_peers,omitempty"`
	BootstrapRaft           bool        `yaml:"bootstrap_raft,omitempty"`
	LoggingLevel            *logy.Level `yaml:"-"`
	OffsetReplicationFactor int         `yaml:"-"`
}

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(b *Broker, topic, partition, name string) *ConsumerGroup

func (*ConsumerGroup) Consume

func (c *ConsumerGroup) Consume(consumerName string) (<-chan *sgproto.Message, chan<- struct{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL