Documentation ¶
Index ¶
- func MakeSaramaKafkaEndpointConsumer[T any](stream runtime.TypedInputStream[T], handler SaramaKafkaEndpointHandler[T]) runtime.Consumer[T]
- type HandlerData
- type SaramaKafkaDataSource
- type SaramaKafkaEndpoint
- type SaramaKafkaEndpointConsumer
- type SaramaKafkaEndpointHandler
- type SaramaKafkaInputDataSource
- type SaramaKafkaInputEndpoint
- type TypedSaramaKafkaEndpointConsumer
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) Cleanup(session kafka.ConsumerGroupSession) error
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) Out(value T)
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) Setup(session kafka.ConsumerGroupSession) error
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) Start(ctx context.Context) error
- func (ec *TypedSaramaKafkaEndpointConsumer[T]) Stop(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MakeSaramaKafkaEndpointConsumer ¶
func MakeSaramaKafkaEndpointConsumer[T any](stream runtime.TypedInputStream[T], handler SaramaKafkaEndpointHandler[T]) runtime.Consumer[T]
Types ¶
type HandlerData ¶
type HandlerData struct { Message *kafka.ConsumerMessage Session kafka.ConsumerGroupSession Claim kafka.ConsumerGroupClaim }
type SaramaKafkaDataSource ¶
type SaramaKafkaDataSource struct { *runtime.InputDataSource // contains filtered or unexported fields }
func (*SaramaKafkaDataSource) Start ¶
func (ds *SaramaKafkaDataSource) Start(ctx context.Context) error
func (*SaramaKafkaDataSource) Stop ¶
func (ds *SaramaKafkaDataSource) Stop(ctx context.Context)
func (*SaramaKafkaDataSource) WaitGroup ¶
func (ds *SaramaKafkaDataSource) WaitGroup() *sync.WaitGroup
type SaramaKafkaEndpoint ¶
type SaramaKafkaEndpoint struct {
*runtime.DataSourceEndpoint
}
func (*SaramaKafkaEndpoint) Start ¶
func (ep *SaramaKafkaEndpoint) Start(ctx context.Context, admin kafka.ClusterAdmin) error
func (*SaramaKafkaEndpoint) Stop ¶
func (ep *SaramaKafkaEndpoint) Stop(ctx context.Context)
type SaramaKafkaEndpointHandler ¶
type SaramaKafkaEndpointHandler[T any] interface { Handler(*HandlerData, runtime.Collect[T]) }
type SaramaKafkaInputDataSource ¶
type SaramaKafkaInputDataSource interface { runtime.DataSource WaitGroup() *sync.WaitGroup }
type SaramaKafkaInputEndpoint ¶
type SaramaKafkaInputEndpoint interface { runtime.InputEndpoint Start(context.Context, kafka.ClusterAdmin) error Stop(context.Context) }
type TypedSaramaKafkaEndpointConsumer ¶
type TypedSaramaKafkaEndpointConsumer[T any] struct { *runtime.DataSourceEndpointConsumer[T] // contains filtered or unexported fields }
func (*TypedSaramaKafkaEndpointConsumer[T]) Cleanup ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) Cleanup(session kafka.ConsumerGroupSession) error
func (*TypedSaramaKafkaEndpointConsumer[T]) ConsumeClaim ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error
func (*TypedSaramaKafkaEndpointConsumer[T]) Out ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) Out(value T)
func (*TypedSaramaKafkaEndpointConsumer[T]) Setup ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) Setup(session kafka.ConsumerGroupSession) error
func (*TypedSaramaKafkaEndpointConsumer[T]) Start ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) Start(ctx context.Context) error
func (*TypedSaramaKafkaEndpointConsumer[T]) Stop ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T]) Stop(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.