Documentation ¶
Index ¶
- Constants
- func ConsumeEndpointRequest(consumer *ConsumerEndpoint, topic string, format Format) (*http.Request, error)
- func ConsumeRequest(baseURL string, topic string, partition int32, offset int64, count int, ...) (*http.Request, error)
- func CreateConsumerRequest(baseURL string, group string, request *ConsumerRequest) (*http.Request, error)
- func DeleteConsumerEndpoint(client HTTPClient, endpoint *ConsumerEndpoint) error
- func GetFunctionalTestTopic(t *testing.T) string
- func GetFunctionalTestURL(t *testing.T) string
- func HandleFunctionalTestError(t testing.TB, err error)
- func IsFunctionalTestRequired() bool
- func ProduceRequest(baseURL string, topic string, format Format, message *ProducerMessage) (*http.Request, error)
- type ConsumerEndpoint
- type ConsumerRequest
- type Format
- type HTTPClient
- type Message
- func Consume(client HTTPClient, baseURL string, topic string, partition int32, offset int64, ...) ([]*Message, error)
- func ConsumeEndpoint(client HTTPClient, endpoint *ConsumerEndpoint, topic string, format Format) ([]*Message, error)
- func WaitFor(client HTTPClient, endpoint *ConsumerEndpoint, topic string, format Format, ...) ([]*Message, error)
- type Offset
- type ProducerMessage
- type ProducerOffsets
- type ProducerRecord
- type ProducerResponse
Constants ¶
const ( //JSON formated consumer JSON = Format("json") //Binary formated consumer Binary = Format("binary") //Avro formated consumer Avro = Format("avro") //Smallest is the offset that is oldest Smallest = "smallest" //Largest is the offset that is newest Largest = "largest" )
const ( //TestURLEnvVar is the url to run functional tests against TestURLEnvVar = "KAFKA_PROXY_TEST_URL" //TestTopicEnvVar is the topic to run functional tests against TestTopicEnvVar = "KAFKA_PROXY_TEST_TOPIC" //TestRequiredEnvVar if set to true will make tests fail TestRequiredEnvVar = "KAFKA_PROXY_TEST_REQUIRED" )
Variables ¶
This section is empty.
Functions ¶
func ConsumeEndpointRequest ¶
func ConsumeEndpointRequest(consumer *ConsumerEndpoint, topic string, format Format) (*http.Request, error)
ConsumeEndpointRequest will create a request for consumerURL/topics/<topic>
func ConsumeRequest ¶
func ConsumeRequest(baseURL string, topic string, partition int32, offset int64, count int, format Format) (*http.Request, error)
ConsumeRequest builds the request for the /topics/<topic>/partitions/<partition>/messages route
func CreateConsumerRequest ¶
func CreateConsumerRequest(baseURL string, group string, request *ConsumerRequest) (*http.Request, error)
CreateConsumerRequest will create a request for the /consumers/<group> endpoint
func DeleteConsumerEndpoint ¶
func DeleteConsumerEndpoint(client HTTPClient, endpoint *ConsumerEndpoint) error
DeleteConsumerEndpoint will delete a previously created consumer endpoint
func GetFunctionalTestTopic ¶
GetFunctionalTestTopic returns the configured test topic
func GetFunctionalTestURL ¶
GetFunctionalTestURL skips, fails, or returns the config variable passed in
func HandleFunctionalTestError ¶
HandleFunctionalTestError will skip or fail based on whether SR_TEST_REQUIRED is set
func IsFunctionalTestRequired ¶
func IsFunctionalTestRequired() bool
IsFunctionalTestRequired returns whether SR_TEST_REQUIRED is set
func ProduceRequest ¶
func ProduceRequest(baseURL string, topic string, format Format, message *ProducerMessage) (*http.Request, error)
ProduceRequest creates the request for POST /topics/<topic> endpoint
Types ¶
type ConsumerEndpoint ¶
type ConsumerEndpoint struct { InstanceID string `json:"instance_id"` BaseURI string `json:"base_uri"` }
ConsumerEndpoint is returned on a create
func CreateConsumer ¶
func CreateConsumer(client HTTPClient, baseURL string, group string, request *ConsumerRequest) (resp *ConsumerEndpoint, err error)
CreateConsumer will create a consumer on the proxy for later use
type ConsumerRequest ¶
type ConsumerRequest struct { Format Format `json:"format"` Offset Offset `json:"auto.offset.reset"` AutoCommit string `json:"auto.commit.enable"` Name string `json:"name,omitempty"` }
ConsumerRequest is the meta information needed to create a consumer
func NewConsumerRequest ¶
func NewConsumerRequest(format Format, offset Offset) *ConsumerRequest
NewConsumerRequest creates a consumer request for the provided format and offset
type HTTPClient ¶
HTTPClient is any client that can do a http request
type Message ¶
type Message struct { Key json.RawMessage `json:"key"` Value json.RawMessage `json:"value"` Partition int32 `json:"partition"` Offset int64 `json:"offset"` }
Message is a single kafka message
func Consume ¶
func Consume(client HTTPClient, baseURL string, topic string, partition int32, offset int64, count int, format Format) ([]*Message, error)
Consume takes a kafka location and consumes messages for it
func ConsumeEndpoint ¶
func ConsumeEndpoint(client HTTPClient, endpoint *ConsumerEndpoint, topic string, format Format) ([]*Message, error)
ConsumeEndpoint will get the next messages off of the previously created consumer endpoint. Format must match the previously created format
type ProducerMessage ¶
type ProducerMessage struct { KeySchema string `json:"key_schema,omitempty"` KeySchemaID int `json:"key_schema_id,omitempty"` ValueSchema string `json:"value_schema,omitempty"` //either value schema or value schema id must be provided for avro messages ValueSchemaID int `json:"value_schema_id,omitempty"` Records []*ProducerRecord `json:"records"` }
ProducerMessage is the wrapper for the data to the kafka rest proxy producer endpoint
type ProducerOffsets ¶
type ProducerOffsets struct { Partition int32 `json:"partition"` Offset int64 `json:"offset"` ErrorCode int64 `json:"error_code"` Error string `json:"error"` }
ProducerOffsets are the resulting offsets for a produced set of messages
type ProducerRecord ¶
type ProducerRecord struct { Key json.RawMessage `json:"key,omitempty"` Value json.RawMessage `json:"value"` Partition int32 `json:"partition,omitempty"` }
ProducerRecord is an individual message to be produced on kafka
type ProducerResponse ¶
type ProducerResponse struct { KeySchemaID int `json:"key_schema_id"` ValueSchemaID int `json:"value_schema_id"` Offsets []*ProducerOffsets `json:"offsets"` }
ProducerResponse is the response the kafka rest proxy returns on message production
func Produce ¶
func Produce(client HTTPClient, baseURL string, topic string, message *ProducerMessage, format Format) (*ProducerResponse, error)
Produce will publish the message to the topic