Documentation ¶
Index ¶
- Variables
- func AttachPositionIndex(p sdk.Position, index uint32) sdk.Position
- func NewDestination() sdk.Destination
- func NewDestinationWithDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) sdk.Destination
- func Specification() sdk.Specification
- type AckManager
- type Config
- type DestConfig
- type Destination
- func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error
- func (d *Destination) Open(ctx context.Context) error
- func (d *Destination) Parameters() map[string]sdk.Parameter
- func (d *Destination) Teardown(ctx context.Context) error
- func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error)
- type MTLSConfig
- type Position
- type StreamManager
Constants ¶
This section is empty.
Variables ¶
var Connector = sdk.Connector{ NewSpecification: Specification, NewSource: nil, NewDestination: NewDestination, }
Connector combines all constructors for each plugin in one struct.
Functions ¶
func NewDestination ¶
func NewDestination() sdk.Destination
func NewDestinationWithDialer ¶
func NewDestinationWithDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) sdk.Destination
NewDestinationWithDialer for testing purposes.
func Specification ¶
func Specification() sdk.Specification
Specification returns the connector's specification.
Types ¶
type AckManager ¶
type AckManager struct {
// contains filtered or unexported fields
}
func NewAckManager ¶
func NewAckManager(sm *StreamManager) *AckManager
func (*AckManager) Expect ¶
func (am *AckManager) Expect(expected []sdk.Position) error
Expect lets the ack manager know what acks to expect in the next batch, If there are still open acks to be received from the previous batch, Expect returns an error. has to be called after Run and before Wait.
func (*AckManager) Got ¶
func (am *AckManager) Got() int
Got returns the deduplicated acknowledgments we have received so far.
func (*AckManager) Run ¶
func (am *AckManager) Run(ctx context.Context) error
Run is a blocking method that listen to acks and validates them, It returns an error if the context is cancelled or if an unrecoverable error happens.
func (*AckManager) Wait ¶
func (am *AckManager) Wait(ctx context.Context) (int, error)
Wait blocks until all acks are received or the connection drops while waiting for acks, or the context was canceled. if the connection drops it returns io.EOF, if the context gets closed it returns the context error, otherwise it returns nil.
type Config ¶
type Config struct { // url to gRPC server URL string `json:"url" validate:"required"` // the bandwidth limit in bytes/second, use "0" to disable rate limiting. RateLimit int `json:"rateLimit" default:"0" validate:"gt=-1"` // delay between each gRPC request retry. ReconnectDelay time.Duration `json:"reconnectDelay" default:"5s"` // max downtime accepted for the server to be off. MaxDowntime time.Duration `json:"maxDowntime" default:"10m"` // mTLS configurations. MTLS MTLSConfig `json:"mtls"` }
Config has the generic parameters needed for a gRPC client
type DestConfig ¶
type DestConfig struct {
Config
}
func (DestConfig) Parameters ¶
func (DestConfig) Parameters() map[string]sdk.Parameter
type Destination ¶
type Destination struct { sdk.UnimplementedDestination // contains filtered or unexported fields }
func (*Destination) Parameters ¶
func (d *Destination) Parameters() map[string]sdk.Parameter
type MTLSConfig ¶
type MTLSConfig struct { // the client certificate path. ClientCertPath string `json:"client.certPath"` // the client private key path. ClientKeyPath string `json:"client.keyPath"` // the root CA certificate path. CACertPath string `json:"ca.certPath"` // option to disable mTLS secure connection, set it to `true` for an insecure connection. Disabled bool `json:"disabled" default:"false"` }
func (*MTLSConfig) ParseMTLSFiles ¶
func (mc *MTLSConfig) ParseMTLSFiles() (tls.Certificate, *x509.CertPool, error)
ParseMTLSFiles parses and validates mTLS params values, returns the parsed client certificate, and CA certificate pool, and an error if the parsing fails
type Position ¶
func ToRecordPosition ¶
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
func NewStreamManager ¶
func NewStreamManager(ctx context.Context, conn *grpc.ClientConn, reconnectDelay, maxDowntime time.Duration) (*StreamManager, error)
func (*StreamManager) Get ¶
func (sm *StreamManager) Get(ctx context.Context) (pb.SourceService_StreamClient, error)
Get blocks until a stream is available. If the context gets closed it returns the context error.
func (*StreamManager) Run ¶
func (sm *StreamManager) Run(ctx context.Context) (err error)
Run a blocking method that monitors the status of the stream connection, If the stream gets closed it waits for the connection to come up again and reestablishes the stream again.
func (*StreamManager) StreamDone ¶
func (sm *StreamManager) StreamDone(ctx context.Context) (chan struct{}, error)
StreamDone returns a channel that will be closed when the connection to the server is lost.