Documentation ¶
Overview ¶
Package google implement elMagician pubsub interfaces for GCP Pubsub provider.
Index ¶
- Variables
- func NewPubsub(ctx context.Context, config Config, opts ...option.ClientOption) (pubsub.Pubsub, error)
- type Config
- type Listener
- type Publisher
- type Pubsub
- type Receiver
- func (r Receiver) OnError(callback func(ctx context.Context))
- func (r Receiver) OnMessage(envelop pubsub.Envelop, callback pubsub.MessageCallback)
- func (r Receiver) OnUnmatched(callback pubsub.MessageCallback)
- func (r Receiver) Receive(ctx context.Context) error
- func (r Receiver) Start(ctx context.Context)
- func (r Receiver) Stop()
- type Registry
- func (l *Registry) AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error
- func (l *Registry) AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error
- func (l *Registry) Clear()
- func (l *Registry) MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) pubsub.Registry
- func (l *Registry) MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) pubsub.Registry
- func (l *Registry) StopTopic(key string)
- func (l *Registry) StopTopics(topics ...string)
- type SendResults
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrPublisherDestroyed = errors.New("publisher instance was destroyed")
Functions ¶
func NewPubsub ¶
func NewPubsub(ctx context.Context, config Config, opts ...option.ClientOption) (pubsub.Pubsub, error)
NewPubsub initializes a GCP implementation for pubsub.
Example (WithOption) ¶
conf := google.Config{ ProjectID: "aSuperCoolProject", CredentialsPath: "path/to/credentials.json", } _, err := google.NewPubsub( context.Background(), conf, option.WithCredentialsFile("some/other/credentials.json"), option.WithEndpoint("dont.evil/know/where"), ) if err != nil { panic(err) } // in this example, client uses credentials path from Config. Passing an option will not override // credentials values.
Output:
Example (WithoutOption) ¶
conf := google.Config{ ProjectID: "aSuperCoolProject", CredentialsPath: "path/to/credentials.json", Timeout: 10 * time.Second, Concurrency: 10, } _, err := google.NewPubsub(context.Background(), conf) if err != nil { panic(err) }
Output:
Types ¶
type Config ¶
type Config struct { // ProjectID in GCP ProjectID string `yaml:"projectId" json:"projectId"` // CredentialsPath to your JSON credential provided by GCP. CredentialsPath string `yaml:"credentialsPath" json:"credentialsPath"` // Concurrency is the default concurrency for listening process. Concurrency int `yaml:"concurrency" json:"concurrency"` // Timeout is the default timeout for GCP calls Timeout time.Duration `yaml:"timeout" json:"timeout"` }
Config for pubsub instance
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher implements pubsub.Publisher interface for GCP.
func (*Publisher) Send ¶
Send message to topics registered. Any new topic will be saved to the registry if Destroy was not called. If Destroy was called, all topics will have their connection stopped and known topics will be kept in registry.
Send will return a single string ID if sending to a single topic, else a list of string.
Example ¶
var conn *grpc.ClientConn var yourEnvelop *Envelop // we are using a mock implementation here // This setup a test instance for Google Pubsub. // You don't need it outside of unit testing. { srv, cli := initTestClient("aSuperCoolProject") defer func() { if err := srv.Close(); err != nil { panic(err) } }() _, err := cli.CreateTopic(context.Background(), "mine") if err != nil { panic(err) } _, err = cli.CreateTopic(context.Background(), "tropical") if err != nil { panic(err) } _, err = cli.CreateTopic(context.Background(), "someTopic") if err != nil { panic(err) } conn, err = grpc.Dial(srv.Addr, grpc.WithInsecure()) if err != nil { panic(err) } yourEnvelop = &Envelop{} yourEnvelop.message = &message{ Message: &googlePubSub.Message{ ID: "test", Data: []byte("someData"), Attributes: map[string]string{"version": "v1", "type": "test", "new": "false"}, }, } } ctx := context.Background() conf := google.Config{ ProjectID: "aSuperCoolProject", CredentialsPath: "path/to/credentials.json", Timeout: 10 * time.Second, Concurrency: 0, } // Initialize pubsub instance. ps, err := google.NewPubsub(ctx, conf, option.WithGRPCConn(conn)) if err != nil { // TODO manage error panic(err) return } // Register topics that will be used from the instance. ps.Registry(). MustAddTopic("topicAnna", nil). // Will not fail if topic does not exists using MustAddTopic. Use AddTopic to check existence on add. MustAddTopic("tropical", nil) if err := ps.Registry().AddTopic("mine", nil); err != nil { // TODO manager error panic(err) return } // Send message to registered topics results, err := ps.Publish().To("mine").Send(ctx, yourEnvelop) if err != nil { // TODO manage error fmt.Println(err.Error()) return } idsStr := "" for topic, res := range results.Results(context.Background()) { idsStr += topic + ": " + res.ID } idsStr = strings.TrimSuffix(idsStr, ",") fmt.Println("Msg:", idsStr) // Send message to registered topics results, err = ps.Publish().To("tropical", "mine").Send(ctx, yourEnvelop) if err != nil { // TODO manage error fmt.Println(err.Error()) return } idsStr = "" for topic, res := range results.Results(context.Background()) { if res.Error != nil { fmt.Println("got unexpected error", res.Error) } if topic == "tropical" { idsStr = topic + ", " + idsStr } else { idsStr += topic } } idsStr = strings.TrimSuffix(idsStr, ",") fmt.Println(idsStr) // Stop topics connection to server without discarding them.s ps.Registry().StopTopics("mine", "someTopic") // Reset registry after stopping all topics. ps.Registry().Clear()
Output: Msg: mine: m0 tropical, mine
func (*Publisher) To ¶
To adds topic to sent message to.
If Destroy is called, unknown topic will not be saved in registry and https://pkg.go.dev/cloud.google.com/go/pubsub#Topic.Stop will be called.
This method apply last registered send configuration for topic. If no configuration where registered, it use default configuration
func (*Publisher) WithOption ¶
WithOption provide send settings to apply to call.
It will be applied to all topics added to sent process before the WithOption call.
type Pubsub ¶
type Pubsub struct { // Client is the gcp instance used to send requests. // It is passed as a private parameter to all structures // derived from Pubsub Client *googlePubSub.Client // Config for running instance. // It is passed as a private parameter to all structures // derived from Pubsub Config Config // contains filtered or unexported fields }
Pubsub implements pubsub.Pubsub interface for GCP.
type Receiver ¶
type Receiver struct{}
func (Receiver) OnMessage ¶
func (r Receiver) OnMessage(envelop pubsub.Envelop, callback pubsub.MessageCallback)
func (Receiver) OnUnmatched ¶
func (r Receiver) OnUnmatched(callback pubsub.MessageCallback)
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func (*Registry) AddSubscription ¶
func (l *Registry) AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error
nolint: dupl
func (*Registry) AddTopic ¶
func (l *Registry) AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error
nolint: dupl
func (*Registry) MustAddSubscription ¶
func (l *Registry) MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) pubsub.Registry
nolint: dupl
func (*Registry) MustAddTopic ¶
func (l *Registry) MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) pubsub.Registry
nolint: dupl
func (*Registry) StopTopics ¶
type SendResults ¶
type SendResults struct {
// contains filtered or unexported fields
}