kafka_datasource

package
v1.60.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2022 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IsolationLevelReadUncommitted = "ReadUncommitted"
	IsolationLevelReadCommitted   = "ReadCommitted"
)
View Source
const (
	BalanceStrategyRange      = "BalanceStrategyRange"
	BalanceStrategySticky     = "BalanceStrategySticky"
	BalanceStrategyRoundRobin = "BalanceStrategyRoundRobin"
)
View Source
const DefaultBalanceStrategy = BalanceStrategyRange
View Source
const DefaultIsolationLevel = IsolationLevelReadUncommitted

Variables

View Source
var (
	DefaultKafkaVersion          = "V1_0_0_0"
	SaramaSupportedKafkaVersions = map[string]sarama.KafkaVersion{
		"V0_10_2_0": sarama.V0_10_2_0,
		"V0_10_2_1": sarama.V0_10_2_1,
		"V0_11_0_0": sarama.V0_11_0_0,
		"V0_11_0_1": sarama.V0_11_0_1,
		"V0_11_0_2": sarama.V0_11_0_2,
		"V1_0_0_0":  sarama.V1_0_0_0,
		"V1_1_0_0":  sarama.V1_1_0_0,
		"V1_1_1_0":  sarama.V1_1_1_0,
		"V2_0_0_0":  sarama.V2_0_0_0,
		"V2_0_1_0":  sarama.V2_0_1_0,
		"V2_1_0_0":  sarama.V2_1_0_0,
		"V2_2_0_0":  sarama.V2_2_0_0,
		"V2_3_0_0":  sarama.V2_3_0_0,
		"V2_4_0_0":  sarama.V2_4_0_0,
		"V2_5_0_0":  sarama.V2_5_0_0,
		"V2_6_0_0":  sarama.V2_6_0_0,
		"V2_7_0_0":  sarama.V2_7_0_0,
		"V2_8_0_0":  sarama.V2_8_0_0,
	}
)

Functions

func ConfigJSON

func ConfigJSON(config Configuration) json.RawMessage

Types

type Configuration

type Configuration struct {
	Subscription SubscriptionConfiguration
}

type Factory

type Factory struct{}

func (*Factory) Planner

func (f *Factory) Planner(ctx context.Context) plan.DataSourcePlanner

type GraphQLSubscriptionClient

type GraphQLSubscriptionClient interface {
	Subscribe(ctx context.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error
}

type GraphQLSubscriptionOptions

type GraphQLSubscriptionOptions struct {
	BrokerAddresses      []string `json:"broker_addresses"`
	Topics               []string `json:"topics"`
	GroupID              string   `json:"group_id"`
	ClientID             string   `json:"client_id"`
	KafkaVersion         string   `json:"kafka_version"`
	StartConsumingLatest bool     `json:"start_consuming_latest"`
	BalanceStrategy      string   `json:"balance_strategy"`
	IsolationLevel       string   `json:"isolation_level"`
	SASL                 SASL     `json:"sasl"`
	// contains filtered or unexported fields
}

func (*GraphQLSubscriptionOptions) Sanitize

func (g *GraphQLSubscriptionOptions) Sanitize()

func (*GraphQLSubscriptionOptions) Validate

func (g *GraphQLSubscriptionOptions) Validate() error

type KafkaConsumerGroup

type KafkaConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumerGroup

func NewKafkaConsumerGroup(log log.Logger, saramaConfig *sarama.Config, options *GraphQLSubscriptionOptions) (*KafkaConsumerGroup, error)

NewKafkaConsumerGroup creates a new sarama.ConsumerGroup and returns a new *KafkaConsumerGroup instance.

func (*KafkaConsumerGroup) Close

func (k *KafkaConsumerGroup) Close() error

Close stops background goroutines and closes the underlying ConsumerGroup instance.

func (*KafkaConsumerGroup) StartConsuming

func (k *KafkaConsumerGroup) StartConsuming(messages chan *sarama.ConsumerMessage)

StartConsuming initializes a new consumer group handler and starts consuming at background.

func (*KafkaConsumerGroup) WaitUntilConsumerStop

func (k *KafkaConsumerGroup) WaitUntilConsumerStop()

WaitUntilConsumerStop waits until ConsumerGroup.Consume function stops.

type KafkaConsumerGroupBridge

type KafkaConsumerGroupBridge struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumerGroupBridge

func NewKafkaConsumerGroupBridge(ctx context.Context, logger log.Logger) *KafkaConsumerGroupBridge

func (*KafkaConsumerGroupBridge) Subscribe

func (c *KafkaConsumerGroupBridge) Subscribe(ctx context.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error

Subscribe creates a new consumer group with given config and streams messages via next channel.

type Planner

type Planner struct {
	// contains filtered or unexported fields
}

func (*Planner) ConfigureFetch

func (p *Planner) ConfigureFetch() plan.FetchConfiguration

func (*Planner) ConfigureSubscription

func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration

func (*Planner) DataSourcePlanningBehavior

func (p *Planner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior

func (*Planner) DownstreamResponseFieldAlias

func (p *Planner) DownstreamResponseFieldAlias(_ int) (alias string, exists bool)

func (*Planner) Register

func (p *Planner) Register(_ *plan.Visitor, configuration plan.DataSourceConfiguration, _ bool) error

type SASL

type SASL struct {
	// Whether or not to use SASL authentication when connecting to the broker
	// (defaults to false).
	Enable bool `json:"enable"`
	// User is the authentication identity (authcid) to present for
	// SASL/PLAIN or SASL/SCRAM authentication
	User string `json:"user"`
	// Password for SASL/PLAIN authentication
	Password string `json:"password"`
}

type SubscriptionConfiguration

type SubscriptionConfiguration struct {
	BrokerAddresses      []string `json:"broker_addresses"`
	Topics               []string `json:"topics"`
	GroupID              string   `json:"group_id"`
	ClientID             string   `json:"client_id"`
	KafkaVersion         string   `json:"kafka_version"`
	StartConsumingLatest bool     `json:"start_consuming_latest"`
	BalanceStrategy      string   `json:"balance_strategy"`
	IsolationLevel       string   `json:"isolation_level"`
	SASL                 SASL     `json:"sasl"`
}

type SubscriptionSource

type SubscriptionSource struct {
	// contains filtered or unexported fields
}

func (*SubscriptionSource) Start

func (s *SubscriptionSource) Start(ctx context.Context, input []byte, next chan<- []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL