kiara

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2021 License: MIT Imports: 6 Imported by: 0

README

Kiara

ci status Go Reference

phoenix

Kiara is a Go equivalent of Phoenix PubSub that makes it easy for Go applications to communicate with each other.

Examples

demo chat application

Basic Usage (with Redis Backend)

package main

import (
	"context"
	"fmt"

	"github.com/genkami/kiara"
	adapter "github.com/genkami/kiara/adapter/redis"
	"github.com/go-redis/redis/v8"
)

type Message struct {
	From string
	Body string
}

func main() {
	var err error
	redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	pubsub := kiara.NewPubSub(adapter.NewAdapter(redisClient))
	defer pubsub.Close()

	channel := make(chan Message, 5)
	sub, err := pubsub.Subscribe("room:123", channel)
	if err != nil {
		panic(err)
	}
	defer sub.Unsubscribe()

	ctx := context.Background()
	msg := &Message{From: "birb", Body: "cock-a-doodle-doo"}
	err = pubsub.Publish(ctx, "room:123", msg)
	if err != nil {
		panic(err)
	}

	sent := <-channel
	fmt.Printf("%s: %s\n", sent.From, sent.Body)
}

Run Test

To run an entire test, you need to run Redis and NATS, and to tell their addresses to test cases by setting environment variables.

We have docker-compose.yml to set up these dependencies easily. To run tests with docker-compose, type these following commands:

$ docker-compose up -d
$ export KIARA_TEST_REDIS_ADDR=localhost:6379
$ export KIARA_TEST_NATS_URL=nats://localhost:4222
$ go test ./...

Codec

By default, messages are marshaled into gob format. You can specify which codec Kiara uses to marshal and unmarshal messages by passing WithCodec() to NewPubSub().

import "github.com/genkami/kiara/codec/msgpack"

pubsub := kiara.NewPubSub(
    adapter.NewAdapter(redisClient),
    kiara.WithCodec(msgpack.Codec),
)

Currently these codecs are officially available:

Custom Codec

You can implement your own codec by simply implementing Marshal and Unmarshal. For example, if you want to encode messages into WATSON, you have to implement WATSON codec like this:

import 	"github.com/genkami/watson"

type WatsonCodec struct{}

func (_ *WatsonCodec) Marshal(v interface{}) ([]byte, error) {
	return watson.Marshal(v)
}

func (_ *WatsonCodec) Unmarshal(src []byte, v interface{}) error {
	return watson.Unmarshal(src, v)
}

Backend-Agnostic

Kiara does not depend on specific message broker implementation. Currently these message brokers are officially supported:

You can change backend message brokers with little effort. Here are examples of connecting to Redis and NATS as a Kiara's backend.

Example(Redis):

import (
    "github.com/go-redis/redis/v8"
    adapter "github.com/genkami/kiara/adapter/redis"
)
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
pubsub := kiara.NewPubSub(adapter.NewAdapter(redisClient))

Example(NATS):

import (
    "github.com/nats-io/nats.go"
    adapter "github.com/genkami/kiara/adapter/nats"
)
conn, err := nats.Connect("nats://localhost:4222")
// error handling omitted
pubsub := kiara.NewPubSub(adapter.NewAdapter(conn))

License

Distributed under the MIT License. See LICENSE for more information.

Acknowledgements

This library is highly inspired by phoenixframework/phoenix_pubsub, nats-io/nats.go, and the majestic phoenix Takanashi Kiara.

Documentation

Overview

Package kiara provides a thin pubsub wrapper that allows Go applications to communicate easily.

Index

Constants

This section is empty.

Variables

View Source
var (
	// This error is reported through PubSub.Errors() when a channel through which messages are sent is full.
	ErrSlowConsumer = errors.New("consumer is too slow; message discarded")

	// This error is returned when a given context is cancelled.
	ErrCancelled = errors.New("cancelled")

	// This error is returned when the second argument of PubSub.Subscribe() is not a channel or the direction of the channel is not <-.
	ErrArgumentMustBeChannel = errors.New("argument must be a channel")
)

Functions

This section is empty.

Types

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option configures PubSub.

func DeliveredChannelSize added in v0.2.0

func DeliveredChannelSize(size int) Option

DeliveredChannelSize sets the size of a channel that contains messages that are sent by the backend and about to be delivered.

func ErrorChannelSize

func ErrorChannelSize(size int) Option

ErrorChannelSize sets a size of a channel through which async errors are reported.

func PublishChannelSize added in v0.2.0

func PublishChannelSize(size int) Option

PublishChannelSize sets the size of a channel that contains messages that will be sent later.

func WithCodec

func WithCodec(codec types.Codec) Option

WithCodec specifies a codec that PubSub uses to marshal and unmarshal messages.

By default, messages are marshaled into gob format.

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub provides a way to send and receive arbitrary data.

func NewPubSub

func NewPubSub(adapter types.Adapter, options ...Option) *PubSub

NewPubSub creates a new PubSub and starts its underlying adapter.

func (*PubSub) Close

func (p *PubSub) Close()

Close stops the PubSub and releases its resources. It also stop its underlying adapter so we don't need stopping adapters manually.

func (*PubSub) Errors

func (p *PubSub) Errors() <-chan error

Errors returns a channel through which asynchronous errors are reported. When the channel is full, subsequent errors are discarded.

func (*PubSub) Publish

func (p *PubSub) Publish(ctx context.Context, topic string, data interface{}) error

Publish publishes `data` to the underlying message broker. This means `data` is sent to every channels that is `Subscribe`ing the same topic as the given one. It returns an error when it cannot prepare publishing due to marshaling error or being cancelled by `ctx`. Any other errors are reported asynchronously via PubSub.Errors().

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(topic string, channel interface{}) (*Subscription, error)

Subscribe binds a channel to the given topic. This means any messages that are `Publish`ed toghther with the same topic are sent to the given channel.

A `channel` must be the type of `chan T` or `chan<- T` where `T` is any type that can be `Unmarshal`ed by the codec of the `PubSub`.

Note that PubSub internally passes *T to its internal codec when T is not a pointer. In most cases you don't have to care about it but it may be confusing when the ccodec assumes that the data implements certain interfaces.

It's ok to subscribe to one topic more than one times. In this case, messages are broadcasted to all channels that are subscribing to the topic.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription binds a channel to specific topic.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe removes a binding from corresponding channel to its associated topic. Once `Unsubscribe` is returned, it is guaranteed that no more messages are sent to the channel.

Directories

Path Synopsis
adapter
inmemory
Package inmemory provides a simple pubsub adapter mainly aimed at testing.
Package inmemory provides a simple pubsub adapter mainly aimed at testing.
nats
Package nats provides a NATS adapter for Kiara.
Package nats provides a NATS adapter for Kiara.
redis
Package redis provides a Redis adapter for Kiara.
Package redis provides a Redis adapter for Kiara.
codec
gob
Package gob provides a Codec for gob.
Package gob provides a Codec for gob.
json
Package msgpack provides a Codec for JSON
Package msgpack provides a Codec for JSON
msgpack
Package msgpack provides a Codec for MessagePack
Package msgpack provides a Codec for MessagePack
proto
Package msgpack provides a Codec for Protocol Buffers
Package msgpack provides a Codec for Protocol Buffers
examples
Package types provides types and interfaces that are needed to implement backend adapters.
Package types provides types and interfaces that are needed to implement backend adapters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL