Documentation ¶
Overview ¶
Package msk provides an MskCluster which is compatible with the github.com/aws/go-kafka-event-source/streams.Cluster interface. GKES is a non-proprietray library and using MSK is not required. This package is provided as a convenience for those who are using MSK.
Disclaimer: github.com/aws/go-kafka-event-source/msk is not maintained or endorsed by the MSK development team. It is maintained by the developers od GKES. If you have issues with GKES->MSK connectivity, or would like new GKES->MSK features, https://github.com/aws/go-kafka-event-source is the place to ask first.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultClientConfig ¶
Returns the default AWS client config with default region of `region`. DefaultClientConfig panics on errors.
Types ¶
type MskClient ¶
type MskClient interface { ListClusters(context.Context, *kafka.ListClustersInput, ...func(*kafka.Options)) (*kafka.ListClustersOutput, error) GetBootstrapBrokers(context.Context, *kafka.GetBootstrapBrokersInput, ...func(*kafka.Options)) (*kafka.GetBootstrapBrokersOutput, error) }
type MskCluster ¶
type MskCluster struct {
// contains filtered or unexported fields
}
An implementation of github.com/aws/go-kafka-event-source/streams.Cluster.
func NewMskCluster ¶
func NewMskCluster(clusterName string, authType AuthType, region string, optFns ...func(*kafka.Options)) *MskCluster
Creates a new MskCluster using DefaultClientConfig. If you're application is running in EC2/ECS Task or Lambda, this is likely the initializer you need. See Sasl IAM support if using SaslIAm/PublicSaslIam AuthType. Look here to see how to custom client SDK options, such as a custom Rery mechanism. Note: your application's IAM role will need access to the 'ListClusters' and 'GetBootstrapBrokers' calls for your MSK Cluster.
func NewMskClusterWithClientConfig ¶
func NewMskClusterWithClientConfig(clusterName string, authType AuthType, awsConfig aws.Config, optFns ...func(*kafka.Options)) *MskCluster
Creates a new MskCluster using the specified awsConfig. If you are using STS for authentication, you will likely need to create your own AWS config. If you are running on some sort of managed container like EC2/ECS Task or Lambda, you can likely use NewMskCluster instead. Note: your application's IAM role will need access to the 'ListClusters' and 'GetBootstrapBrokers' calls for your MSK Cluster.
func (*MskCluster) Config ¶
func (c *MskCluster) Config() (opts []kgo.Opt, err error)
Called by GKES when intiializing Kafka clients. The MskClluster will call ListClusters with a ClusterNameFilter (using cluster.clusterName) to rertieve the ARN for your specific cluster. Once the arn is retrieved, GetBootstrapBrokers will be called and the appropriate broker addresses for the specified authType will be used to seed the underlying kgo.Client
func (*MskCluster) WithClientOptions ¶ added in v1.0.2
func (c *MskCluster) WithClientOptions(opts ...kgo.Opt) *MskCluster
Used to supply additional kgo client options. Caution: Options supplied here will override any set by MskCluster. This call replaces any client options previously set. Usage:
cluster := msk.NewMskCluster("MyCluster", msk.MutualTLS, "us-east-1").WithClientOptions( kgo.Dialer((&tls.Dialer{Config: tlsConfig, NetDialer: &net.Dialer{KeepAlive: 20 * time.Minute}}).DialContext))
func (*MskCluster) WithScramUserPass ¶
func (c *MskCluster) WithScramUserPass(user, pass string) *MskCluster
WithScramUserPass is used to set user/password info for SaslScram/PublicSaslScram auth types. This package does not provide for Scram credential rotation:
cluster := msk.NewMskCluster("MyCluster", msk.SaslScram, "us-east-1").WithScramUserPass("super", "secret")
func (*MskCluster) WithTlsConfig ¶
func (c *MskCluster) WithTlsConfig(tlsConfig *tls.Config) *MskCluster
Used primarily for MutualTLS authentication. If you need any configuration beyond the certificate itself, or simply switch on TLS, you'll need to use WithClientOptions instead. See WithClientOptions for an example
cluster := msk.NewMskCluster("MyCluster", msk.MutualTLS, "us-east-1").WithTlsConfig(myMutualTlsConfig)