fxgcppubsub

package module
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2024 License: MIT Imports: 16 Imported by: 0

README

Yokai GCP Pub/Sub Module

ci go report codecov Deps PkgGoDev

Yokai module for GCP Pub/Sub.

Overview

This module provides to your Yokai application the possibility to publish and/or subscribe on a GCP Pub/Sub instance.

It also provides the support of Avro and Protobuf schemas.

Installation

First install the module:

go get github.com/ankorstore/yokai-contrib/fxgcppubsub

Then activate it in your application bootstrapper:

// internal/bootstrap.go
package internal

import (
	"github.com/ankorstore/yokai/fxcore"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
)

var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
	// load fxgcppubsub module
	fxgcppubsub.FxGcpPubSubModule,
	// ...
)

Configuration

Configuration reference:

# ./configs/config.yaml
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}  # GCP project id
    healthcheck:
      topics:                # list of topics to check for the topics probe
        - some-topic         # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
      subscriptions:         # list of subscriptions to check for the subscriptions probe
        - some-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription

Publish

This module provides a high level Publisher that you can inject anywhere to publish messages on a topic.

If the topic is associated to an avro or protobuf schema, the publisher will automatically handle the message encoding.

This module also provides a pubsub.Client, that you can use for low level publishing.

Raw message

To publish a raw message (without associated schema) on a topic:

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", "some message")
Avro message

The publisher can accept any struct, and will automatically handle the avro encoding based on the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To publish a message on a topic associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "some string",
    FloatField:   12.34,
    BooleanField: true,
})
Protobuf message

The publisher can accept any proto.Message, and will automatically handle the protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
    string string_field = 1;
    float float_field = 2;
    bool boolean_field = 3;
}

To publish a message on a topic associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "test proto",
    FloatField:   56.78,
    BooleanField: false,
})

Subscribe

This module provides a high level Subscriber that you can inject anywhere to subscribe messages from a subscription.

If the subscription's topic is associated to an avro or protobuf schema, the subscriber will offer a message from which you can handle the decoding.

This module also provides a pubsub.Client, that you can use for low level subscribing.

Raw message

To subscribe on a subscription receiving raw messages (without associated schema):

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    fmt.Printf("%s", m.Data())
    
    m.Ack()
})
Avro message

The subscriber message can be decoded into any struct with the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To subscribe from a subscription associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})
Protobuf message

The subscriber message can be decoded into any proto.Message, for protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

To subscribe from a subscription associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Health Check

This module provides ready to use health check probes, to be used by the Health Check module:

Considering the following configuration:

# ./configs/config.yaml
app:
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}   # GCP project id
    healthcheck:
      topics:                 # list of topics to check for the topics probe
        - some-topic          # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
        - other-topic         # refers to projects/${GCP_PROJECT_ID}/topics/other-topic
      subscriptions:          # list of subscriptions to check for the subscriptions probe
        - some-subscription   # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
        - other-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/other-subscription

To activate those probes, you just need to register them:

// internal/services.go
package internal

import (
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
	"go.uber.org/fx"
)

func ProvideServices() fx.Option {
	return fx.Options(
		// register the GcpPubSubTopicsProbe for some-topic and other-topic
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubTopicsProbe),
		// register the GcpPubSubSubscriptionsProbe for some-subscription and other-subscription
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubSubscriptionsProbe),
		// ...
	)
}

Notes:

  • if your application is interested only in publishing, activate the GcpPubSubTopicsProbe only
  • if it is interested only in subscribing, activate the GcpPubSubSubscriptionsProbe only

Testing

In test mode:

are all configured to work against a pstest.Server, avoiding the need to spin up any Pub/Sub real (or emulator) instance, for better tests portability.

This means that you can create topics, subscriptions and schemas locally only for your tests.

For example:

// internal/example/example_test.go
package example_test

import (
	"context"
	"testing"
	"time"
	
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/message"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/ack"
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/stretchr/testify/assert"
	"github.com/foo/bar/internal"
	"go.uber.org/fx"
	"go.uber.org/fx/fxtest"
)

