Documentation ¶
Index ¶
- Constants
- Variables
- func Dynamic(opts *cli.Options) error
- func Lag(opts *cli.Options) error
- func Read(opts *cli.Options, md *desc.MessageDescriptor) error
- func Relay(opts *cli.Options, relayCh chan interface{}, shutdownCtx context.Context) (relay.IRelayBackend, error)
- func Write(opts *cli.Options, md *desc.MessageDescriptor) error
- type Kafka
- type KafkaLag
- func (kLag *KafkaLag) GetLastOffsetPerPartition(topic string, groupId string, part int, opts *cli.Options) (int64, error)
- func (kLag *KafkaLag) LagCalculationForConsumerGroup(groupId string, opts *cli.Options) error
- func (kLag *KafkaLag) LagCalculationPerPartition(topic string, groupId string, part int, opts *cli.Options) (int64, error)
- type KafkaReader
- type KafkaWriter
- type Relayer
Constants ¶
const (
DefaultBatchSize = 1
)
const (
RetryReadInterval = 5 * time.Second
)
Variables ¶
var (
ErrMissingTopic = errors.New("You must specify at least one topic")
)
Functions ¶
func Dynamic ¶ added in v0.28.0
Dynamic starts up a new GRPC client connected to the dProxy service and receives a stream of outbound replay messages which are then written to the message bus.
func Read ¶
func Read(opts *cli.Options, md *desc.MessageDescriptor) error
Read is the entry point function for performing read operations in Kafka.
This is where we verify that the provided arguments and flag combination makes sense/are valid; this is also where we will perform our initial conn.
func Relay ¶ added in v0.4.0
func Relay(opts *cli.Options, relayCh chan interface{}, shutdownCtx context.Context) (relay.IRelayBackend, error)
Relay sets up a new Kafka relayer
func Write ¶
func Write(opts *cli.Options, md *desc.MessageDescriptor) error
Write is the entry point function for performing write operations in Kafka.
This is where we verify that the passed args and flags combo makes sense, attempt to establish a connection, parse protobuf before finally attempting to perform the write.
Types ¶
type Kafka ¶
type Kafka struct { Id string Reader *skafka.Reader Writer *skafka.Writer Options *cli.Options MsgDesc *desc.MessageDescriptor // contains filtered or unexported fields }
Kafka holds all attributes required for performing a write to Kafka. This struct should be instantiated via the kafka.Read(..) func.
type KafkaLag ¶ added in v0.31.0
type KafkaLag struct {
// contains filtered or unexported fields
}
func NewKafkaLagConnection ¶ added in v0.31.0
func (*KafkaLag) GetLastOffsetPerPartition ¶ added in v0.31.0
func (*KafkaLag) LagCalculationForConsumerGroup ¶ added in v0.31.0
calculate lag with a given connection