contube

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EndpointKey     = "endpoint"
	IncludeMetadata = "includeMetadata"
)
View Source
const (
	PulsarURLKey = "pulsar_url"
)

Variables

View Source
var (
	ErrTubeNotImplemented       = errors.New("tube not implemented")
	ErrSinkTubeNotImplemented   = errors.Wrap(ErrTubeNotImplemented, "sink tube not implemented")
	ErrSourceTubeNotImplemented = errors.Wrap(ErrTubeNotImplemented, "source tube not implemented")
)
View Source
var (
	ErrEndpointNotFound        = errors.New("endpoint not found")
	ErrEndpointClosed          = errors.New("endpoint closed")
	ErrorEndpointAlreadyExists = errors.New("endpoint already exists")
)

Functions

This section is empty.

Types

type ConfigMap

type ConfigMap map[string]interface{}

func MergeConfig

func MergeConfig(configs ...ConfigMap) ConfigMap

MergeConfig merges multiple ConfigMap into one

func ToConfigMap added in v0.5.0

func ToConfigMap(v any) (ConfigMap, error)

func (ConfigMap) ToConfigStruct added in v0.4.0

func (c ConfigMap) ToConfigStruct(v any) error

type EndpointHandler

type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error

type HttpHandler added in v0.5.0

type HttpHandler func(w http.ResponseWriter, r *http.Request, payload []byte) Record

type HttpTubeFactory

type HttpTubeFactory struct {
	TubeFactory
	// contains filtered or unexported fields
}

func NewHttpTubeFactory

func NewHttpTubeFactory(ctx context.Context) *HttpTubeFactory

func NewHttpTubeFactoryWithIntercept added in v0.5.0

func NewHttpTubeFactoryWithIntercept(ctx context.Context, handler HttpHandler) *HttpTubeFactory

func (*HttpTubeFactory) GetHandleFunc

func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error),
	logger *common.Logger) func(http.ResponseWriter, *http.Request)

func (*HttpTubeFactory) Handle

func (f *HttpTubeFactory) Handle(ctx context.Context, endpoint string, record Record) error

func (*HttpTubeFactory) NewSinkTube

func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Record, error)

func (*HttpTubeFactory) NewSourceTube

func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)

type MemoryQueueFactory

type MemoryQueueFactory struct {
	// contains filtered or unexported fields
}

func (*MemoryQueueFactory) NewSinkTube

func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*MemoryQueueFactory) NewSourceTube

func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarEventQueueFactory

type PulsarEventQueueFactory struct {
	// contains filtered or unexported fields
}

func (*PulsarEventQueueFactory) NewSinkTube

func (f *PulsarEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*PulsarEventQueueFactory) NewSourceTube

func (f *PulsarEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarTubeFactoryConfig

type PulsarTubeFactoryConfig struct {
	PulsarURL string
}

func NewPulsarTubeFactoryConfig

func NewPulsarTubeFactoryConfig(configMap ConfigMap) (*PulsarTubeFactoryConfig, error)

func (*PulsarTubeFactoryConfig) ToConfigMap

func (c *PulsarTubeFactoryConfig) ToConfigMap() ConfigMap

type Record

type Record interface {
	GetPayload() []byte
	GetSchema() string
	Commit()
}

func DefaultHttpHandler added in v0.5.0

func DefaultHttpHandler(_ http.ResponseWriter, _ *http.Request, payload []byte) Record

type RecordImpl

type RecordImpl struct {
	// contains filtered or unexported fields
}

func NewRecordImpl

func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl

func NewSchemaRecordImpl added in v0.5.0

func NewSchemaRecordImpl(payload []byte, schema string, ackFunc func()) *RecordImpl

func (*RecordImpl) Commit

func (e *RecordImpl) Commit()

func (*RecordImpl) GetPayload

func (e *RecordImpl) GetPayload() []byte

func (*RecordImpl) GetSchema added in v0.5.0

func (e *RecordImpl) GetSchema() string

type SinkQueueConfig

type SinkQueueConfig struct {
	Topic string `json:"output" validate:"required"`
}

func (*SinkQueueConfig) ToConfigMap

func (c *SinkQueueConfig) ToConfigMap() ConfigMap

type SinkTubeFactory

type SinkTubeFactory interface {
	// NewSinkTube returns a new channel that can be used to sink events
	// The event.Commit() would be invoked after the event is sunk successfully
	// The caller should close the channel when it is done
	NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error)
}

type SourceQueueConfig

type SourceQueueConfig struct {
	Topics  []string `json:"inputs" validate:"required"`
	SubName string   `json:"subscription-name" validate:"required"`
}

func (*SourceQueueConfig) ToConfigMap

func (c *SourceQueueConfig) ToConfigMap() ConfigMap

type SourceTubeFactory

type SourceTubeFactory interface {
	// NewSourceTube returns a new channel that can be used to receive events
	// The channel would be closed when the context is done
	NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)
}

type TubeFactory

type TubeFactory interface {
	SourceTubeFactory
	SinkTubeFactory
}

func NewMemoryQueueFactory

func NewMemoryQueueFactory(ctx context.Context) TubeFactory

func NewPulsarEventQueueFactory

func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL