Documentation ¶
Index ¶
- Constants
- Variables
- func ConfigJSON(config Configuration) json.RawMessage
- type Configuration
- type Factory
- type GraphQLSubscriptionClient
- type GraphQLSubscriptionOptions
- type KafkaConsumerGroup
- type KafkaConsumerGroupBridge
- type Planner
- func (p *Planner) ConfigureFetch() plan.FetchConfiguration
- func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration
- func (p *Planner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior
- func (p *Planner) DownstreamResponseFieldAlias(_ int) (alias string, exists bool)
- func (p *Planner) Register(_ *plan.Visitor, configuration plan.DataSourceConfiguration, _ bool) error
- type SASL
- type SubscriptionConfiguration
- type SubscriptionSource
Constants ¶
View Source
const ( IsolationLevelReadUncommitted = "ReadUncommitted" IsolationLevelReadCommitted = "ReadCommitted" )
View Source
const ( BalanceStrategyRange = "BalanceStrategyRange" BalanceStrategySticky = "BalanceStrategySticky" BalanceStrategyRoundRobin = "BalanceStrategyRoundRobin" )
View Source
const DefaultBalanceStrategy = BalanceStrategyRange
View Source
const DefaultIsolationLevel = IsolationLevelReadUncommitted
Variables ¶
View Source
var ( DefaultKafkaVersion = "V1_0_0_0" SaramaSupportedKafkaVersions = map[string]sarama.KafkaVersion{ "V0_10_2_0": sarama.V0_10_2_0, "V0_10_2_1": sarama.V0_10_2_1, "V0_11_0_0": sarama.V0_11_0_0, "V0_11_0_1": sarama.V0_11_0_1, "V0_11_0_2": sarama.V0_11_0_2, "V1_0_0_0": sarama.V1_0_0_0, "V1_1_0_0": sarama.V1_1_0_0, "V1_1_1_0": sarama.V1_1_1_0, "V2_0_0_0": sarama.V2_0_0_0, "V2_0_1_0": sarama.V2_0_1_0, "V2_1_0_0": sarama.V2_1_0_0, "V2_2_0_0": sarama.V2_2_0_0, "V2_3_0_0": sarama.V2_3_0_0, "V2_4_0_0": sarama.V2_4_0_0, "V2_5_0_0": sarama.V2_5_0_0, "V2_6_0_0": sarama.V2_6_0_0, "V2_7_0_0": sarama.V2_7_0_0, "V2_8_0_0": sarama.V2_8_0_0, } )
Functions ¶
func ConfigJSON ¶
func ConfigJSON(config Configuration) json.RawMessage
Types ¶
type Configuration ¶
type Configuration struct {
Subscription SubscriptionConfiguration
}
type GraphQLSubscriptionClient ¶
type GraphQLSubscriptionClient interface {
Subscribe(ctx context.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error
}
type GraphQLSubscriptionOptions ¶
type GraphQLSubscriptionOptions struct { BrokerAddr string `json:"broker_addr"` Topic string `json:"topic"` GroupID string `json:"group_id"` ClientID string `json:"client_id"` KafkaVersion string `json:"kafka_version"` StartConsumingLatest bool `json:"start_consuming_latest"` BalanceStrategy string `json:"balance_strategy"` IsolationLevel string `json:"isolation_level"` SASL SASL `json:"sasl"` // contains filtered or unexported fields }
func (*GraphQLSubscriptionOptions) Sanitize ¶
func (g *GraphQLSubscriptionOptions) Sanitize()
func (*GraphQLSubscriptionOptions) Validate ¶
func (g *GraphQLSubscriptionOptions) Validate() error
type KafkaConsumerGroup ¶
type KafkaConsumerGroup struct {
// contains filtered or unexported fields
}
func NewKafkaConsumerGroup ¶
func NewKafkaConsumerGroup(log log.Logger, saramaConfig *sarama.Config, options *GraphQLSubscriptionOptions) (*KafkaConsumerGroup, error)
NewKafkaConsumerGroup creates a new sarama.ConsumerGroup and returns a new *KafkaConsumerGroup instance.
func (*KafkaConsumerGroup) Close ¶
func (k *KafkaConsumerGroup) Close() error
Close stops background goroutines and closes the underlying ConsumerGroup instance.
func (*KafkaConsumerGroup) StartConsuming ¶
func (k *KafkaConsumerGroup) StartConsuming(messages chan *sarama.ConsumerMessage)
StartConsuming initializes a new consumer group handler and starts consuming at background.
func (*KafkaConsumerGroup) WaitUntilConsumerStop ¶
func (k *KafkaConsumerGroup) WaitUntilConsumerStop()
WaitUntilConsumerStop waits until ConsumerGroup.Consume function stops.
type KafkaConsumerGroupBridge ¶
type KafkaConsumerGroupBridge struct {
// contains filtered or unexported fields
}
func NewKafkaConsumerGroupBridge ¶
func NewKafkaConsumerGroupBridge(ctx context.Context, logger log.Logger) *KafkaConsumerGroupBridge
func (*KafkaConsumerGroupBridge) Subscribe ¶
func (c *KafkaConsumerGroupBridge) Subscribe(ctx context.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error
Subscribe creates a new consumer group with given config and streams messages via next channel.
type Planner ¶
type Planner struct {
// contains filtered or unexported fields
}
func (*Planner) ConfigureFetch ¶
func (p *Planner) ConfigureFetch() plan.FetchConfiguration
func (*Planner) ConfigureSubscription ¶
func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration
func (*Planner) DataSourcePlanningBehavior ¶
func (p *Planner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior
func (*Planner) DownstreamResponseFieldAlias ¶
type SASL ¶
type SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool `json:"enable"` // User is the authentication identity (authcid) to present for // SASL/PLAIN or SASL/SCRAM authentication User string `json:"user"` // Password for SASL/PLAIN authentication Password string `json:"password"` }
type SubscriptionConfiguration ¶
type SubscriptionConfiguration struct { BrokerAddr string `json:"broker_addr"` Topic string `json:"topic"` GroupID string `json:"group_id"` ClientID string `json:"client_id"` KafkaVersion string `json:"kafka_version"` StartConsumingLatest bool `json:"start_consuming_latest"` BalanceStrategy string `json:"balance_strategy"` IsolationLevel string `json:"isolation_level"` SASL SASL `json:"sasl"` }
type SubscriptionSource ¶
type SubscriptionSource struct {
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.