Documentation ¶
Index ¶
- Variables
- func NewManualHashPartitioner(topic string) sarama.Partitioner
- func NewPlugin() (*pkgplugin.Plugin, error)
- func NewPluginVersion(name string, version string, commitID string) (*pkgplugin.Plugin, error)
- func NewReceiver(tid tenant.Id, plugin string, name string, config interface{}, ...) (receiver.Receiver, error)
- func NewSender(tid tenant.Id, plugin string, name string, config interface{}, ...) (sender.Sender, error)
- type DynamicMetricLabel
- type DynamicMetricValue
- type ManualHashPartitioner
- type Producer
- type Receiver
- func (r *Receiver) Cleanup(session sarama.ConsumerGroupSession) error
- func (r *Receiver) Close()
- func (r *Receiver) Config() interface{}
- func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (r *Receiver) Count() int
- func (r *Receiver) Name() string
- func (r *Receiver) Plugin() string
- func (r *Receiver) Receive(next receiver.NextFn) error
- func (r *Receiver) Setup(session sarama.ConsumerGroupSession) error
- func (r *Receiver) Start(handler func(*sarama.ConsumerMessage) bool)
- func (r *Receiver) StopReceiving(ctx context.Context) error
- func (r *Receiver) Tenant() tenant.Id
- func (r *Receiver) Trigger(e event.Event)
- type ReceiverConfig
- type Sender
- func (s *Sender) Config() interface{}
- func (s *Sender) Count() int
- func (s *Sender) Name() string
- func (s *Sender) NewProducer(count int) (*Producer, error)
- func (s *Sender) NewSyncProducers(count int) ([]sarama.SyncProducer, sarama.Client, error)
- func (s *Sender) Plugin() string
- func (s *Sender) Send(e event.Event)
- func (s *Sender) StopSending(ctx context.Context)
- func (s *Sender) Tenant() tenant.Id
- func (s *Sender) Unwrap() sender.Sender
- type SenderConfig
- type SenderMetrics
Constants ¶
This section is empty.
Variables ¶
View Source
var ( Name = "kafka" Version = "v0.0.0" CommitID = "" )
View Source
var DefaultReceiverConfig = ReceiverConfig{ Brokers: "localhost:9092", Topic: "quickstart-events", GroupId: "", Username: "", Password: "", CACert: "", AccessCert: "", AccessKey: "", Version: "", CommitInterval: pointer.Int(1), ChannelBufferSize: pointer.Int(0), TracePayloadOnNack: pointer.Bool(false), }
View Source
var DefaultSenderConfig = SenderConfig{ Brokers: "localhost:9092", Topic: "quickstart-events", Partition: pointer.Int(-1), ChannelBufferSize: pointer.Int(0), Username: "", Password: "", CACert: "", AccessCert: "", AccessKey: "", Version: "", SenderPoolSize: pointer.Int(1), PartitionPath: "", DynamicMetricLabels: make([]DynamicMetricLabel, 0), }
Functions ¶
func NewManualHashPartitioner ¶
func NewManualHashPartitioner(topic string) sarama.Partitioner
func NewPluginVersion ¶
func NewReceiver ¶
Types ¶
type DynamicMetricLabel ¶ added in v0.4.0
type DynamicMetricValue ¶ added in v0.4.0
type ManualHashPartitioner ¶
type ManualHashPartitioner struct {
sarama.Partitioner
}
func (*ManualHashPartitioner) Partition ¶
func (mp *ManualHashPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error)
type Receiver ¶
type Receiver struct { sync.Mutex sarama.ConsumerGroupSession // contains filtered or unexported fields }
func (*Receiver) Cleanup ¶
func (r *Receiver) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Receiver) ConsumeClaim ¶
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type ReceiverConfig ¶
type ReceiverConfig struct { Brokers string `json:"brokers,omitempty"` Topic string `json:"topic,omitempty"` GroupId string `json:"groupId,omitempty"` Username string `json:"username,omitempty"` // yaml Password string `json:"password,omitempty"` CACert string `json:"caCert,omitempty"` AccessCert string `json:"accessCert,omitempty"` AccessKey string `json:"accessKey,omitempty"` Version string `json:"version,omitempty"` CommitInterval *int `json:"commitInterval,omitempty"` ChannelBufferSize *int `json:"channelBufferSize,omitempty"` ConsumeByPartitions bool `json:"consumeByPartitions,omitempty"` TLSEnable bool `json:"tlsEnable,omitempty"` TracePayloadOnNack *bool `json:"tracePayloadOnNack,omitempty"` }
func (*ReceiverConfig) Validate ¶
func (rc *ReceiverConfig) Validate() error
Validate returns an error upon validation failure
func (*ReceiverConfig) WithDefaults ¶
func (rc *ReceiverConfig) WithDefaults() ReceiverConfig
WithDefaults returns a new config object that has all of the unset (nil) values filled in.
type SenderConfig ¶
type SenderConfig struct { Brokers string `json:"brokers,omitempty"` Topic string `json:"topic,omitempty"` Partition *int `json:"partition,omitempty"` PartitionPath string `json:"partitionPath,omitempty"` // if path is set, look up partition from event rather than using the hard coded partition id Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` CACert string `json:"caCert,omitempty"` AccessCert string `json:"accessCert,omitempty"` AccessKey string `json:"accessKey,omitempty"` Version string `json:"version,omitempty"` ChannelBufferSize *int `json:"channelBufferSize,omitempty"` TLSEnable bool `json:"tlsEnable,omitempty"` SenderPoolSize *int `json:"senderPoolSize,omitempty"` DynamicMetricLabels []DynamicMetricLabel `json:"dynamicMetricLabel,omitempty"` CompressionMethod string `json:"compressionMethod,omitempty"` CompressionLevel *int `json:"compressionLevel,omitempty"` }
SenderConfig can be passed into NewSender() in order to configure the behavior of the sender.
type SenderMetrics ¶ added in v0.4.0
type SenderMetrics struct {
// contains filtered or unexported fields
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.