Documentation ¶
Index ¶
- Variables
- type Authenticator
- type Client
- func (c *Client) Close(ctx context.Context) error
- func (c *Client) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error)
- func (c *Client) GetTopic(topic string) (*eventbusv1.TopicInfo, error)
- func (c *Client) Initialize(ctx context.Context, topics []string) error
- func (c *Client) Next(ctx context.Context) (opencdc.Record, error)
- func (c *Client) Publish(ctx context.Context, publishEvent *PublishEvent) error
- func (c *Client) Recv(ctx context.Context, topic string, replayID []byte) ([]ConnectResponseEvent, error)
- func (c *Client) StartCDC(ctx context.Context, replay string, currentPos position.Topics, ...) error
- func (c *Client) Stop(ctx context.Context)
- func (c *Client) Subscribe(ctx context.Context, replayPreset eventbusv1.ReplayPreset, replayID []byte, ...) (eventbusv1.PubSub_SubscribeClient, error)
- func (c *Client) Wait(ctx context.Context) error
- func (c *Client) Write(ctx context.Context, r opencdc.Record) error
- type ConnectResponseEvent
- type Credentials
- type LoginResponse
- type PublishEvent
- type Topic
- type UserInfoResponse
Constants ¶
This section is empty.
Variables ¶
var ( GRPCCallTimeout = 5 * time.Second RetryDelay = 10 * time.Second )
var ErrEndOfRecords = errors.New("end of records from stream")
var OAuthDialTimeout = 5 * time.Second
Functions ¶
This section is empty.
Types ¶
type Authenticator ¶
type Authenticator interface { Login() (*LoginResponse, error) UserInfo(string) (*UserInfoResponse, error) }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewGRPCClient ¶
Creates a new connection to the gRPC server and returns the wrapper struct.
func (*Client) GetSchema ¶
func (c *Client) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error)
Wrapper function around the GetSchema RPC. This will add the oauth credentials and make a call to fetch data about a specific schema.
func (*Client) Initialize ¶
Initializes the pubsub client by authenticating for source and destination.
func (*Client) Publish ¶
func (c *Client) Publish(ctx context.Context, publishEvent *PublishEvent) error
func (*Client) StartCDC ¶
func (c *Client) StartCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error
Start CDC Routine for Source.
func (*Client) Subscribe ¶
func (c *Client) Subscribe( ctx context.Context, replayPreset eventbusv1.ReplayPreset, replayID []byte, topic string, ) (eventbusv1.PubSub_SubscribeClient, error)
Wrapper function around the Subscribe RPC. This will add the oauth credentials and create a separate streaming client that will be used to, fetch data from the topic. This method will continuously consume messages unless an error occurs; if an error does occur then this method will, return the last successfully consumed replayID as well as the error message. If no messages were successfully consumed then this method will return, the same replayID that it originally received as a parameter.
type ConnectResponseEvent ¶
type Credentials ¶
func NewCredentials ¶
func NewCredentials(clientID, secret, endpoint string) (Credentials, error)
type LoginResponse ¶
type LoginResponse struct { AccessToken string `json:"access_token"` InstanceURL string `json:"instance_url"` ID string `json:"id"` TokenType string `json:"token_type"` IssuedAt string `json:"issued_at"` Signature string `json:"signature"` Error string `json:"error"` ErrorDescription string `json:"error_description"` }
func (LoginResponse) Err ¶
func (l LoginResponse) Err() error
type PublishEvent ¶
type PublishEvent struct {
// contains filtered or unexported fields
}
type UserInfoResponse ¶
type UserInfoResponse struct { UserID string `json:"user_id"` OrganizationID string `json:"organization_id"` Error string `json:"error"` ErrorDescription string `json:"error_description"` }
func (UserInfoResponse) Err ¶
func (u UserInfoResponse) Err() error