func TestPubSub(t *testing.T) {
	t.Setenv("APP_ENV", "test")
	t.Setenv("APP_CONFIG_PATH", "testdata/config")
	t.Setenv("GCP_PROJECT_ID", "test-project")

	var publisher fxgcppubsub.Publisher
	var subscriber fxgcppubsub.Subscriber
	var supervisor ack.AckSupervisor

	ctx := context.Background()

	// test app
	internal.RunTest(
		t,
		// prepare test topic and subscription
		fxgcppubsub.PrepareTopicAndSubscription(fxgcppubsub.PrepareTopicAndSubscriptionParams{
			TopicID:        "test-topic",
			SubscriptionID: "test-subscription",
		}),
		fx.Populate(&publisher, &subscriber, &supervisor),
	)
	
	// publish to test-topic
	_, err := publisher.Publish(ctx, "test-topic", "test data")
	assert.NoError(t, err)

	// subscribe from test-subscription
	waiter := supervisor.StartAckWaiter("test-subscription")
	
	go subscriber.Subscribe(ctx, "test-subscription", func(ctx context.Context, m *message.Message) {
		assert.Equal(t, []byte("test data"), m.Data())

		m.Ack()
	})

	// wait for subscriber message ack
	_, err = waiter.WaitMaxDuration(ctx, time.Second)
	assert.NoError(t, err)
}

Notes:

  • you can prepare the test topics, subscriptions and schemas using the provided helpers
  • you can find tests involving avro and protobuf schemas in the module test examples

Documentation

Index

Constants

View Source
const ModuleName = "gcppubsub"

ModuleName is the module name.

Variables

FxGcpPubSubModule is the Fx GCP pub/sub module.

Functions

func AsPubSubTestServerReactor added in v1.2.0

func AsPubSubTestServerReactor(constructor any) fx.Option

AsPubSubTestServerReactor registers a Reactor into Fx.

func AsPubSubTestServerReactors added in v1.2.0

func AsPubSubTestServerReactors(constructors ...any) fx.Option

AsPubSubTestServerReactors registers a list of Reactor into Fx.

func NewFxGcpPubSubClient

func NewFxGcpPubSubClient(p FxGcpPubSubClientParam) (*pubsub.Client, error)

NewFxGcpPubSubClient returns a pubsub.Client.

func NewFxGcpPubSubSchemaClient added in v1.2.0

func NewFxGcpPubSubSchemaClient(p FxGcpPubSubSchemaClientParam) (*pubsub.SchemaClient, error)

NewFxGcpPubSubSchemaClient returns a pubsub.SchemaClient.

func NewFxGcpPubSubTestServer added in v1.2.0

func NewFxGcpPubSubTestServer(p FxGcpPubSubTestServerParam) *pstest.Server

NewFxGcpPubSubTestServer returns a pstest.Server.

func PrepareSchema added in v1.2.0

func PrepareSchema(params PrepareSchemaParams) fx.Option

PrepareSchema prepares a pub/sub schema.

func PrepareTopic added in v1.2.0

func PrepareTopic(params PrepareTopicParams) fx.Option

PrepareTopic prepares a pub/sub topic.

func PrepareTopicAndSubscription added in v1.2.0

func PrepareTopicAndSubscription(params PrepareTopicAndSubscriptionParams) fx.Option

PrepareTopicAndSubscription prepares a pub/sub topic and an associated subscription.

func PrepareTopicAndSubscriptionWithSchema added in v1.2.0

func PrepareTopicAndSubscriptionWithSchema(params PrepareTopicAndSubscriptionWithSchemaParams) fx.Option

PrepareTopicAndSubscriptionWithSchema prepares a pub/sub topic and an associated subscription with a schema.

func PrepareTopicWithSchema added in v1.2.0

func PrepareTopicWithSchema(params PrepareTopicWithSchemaParams) fx.Option

PrepareTopicWithSchema prepares a pub/sub topic with a schema.

Types

type DefaultPublisher added in v1.2.0

type DefaultPublisher struct {
	// contains filtered or unexported fields
}

DefaultPublisher is the default Publisher implementation.

func NewDefaultPublisher added in v1.2.0

func NewDefaultPublisher(factory topic.TopicFactory, registry topic.TopicRegistry) *DefaultPublisher

NewDefaultPublisher returns a new DefaultPublisher instance.

func NewFxGcpPubSubPublisher added in v1.2.0

func NewFxGcpPubSubPublisher(p FxGcpPubSubPublisherParam) *DefaultPublisher

NewFxGcpPubSubPublisher returns a Publisher.

func (*DefaultPublisher) Publish added in v1.2.0

func (p *DefaultPublisher) Publish(ctx context.Context, topicID string, data any, options ...topic.PublishOption) (*pubsub.PublishResult, error)

Publish publishes data, with options, on a given topicID.

func (*DefaultPublisher) Stop added in v1.2.0

func (p *DefaultPublisher) Stop()

Stop stops gracefully all internal publishers.

type DefaultSubscriber added in v1.2.0

type DefaultSubscriber struct {
	// contains filtered or unexported fields
}

DefaultSubscriber is the default Subscriber implementation.

func NewDefaultSubscriber added in v1.2.0

NewDefaultSubscriber returns a new DefaultSubscriber instance.

func NewFxGcpPubSubSubscriber added in v1.2.0

func NewFxGcpPubSubSubscriber(p FxGcpPubSubSubscriberParam) *DefaultSubscriber

NewFxGcpPubSubSubscriber returns a Subscriber.

func (*DefaultSubscriber) Subscribe added in v1.2.0

func (s *DefaultSubscriber) Subscribe(ctx context.Context, subscriptionID string, f subscription.SubscribeFunc, options ...subscription.SubscribeOption) error

Subscribe handle received data using a subscription.SubscribeFunc, with options, from a given subscriptionID.

type FxGcpPubSubClientParam

type FxGcpPubSubClientParam struct {
	fx.In
	LifeCycle fx.Lifecycle
	Context   context.Context
	Config    *config.Config
	Server    *pstest.Server
}

FxGcpPubSubClientParam allows injection of the required dependencies in NewFxGcpPubSubClient.

type FxGcpPubSubPublisherParam added in v1.2.0

type FxGcpPubSubPublisherParam struct {
	fx.In
	LifeCycle fx.Lifecycle
	Config    *config.Config
	Factory   topic.TopicFactory
	Registry  topic.TopicRegistry
}

FxGcpPubSubPublisherParam allows injection of the required dependencies in NewFxGcpPubSubPublisher.

type FxGcpPubSubSchemaClientParam added in v1.2.0

type FxGcpPubSubSchemaClientParam struct {
	fx.In
	LifeCycle fx.Lifecycle
	Context   context.Context
	Config    *config.Config
	Server    *pstest.Server
}

FxGcpPubSubSchemaClientParam allows injection of the required dependencies in NewFxGcpPubSubSchemaClient.

type FxGcpPubSubSubscriberParam added in v1.2.0

type FxGcpPubSubSubscriberParam struct {
	fx.In
	Factory  subscription.SubscriptionFactory
	Registry subscription.SubscriptionRegistry
}

FxGcpPubSubSubscriberParam allows injection of the required dependencies in NewFxGcpPubSubPublisher.

type FxGcpPubSubTestServerParam added in v1.2.0

type FxGcpPubSubTestServerParam struct {
	fx.In
	Reactors []Reactor `group:"gcppubsub-reactors"`
}

type PrepareSchemaParams added in v1.2.0

type PrepareSchemaParams struct {
	SchemaID     string
	SchemaConfig pubsub.SchemaConfig
}

PrepareSchemaParams represents the parameters used in PrepareSchema.

type PrepareTopicAndSubscriptionParams added in v1.2.0

type PrepareTopicAndSubscriptionParams struct {
	TopicID            string
	TopicConfig        pubsub.TopicConfig
	SubscriptionID     string
	SubscriptionConfig pubsub.SubscriptionConfig
}

PrepareTopicAndSubscriptionParams represents the parameters used in PrepareTopicAndSubscription.

type PrepareTopicAndSubscriptionWithSchemaParams added in v1.2.0

type PrepareTopicAndSubscriptionWithSchemaParams struct {
	TopicID            string
	TopicConfig        pubsub.TopicConfig
	SubscriptionID     string
	SubscriptionConfig pubsub.SubscriptionConfig
	SchemaID           string
	SchemaConfig       pubsub.SchemaConfig
	SchemaEncoding     pubsub.SchemaEncoding
}

PrepareTopicAndSubscriptionWithSchemaParams represents the parameters used in PrepareTopicAndSubscriptionWithSchema.

type PrepareTopicParams added in v1.2.0

type PrepareTopicParams struct {
	TopicID     string
	TopicConfig pubsub.TopicConfig
}

PrepareTopicParams represents the parameters used in PrepareTopic.

type PrepareTopicWithSchemaParams added in v1.2.0

type PrepareTopicWithSchemaParams struct {
	TopicID        string
	TopicConfig    pubsub.TopicConfig
	SchemaID       string
	SchemaConfig   pubsub.SchemaConfig
	SchemaEncoding pubsub.SchemaEncoding
}

PrepareTopicWithSchemaParams represents the parameters used in PrepareTopicWithSchema.

type Publisher added in v1.2.0

type Publisher interface {
	Publish(ctx context.Context, topicID string, data any, options ...topic.PublishOption) (*pubsub.PublishResult, error)
	Stop()
}

Publisher is the interface for high level publishers.

type Reactor added in v1.2.0

type Reactor interface {
	FuncNames() []string
	pstest.Reactor
}

Reactor is the interface for pub/sub test server reactors.

type Subscriber added in v1.2.0

type Subscriber interface {
	Subscribe(ctx context.Context, subscriptionID string, f subscription.SubscribeFunc, options ...subscription.SubscribeOption) error
}

Subscriber is the interface for high level subscribers.

Directories

Path Synopsis
ack
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL