Documentation ¶
Index ¶
- Variables
- type AdminClientInterface
- type AdminClientType
- type ConfluentAdminClientInterface
- type EventHubAdminClient
- func (c *EventHubAdminClient) Close()
- func (c *EventHubAdminClient) CreateTopics(ctx context.Context, topicSpecifications []kafka.TopicSpecification, ...) ([]kafka.TopicResult, error)
- func (c *EventHubAdminClient) DeleteTopics(ctx context.Context, topicNames []string, ...) ([]kafka.TopicResult, error)
- func (c *EventHubAdminClient) GetKafkaSecretName(topicName string) string
- type KafkaAdminClient
- func (c KafkaAdminClient) Close()
- func (c KafkaAdminClient) CreateTopics(ctx context.Context, topicSpecifications []kafka.TopicSpecification, ...) ([]kafka.TopicResult, error)
- func (c KafkaAdminClient) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
- func (c KafkaAdminClient) GetKafkaSecretName(topicName string) string
Constants ¶
This section is empty.
Variables ¶
var NewCacheWrapper = func(ctx context.Context, k8sNamespace string) eventhubcache.CacheInterface { return eventhubcache.NewCache(ctx, k8sNamespace) }
EventHub NewCache Wrapper To Facilitate Unit Testing
var NewEventHubAdminClientWrapper = func(ctx context.Context, namespace string) (AdminClientInterface, error) { return NewEventHubAdminClient(ctx, namespace) }
New EventHub AdminClient Wrapper To Facilitate Unit Testing
var NewKafkaAdminClientWrapper = func(ctx context.Context, namespace string) (AdminClientInterface, error) { return NewKafkaAdminClient(ctx, namespace) }
New Kafka AdminClient Wrapper To Facilitate Unit Testing
Functions ¶
This section is empty.
Types ¶
type AdminClientInterface ¶
type AdminClientInterface interface { CreateTopics(context.Context, []kafka.TopicSpecification, ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) DeleteTopics(context.Context, []string, ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error) Close() GetKafkaSecretName(topicName string) string }
Confluent Client Doesn't Code To Interfaces Or Provide Mocks So We're Wrapping Our Usage Of The AdminClient For Testing Also Introduced Additional Functionality To Get The Kafka Secret For A Topic
func CreateAdminClient ¶
func CreateAdminClient(ctx context.Context, adminClientType AdminClientType) (AdminClientInterface, error)
Create A New Kafka AdminClient Of Specified Type - Using Credentials From Kafka Secret(s) In Specified K8S Namespace
The K8S Namespace parameter indicates the Kubernetes Namespace in which the Kafka Credentials secret(s) will be found. The secret(s) must contain the constants.KafkaSecretLabel label indicating it is a "Kafka Secret".
For the normal Kafka use case (Confluent, etc.) there should be only one Secret with the following content...
data: brokers: SASL_SSL://<host>.<region>.aws.confluent.cloud:9092 username: <username> password: <password>
For the Azure EventHub use case there will be multiple Secrets (one per Azure Namespace) each with the following content...
data: brokers: <azure-namespace>.servicebus.windows.net:9093 username: $ConnectionString password: Endpoint=sb://<azure-namespace>.servicebus.windows.net/;SharedAccessKeyName=<shared-access-key-name>;SharedAccessKey=<shared-access-key-value> namespace: <azure-namespace>
* If no authorization is required (local dev instance) then specify username and password as the empty string ""
func NewEventHubAdminClient ¶
func NewEventHubAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
Create A New Azure EventHub AdminClient Based On Kafka Secrets In The Specified K8S Namespace
func NewKafkaAdminClient ¶
func NewKafkaAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
Create A New Kafka (Confluent, etc...) AdminClient Based On The Kafka Secret In The Specified K8S Namespace
type AdminClientType ¶
type AdminClientType int
AdminClient Type Enumeration
const ( Kafka AdminClientType = iota EventHub )
type ConfluentAdminClientInterface ¶
type ConfluentAdminClientInterface interface { CreateTopics(ctx context.Context, topicSpecifications []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error) Close() }
Confluent AdminClient Interface - Adding Our Own Wrapping Interface To Facilitate Testing
type EventHubAdminClient ¶
type EventHubAdminClient struct {
// contains filtered or unexported fields
}
EventHub AdminClient Definition
func (*EventHubAdminClient) Close ¶
func (c *EventHubAdminClient) Close()
Kafka AdminClient Close Implementation Using Azure EventHub API
func (*EventHubAdminClient) CreateTopics ¶
func (c *EventHubAdminClient) CreateTopics(ctx context.Context, topicSpecifications []kafka.TopicSpecification, createTopicsAdminOptions ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
Kafka AdminClient CreateTopics Implementation Using Azure EventHub API
func (*EventHubAdminClient) DeleteTopics ¶
func (c *EventHubAdminClient) DeleteTopics(ctx context.Context, topicNames []string, deleteOptions ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
Kafka AdminClient DeleteTopics Implementation Using Azure EventHub API
func (*EventHubAdminClient) GetKafkaSecretName ¶
func (c *EventHubAdminClient) GetKafkaSecretName(topicName string) string
Get The K8S Secret With Kafka Credentials For The Specified Topic (EventHub)
type KafkaAdminClient ¶
type KafkaAdminClient struct {
// contains filtered or unexported fields
}
Kafka AdminClient Definition
func (KafkaAdminClient) Close ¶
func (c KafkaAdminClient) Close()
Close - Confluent Pass-Through Function
func (KafkaAdminClient) CreateTopics ¶
func (c KafkaAdminClient) CreateTopics(ctx context.Context, topicSpecifications []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
CreateTopics - Confluent Pass-Through Function
func (KafkaAdminClient) DeleteTopics ¶
func (c KafkaAdminClient) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
DeleteTopics - Confluent Pass-Through Function
func (KafkaAdminClient) GetKafkaSecretName ¶
func (c KafkaAdminClient) GetKafkaSecretName(topicName string) string
Get The K8S Secret With Kafka Credentials For The Specified Topic