Documentation ¶
Overview ¶
Package kafka encapsulated from github.com/IBM/sarama Producer sending through trpc.Client Implement Consumer logic through trpc.Service
Package kafka security verification class
Index ¶
- Constants
- Variables
- func BatchConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)
- func IsCWCError(err error) bool
- func KafkaConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)
- func NewClientTransport(opt ...transport.ClientTransportOption) transport.ClientTransport
- func NewServerTransport(opt ...transport.ServerTransportOption) transport.ServerTransport
- func RegisterAddrConfig(address string, cfg *UserConfig)
- func RegisterBatchHandlerService(s server.Service, ...)
- func RegisterKafkaConsumerService(s server.Service, svr KafkaConsumer)
- func RegisterKafkaHandlerService(s server.Service, ...)
- type BatchConsumer
- type Client
- type ClientCodec
- type ClientConversation
- type ClientTransport
- func (ct *ClientTransport) AsyncProduce(producer sarama.AsyncProducer)
- func (ct *ClientTransport) GetAsyncProducer(address string, timeout time.Duration) (*Producer, error)
- func (ct *ClientTransport) GetProducer(address string, timeout time.Duration) (*Producer, error)
- func (ct *ClientTransport) RoundTrip(ctx context.Context, _ []byte, callOpts ...transport.RoundTripOption) ([]byte, error)
- type Config
- type KafkaConsumer
- type LSCRAMClient
- type LogReWriter
- type Plugin
- type Producer
- type RateLimitConfig
- type RawSaramaContext
- type Request
- type Response
- type ServerCodec
- type ServerTransport
- type UserConfig
Constants ¶
const DefaultClientID = "trpcgo"
DefaultClientID default client id
const SASLTypeSSL = "SASL_SSL"
SASLTypeSSL represents the SASL_SSL security protocol.
Variables ¶
var ( DefaultServerCodec = &ServerCodec{} DefaultClientCodec = &ClientCodec{} )
default codec
var AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) { log.Errorf("asyncProduce failed. topic:%s, key:%s, value:%s. err:%v", topic, key, value, err) }
AsyncProducerErrorCallback asynchronous production failure callback, the default implementation only prints the error log, the user can rewrite the callback function to achieve sending error capture.
var AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {}
AsyncProducerSuccCallback asynchronous production success callback, no processing is done by default, the user can rewrite the callback function to achieve sending success capture.
var BatchConsumerServiceDesc = server.ServiceDesc{ ServiceName: "trpc.kafka.consumer.service", HandlerType: ((*BatchConsumer)(nil)), Methods: []server.Method{ { Name: "/trpc.kafka.consumer.service/handle", Func: BatchConsumerHandle, }, }, }
BatchConsumerServiceDesc descriptor for server.RegisterService
var ContinueWithoutCommitError = &errs.Error{ Type: errs.ErrorTypeBusiness, Code: errs.RetUnknown, Msg: "Error:Continue to consume message without committing ack", }
ContinueWithoutCommitError whether to continue to consume messages without commit ack Scenes: When producing a message, a message body may exceed the limit of Kafka, so the original message body will be split into multiple byte packets, encapsulated into a Kafka message body, and delivered. Then, when consuming messages, you need to wait for all subcontracted messages to be consumed before starting business logic processing. When the consumer's Handle method or msg.ServerRspErr returns this error, It means that you want to continue to consume messages without commit ack and not treat them as errors.
var DefaultClientTransport = NewClientTransport()
DefaultClientTransport default client kafka transport
var DefaultServerTransport = NewServerTransport()
DefaultServerTransport ServerTransport default implement
var KafkaConsumerServiceDesc = server.ServiceDesc{ ServiceName: "trpc.kafka.consumer.service", HandlerType: ((*KafkaConsumer)(nil)), Methods: []server.Method{{ Name: "/trpc.kafka.consumer.service/handle", Func: KafkaConsumerHandle, }}, }
KafkaConsumerServiceDesc descriptor
var NewClientProxy = func(name string, opts ...client.Option) Client { c := &kafkaCli{ ServiceName: name, Client: client.DefaultClient, } c.opts = make([]client.Option, 0, len(opts)+2) c.opts = append(c.opts, client.WithProtocol("kafka"), client.WithDisableServiceRouter()) c.opts = append(c.opts, opts...) return c }
NewClientProxy create a new kafka backend request proxy. The required parameter kafka service name: trpc.kafka.producer.service
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
SHA256 hash protocol
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
SHA512 hash protocol
var Timeout = 2 * time.Second
Timeout is the global timeout configuration of Kafka, the default is 2s, and users can modify it if necessary.
Functions ¶
func BatchConsumerHandle ¶
func BatchConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)
BatchConsumerHandle batch consumer service handler wrapper
func IsCWCError ¶
IsCWCError Check if it is a ContinueWithoutCommitError error CWC:Continue Without Commit
func KafkaConsumerHandle ¶
func KafkaConsumerHandle(svr interface{}, ctx context.Context, f server.FilterFunc) (interface{}, error)
KafkaConsumerHandle consumer service handler wrapper
func NewClientTransport ¶
func NewClientTransport(opt ...transport.ClientTransportOption) transport.ClientTransport
NewClientTransport build kafka transport
func NewServerTransport ¶
func NewServerTransport(opt ...transport.ServerTransportOption) transport.ServerTransport
NewServerTransport build serverTransport
func RegisterAddrConfig ¶
func RegisterAddrConfig(address string, cfg *UserConfig)
RegisterAddrConfig register user-defined information, address is the corresponding address in the configuration file
func RegisterBatchHandlerService ¶
func RegisterBatchHandlerService( s server.Service, handle func(ctx context.Context, msgArray []*sarama.ConsumerMessage) error, )
RegisterBatchHandlerService register consumer function
func RegisterKafkaConsumerService ¶
func RegisterKafkaConsumerService(s server.Service, svr KafkaConsumer)
RegisterKafkaConsumerService register service
func RegisterKafkaHandlerService ¶
func RegisterKafkaHandlerService(s server.Service, handle func(ctx context.Context, msg *sarama.ConsumerMessage) error, )
RegisterKafkaHandlerService register handle
Types ¶
type BatchConsumer ¶
type BatchConsumer interface { // Handle callback function when a message is received Handle(ctx context.Context, msgArray []*sarama.ConsumerMessage) error }
BatchConsumer batch consumer
type Client ¶
type Client interface { Produce(ctx context.Context, key, value []byte, headers ...sarama.RecordHeader) error SendMessage(ctx context.Context, topic string, key, value []byte, headers ...sarama.RecordHeader) (partition int32, offset int64, err error) AsyncSendMessage(ctx context.Context, topic string, key, value []byte, headers ...sarama.RecordHeader) (err error) // SendSaramaMessage produce sarama native messages directly SendSaramaMessage(ctx context.Context, sMsg sarama.ProducerMessage) (partition int32, offset int64, err error) }
Client kafka interface
type ClientCodec ¶
type ClientCodec struct{}
ClientCodec decode kafka client requests
type ClientConversation ¶
type ClientConversation interface { Step(challenge string) (response string, err error) Done() bool }
ClientConversation implements the client-side of an authentication conversation with a server. go:generate mockgen -destination=./mockkafka/scram_mock.go -package=mockkafka . ClientConversation
type ClientTransport ¶
type ClientTransport struct {
// contains filtered or unexported fields
}
ClientTransport implements the trpc.ClientTransport interface and encapsulate the producer
func (*ClientTransport) AsyncProduce ¶
func (ct *ClientTransport) AsyncProduce(producer sarama.AsyncProducer)
AsyncProduce produce and process captured messages asynchronously
func (*ClientTransport) GetAsyncProducer ¶
func (ct *ClientTransport) GetAsyncProducer(address string, timeout time.Duration) (*Producer, error)
GetAsyncProducer get an asynchronous producer and start an asynchronous coroutine to process production data and messages
func (*ClientTransport) GetProducer ¶
GetProducer get producer logic
func (*ClientTransport) RoundTrip ¶
func (ct *ClientTransport) RoundTrip( ctx context.Context, _ []byte, callOpts ...transport.RoundTripOption, ) ([]byte, error)
RoundTrip send and receive kafka packets, return the kafka response and put it in ctx, there is no need to return rspbuf
type Config ¶
type Config struct { MaxRequestSize int32 `yaml:"max_request_size"` // global maximum request body size MaxResponseSize int32 `yaml:"max_response_size"` // global maximum response body size RewriteLog bool `yaml:"rewrite_log"` // whether to rewrite logs to log }
Config is kafka proxy configuration
type KafkaConsumer ¶
type KafkaConsumer interface {
Handle(ctx context.Context, msg *sarama.ConsumerMessage) error
}
KafkaConsumer consumer interface
type LSCRAMClient ¶
type LSCRAMClient struct { *scram.Client // client ClientConversation // client session layer scram.HashGeneratorFcn // hash value generating function User string // user Password string // password Mechanism string // encryption protocol type Protocol string // encryption protocol }
LSCRAMClient scram authentication client configuration
func (*LSCRAMClient) Begin ¶
func (s *LSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin SCRAM authentication start interface
func (*LSCRAMClient) Done ¶
func (s *LSCRAMClient) Done() bool
Done SCRAM authentication end interface
type LogReWriter ¶
type LogReWriter struct{}
LogReWriter redirect log
func (LogReWriter) Printf ¶
func (LogReWriter) Printf(format string, v ...interface{})
Printf sarama.Logger interface
func (LogReWriter) Println ¶
func (LogReWriter) Println(v ...interface{})
Println sarama.Logger interface
type Plugin ¶
type Plugin struct{}
Plugin the default initialization of the plugin is used to load the kafka proxy connection parameter configuration
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer encapsulation Producer information
type RateLimitConfig ¶
type RateLimitConfig struct { Rate float64 // token production rate Burst int // token bucket capacity }
RateLimitConfig is limit config
type RawSaramaContext ¶
type RawSaramaContext struct { Session sarama.ConsumerGroupSession Claim sarama.ConsumerGroupClaim }
RawSaramaContext set sarama ConsumerGroupSession and ConsumerGroupClaim This structure is exported for the convenience of users to implement monitoring, the content provided is only for reading, calling any write method is an undefined behavior
func GetRawSaramaContext ¶
func GetRawSaramaContext(ctx context.Context) (*RawSaramaContext, bool)
GetRawSaramaContext get sarama raw context information, include ConsumerGroupSession and ConsumerGroupClaim The retrieved context should only use read methods, using any write methods is undefined behavior
type Request ¶
type Request struct { Topic string Key []byte Value []byte Async bool // to produce asynchronously or not Partition int32 Headers []sarama.RecordHeader // Deprecated: use Message.Headers instead Message sarama.ProducerMessage }
Request kafka request body
type ServerCodec ¶
type ServerCodec struct{}
ServerCodec Server codec
type ServerTransport ¶
type ServerTransport struct {
// contains filtered or unexported fields
}
ServerTransport kafka consumer transport
func (*ServerTransport) ListenAndServe ¶
func (s *ServerTransport) ListenAndServe(ctx context.Context, opts ...transport.ListenServeOption) (err error)
ListenAndServe start the listener, and return an error if the listener fails
type UserConfig ¶
type UserConfig struct { Brokers []string // cluster address Topics []string // for consumers Topic string // for producers Group string // consumer group Async int // Whether to produce asynchronously, 0 is synchronous and 1 is asynchronous ClientID string // Client ID Compression sarama.CompressionCodec Version sarama.KafkaVersion Strategy sarama.BalanceStrategy Partitioner func(topic string) sarama.Partitioner Initial int64 // The location where the new consumer group first connects to the cluster consumer FetchDefault int FetchMax int MaxWaitTime time.Duration RequiredAcks sarama.RequiredAcks ReturnSuccesses bool Timeout time.Duration // When producing asynchronously MaxMessageBytes int // the maximum number of bytes in the local cache queue FlushMessages int // the maximum number of messages sent by the local cache broker FlushMaxMessages int // the maximum number of messages in the local cache queue FlushBytes int // the maximum number of bytes sent by the local cache broker FlushFrequency time.Duration // In asynchronous production, the maximum time sent by the local cache broker BatchConsumeCount int // Maximum number of messages for batch consumption BatchFlush time.Duration // Batch consumption takes effect ScramClient *LSCRAMClient // LSCRAM safety certification // The maximum number of retries on failure, // the default is 0: retry all the time <0 means no retry MaxRetry int NetMaxOpenRequests int // Maximum number of requests MaxProcessingTime time.Duration NetDailTimeout time.Duration NetReadTimeout time.Duration NetWriteTimeout time.Duration GroupSessionTimeout time.Duration GroupRebalanceTimeout time.Duration GroupRebalanceRetryMax int MetadataRetryMax int MetadataRetryBackoff time.Duration MetadataRefreshFrequency time.Duration MetadataFull bool MetadataAllowAutoTopicCreation bool IsolationLevel sarama.IsolationLevel RetryInterval time.Duration // Retry Interval Works with MaxRetry ProducerRetry struct { Max int // Maximum number of retries RetryInterval time.Duration // RetryInterval retry interval } TrpcMeta bool Idempotent bool // If enabled, the producer will ensure that exactly one copy of each message is written. RateLimitConfig *RateLimitConfig // token bucket limit configuration // contains filtered or unexported fields }
UserConfig configuration parsed from address
func GetDefaultConfig ¶
func GetDefaultConfig() *UserConfig
GetDefaultConfig Get the default configuration
func ParseAddress ¶
func ParseAddress(address string) (*UserConfig, error)
ParseAddress address format ip1:port1,ip2:port2?clientid=xx&topics=topic1,topic2&group=xxx&compression=gzip
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
batchconsumer
Package main is the main package.
|
Package main is the main package. |
consumer
Package main is the main package.
|
Package main is the main package. |
consumer_with_mulit_service
Package main is the main package.
|
Package main is the main package. |
producer
Package main is the main package.
|
Package main is the main package. |
producer_ex
Package main is the main package.
|
Package main is the main package. |
Package mockkafka is a generated GoMock package.
|
Package mockkafka is a generated GoMock package. |