Versions in this module Expand all Collapse all v0 v0.14.2 Sep 13, 2022 Changes in this version + const Bytes + const ConcurrentRequests + const Key + const MagicPrefixSize + const RecordNameStrategy + const String + const Timeout + const TopicNameStrategy + const TopicRecordNameStrategy + const Value + var Balancers map[string]kafkago.Balancer + var CompressionCodecs map[string]compress.Compression + var ErrFailedTypeCast = NewXk6KafkaError(failedTypeCast, "Failed to cast type", nil) + var ErrForbiddenInInitContext = NewXk6KafkaError(kafkaForbiddenInInitContext, ...) + var ErrInvalidDataType = NewXk6KafkaError(invalidDataType, "Invalid data type provided for serializer/deserializer", nil) + var ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input") + var ErrInvalidSchema = NewXk6KafkaError(failedUnmarshalSchema, "Failed to unmarshal schema", nil) + var ErrNoSchemaRegistryClient = NewXk6KafkaError(failedConfigureSchemaRegistryClient, ...) + var ErrNotEnoughArguments = errors.New("not enough arguments") + var ErrUnknownSerdesType = NewXk6KafkaError(invalidSerdeType, "Unknown serdes type", nil) + var ErrUnsupportedOperation = NewXk6KafkaError(unsupportedOperation, "Operation not supported", nil) + var GroupBalancers map[string]kafkago.GroupBalancer + var IsolationLevels map[string]kafkago.IsolationLevel + var MaxWait = time.Millisecond * 200 + var RebalanceTimeout = time.Second * 5 + var TLSVersions map[string]uint16 + var TypesRegistry map[string]Serdes = map[string]Serdes + func GetSerdes(schemaType string) (Serdes, *Xk6KafkaError) + type AvroSerde struct + func (*AvroSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*AvroSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type BasicAuth struct + Password string + Username string + type ByteArraySerde struct + func (*ByteArraySerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*ByteArraySerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type ConnectionConfig struct + Address string + SASL SASLConfig + TLS TLSConfig + type ConsumeConfig struct + Limit int64 + type Container struct + Data interface{} + Schema *Schema + SchemaType string + type Element string + type JSONSerde struct + func (*JSONSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*JSONSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type Kafka struct + type Message struct + Headers map[string]interface{} + HighWaterMark int64 + Key []byte + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + type Module struct + func (m *Module) Exports() modules.Exports + type ProduceConfig struct + Messages []Message + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + ConnectLogger bool + GroupBalancers []string + GroupID string + GroupTopics []string + HeartbeatInterval time.Duration + IsolationLevel string + JoinGroupBackoff time.Duration + MaxAttempts int + MaxBytes int + MaxWait time.Duration + MinBytes int + Offset int64 + Partition int + PartitionWatchInterval time.Duration + QueueCapacity int + ReadBackoffMax time.Duration + ReadBackoffMin time.Duration + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SASL SASLConfig + SessionTimeout time.Duration + StartOffset int64 + TLS TLSConfig + Topic string + WatchPartitionChanges bool + type RootModule struct + func New() *RootModule + func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance + type SASLConfig struct + Algorithm string + Password string + Username string + type Schema struct + EnableCaching bool + ID int + References []srclient.Reference + Schema string + SchemaType *srclient.SchemaType + Subject string + Version int + func (s *Schema) Codec() *goavro.Codec + func (s *Schema) JsonSchema() *jsonschema.Schema + type SchemaRegistryConfig struct + BasicAuth BasicAuth + EnableCaching bool + TLS TLSConfig + URL string + type Serdes interface + Deserialize func(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + Serialize func(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type StringSerde struct + func (*StringSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*StringSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type SubjectNameConfig struct + Element Element + Schema string + SubjectNameStrategy string + Topic string + type TLSConfig struct + ClientCertPem string + ClientKeyPem string + EnableTLS bool + InsecureSkipTLSVerify bool + MinVersion string + ServerCaPem string + type WireFormat struct + Data []byte + SchemaID int + type WriterConfig struct + AutoCreateTopic bool + Balancer string + BatchBytes int + BatchSize int + BatchTimeout time.Duration + Brokers []string + Compression string + ConnectLogger bool + MaxAttempts int + ReadTimeout time.Duration + RequiredAcks int + SASL SASLConfig + TLS TLSConfig + Topic string + WriteTimeout time.Duration + type Xk6KafkaError struct + Code errCode + Message string + OriginalError error + func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError) + func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError) + func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError) + func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError + func (e Xk6KafkaError) Error() string + func (e Xk6KafkaError) Unwrap() error v0.0.2 Jul 26, 2020 v0.0.1 Jul 24, 2020