gohub

package module
v0.0.0-...-cad6240 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2017 License: MIT Imports: 15 Imported by: 0

README

gohub

Azure Event Hub client for Go.

This repository is under active development. Not ready for PRODUCTION.

Installation

go get github.com/recobe182/gohub

Documentation

See Go Doc.

Getting started

Send
c, err := gohub.New(`a_namespace`, `a_hub`, `a_key_name`, `a_key`)
if err != nil {
	panic(err)
}
s, err := c.CreateSender()
if err != nil {
	panic(err)
}
s.SendSync("55")
Receive
c, err := gohub.New(`a_namespace`, `a_hub`, `a_key_name`, `a_key`)
if err != nil {
	panic(err)
}
ss := gohub.StorageSetting(`an_account`, `a_key`)
r, err := c.CreateReceiver(0, ss, gohub.ConsumerGroup(`$Default`), gohub.PrefetchCount(1), gohub.CheckPointAfter(5))
if err != nil {
    panic(err)
}
o := make(chan gohub.ReceiveMessage)
go r.Receive(o)
for outcome := range o {
    if outcome.Error != nil {
        close(o)
    }
    fmt.Println("Partition 0: " + outcome.Msg)
}
	

Running application

Due to the fact that this library use qpid.apache.org/electron as a AMQP1.0 library, you have to install proton-c on your development environment. Unfortunately proton-c is not available for Mac OSX, please use pre-built docker image as a workaround to build and run an application.

If you want to build an application

docker pull recobe/ubuntu:16.10-proton-go1.8
docker run --rm -it -v "$GOPATH":/go recobe/ubuntu:16.10-proton-go1.8 bash

then map your $GOPATH to /go.

If you just want to run your application inside docker container, just put a binary file inside this container.

docker pull recobe/ubuntu:16.10-proton

Patching qpid.apache.org/electron and proton

In order to use this library, you need to patch qpid.apache.org/electron and proton to make it supports marshal/unmarshl timestamp data type. Please use timestamp.patch to patch qpid.apache.org/electron and proton.

Documentation

Overview

Package gohub provides basic functionality to send/receive message to/from Azure Event Hub.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(ns, hub, sasN, sasK string) (*evhConnection, error)

New creates a new Azure Event Hub connection instance. New also try to connect to Azure Event Hub. The parameters are service bus namespace, event hub name, SAS key name and SAS key.

func NewWithConnectionString

func NewWithConnectionString(connStr string) (*evhConnection, error)

/ NewWithConnectionString a new Azure Event Hub connection instance by using connection string.

Types

type EVHConnection

type EVHConnection interface {
	// CreateSender creates a new sender on the DefaultSession.
	CreateSender() (EVHSender, error)

	// CreateReceiver creates a new receiver on the DefaultSession.
	CreateReceiver(p int, opts ...ReceiverOption) (EVHReceiver, error)

	// Close the connection.
	Close() error
}

EVHConnection is a connection interface which connect to Azure Event Hub.

type EVHReceiver

type EVHReceiver interface {
	// Receive a message through out channel.
	Receive(out chan<- ReceiveMessage) PartitionContext
}

EVHReceiver is a receiver interface use to receive a message to Azure Event Hub.

type EVHSender

type EVHSender interface {
	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
	// Returns an error or nil in case of success.
	SendSync(body []byte) error

	// SendSyncWithKey act like SendSync. In addition, you can specify partition key. A message with same partition key always sent to same partition.
	SendSyncWithKey(body []byte, pk string) error

	// SendSyncTimeout sends a message and blocks until the message is acknowledged by the remote receiver.
	// If the sending process exceeds the timeout, Error will be returned.
	// Returns an error or nil in case of success.
	SendSyncTimeout(body []byte, t time.Duration) error

	// SendSyncTimeoutWithKey act like SendSyncTimeout. In addition, you can specify partition key. A message with same partition key always sent to same partition.
	SendSyncTimeoutWithKey(body []byte, t time.Duration, pk string) error

	// SendAsync puts a message in the send buffer and returns immediately.
	// If error occurs, an error object will be sent to out channel.
	// Note: can block if there is no space to buffer the message.
	SendAsync(body []byte, out chan<- error)

	// SendAsyncWithKey act like SendAsync. In addition, you can specify partition key. A message with same partition key always sent to same partition.
	SendAsyncWithKey(body []byte, out chan<- error, pk string)

	SendAsyncTimeout(body []byte, out chan<- error, t time.Duration)

	// SendAsyncTimeoutWithKey act like SendAsyncTimeout. In addition, you can specify partition key. A message with same partition key always sent to same partition.
	SendAsyncTimeoutWithKey(body []byte, out chan<- error, t time.Duration, pk string)
}

EVHSender is a sender interface use to send a message to Azure Event Hub.

type PartitionContext

type PartitionContext interface {
	Checkpoint(offset string, seqNo int64) error
	GetId() string
}

PartitionContext is for doing checkpoint. If receiver mode is FromNow, the checkpoint operation will do nothing. If receiver mode is FromLastOffset, you must provide storage account setting to make checkpoint works.

type ReceiveMessage

type ReceiveMessage struct {
	// Msg is a message body.
	Body []byte

	// Offset is a partition offset.
	Offset string

	// SeqNo is a message sequence number.
	SeqNo int64

	PartitionId string

	// Error object is set in case of failure.
	Error error
}

ReceiveMessage is a message struct.

type ReceiveMode

type ReceiveMode string

type ReceiverOption

type ReceiverOption func(*receiverSetting)

ReceiverOption can be passed when creating a receiver to set optional configuration.

func ConsumerGroup

func ConsumerGroup(s string) ReceiverOption

ConsumerGroup returns a ReceiverOption that sets consumer group.

func FromLastOffset

func FromLastOffset(ss StorageSetting) ReceiverOption

FromLastOffset returns a ReceiverOption that make this receiver start receive message last offset.

func FromTime

func FromTime(epochTimeInMillisec int64) ReceiverOption

FromTime returns a ReceiverOption that make this receiver start receive message from specific time.

func PrefetchCount

func PrefetchCount(i int) ReceiverOption

PrefetchCount returns a ReceiverOption that sets prefetch count.

type StorageSetting

type StorageSetting struct {
	Name string
	Key  string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL