sg

package
v0.0.0-...-506dd38 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2018 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithAddresses

func WithAddresses(addrs ...string) optionFn

func WithGRPCClientConn

func WithGRPCClientConn(conn *grpc.ClientConn) optionFn

func WithProducerBatchSize

func WithProducerBatchSize(batchSize int) func(*ProducerConfig)

func WithProducerFlushInterval

func WithProducerFlushInterval(interval time.Duration) func(*ProducerConfig)

func WithProducerPartition

func WithProducerPartition(partition string) func(*ProducerConfig)

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

func NewAsyncProducer

func NewAsyncProducer(client *Client, topic, partition string, opts ...ProducerOption) *AsyncProducer

func (*AsyncProducer) Close

func (p *AsyncProducer) Close() error

func (*AsyncProducer) Produce

func (p *AsyncProducer) Produce(ctx context.Context, msg *ProducerMessage) error

func (*AsyncProducer) ProduceMessages

func (p *AsyncProducer) ProduceMessages(ctx context.Context, msgs []*ProducerMessage) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...optionFn) (c *Client, err error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) CreateTopic

func (c *Client) CreateTopic(ctx context.Context, params *sgproto.TopicConfig) error

func (*Client) ListPartitions

func (c *Client) ListPartitions(ctx context.Context, topic string) ([]string, error)

func (*Client) NewConsumer

func (c *Client) NewConsumer(topic, partition, group, name string) *Consumer

func (*Client) ProduceMessage

func (c *Client) ProduceMessage(ctx context.Context, topic, partition string, msg *sgproto.Message) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Acknowledge

func (c *Consumer) Acknowledge(ctx context.Context, msgs ...*sgproto.Message) error

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) (chan *sgproto.Message, error)

func (*Consumer) NotAcknowledge

func (c *Consumer) NotAcknowledge(ctx context.Context, msgs ...*sgproto.Message) error

type Handler

type Handler interface {
	Consume(msg *sgproto.Message) error
}

type HandlerFunc

type HandlerFunc func(msg *sgproto.Message) error

func (HandlerFunc) Consume

func (h HandlerFunc) Consume(msg *sgproto.Message) error

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

func NewMux

func NewMux() *Mux

func (*Mux) Subscribe

func (m *Mux) Subscribe(topic string, h Handler)

func (*Mux) SubscribeFunc

func (m *Mux) SubscribeFunc(topic string, h func(msg *sgproto.Message) error)

type MuxManager

type MuxManager struct {
	Mux                  *Mux
	Client               *Client
	ConsumerGroup        string
	ConsumerName         string
	SynchronousCompution bool
	ReFetchSleepDuration time.Duration
	// contains filtered or unexported fields
}

func (*MuxManager) Shutdown

func (m *MuxManager) Shutdown(ctx context.Context) error

func (*MuxManager) Start

func (m *MuxManager) Start() error

type Producer

type Producer interface {
	Produce(ctx context.Context, msg *ProducerMessage) error
	ProduceMessages(ctx context.Context, msgs []*ProducerMessage) error
	Close() error
}

type ProducerConfig

type ProducerConfig struct {
	// contains filtered or unexported fields
}

type ProducerMessage

type ProducerMessage struct {
	ConsumeIn time.Duration // TODO: waiting for server to handle this without manually setting an offset
	Value     []byte
}

type ProducerOption

type ProducerOption func(*ProducerConfig)

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(client *Client, topic string, opts ...ProducerOption) *SyncProducer

func (*SyncProducer) Close

func (p *SyncProducer) Close() error

func (*SyncProducer) Produce

func (p *SyncProducer) Produce(ctx context.Context, msg *ProducerMessage) error

func (*SyncProducer) ProduceMessages

func (p *SyncProducer) ProduceMessages(ctx context.Context, msgs []*ProducerMessage) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL