pubsub

package
v0.5.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package pubsub implements functionality to interact with DxHub using PubSub semantics.

Examples

Following examples explain how to use the pubsub package at the high level.

Connection

config := &pubsub.Config{App: "test-app", Domain: "test-domain", APIKeyProvider: func() string { return "APIKey"}}
conn, err := pubsub.NewConnection(config)
err = conn.Connect(context.Background())
go func() {
    err = <- conn.Error  // Monitor connection for any errors
}
...
conn.Disconnect()        // Disconnect from the server

Subscribe

conn.Subscribe("stream", func(err error, id string, headers map[string]string, payload []byte){
    fmt.Printf("Received new message: %s, %v, %s", id, headers, payload)
})

Publish

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
headers := map[string]string{
    "key1": "value1",
    "key2": "value2",
}
payload := []byte("message payload")
resp, err := conn.Publish(ctx, "stream", headers, payload)
fmt.Printf("Message publish result: %v", resp)

PublishAsync

headers := map[string]string{
    "key1": "value1",
    "key2": "value2",
}
payload := []byte("message payload")
respCh := make(chan *PublishResult)
id, cancel, err := conn.PublishAsync("stream", headers, payload, respCh)
defer cancel()
resp := <- respCh
fmt.Printf("Message publish result: %v", resp)

Index

Constants

This section is empty.

Variables

View Source
var (
	WebSocketScheme = "wss"
	HttpScheme      = "https"
)

Functions

func NewHandlerMap added in v0.5.9

func NewHandlerMap(expireDuration time.Duration) *handlerMap

Types

type Config

type Config struct {
	// GroupID specifies the group ID of the PubSub client
	GroupID string

	// Domain should be set to the cloud domain of the region where Application wants to connect to.
	Domain string

	// APIKeyProvider returns the API Key for the Application. Either APIKeyProvider or the
	// AuthTokenProvider must be set in the config. APIKeyProvider takes precedence over the
	// AuthTokenProvider.
	APIKeyProvider func() ([]byte, error)

	// AuthTokenProvider returns the generated Auth Token for the Application. Either APIKeyProvider
	// or the AuthTokenProvider must be set in the config.
	AuthTokenProvider func() ([]byte, error)

	// PollInterval defines the interval between consecutive read requests to the server.
	// Default is 1 second.
	PollInterval time.Duration

	Transport *http.Transport
}

Config represents configuration required to open the pubsub connection.

type Connection

type Connection struct {
	Error chan error
	// contains filtered or unexported fields
}

Connection represents a connection to the DxHub PubSub server.

func NewConnection

func NewConnection(config Config) (*Connection, error)

NewConnection creates a new connection object based on the supplied configuration.

func (*Connection) Connect

func (c *Connection) Connect(connectCtx context.Context) error

Connect establishes a connection to the DxHub PubSub server.

func (*Connection) Disconnect

func (c *Connection) Disconnect()

Disconnect disconnects the connection to the DxHub PubSub server.

func (*Connection) IsDisconnected

func (c *Connection) IsDisconnected() bool

IsDisconnected returns true if c is disconnected from the server.

func (*Connection) Publish

func (c *Connection) Publish(ctx context.Context, stream string, headers map[string]string, payload []byte) (*PublishResult, error)

Publish publishes a message to the stream asynchronously.

func (*Connection) PublishAsync

func (c *Connection) PublishAsync(stream string, headers map[string]string, payload []byte, result chan *PublishResult) (msgID string, cancel func(), err error)

PublishAsync publishes a message to the stream asynchronously. Response can be monitored on the supplied channel. The cancel function must be invoked before closing the channel.

func (*Connection) String

func (c *Connection) String() string

func (*Connection) Subscribe

func (c *Connection) Subscribe(stream string, handler SubscriptionCallback) error

Subscribe subscribes to a DxHub Pubsub Stream

func (*Connection) Unsubscribe

func (c *Connection) Unsubscribe(stream string) error

Unsubscribe unsubscribes from a DxHub Pubsub Stream

type PublishResult

type PublishResult struct {
	ID    string // Message ID
	Error error  // Error shall be non-nil in case of an error
}

PublishResult represents the result of a publish request from the server

func (*PublishResult) String

func (p *PublishResult) String() string

type SubscriptionCallback

type SubscriptionCallback func(err error, id string, headers map[string]string, payload []byte)

SubscriptionCallback is the callback that's invoked when a message/error is received for the subscription request.

err is set to a non-nil error in case of an error id represents the message id headers are the headers associated with the message payload contains the message payload

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL