Documentation ¶
Overview ¶
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
*Copyright (c) 2023, kaydxh * *Permission is hereby granted, free of charge, to any person obtaining a copy *of this software and associated documentation files (the "Software"), to deal *in the Software without restriction, including without limitation the rights *to use, copy, modify, merge, publish, distribute, sublicense, and/or sell *copies of the Software, and to permit persons to whom the Software is *furnished to do so, subject to the following conditions: * *The above copyright notice and this permission notice shall be included in all *copies or substantial portions of the Software. * *THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, *OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *SOFTWARE.
Index ¶
- Variables
- type CompletedConfig
- type Config
- type ConfigOption
- type ConfigOptionFunc
- type Consumer
- type ConsumerOptions
- type EmptyConfigOption
- type EmptyMQOption
- type Kafka
- func (*Kafka) Descriptor() ([]byte, []int)deprecated
- func (x *Kafka) GetApiVersionRequest() bool
- func (x *Kafka) GetBrokers() []string
- func (x *Kafka) GetEnabled() bool
- func (x *Kafka) GetFailAfterDuration() *duration.Duration
- func (x *Kafka) GetMaxWaitDuration() *duration.Duration
- func (x *Kafka) GetReconnectBackoff() *duration.Duration
- func (x *Kafka) GetReconnectBackoffMax() *duration.Duration
- func (*Kafka) ProtoMessage()
- func (x *Kafka) ProtoReflect() protoreflect.Message
- func (x *Kafka) Reset()
- func (x *Kafka) String() string
- type KafkaMessage
- type Kafka_Consumer
- type Kafka_Producer
- type MQ
- func (o *MQ) ApplyOptions(options ...MQOption) *MQ
- func (q *MQ) AsConsumers(ctx context.Context, topics ...string) (consumers []*Consumer, err error)
- func (q *MQ) AsProducers(ctx context.Context, topics ...string) (producers []*Producer, err error)
- func (q *MQ) Close()
- func (q *MQ) GetConsumer(topic string) (*Consumer, error)
- func (q *MQ) GetProducer(topic string) (*Producer, error)
- func (q *MQ) InstallMQ(ctx context.Context, maxWaitInterval time.Duration, failAfter time.Duration) (*MQ, error)
- func (q *MQ) ReadStream(ctx context.Context, topic string) <-chan mq_.Message
- func (q *MQ) Send(ctx context.Context, topic string, msgs ...kafka.Message) error
- type MQConfig
- type MQOption
- func WithConsumerGroupID(groupID string) MQOption
- func WithConsumerMaxBytes(maxBytes int) MQOption
- func WithConsumerMaxWait(maxWait time.Duration) MQOption
- func WithConsumerMinBytes(minBytes int) MQOption
- func WithConsumerPartition(partition int) MQOption
- func WithDialTimeout(dialTimeout time.Duration) MQOption
- func WithProducerBatchBytes(batchBytes int) MQOption
- func WithProducerBatchSize(batchSize int) MQOption
- func WithProducerBatchTimeout(batchTimeout time.Duration) MQOption
- type MQOptionFunc
- type MQOptions
- type Producer
- type ProducerOptions
Constants ¶
This section is empty.
Variables ¶
var File_pkg_mq_kafka_kafka_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type CompletedConfig ¶
type CompletedConfig struct {
// contains filtered or unexported fields
}
type Config ¶
type Config struct { Proto Kafka // contains filtered or unexported fields }
func NewConfig ¶
func NewConfig(options ...ConfigOption) *Config
func (*Config) ApplyOptions ¶
func (o *Config) ApplyOptions(options ...ConfigOption) *Config
func (*Config) Complete ¶
func (c *Config) Complete() CompletedConfig
Complete set default ServerRunOptions.
type ConfigOption ¶
type ConfigOption interface {
// contains filtered or unexported methods
}
A ConfigOption sets options.
func WithViper ¶
func WithViper(v *viper.Viper) ConfigOption
type ConfigOptionFunc ¶
type ConfigOptionFunc func(*Config)
ConfigOptionFunc wraps a function that modifies Client into an implementation of the ConfigOption interface.
type Consumer ¶
func NewConsumer ¶
func NewConsumer(config kafka.ReaderConfig) (*Consumer, error)
type ConsumerOptions ¶
type ConsumerOptions struct {
// contains filtered or unexported fields
}
type EmptyConfigOption ¶
type EmptyConfigOption struct{}
EmptyConfigOption does not alter the configuration. It can be embedded in another structure to build custom options.
This API is EXPERIMENTAL.
type EmptyMQOption ¶
type EmptyMQOption struct{}
EmptyMQUrlOption does not alter the MQuration. It can be embedded in another structure to build custom options.
This API is EXPERIMENTAL.
type Kafka ¶
type Kafka struct { Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` Brokers []string `protobuf:"bytes,2,rep,name=brokers,proto3" json:"brokers,omitempty"` ApiVersionRequest bool `protobuf:"varint,3,opt,name=api_version_request,json=apiVersionRequest,proto3" json:"api_version_request,omitempty"` ReconnectBackoff *duration.Duration `protobuf:"bytes,4,opt,name=reconnect_backoff,json=reconnectBackoff,proto3" json:"reconnect_backoff,omitempty"` ReconnectBackoffMax *duration.Duration `protobuf:"bytes,5,opt,name=reconnect_backoff_max,json=reconnectBackoffMax,proto3" json:"reconnect_backoff_max,omitempty"` MaxWaitDuration *duration.Duration `protobuf:"bytes,6,opt,name=max_wait_duration,json=maxWaitDuration,proto3" json:"max_wait_duration,omitempty"` FailAfterDuration *duration.Duration `protobuf:"bytes,7,opt,name=fail_after_duration,json=failAfterDuration,proto3" json:"fail_after_duration,omitempty"` // contains filtered or unexported fields }
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
func (*Kafka) Descriptor
deprecated
func (*Kafka) GetApiVersionRequest ¶
func (*Kafka) GetBrokers ¶
func (*Kafka) GetEnabled ¶
func (*Kafka) GetFailAfterDuration ¶
func (*Kafka) GetMaxWaitDuration ¶
func (*Kafka) GetReconnectBackoff ¶
func (*Kafka) GetReconnectBackoffMax ¶
func (*Kafka) ProtoMessage ¶
func (*Kafka) ProtoMessage()
func (*Kafka) ProtoReflect ¶
func (x *Kafka) ProtoReflect() protoreflect.Message
type KafkaMessage ¶
type KafkaMessage struct { Err error Msg *kafka.Message }
func (KafkaMessage) Error ¶ added in v0.0.121
func (m KafkaMessage) Error() error
func (KafkaMessage) Key ¶ added in v0.0.121
func (m KafkaMessage) Key() []byte
func (KafkaMessage) Value ¶ added in v0.0.121
func (m KafkaMessage) Value() []byte
type Kafka_Consumer ¶
type Kafka_Consumer struct {
// contains filtered or unexported fields
}
func (*Kafka_Consumer) Descriptor
deprecated
func (*Kafka_Consumer) Descriptor() ([]byte, []int)
Deprecated: Use Kafka_Consumer.ProtoReflect.Descriptor instead.
func (*Kafka_Consumer) ProtoMessage ¶
func (*Kafka_Consumer) ProtoMessage()
func (*Kafka_Consumer) ProtoReflect ¶
func (x *Kafka_Consumer) ProtoReflect() protoreflect.Message
func (*Kafka_Consumer) Reset ¶
func (x *Kafka_Consumer) Reset()
func (*Kafka_Consumer) String ¶
func (x *Kafka_Consumer) String() string
type Kafka_Producer ¶
type Kafka_Producer struct {
// contains filtered or unexported fields
}
func (*Kafka_Producer) Descriptor
deprecated
func (*Kafka_Producer) Descriptor() ([]byte, []int)
Deprecated: Use Kafka_Producer.ProtoReflect.Descriptor instead.
func (*Kafka_Producer) ProtoMessage ¶
func (*Kafka_Producer) ProtoMessage()
func (*Kafka_Producer) ProtoReflect ¶
func (x *Kafka_Producer) ProtoReflect() protoreflect.Message
func (*Kafka_Producer) Reset ¶
func (x *Kafka_Producer) Reset()
func (*Kafka_Producer) String ¶
func (x *Kafka_Producer) String() string
type MQ ¶
func (*MQ) ApplyOptions ¶
func (*MQ) AsConsumers ¶
func (*MQ) AsProducers ¶
func (*MQ) ReadStream ¶
type MQOption ¶
type MQOption interface {
// contains filtered or unexported methods
}
A MQOption sets options.
func WithConsumerMaxBytes ¶
func WithConsumerMaxWait ¶
func WithConsumerMinBytes ¶
func WithConsumerPartition ¶
func WithProducerBatchBytes ¶
type MQOptionFunc ¶
type MQOptionFunc func(*MQ)
MQOptionFunc wraps a function that modifies MQ into an implementation of the MQOption interface.
type Producer ¶
func NewProducer ¶
func NewProducer(config kafka.WriterConfig) (*Producer, error)
type ProducerOptions ¶
type ProducerOptions struct {
// contains filtered or unexported fields
}