receiver

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2020 License: MIT Imports: 6 Imported by: 0

README

Receiver usage (consumer)

Go Modules:

require github.com/azure-open-tools/event-hubs/receiver v1.0.1

Import

import github.com/azure-open-tools/event-hubs/receiver

The connections string must be the 'LISTEN' action with the topic specified.

Simple usage:

builder := receiver.NewReceiverBuilder()

if builder != nil {
    builder.SetConnectionString("Endpoint://") //required field
    builder.SetReceiverHandler(OnReceiverHandler)

    rcv, err := builder.GetReceiver()
    if err == nil {
        err = rcv.StartListener(context.Background())
    }
}
  • recommended usage: Consumer group is an optional field, but recommended (you must create it first in azure portal or azure cli. $Default consumer group will be used as default value. if you are using against the development environment it's not a problem. However against Production environment others clients could be disconnected it depends on your Azure Event Hubs settings.
builder := receiver.NewReceiverBuilder()

if builder != nil {
    builder.SetConnectionString("Endpoint://") //required field
    builder.SetConsumerGroup("debug") //recommended
    builder.SetReceiverHandler(func(ctx context.Context, event *eventhub.Event) error { })

    rcv, err := builder.GetReceiver()
    if err == nil {
        err = rcv.StartListener(context.Background())
    }
}
  • ReceiverHandler it is the event handler you must add to handle the events arrive from the event hubs. Whenever an event arrives this event handler will be executed.
builder.SetReceiverHandler(func(ctx context.Context, event *eventhub.Event) error {
    fmt.Println(event.Data)
})
  • Special fields:
    • DataFilers(event.Data field): you can set any kind of string, if the string match the event will be delivered to the ReceiverHandler function.
    • PropertyFilters(event.Properties field): same as above, but will concentrate into the property fields.
    • ListenerPartitionIds: here you can specify which partition ids you want to listen to. if you let it away, the library will listen all partitionIds available.
builder := receiver.NewReceiverBuilder()

if builder != nil {
    builder.SetConnectionString("Endpoint://") //required field
    builder.SetConsumerGroup("debug") //recommended
    builder.AddDataFilters("any content to be filtered")
    builder.AddPropertyFilters([]string{"propertyKey1:value1", "propertyKey2:value2"})
    builder.AddListenerPartitionIds([]string{"0", "1", "12", "21"}) 
    builder.SetReceiverHandler(func(ctx context.Context, event *eventhub.Event) error {
                                   fmt.Println(event.Data)
                               })

    rcv, err := builder.GetReceiver()
    if err == nil {
        err = rcv.StartListener(context.Background())
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Builder

type Builder struct {
	DataFilter     []string
	PropertyFilter []string
	PartitionIds   []string
	ConnString     string
	ConsumerGroup  string

	OnReceiveHandler func(ctx context.Context, event *eventhub.Event) error
}

func NewReceiverBuilder

func NewReceiverBuilder() *Builder

func (*Builder) AddDataFilter

func (builder *Builder) AddDataFilter(filter string) IReceiveBuilder

func (*Builder) AddDataFilters

func (builder *Builder) AddDataFilters(filters []string) IReceiveBuilder

func (*Builder) AddListenerPartitionId

func (builder *Builder) AddListenerPartitionId(partitionId string) IReceiveBuilder

func (*Builder) AddListenerPartitionIds

func (builder *Builder) AddListenerPartitionIds(partitionIds []string) IReceiveBuilder

func (*Builder) AddPropertyFilter

func (builder *Builder) AddPropertyFilter(filter string) IReceiveBuilder

func (*Builder) AddPropertyFilters

func (builder *Builder) AddPropertyFilters(filters []string) IReceiveBuilder

func (*Builder) GetReceiver

func (builder *Builder) GetReceiver() (*Receiver, error)

func (*Builder) SetConnectionString

func (builder *Builder) SetConnectionString(connStr string) IReceiveBuilder

func (*Builder) SetConsumerGroup

func (builder *Builder) SetConsumerGroup(consumerGroup string) IReceiveBuilder

func (*Builder) SetReceiverHandler

func (builder *Builder) SetReceiverHandler(handler func(ctx context.Context, event *eventhub.Event) error) IReceiveBuilder

type IReceiveBuilder

type IReceiveBuilder interface {
	AddDataFilter(filter string) IReceiveBuilder
	AddDataFilters(filters []string) IReceiveBuilder
	AddPropertyFilter(filter string) IReceiveBuilder
	AddPropertyFilters(filters []string) IReceiveBuilder
	AddListenerPartitionId(partitionId string) IReceiveBuilder
	AddListenerPartitionIds(partitionIds []string) IReceiveBuilder
	SetConnectionString(connStr string) IReceiveBuilder
	SetConsumerGroup(consumerGroup string) IReceiveBuilder
	SetReceiverHandler(handler func(ctx context.Context, event *eventhub.Event) error) IReceiveBuilder
	GetReceiver() (*Receiver, error)
}

type IReceiver

type IReceiver interface {
	StartListener(ctx context.Context) error
	StopListener(ctx context.Context) error
}

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func (*Receiver) StartListener

func (receiver *Receiver) StartListener(ctx context.Context) error

func (*Receiver) StopListener

func (receiver *Receiver) StopListener(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL