messagebroker

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2023 License: Apache-2.0 Imports: 1 Imported by: 0

README

messagebroker package

create new client example

  1. google pubsub
package main

import (
	"clodeo.tech/platform/go-universe/pkg/messagebroker"
	"clodeo.tech/platform/go-universe/pkg/messagebroker/googlepubsub"
	"clodeo.tech/platform/go-universe/pkg/redis"
	"fmt"
)

type msg = messagebroker.PublishMessage
type msgOpts = messagebroker.PublishOptions
type gb = googlepubsub.GooglePubSub
type ga = googlepubsub.GooglePubSubAdapter
type gc = googlepubsub.Config
type gaOpts = googlepubsub.AdapterOptions

func main() {
	var sl []googlepubsub.Strategy
	sl = append(sl, googlepubsub.Strategy{
		TopicName:         "test",
		SubscriptionNames: []string{"test-sub"},
	})

	cfg := gc{
		AuthJsonPath: "YOUR_CREDENTIAL_PATH",
		ProjectId:    "YOUR_PROJECT_ID",
		Strategy:     sl,
	}

	broker := &gb{
		Cfg: cfg,
	}

	rc := redis.NewRedisClient("localhost", 6379, "", 0)

	msgBroker := &ga{
		Broker: broker,
		Options: &gaOpts{
			RedisClient: rc,
		},
	}

	client := messagebroker.NewClient(msgBroker)
}
  1. rabbitmq
package main

import (
	"clodeo.tech/platform/go-universe/pkg/messagebroker"
	"clodeo.tech/platform/go-universe/pkg/messagebroker/googlepubsub"
	"clodeo.tech/platform/go-universe/pkg/redis"
	"fmt"
)

type msg = messagebroker.PublishMessage
type msgOpts = messagebroker.PublishOptions
type rb = rabbitmq.RabbitMQ
type ra = rabbitmq.RabbitMQAdapter
type rc = rabbitmq.Config

func main() {
	cfg := rc{
		Username: "guest",
		Password: "guest",
		Server:   "localhost:5672",
	}
	broker := &rb{
		Cfg: cfg,
	}
	msgBroker := &ra{
		Broker: broker,
	}

	client := messagebroker.NewClient(msgBroker)
}
  1. kafka
package main

import (
	"clodeo.tech/platform/go-universe/pkg/messagebroker"
	"clodeo.tech/platform/go-universe/pkg/messagebroker/googlepubsub"
	"clodeo.tech/platform/go-universe/pkg/redis"
	"fmt"
)

type msg = messagebroker.PublishMessage
type msgOpts = messagebroker.PublishOptions
type kb = rabbitmq.Kafka
type ka = rabbitmq.KafkaAdapter
type kc = rabbitmq.Config

func main() {
	cfg := kc{}
	broker := &kb{
		Cfg: cfg,
	}
	msgBroker := &ka{
		Broker: broker,
	}

	client := messagebroker.NewClient(msgBroker)
}
publish message example
    ctx := context.Background()
    client := messagebroker.NewClient(msgBroker)
	client.Publish(ctx, msg{
		Name:    "test",
		Message: "halloword",
		Options: msgOpts{
			EnableOrdering: true,
			OrderingKey:    "test-1001:asdf",
		},
	})
subscribe message example
type hdl = messagebroker.SubscribeMessageHandler

type testSubHdl struct{}

func (t *testSubHdl) OnProcess(ctx context.Context,msg []byte) {
	fmt.Println(string(msg))
}

func (t *testSubHdl) OnError(ctx context.Context,err error) {
	fmt.Println(err)
	return nil
}

func main() {
	ctx := context.Background()
	client := messagebroker.NewClient(msgBroker)
	thdl := &testSubHdl{}
	client.Subscribe(ctx, "test-sub", thdl)
}

Note

rabbitmq & kafka comming soon

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct{}

func NewClient

func NewClient(msgb Messagebroker) (*Client, error)

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, m PublishMessage) error

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, name string, handler SubscribeMessageHandler)

type Messagebroker

type Messagebroker interface {
	Init() error
	Publish(ctx context.Context, m PublishMessage) error
	Subscribe(ctx context.Context, name string, handler SubscribeMessageHandler)
}

type PublishMessage

type PublishMessage struct {
	Name    string
	Message string
	Options PublishOptions
}

type PublishOptions

type PublishOptions struct {
	EnableOrdering bool
	OrderingKey    string
}

type SubscribeMessageHandler

type SubscribeMessageHandler interface {
	OnProcess(ctx context.Context, msg []byte)
	OnError(ctx context.Context, err error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL