event

package
v9.1.81+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2020 License: MIT Imports: 21 Imported by: 11

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDeadlineExceeded = errors.New("error: deadline exceeded on publish")

ErrDeadlineExceeded is an error that's raised when a deadline occurs

View Source
var EventDebug = os.Getenv("PP_EVENT_DEBUG") == "1"

EventDebug allows debug printing without requiring a logger to make it easy to turn on for debugging on the fly with an env variable

View Source
var MaxErrorCount = 50

MaxErrorCount is the number of errors trying to connect to the event-api server before giving up

View Source
var MinCompressionSize = 1024

MinCompressionSize is the minimum size for compression on Publish

Functions

func IsErrorRetryable

func IsErrorRetryable(err error) bool

IsErrorRetryable returns true if the error is retryable

func IsHTTPStatusRetryable

func IsHTTPStatusRetryable(statusCode int) bool

IsHTTPStatusRetryable returns true if the status code is retryable

func Publish

func Publish(ctx context.Context, event PublishEvent, channel string, apiKey string, options ...Option) (err error)

Publish will publish an event to the event api server

Types

type Option

type Option func(config *PublishConfig) error

Option will allow publish to be customized

func WithAsync

func WithAsync(async bool) Option

WithAsync will set the async flag on publishing when using a SubscriptionChannel and is ignored when using the Publish method of the package. If true, is non-blocking and won't wait for an ACK. If false, will wait for the ack before returning from publish

func WithDeadline

func WithDeadline(deadline time.Time) Option

WithDeadline will set a deadline for publishing

func WithDebugOption

func WithDebugOption() Option

WithDebugOption will turn on debugging

func WithHeaders

func WithHeaders(headers map[string]string) Option

WithHeaders will provide an ability to set specific headers on the outgoing HTTP request

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger will provide a logger to use

func WithTimestamp

func WithTimestamp(ts time.Time) Option

WithTimestamp will set the timestamp on the event payload

type PublishConfig

type PublishConfig struct {
	Debug     bool
	Deadline  time.Time
	Logger    log.Logger
	Header    http.Header
	Timestamp time.Time
	Async     bool // only when using SubscriptionChannel
}

PublishConfig is used by Options

type PublishEvent

type PublishEvent struct {
	Object  datamodel.Model
	Headers map[string]string
	Logger  log.Logger `json:"-"`
	// contains filtered or unexported fields
}

PublishEvent is the container for a model event

type Subscription

type Subscription struct {
	GroupID           string              `json:"group_id"`
	Topics            []string            `json:"topics"`
	Headers           map[string]string   `json:"headers,omitempty"`
	IdleDuration      string              `json:"idle_duration,omitempty"` // Deprecated
	Limit             int                 `json:"limit,omitempty"`
	Offset            string              `json:"offset,omitempty"` // Deprecated
	After             int64               `json:"after,omitempty"`
	DisableAutoCommit bool                `json:"disable_autocommit,omitempty"`
	Temporary         bool                `json:"temporary,omitempty"`
	Filter            *SubscriptionFilter `json:"filter,omitempty"`
	Channel           string              `json:"-"`
	APIKey            string              `json:"-"`
	BufferSize        int                 `json:"-"`
	Errors            chan<- error        `json:"-"`
	Logger            log.Logger          `json:"-"`
	HTTPHeaders       map[string]string   `json:"-"`
	CloseTimeout      time.Duration       `json:"-"`
	DispatchTimeout   time.Duration       `json:"-"`
	DisablePing       bool                `json:"-"`
}

Subscription is the information for creating a subscription channel to receive events from the event server

type SubscriptionChannel

type SubscriptionChannel struct {
	// contains filtered or unexported fields
}

SubscriptionChannel is a channel for receiving events

func NewSubscription

func NewSubscription(ctx context.Context, subscription Subscription) (*SubscriptionChannel, error)

NewSubscription will create a subscription to the event server and will continously read events (as they arrive) and send them back to the return channel. once you're done, you must call Close on the channel to stop receiving events

func (*SubscriptionChannel) Channel

func (c *SubscriptionChannel) Channel() <-chan SubscriptionEvent

Channel returns a read-only channel to receive SubscriptionEvent

func (*SubscriptionChannel) Close

func (c *SubscriptionChannel) Close() error

Close will close the event channel and stop receiving them

func (*SubscriptionChannel) Publish

func (c *SubscriptionChannel) Publish(event PublishEvent, options ...Option) error

Publish will send a message to the event api

func (*SubscriptionChannel) WaitForReady

func (c *SubscriptionChannel) WaitForReady()

WaitForReady will block until we have received the subscription ack

type SubscriptionEvent

type SubscriptionEvent struct {
	ID        string            `json:"message_id"`
	Timestamp time.Time         `json:"timestamp"`
	Headers   map[string]string `json:"headers,omitempty"`
	Key       string            `json:"key"`
	Type      string            `json:"type"`
	Model     string            `json:"model"`
	Data      string            `json:"object"`
	// contains filtered or unexported fields
}

SubscriptionEvent is received from the event server

func (SubscriptionEvent) Commit

func (e SubscriptionEvent) Commit()

Commit for committing a message when auto commit is false

type SubscriptionFilter

type SubscriptionFilter struct {
	HeaderExpr string `json:"header,omitempty"`
	ObjectExpr string `json:"object,omitempty"`
}

SubscriptionFilter are subscription related filters

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL