Documentation ¶
Index ¶
- Constants
- func Convert2Avro(value []byte, schema string) ([]byte, error)
- func ConvertFromAvro(binary []byte, schema string) (string, error)
- func CreateMessage(message []byte, schemaID int) ([]byte, error)
- func GetMessageAvroID(messageValue []byte) ([]byte, int)
- func New() venom.Executor
- type Executor
- type Message
- type MessageJSON
- type Result
- type SchemaRegistry
Constants ¶
const (
// Name of executor
Name = "kafka"
)
Variables ¶
This section is empty.
Functions ¶
func Convert2Avro ¶ added in v1.0.0
Convert2Avro will convert value to Avro encoded binary with help of schema
func ConvertFromAvro ¶ added in v1.0.0
ConvertFromAvro will convert value from Avro encoded binary with help of schema to string
func CreateMessage ¶ added in v1.0.0
CreateMessage will convert Avro message to one, which can be sent to Kafka
func GetMessageAvroID ¶ added in v1.0.0
GetMessageAvroID will try to get encoded message Avro ID
Types ¶
type Executor ¶
type Executor struct { Addrs []string `json:"addrs,omitempty" yaml:"addrs,omitempty"` // Registry schema address SchemaRegistryAddr string `json:"schema_registry_addr,omitempty" yaml:"schemaRegistryAddr,omitempty"` WithAVRO bool `json:"with_avro,omitempty" yaml:"withAVRO,omitempty"` WithTLS bool `json:"with_tls,omitempty" yaml:"withTLS,omitempty"` WithSASL bool `json:"with_sasl,omitempty" yaml:"withSASL,omitempty"` WithSASLHandshaked bool `json:"with_sasl_handshaked,omitempty" yaml:"withSASLHandshaked,omitempty"` User string `json:"user,omitempty" yaml:"user,omitempty"` Password string `json:"password,omitempty" yaml:"password,omitempty"` // ClientType must be "consumer" or "producer" ClientType string `json:"client_type,omitempty" yaml:"clientType,omitempty"` // Used when ClientType is consumer GroupID string `json:"group_id,omitempty" yaml:"groupID,omitempty"` Topics []string `json:"topics,omitempty" yaml:"topics,omitempty"` // Represents the timeout for reading messages. In Seconds. Default 5 Timeout int `json:"timeout,omitempty" yaml:"timeout,omitempty"` // WaitFor represents the time for reading messages without marking the test as failure. WaitFor int `json:"wait_for,omitempty" yaml:"waitFor,omitempty"` // Represents the limit of message will be read. After limit, consumer stop read message MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"` // InitialOffset represents the initial offset for the consumer. Possible value : newest, oldest. default: newest InitialOffset string `json:"initial_offset,omitempty" yaml:"initialOffset,omitempty"` // MarkOffset allows to mark offset when consuming message MarkOffset bool `json:"mark_offset,omitempty" yaml:"markOffset,omitempty"` // KeyFilter determines the key to filter from KeyFilter string `json:"key_filter,omitempty" yaml:"keyFilter,omitempty"` // Only one of JSON or Avro are currently supported ConsumerEncoding string `json:"consumer_encoding,omitempty" yaml:"consumerEncoding,omitempty"` // Used when ClientType is producer // Messages represents the message sended by producer Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` // MessagesFile represents the messages into the file sended by producer (messages field would be ignored) MessagesFile string `json:"messages_file,omitempty" yaml:"messages_file,omitempty"` // Kafka version, default is 0.10.2.0 KafkaVersion string `json:"kafka_version,omitempty" yaml:"kafka_version,omitempty"` // contains filtered or unexported fields }
Executor represents a Test Exec
func (Executor) GetDefaultAssertions ¶
func (Executor) GetDefaultAssertions() *venom.StepAssertions
GetDefaultAssertions return default assertions for type exec
func (Executor) ZeroValueResult ¶
func (Executor) ZeroValueResult() interface{}
ZeroValueResult return an empty implementation of this executor result
type Message ¶
type Message struct { Topic string `json:"topic" yaml:"topic"` Key string `json:"key" yaml:"key"` Value string `json:"value,omitempty" yaml:"value,omitempty"` ValueFile string `json:"valueFile,omitempty" yaml:"valueFile,omitempty"` AvroSchemaFile string `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"` }
Message represents the object sended or received from kafka
type MessageJSON ¶
type MessageJSON struct { Topic string Key interface{} Value interface{} }
MessageJSON represents the object sended or received from kafka
type Result ¶
type Result struct { TimeSeconds float64 `json:"timeSeconds,omitempty" yaml:"timeSeconds,omitempty"` Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` MessagesJSON []interface{} `json:"messagesJSON,omitempty" yaml:"messagesJSON,omitempty"` Err string `json:"error" yaml:"error"` }
Result represents a step result.
type SchemaRegistry ¶ added in v1.0.0
type SchemaRegistry interface { GetSchemaByID(id int) (string, error) RegisterNewSchema(subject, schema string) (int, error) }
SchemaRegistry will provide interface to SchemaRegistry implementation
func NewSchemaRegistry ¶ added in v1.0.0
func NewSchemaRegistry(schemaRegistryHost string) (SchemaRegistry, error)
NewSchemaRegistry will create new Schema Registry interface
func NewWithClient ¶ added in v1.0.0
func NewWithClient(schemaRegistryHost string, httpClient *http.Client) (SchemaRegistry, error)
NewWithClient will add SchemaRegistry with client