Documentation ¶
Index ¶
- Variables
- type AdminClientInterface
- func CreateAdminClient(ctx context.Context, saramaConfig *sarama.Config, clientId string, ...) (AdminClientInterface, error)
- func NewCustomAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
- func NewEventHubAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
- func NewKafkaAdminClient(ctx context.Context, saramaConfig *sarama.Config, clientId string, ...) (AdminClientInterface, error)
- type AdminClientType
- type CustomAdminClient
- func (c *CustomAdminClient) Close() error
- func (c *CustomAdminClient) CreateTopic(_ context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
- func (c *CustomAdminClient) DeleteTopic(_ context.Context, topicName string) *sarama.TopicError
- func (c *CustomAdminClient) GetKafkaSecretName(_ string) string
- type EventHubAdminClient
- func (c *EventHubAdminClient) Close() error
- func (c *EventHubAdminClient) CreateTopic(ctx context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
- func (c *EventHubAdminClient) DeleteTopic(ctx context.Context, topicName string) *sarama.TopicError
- func (c *EventHubAdminClient) GetKafkaSecretName(topicName string) string
- type KafkaAdminClient
- func (k KafkaAdminClient) Close() error
- func (k KafkaAdminClient) CreateTopic(_ context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
- func (k KafkaAdminClient) DeleteTopic(_ context.Context, topicName string) *sarama.TopicError
- func (k KafkaAdminClient) GetKafkaSecretName(_ string) string
Constants ¶
This section is empty.
Variables ¶
var NewCacheWrapper = func(ctx context.Context, k8sNamespace string) eventhubcache.CacheInterface { return eventhubcache.NewCache(ctx, k8sNamespace) }
EventHub NewCache Wrapper To Facilitate Unit Testing
var NewClusterAdminWrapper = func(brokers []string, config *sarama.Config) (sarama.ClusterAdmin, error) { return sarama.NewClusterAdmin(brokers, config) }
Sarama NewClusterAdmin() Wrapper Function Variable To Facilitate Unit Testing
var NewCustomAdminClientWrapper = func(ctx context.Context, namespace string) (AdminClientInterface, error) { return NewCustomAdminClient(ctx, namespace) }
New Custom AdminClient Wrapper To Facilitate Unit Testing
var NewEventHubAdminClientWrapper = func(ctx context.Context, namespace string) (AdminClientInterface, error) { return NewEventHubAdminClient(ctx, namespace) }
New EventHub AdminClient Wrapper To Facilitate Unit Testing
var NewKafkaAdminClientWrapper = func(ctx context.Context, saramaConfig *sarama.Config, clientId string, namespace string) (AdminClientInterface, error) { return NewKafkaAdminClient(ctx, saramaConfig, clientId, namespace) }
New Kafka AdminClient Wrapper To Facilitate Unit Testing
Functions ¶
This section is empty.
Types ¶
type AdminClientInterface ¶
type AdminClientInterface interface { CreateTopic(context.Context, string, *sarama.TopicDetail) *sarama.TopicError DeleteTopic(context.Context, string) *sarama.TopicError Close() error GetKafkaSecretName(topicName string) string }
Sarama ClusterAdmin Wrapping Interface To Facilitate Other Implementations (e.g. Azure EventHubs)
func CreateAdminClient ¶
func CreateAdminClient(ctx context.Context, saramaConfig *sarama.Config, clientId string, adminClientType AdminClientType) (AdminClientInterface, error)
Create A New Kafka AdminClient Of Specified Type - Using Credentials From Kafka Secret(s) In Specified K8S Namespace
The K8S Namespace parameter indicates the Kubernetes Namespace in which the Kafka Credentials secret(s) will be found. The secret(s) must contain the constants.KafkaSecretLabel label indicating it is a "Kafka Secret".
For the normal Kafka use case (Confluent, etc.) there should be only one Secret with the following content...
data: brokers: SASL_SSL://<host>.<region>.aws.confluent.cloud:9092 username: <username> password: <password>
For the Azure EventHub use case there will be multiple Secrets (one per Azure Namespace) each with the following content...
data: brokers: <azure-namespace>.servicebus.windows.net:9093 username: $ConnectionString password: Endpoint=sb://<azure-namespace>.servicebus.windows.net/;SharedAccessKeyName=<shared-access-key-name>;SharedAccessKey=<shared-access-key-value> namespace: <azure-namespace>
* If no authorization is required (local dev instance) then specify username and password as the empty string ""
func NewCustomAdminClient ¶
func NewCustomAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
Create A New Custom Kafka AdminClient Based On The Kafka Secret In The Specified K8S Namespace
func NewEventHubAdminClient ¶
func NewEventHubAdminClient(ctx context.Context, namespace string) (AdminClientInterface, error)
Create A New Azure EventHub AdminClient Based On Kafka Secrets In The Specified K8S Namespace
func NewKafkaAdminClient ¶
func NewKafkaAdminClient(ctx context.Context, saramaConfig *sarama.Config, clientId string, namespace string) (AdminClientInterface, error)
Create A New Kafka AdminClient Based On The Kafka Secret In The Specified K8S Namespace
type AdminClientType ¶
type AdminClientType int
AdminClient Type Enumeration
const ( Kafka AdminClientType = iota EventHub Custom Unknown )
type CustomAdminClient ¶
type CustomAdminClient struct {
// contains filtered or unexported fields
}
Custom AdminClient Definition
func (*CustomAdminClient) Close ¶
func (c *CustomAdminClient) Close() error
Custom REST Pass-Through Function For Closing The Admin Client
func (*CustomAdminClient) CreateTopic ¶
func (c *CustomAdminClient) CreateTopic(_ context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
Custom REST Pass-Through Function For Creating Topics
func (*CustomAdminClient) DeleteTopic ¶
func (c *CustomAdminClient) DeleteTopic(_ context.Context, topicName string) *sarama.TopicError
Custom REST Pass-Through Function For Deleting Topics
func (*CustomAdminClient) GetKafkaSecretName ¶
func (c *CustomAdminClient) GetKafkaSecretName(_ string) string
Get The K8S Secret With Kafka Credentials For The Specified Topic Name
type EventHubAdminClient ¶
type EventHubAdminClient struct {
// contains filtered or unexported fields
}
EventHub AdminClient Definition
func (*EventHubAdminClient) Close ¶
func (c *EventHubAdminClient) Close() error
Kafka AdminClient Close Implementation Using Azure EventHub API
func (*EventHubAdminClient) CreateTopic ¶
func (c *EventHubAdminClient) CreateTopic(ctx context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
Kafka AdminClient CreateTopics Implementation Using Azure EventHub API
func (*EventHubAdminClient) DeleteTopic ¶
func (c *EventHubAdminClient) DeleteTopic(ctx context.Context, topicName string) *sarama.TopicError
Delete A Single Topic (EventHub) Via The Azure EventHub API
func (*EventHubAdminClient) GetKafkaSecretName ¶
func (c *EventHubAdminClient) GetKafkaSecretName(topicName string) string
Get The K8S Secret With Kafka Credentials For The Specified Topic (EventHub)
type KafkaAdminClient ¶
type KafkaAdminClient struct {
// contains filtered or unexported fields
}
Kafka AdminClient Definition
func (KafkaAdminClient) Close ¶
func (k KafkaAdminClient) Close() error
Sarama Pass-Through Function For Closing ClusterAdmin
func (KafkaAdminClient) CreateTopic ¶
func (k KafkaAdminClient) CreateTopic(_ context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
Sarama Pass-Through Function For Creating Topics
func (KafkaAdminClient) DeleteTopic ¶
func (k KafkaAdminClient) DeleteTopic(_ context.Context, topicName string) *sarama.TopicError
Sarama Pass-Through Function For Deleting Topics
func (KafkaAdminClient) GetKafkaSecretName ¶
func (k KafkaAdminClient) GetKafkaSecretName(_ string) string
Get The K8S Secret With Kafka Credentials For The Specified Topic Name