Documentation
¶
Overview ¶
Example ¶
if os.Getenv("KAFKA_ADDR") == "" { fmt.Println("set KAFKA_ADDR to run this example") return } brokers := strings.Split(os.Getenv("KAFKA_ADDR"), ",") conf := map[string]interface{}{ "log": map[string]interface{}{ "level": "none", }, "kafka": map[string]interface{}{ "default": map[string]interface{}{ "seed_brokers": brokers, "default_produce_topic": "example", "topics": []string{"example"}, "group": "test", }, }, } c := core.Default(core.WithConfigStack(confmap.Provider(conf, "."), nil)) c.Provide(otfranz.Providers()) c.Invoke(func(cli *kgo.Client) { record := &kgo.Record{Value: []byte("bar")} cli.Produce(context.Background(), record, nil) }) c.Invoke(func(cli *kgo.Client) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() fetches := cli.PollFetches(ctx) if errs := fetches.Errors(); len(errs) > 0 { panic(errs) } iter := fetches.RecordIter() if iter.Done() { return } rec := iter.Next() fmt.Println(string(rec.Value)) })
Output: bar
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FranzLogAdapter ¶
FranzLogAdapter return an log adapter bridging kitlog and kgo.Logger.
func Providers ¶
func Providers(optionFunc ...ProvidersOptionFunc) di.Deps
Providers is a set of dependencies including Factory,Maker and exported configs.
Depends On: opentracing.Tracer contract.ConfigUnmarshaler log.Logger contract.Dispatcher Provide: Factory Maker
Types ¶
type Client ¶
Client is a decorator around *kgo.Client that provides tracing capabilities.
func (*Client) ProduceSyncWithTracing ¶
ProduceSyncWithTracing wrap ProduceSync method with tracing.
type Config ¶
type Config struct { ID string `json:"id" yaml:"id"` // client ID DialFn func(context.Context, string, string) (net.Conn, error) `json:"-" yaml:"-"` RequestTimeoutOverhead time.Duration `json:"request_timeout_overhead" yaml:"request_timeout_overhead"` ConnIdleTimeout time.Duration `json:"conn_idle_timeout" yaml:"conn_idle_timeout"` SoftwareName string `json:"software_name" yaml:"software_name"` // KIP-511 SoftwareVersion string `json:"software_version" yaml:"software_version"` // KIP-511 Logger kgo.Logger `json:"-" yaml:"-"` SeedBrokers []string `json:"seed_brokers" yaml:"seed_brokers" mapstructure:"seed_brokers"` MaxVersions *kversion.Versions `json:"-" yaml:"-"` MinVersions *kversion.Versions `json:"-" yaml:"-"` RetryBackoff func(int) time.Duration `json:"-" yaml:"-"` Retries int `json:"retries" yaml:"retries"` RetryTimeout time.Duration `json:"retry_timeout" yaml:"retry_timeout"` MaxBrokerWriteBytes int32 `json:"max_broker_write_bytes" yaml:"max_broker_write_bytes"` MaxBrokerReadBytes int32 `json:"max_broker_read_bytes" yaml:"max_broker_read_bytes"` AllowAutoTopicCreation bool `json:"allow_auto_topic_creation" yaml:"allow_auto_topic_creation"` MetadataMaxAge time.Duration `json:"metadata_max_age" yaml:"metadata_max_age"` MetadataMinAge time.Duration `json:"metadata_min_age" yaml:"metadata_min_age"` Sasls []sasl.Mechanism `json:"-" yaml:"-"` Hooks []kgo.Hook `json:"-" yaml:"-"` TxnID string `json:"txn_id" yaml:"txn_id"` TxnTimeout time.Duration `json:"txn_timeout" yaml:"txn_timeout"` Acks int16 `json:"acks" yaml:"acks"` DisableIdempotency bool `json:"disable_idempotency" yaml:"disable_idempotency"` Compression []kgo.CompressionCodec `json:"-" yaml:"-"` // order of preference DefaultProduceTopic string `json:"default_produce_topic" yaml:"default_produce_topic"` MaxRecordBatchBytes int32 `json:"max_record_batch_bytes" yaml:"max_record_batch_bytes"` MaxBufferedRecords int `json:"max_buffered_records" yaml:"max_buffered_records"` ProduceTimeout time.Duration `json:"produce_timeout" yaml:"produce_timeout"` RecordRetries int `json:"record_retries" yaml:"record_retries"` Linger time.Duration `json:"linger" yaml:"linger"` RecordTimeout time.Duration `json:"record_timeout" yaml:"record_timeout"` ManualFlushing bool `json:"manual_flushing" yaml:"manual_flushing"` Partitioner kgo.Partitioner `json:"-" yaml:"-"` StopOnDataLoss bool `json:"stop_on_data_loss" yaml:"stop_on_data_loss"` OnDataLoss func(string, int32) `json:"-" yaml:"-"` MaxWait time.Duration `json:"max_wait" yaml:"max_wait"` MinBytes int32 `json:"min_bytes" yaml:"min_bytes"` MaxBytes int32 `json:"max_bytes" yaml:"max_bytes"` MaxPartBytes int32 `json:"max_part_bytes" yaml:"max_part_bytes"` ResetOffset struct { At int64 `json:"at" yaml:"at"` Relative int64 `json:"relative" yaml:"relative"` Epoch int32 `json:"epoch" yaml:"epoch"` } `json:"reset_offset" yaml:"reset_offset"` IsolationLevel int8 `json:"isolation_level" yaml:"isolation_level"` KeepControl bool `json:"keep_control" yaml:"keep_control"` Rack string `json:"rack" yaml:"rack"` MaxConcurrentFetches int `json:"max_concurrent_fetches" yaml:"max_concurrent_fetches"` DisableFetchSessions bool `json:"disable_fetch_sessions" yaml:"disable_fetch_sessions"` Topics []string `json:"topics" yaml:"topics"` // topics to consume; if regex is true, values are compiled regular expressions Partitions map[string]map[int32]kgo.Offset `json:"-" yaml:"-"` // partitions to directly consume from Regex bool `json:"regex" yaml:"regex"` Group string `json:"group" yaml:"group"` // group we are in InstanceID string `json:"instance_id" yaml:"instance_id"` // optional group instance ID Balancers []kgo.GroupBalancer `json:"-" yaml:"-"` // balancers we can use Protocol string `json:"protocol" yaml:"protocol"` // "consumer" by default, expected to never be overridden SessionTimeout time.Duration `json:"session_timeout" yaml:"session_timeout"` RebalanceTimeout time.Duration `json:"rebalance_timeout" yaml:"rebalance_timeout"` HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"` RequireStable bool `json:"require_stable" yaml:"require_stable"` OnAssigned func(context.Context, *kgo.Client, map[string][]int32) `json:"-" yaml:"-"` OnRevoked func(context.Context, *kgo.Client, map[string][]int32) `json:"-" yaml:"-"` OnLost func(context.Context, *kgo.Client, map[string][]int32) `json:"-" yaml:"-"` AutocommitDisable bool `json:"autocommit_disable" yaml:"autocommit_disable"` // true if autocommit was disabled or we are transactional AutocommitGreedy bool `json:"autocommit_greedy" yaml:"autocommit_greedy"` AutocommitMarks bool `json:"autocommit_marks" yaml:"autocommit_marks"` AutocommitInterval time.Duration `json:"autocommit_interval" yaml:"autocommit_interval"` CommitCallback func(*kgo.Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) `json:"-" yaml:"-"` // Options allows users to directly use the latest options without waiting for otfranz adaptation. Options []kgo.Opt `json:"-" yaml:"-"` }
Config is a configuration object used to create new instances of *kgo.Client. The detailed configuration check should be referred to kgo.cfg.validate method.
type Factory ¶
Factory is a *di.Factory that creates *kafka.Client.
Unlike other database providers, the kafka factories don't bundle a default kafka client. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.
type Interceptor ¶
Interceptor is an interceptor that makes last minute change to a *Config during kgo.Client's creation
type ProvidersOptionFunc ¶
type ProvidersOptionFunc func(options *providersOption)
ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.
func WithInterceptor ¶
func WithInterceptor(interceptor Interceptor) ProvidersOptionFunc
WithInterceptor instructs the Providers to accept the Interceptor so that users can change config during runtime. This can be useful when some dynamic computations on configs are required.
func WithReload ¶
func WithReload(shouldReload bool) ProvidersOptionFunc
WithReload toggles whether the factory should reload cached instances upon OnReload event.