Documentation ¶
Index ¶
- Constants
- Variables
- func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError)
- type AvroSerde
- type BasicAuth
- type ByteArraySerde
- type ConnectionConfig
- type ConsumeConfig
- type Container
- type Duration
- type Element
- type JKS
- type JKSConfig
- type JSONSerde
- type Kafka
- type Message
- type Module
- type ProduceConfig
- type ReaderConfig
- type RootModule
- type SASLConfig
- type Schema
- type SchemaRegistryConfig
- type Serdes
- type StringSerde
- type SubjectNameConfig
- type TLSConfig
- type WireFormat
- type WriterConfig
- type Xk6KafkaError
- func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
- func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
- func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
- func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
Constants ¶
const ( Key Element = "key" Value Element = "value" MagicPrefixSize int = 5 ConcurrentRequests int = 16 )
const ( TopicNameStrategy string = "TopicNameStrategy" RecordNameStrategy string = "RecordNameStrategy" TopicRecordNameStrategy string = "TopicRecordNameStrategy" )
const (
Bytes srclient.SchemaType = "BYTES"
)
const (
String srclient.SchemaType = "STRING"
)
const (
Timeout = time.Second * 10
)
Variables ¶
var ( // ErrUnsupported is the error returned when the operation is not supported. ErrUnsupportedOperation = NewXk6KafkaError(unsupportedOperation, "Operation not supported", nil) // ErrForbiddenInInitContext is used when a Kafka producer was used in the init context. ErrForbiddenInInitContext = NewXk6KafkaError( kafkaForbiddenInInitContext, "Producing Kafka messages in the init context is not supported", nil) // ErrInvalidDataType is used when a data type is not supported. ErrInvalidDataType = NewXk6KafkaError( invalidDataType, "Invalid data type provided for serializer/deserializer", nil) // ErrInvalidSchema is used when a schema is not supported or is malformed. ErrInvalidSchema = NewXk6KafkaError(failedUnmarshalSchema, "Failed to unmarshal schema", nil) // ErrFailedTypeCast is used when a type cast failed. ErrFailedTypeCast = NewXk6KafkaError(failedTypeCast, "Failed to cast type", nil) // ErrUnknownSerdesType is used when a serdes type is not supported. ErrUnknownSerdesType = NewXk6KafkaError(invalidSerdeType, "Unknown serdes type", nil) ErrPartitionAndGroupID = NewXk6KafkaError( partitionAndGroupID, "Partition and groupID cannot be set at the same time", nil) ErrTopicAndGroupID = NewXk6KafkaError( topicAndGroupID, "When you specifiy groupID, you must set groupTopics instead of topic", nil) // ErrNotEnoughArguments is used when a function is called with too few arguments. ErrNotEnoughArguments = errors.New("not enough arguments") // ErrNoSchemaRegistryClient is used when a schema registry client is not configured correctly. ErrNoSchemaRegistryClient = NewXk6KafkaError( failedConfigureSchemaRegistryClient, "Failed to configure the schema registry client", nil) // ErrNoJKSConfig is used when a JKS config is not configured correctly. ErrNoJKSConfig = NewXk6KafkaError(failedConfigureJKS, "Failed to configure JKS", nil) ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input") )
var ( GroupBalancers map[string]kafkago.GroupBalancer IsolationLevels map[string]kafkago.IsolationLevel // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset // // Only used when GroupID is set // Ref: https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/reader.go#L481-L488 StartOffsets map[string]int64 RebalanceTimeout = time.Second * 5 HeartbeatInterval = time.Second * 3 SessionTimeout = time.Second * 30 PartitionWatchInterval = time.Second * 5 JoinGroupBackoff = time.Second * 5 RetentionTime = time.Hour * 24 )
var ( // CompressionCodecs is a map of compression codec names to their respective codecs. CompressionCodecs map[string]compress.Compression // Balancers is a map of balancer names to their respective balancers. Balancers map[string]kafkago.Balancer )
var TLSVersions map[string]uint16
TLSVersions is a map of TLS versions to their numeric values.
var TypesRegistry map[srclient.SchemaType]Serdes = map[srclient.SchemaType]Serdes{ String: &StringSerde{}, Bytes: &ByteArraySerde{}, srclient.Json: &JSONSerde{}, srclient.Avro: &AvroSerde{}, }
Functions ¶
func GetSerdes ¶
func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError)
Types ¶
type AvroSerde ¶
type AvroSerde struct {
Serdes
}
func (*AvroSerde) Deserialize ¶
func (*AvroSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError)
Deserialize deserializes a Avro binary into a JSON object.
type ByteArraySerde ¶
type ByteArraySerde struct {
Serdes
}
func (*ByteArraySerde) Deserialize ¶
func (*ByteArraySerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError)
DeserializeByteArray returns the data as-is, because it is already a byte array.
func (*ByteArraySerde) Serialize ¶
func (*ByteArraySerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError)
Serialize serializes the given data into a byte array.
type ConnectionConfig ¶
type ConnectionConfig struct { Address string `json:"address"` SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type ConsumeConfig ¶
type Container ¶
type Container struct { Data interface{} `json:"data"` Schema *Schema `json:"schema"` SchemaType srclient.SchemaType `json:"schemaType"` }
type JSONSerde ¶
type JSONSerde struct {
Serdes
}
func (*JSONSerde) Deserialize ¶
func (*JSONSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError)
Deserialize deserializes a map from bytes to be exported as object to JS.
type Message ¶
type Message struct { Topic string `json:"topic"` // Setting Partition has no effect when writing messages. Partition int `json:"partition"` Offset int64 `json:"offset"` HighWaterMark int64 `json:"highWaterMark"` Key []byte `json:"key"` Value []byte `json:"value"` Headers map[string]interface{} `json:"headers"` // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time `json:"time"` }
type ProduceConfig ¶
type ProduceConfig struct {
Messages []Message `json:"messages"`
}
type ReaderConfig ¶
type ReaderConfig struct { WatchPartitionChanges bool `json:"watchPartitionChanges"` ConnectLogger bool `json:"connectLogger"` Partition int `json:"partition"` QueueCapacity int `json:"queueCapacity"` MinBytes int `json:"minBytes"` MaxBytes int `json:"maxBytes"` MaxAttempts int `json:"maxAttempts"` GroupID string `json:"groupId"` Topic string `json:"topic"` IsolationLevel string `json:"isolationLevel"` StartOffset string `json:"startOffset"` Offset int64 `json:"offset"` Brokers []string `json:"brokers"` GroupTopics []string `json:"groupTopics"` GroupBalancers []string `json:"groupBalancers"` MaxWait Duration `json:"maxWait"` ReadBatchTimeout time.Duration `json:"readBatchTimeout"` ReadLagInterval time.Duration `json:"readLagInterval"` HeartbeatInterval time.Duration `json:"heartbeatInterval"` CommitInterval time.Duration `json:"commitInterval"` PartitionWatchInterval time.Duration `json:"partitionWatchInterval"` SessionTimeout time.Duration `json:"sessionTimeout"` RebalanceTimeout time.Duration `json:"rebalanceTimeout"` JoinGroupBackoff time.Duration `json:"joinGroupBackoff"` RetentionTime time.Duration `json:"retentionTime"` ReadBackoffMin time.Duration `json:"readBackoffMin"` ReadBackoffMax time.Duration `json:"readBackoffMax"` OffsetOutOfRangeError bool `json:"offsetOutOfRangeError"` // deprecated, do not use SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type RootModule ¶
type RootModule struct{}
func (*RootModule) NewModuleInstance ¶
func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance
NewModuleInstance creates a new instance of the Kafka module.
type SASLConfig ¶
type Schema ¶
type Schema struct { EnableCaching bool `json:"enableCaching"` ID int `json:"id"` Schema string `json:"schema"` SchemaType *srclient.SchemaType `json:"schemaType"` Version int `json:"version"` References []srclient.Reference `json:"references"` Subject string `json:"subject"` // contains filtered or unexported fields }
Schema is a wrapper around the schema registry schema. The Codec() and JsonSchema() methods will return the respective codecs (duck-typing).
func (*Schema) Codec ¶
func (s *Schema) Codec() *goavro.Codec
Codec ensures access to Codec Will try to initialize a new one if it hasn't been initialized before Will return nil if it can't initialize a codec from the schema
func (*Schema) JsonSchema ¶
func (s *Schema) JsonSchema() *jsonschema.Schema
JsonSchema ensures access to JsonSchema Will try to initialize a new one if it hasn't been initialized before Will return nil if it can't initialize a json schema from the schema
type SchemaRegistryConfig ¶
type Serdes ¶
type Serdes interface { Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) }
type StringSerde ¶
type StringSerde struct {
Serdes
}
func (*StringSerde) Deserialize ¶
func (*StringSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError)
Deserialize deserializes a string from bytes.
func (*StringSerde) Serialize ¶
func (*StringSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError)
Serialize serializes a string to bytes.
type SubjectNameConfig ¶
type WireFormat ¶
type WriterConfig ¶
type WriterConfig struct { AutoCreateTopic bool `json:"autoCreateTopic"` ConnectLogger bool `json:"connectLogger"` MaxAttempts int `json:"maxAttempts"` BatchSize int `json:"batchSize"` BatchBytes int `json:"batchBytes"` RequiredAcks int `json:"requiredAcks"` Topic string `json:"topic"` Balancer string `json:"balancer"` Compression string `json:"compression"` Brokers []string `json:"brokers"` BatchTimeout time.Duration `json:"batchTimeout"` ReadTimeout time.Duration `json:"readTimeout"` WriteTimeout time.Duration `json:"writeTimeout"` SASL SASLConfig `json:"sasl"` TLS TLSConfig `json:"tls"` }
type Xk6KafkaError ¶
func GetDialer ¶
func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError)
GetDialer creates a kafka dialer from the given auth string or an unauthenticated dialer if the auth string is empty.
func GetSASLMechanism ¶
func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError)
GetSASLMechanism returns a kafka SASL config from the given credentials.
func GetTLSConfig ¶
func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError)
GetTLSConfig creates a TLS config from the given TLS config struct and checks for errors. nolint: funlen
func NewXk6KafkaError ¶
func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError
NewXk6KafkaError is the constructor for Xk6KafkaError.
func (Xk6KafkaError) Error ¶
func (e Xk6KafkaError) Error() string
Error implements the `error` interface, so Xk6KafkaError are normal Go errors.
func (Xk6KafkaError) Unwrap ¶
func (e Xk6KafkaError) Unwrap() error
Unwrap implements the `xerrors.Wrapper` interface, so Xk6KafkaError are a bit future-proof Go 2 errors.