Documentation ¶
Index ¶
- Variables
- type ConsumerDetails
- type MessageParser
- type RunnableTarget
- type Target
- type TargetDiscoverer
- type TargetManager
- type TargetSyncer
- func (ts *TargetSyncer) ActiveTargets() []target.Target
- func (c *TargetSyncer) Cleanup(sarama.ConsumerGroupSession) error
- func (c *TargetSyncer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (ts *TargetSyncer) DroppedTargets() []target.Target
- func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error)
- func (c *TargetSyncer) Setup(_ sarama.ConsumerGroupSession) error
- func (ts *TargetSyncer) Stop() error
- type TargetSyncerConfig
- type TopicManager
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
var TopicPollInterval = 30 * time.Second
Functions ¶
This section is empty.
Types ¶
type ConsumerDetails ¶
type ConsumerDetails struct { // MemberID returns the cluster member ID. MemberID string // GenerationID returns the current generation ID. GenerationID int32 Topic string Partition int32 InitialOffset int64 }
func (ConsumerDetails) String ¶
func (c ConsumerDetails) String() string
type MessageParser ¶
type MessageParser interface {
Parse(message *sarama.ConsumerMessage, labels model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]api.Entry, error)
}
MessageParser defines parsing for each incoming message
type RunnableTarget ¶
type Target ¶
type Target struct {
// contains filtered or unexported fields
}
func NewTarget ¶
func NewTarget( logger log.Logger, session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, discoveredLabels, lbs model.LabelSet, relabelConfig []*relabel.Config, client api.EntryHandler, useIncomingTimestamp bool, messageParser MessageParser, ) *Target
func (*Target) Details ¶
func (t *Target) Details() interface{}
Details returns target-specific details.
func (*Target) DiscoveredLabels ¶
func (*Target) Type ¶
func (t *Target) Type() target.TargetType
type TargetDiscoverer ¶
type TargetDiscoverer interface {
NewTarget(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)
}
type TargetManager ¶
type TargetManager struct {
// contains filtered or unexported fields
}
TargetManager manages a series of kafka targets.
func NewTargetManager ¶
func NewTargetManager( reg prometheus.Registerer, logger log.Logger, pushClient api.EntryHandler, scrapeConfigs []scrapeconfig.Config, ) (*TargetManager, error)
NewTargetManager creates a new Kafka managers.
func (*TargetManager) ActiveTargets ¶
func (tm *TargetManager) ActiveTargets() map[string][]target.Target
func (*TargetManager) AllTargets ¶
func (tm *TargetManager) AllTargets() map[string][]target.Target
func (*TargetManager) Ready ¶
func (tm *TargetManager) Ready() bool
Ready returns true if at least one Kafka target is active.
func (*TargetManager) Stop ¶
func (tm *TargetManager) Stop()
type TargetSyncer ¶
type TargetSyncer struct {
// contains filtered or unexported fields
}
func NewSyncer ¶
func NewSyncer(ctx context.Context, reg prometheus.Registerer, logger log.Logger, pushClient api.EntryHandler, pipeline *stages.Pipeline, group sarama.ConsumerGroup, client sarama.Client, messageParser MessageParser, topics []string, cfg *TargetSyncerConfig, ) (*TargetSyncer, error)
NewSyncer creates TargetSyncer
func NewSyncerFromScrapeConfig ¶
func NewSyncerFromScrapeConfig( reg prometheus.Registerer, logger log.Logger, cfg scrapeconfig.Config, pushClient api.EntryHandler, ) (*TargetSyncer, error)
NewSyncerFromScrapeConfig creates TargetSyncer from scrape config
func (*TargetSyncer) ActiveTargets ¶
func (ts *TargetSyncer) ActiveTargets() []target.Target
ActiveTargets returns active targets from its consumer
func (*TargetSyncer) Cleanup ¶
func (c *TargetSyncer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*TargetSyncer) ConsumeClaim ¶
func (c *TargetSyncer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim creates a target for the given received claim and start reading message from it.
func (*TargetSyncer) DroppedTargets ¶
func (ts *TargetSyncer) DroppedTargets() []target.Target
DroppedTargets returns dropped targets from its consumer
func (*TargetSyncer) NewTarget ¶
func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error)
NewTarget creates a new targets based on the current kafka claim and group session.
func (*TargetSyncer) Setup ¶
func (c *TargetSyncer) Setup(_ sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*TargetSyncer) Stop ¶
func (ts *TargetSyncer) Stop() error
type TargetSyncerConfig ¶
type TargetSyncerConfig struct { RelabelConfigs []*relabel.Config UseIncomingTimestamp bool Labels model.LabelSet GroupID string }
TargetSyncerConfig contains specific TargetSyncer configuration. It allows to make the TargetSyncer creation independent from the scrape config structure.
type TopicManager ¶
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
XDGSCRAMClient implements sarama.SCRAMClient
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool