pubsub

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	GRPCCallTimeout = 5 * time.Second
	RetryDelay      = 10 * time.Second
)
View Source
var ErrEndOfRecords = errors.New("end of records from stream")
View Source
var OAuthDialTimeout = 5 * time.Second

Functions

This section is empty.

Types

type Authenticator

type Authenticator interface {
	Login() (*LoginResponse, error)
	UserInfo(string) (*UserInfoResponse, error)
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewGRPCClient

func NewGRPCClient(ctx context.Context, config config.Config, action string) (*Client, error)

Creates a new connection to the gRPC server and returns the wrapper struct.

func (*Client) Close

func (c *Client) Close(ctx context.Context) error

Closes the underlying connection to the gRPC server.

func (*Client) GetSchema

func (c *Client) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error)

Wrapper function around the GetSchema RPC. This will add the oauth credentials and make a call to fetch data about a specific schema.

func (*Client) GetTopic

func (c *Client) GetTopic(topic string) (*eventbusv1.TopicInfo, error)

func (*Client) Initialize

func (c *Client) Initialize(ctx context.Context, topics []string) error

Initializes the pubsub client by authenticating for source and destination.

func (*Client) Next

func (c *Client) Next(ctx context.Context) (opencdc.Record, error)

Next returns the next record from the buffer.

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, publishEvent *PublishEvent) error

func (*Client) Recv

func (c *Client) Recv(ctx context.Context, topic string, replayID []byte) ([]ConnectResponseEvent, error)

func (*Client) StartCDC

func (c *Client) StartCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error

Start CDC Routine for Source.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context)

Stop ends CDC processing.

func (*Client) Subscribe

func (c *Client) Subscribe(
	ctx context.Context,
	replayPreset eventbusv1.ReplayPreset,
	replayID []byte,
	topic string,
) (eventbusv1.PubSub_SubscribeClient, error)

Wrapper function around the Subscribe RPC. This will add the oauth credentials and create a separate streaming client that will be used to, fetch data from the topic. This method will continuously consume messages unless an error occurs; if an error does occur then this method will, return the last successfully consumed replayID as well as the error message. If no messages were successfully consumed then this method will return, the same replayID that it originally received as a parameter.

func (*Client) Wait

func (c *Client) Wait(ctx context.Context) error

func (*Client) Write

func (c *Client) Write(ctx context.Context, r opencdc.Record) error

TODO - refactor this to allow for multi topic support. Write attempts to publish event with retry for any pubsub connection errors.

type ConnectResponseEvent

type ConnectResponseEvent struct {
	Data            map[string]interface{}
	EventID         string
	ReplayID        []byte
	Topic           string
	ReceivedAt      time.Time
	CurrentPosition opencdc.Position
}

type Credentials

type Credentials struct {
	ClientID, ClientSecret string
	OAuthEndpoint          *url.URL
}

func NewCredentials

func NewCredentials(clientID, secret, endpoint string) (Credentials, error)

type LoginResponse

type LoginResponse struct {
	AccessToken      string `json:"access_token"`
	InstanceURL      string `json:"instance_url"`
	ID               string `json:"id"`
	TokenType        string `json:"token_type"`
	IssuedAt         string `json:"issued_at"`
	Signature        string `json:"signature"`
	Error            string `json:"error"`
	ErrorDescription string `json:"error_description"`
}

func (LoginResponse) Err

func (l LoginResponse) Err() error

type PublishEvent

type PublishEvent struct {
	// contains filtered or unexported fields
}

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

type UserInfoResponse

type UserInfoResponse struct {
	UserID           string `json:"user_id"`
	OrganizationID   string `json:"organization_id"`
	Error            string `json:"error"`
	ErrorDescription string `json:"error_description"`
}

func (UserInfoResponse) Err

func (u UserInfoResponse) Err() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL