messaging

package
v0.0.0-...-9a998da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: BSD-3-Clause Imports: 9 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const (
	StartupEvent  = "event:startup"
	ShutdownEvent = "event:shutdown"
	PingEvent     = "event:ping"
	StatusEvent   = "event:status"
	FailoverEvent = "event:failover"
	HostName      = "host"
)

Variables

This section is empty.

Functions

func NewStatusCode

func NewStatusCode(status **runtime.Status) func() int

func Ping

func Ping[E runtime.ErrorHandler](ctx context.Context, uri string) (status *runtime.Status)

Ping - templated function to "ping" a resource

Example
package main

import (
	"errors"
	"fmt"
	"github.com/idiomatic-go/motif/runtime"
	"time"
)

func main() {
	uri1 := "urn:ping:good"
	uri2 := "urn:ping:bad"
	uri3 := "urn:ping:error"
	uri4 := "urn:ping:delay"

	start = time.Now()
	directory.Empty()

	c := make(chan Message, 16)
	RegisterResource(uri1, c)
	go pingGood(c)
	status := Ping[runtime.DebugError](nil, uri1)
	duration := status.Duration()
	fmt.Printf("test: Ping(good) -> [%v] [duration:%v]\n", status, duration)

	c = make(chan Message, 16)
	RegisterResource(uri2, c)
	go pingBad(c)
	status = Ping[runtime.DebugError](nil, uri2)
	fmt.Printf("test: Ping(bad) -> [%v] [duration:%v]\n", status, duration)

	c = make(chan Message, 16)
	RegisterResource(uri3, c)
	go pingError(c, errors.New("ping depends error message"))
	status = Ping[runtime.DebugError](nil, uri3)
	fmt.Printf("test: Ping(error) -> [%v] [duration:%v]\n", status, duration)

	c = make(chan Message, 16)
	RegisterResource(uri4, c)
	go pingDelay(c)
	status = Ping[runtime.DebugError](nil, uri4)
	fmt.Printf("test: Ping(delay) -> [%v] [duration:%v]\n", status, duration)

}

func pingGood(c chan Message) {
	for {
		select {
		case msg, open := <-c:
			if !open {
				return
			}
			ReplyTo(msg, runtime.NewStatusOK().SetDuration(time.Since(start)))
		default:
		}
	}
}

func pingBad(c chan Message) {
	for {
		select {
		case msg, open := <-c:
			if !open {
				return
			}
			time.Sleep(time.Second * 4)
			ReplyTo(msg, runtime.NewStatusOK().SetDuration(time.Since(start)))
		default:
		}
	}
}

func pingError(c chan Message, err error) {
	for {
		select {
		case msg, open := <-c:
			if !open {
				return
			}
			if err != nil {
				time.Sleep(time.Second)
				ReplyTo(msg, runtime.NewStatusError(pingLocation, err).SetDuration(time.Since(start)))
			}
		default:
		}
	}
}

func pingDelay(c chan Message) {
	for {
		select {
		case msg, open := <-c:
			if !open {
				return
			}
			time.Sleep(time.Second)
			ReplyTo(msg, runtime.NewStatusOK().SetDuration(time.Since(start)))
		default:
		}
	}
}
Output:

test: Ping(good) -> [OK] [duration:0s]
[[] github.com/idiomatic-go/motif/messaging/ping [ping response time out: [urn:ping:bad]]]
test: Ping(bad) -> [DeadlineExceeded] [duration:0s]
test: Ping(error) -> [Internal [ping depends error message]] [duration:0s]
test: Ping(delay) -> [OK] [duration:0s]

func RegisterResource

func RegisterResource(uri string, c chan Message) error

RegisterResource - function to register a resource uri

func ReplyTo

func ReplyTo(msg Message, status *runtime.Status)

ReplyTo - function used by message recipient to reply with a runtime.Status

func Shutdown

func Shutdown()

Shutdown - resource shutdown

func Startup

func Startup[E runtime.ErrorHandler, O runtime.OutputHandler](duration time.Duration, content ContentMap) (status *runtime.Status)

Startup - templated function to start all registered resources.

Types

type ContentMap

type ContentMap map[string][]any

ContentMap - slice of any content to be included in a message

type ControllerApply

type ControllerApply func(ctx context.Context, statusCode func() int, uri, requestId, method string) (fn func(), newCtx context.Context, rateLimited bool)

ControllerApply - type for applying a controller

func AccessControllerApply

func AccessControllerApply(msg *Message) ControllerApply

AccessControllerApply - access function for ControllerApply in a message

Example
fmt.Printf("test: AccessControllerApply(nil) -> [valid:%v]\n", AccessControllerApply(nil) != nil)
fmt.Printf("test: AccessControllerApply(msg) -> [valid:%v]\n", AccessControllerApply(&Message{To: "to-uri"}) != nil)
fmt.Printf("test: AccessControllerApply(msg) -> [valid:%v]\n", AccessControllerApply(&msgTest) != nil)
Output:

test: AccessControllerApply(nil) -> [valid:false]
test: AccessControllerApply(msg) -> [valid:false]
test: AccessControllerApply(msg) -> [valid:true]

type Credentials

type Credentials func() (username string, password string, err error)

Credentials - type for a credentials function

func AccessCredentials

func AccessCredentials(msg *Message) Credentials

AccessCredentials - access function for Credentials in a message

Example
fmt.Printf("test: AccessCredentials(nil) -> %v\n", AccessCredentials(nil) != nil)
fmt.Printf("test: AccessCredentials(msg) -> %v\n", AccessCredentials(&Message{To: "to-uri"}) != nil)
fmt.Printf("test: AccessCredentials(msg) -> %v\n", AccessCredentials(&msgTest) != nil)
Output:

test: AccessCredentials(nil) -> false
test: AccessCredentials(msg) -> false
test: AccessCredentials(msg) -> true

type DatabaseUrl

type DatabaseUrl struct {
	Url string
}

DatabaseUrl - struct for a database connection string

func AccessDatabaseUrl

func AccessDatabaseUrl(msg *Message) DatabaseUrl

AccessDatabaseUrl - access function for a DatabaseUrl in a message

Example
fmt.Printf("test: AccessDatabaseUrl(nil) -> %v\n", AccessDatabaseUrl(nil))
fmt.Printf("test: AccessDatabaseUrl(msg) -> %v\n", AccessDatabaseUrl(&Message{To: "to-uri"}))
fmt.Printf("test: AccessDatabaseUrl(msg) -> %v\n", AccessDatabaseUrl(&msgTest))
Output:

test: AccessDatabaseUrl(nil) -> {}
test: AccessDatabaseUrl(msg) -> {}
test: AccessDatabaseUrl(msg) -> {postgres://username:password@database.cloud.timescale.com/database?sslmode=require}

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry - and entry in an EntryDirectory

type EntryDirectory

type EntryDirectory struct {
	// contains filtered or unexported fields
}

EntryDirectory - collection of Entry

func NewEntryDirectory

func NewEntryDirectory() *EntryDirectory

NewEntryDirectory - create a new directory

func (*EntryDirectory) Add

func (d *EntryDirectory) Add(uri string, c chan Message)
Example
uri := "urn:test"
uri2 := "urn:test:two"

directoryTest.Empty()

fmt.Printf("test: count() -> : %v\n", directoryTest.Count())
d2 := directoryTest.Get(uri)
fmt.Printf("test: get(%v) -> : %v\n", uri, d2)

directoryTest.Add(uri, nil)
fmt.Printf("test: add(%v) -> : ok\n", uri)
fmt.Printf("test: count() -> : %v\n", directoryTest.Count())
d2 = directoryTest.Get(uri)
fmt.Printf("test: get(%v) -> : %v\n", uri, d2)

directoryTest.Add(uri2, nil)
fmt.Printf("test: add(%v) -> : ok\n", uri2)
fmt.Printf("test: count() -> : %v\n", directoryTest.Count())
d2 = directoryTest.Get(uri2)
fmt.Printf("test: get(%v) -> : %v\n", uri2, d2)

fmt.Printf("test: uri() -> : %v\n", directoryTest.Uri())
Output:

test: count() -> : 0
test: get(urn:test) -> : <nil>
test: add(urn:test) -> : ok
test: count() -> : 1
test: get(urn:test) -> : &{urn:test <nil>}
test: add(urn:test:two) -> : ok
test: count() -> : 2
test: get(urn:test:two) -> : &{urn:test:two <nil>}
test: uri() -> : [urn:test urn:test:two]

func (*EntryDirectory) Count

func (d *EntryDirectory) Count() int

func (*EntryDirectory) Empty

func (d *EntryDirectory) Empty()

func (*EntryDirectory) Get

func (d *EntryDirectory) Get(uri string) *Entry

func (*EntryDirectory) Send

func (d *EntryDirectory) Send(msg Message) error
Example
uri1 := "urn:test-1"
uri2 := "urn:test-2"
uri3 := "urn:test-3"
c := make(chan Message, 16)
directoryTest.Empty()

directoryTest.Add(uri1, c)
directoryTest.Add(uri2, c)
directoryTest.Add(uri3, c)

directoryTest.Send(Message{To: uri1, From: HostName, Event: StartupEvent})
directoryTest.Send(Message{To: uri2, From: HostName, Event: StartupEvent})
directoryTest.Send(Message{To: uri3, From: HostName, Event: StartupEvent})

time.Sleep(time.Second * 1)
resp1 := <-c
resp2 := <-c
resp3 := <-c
fmt.Printf("test: <- c -> : [%v] [%v] [%v]\n", resp1.To, resp2.To, resp3.To)
close(c)
Output:

test: <- c -> : [urn:test-1] [urn:test-2] [urn:test-3]

func (*EntryDirectory) Shutdown

func (d *EntryDirectory) Shutdown()

func (*EntryDirectory) Uri

func (d *EntryDirectory) Uri() []string

type Message

type Message struct {
	To      string
	From    string
	Event   string
	Status  *runtime.Status
	Content []any
	ReplyTo MessageHandler
}

Message - message data

type MessageCache

type MessageCache struct {
	// contains filtered or unexported fields
}

MessageCache - message cache by resource uri

func NewMessageCache

func NewMessageCache() *MessageCache

NewMessageCache - create a message cache

func (*MessageCache) Add

func (r *MessageCache) Add(msg Message) error
Example
resp := NewMessageCache()

resp.Add(Message{To: "to-uri", From: "from-uri-0", Event: StartupEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-1", Event: StartupEvent, Status: runtime.NewStatusOK()})
resp.Add(Message{To: "to-uri", From: "from-uri-2", Event: PingEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-3", Event: PingEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-4", Event: PingEvent, Status: runtime.NewHttpStatusCode(http.StatusOK)})

fmt.Printf("test: count() -> : %v\n", resp.Count())

m, err := resp.Get("invalid")
fmt.Printf("test: Get(%v) -> : [error:%v] [msg:%v]\n", "invalid", err, m)

m, err = resp.Get("from-uri-3")
fmt.Printf("test: Get(%v) -> : [error:%v] [msg:%v]\n", "from-uri-3", err, m)

fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, runtime.StatusNotProvided, resp.Include(ShutdownEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, runtime.StatusNotProvided, resp.Exclude(ShutdownEvent, runtime.StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, runtime.StatusNotProvided, resp.Include(StartupEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, runtime.StatusNotProvided, resp.Exclude(StartupEvent, runtime.StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, runtime.StatusNotProvided, resp.Include(PingEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, runtime.StatusNotProvided, resp.Exclude(PingEvent, runtime.StatusNotProvided))
Output:

test: count() -> : 5
test: Get(invalid) -> : [error:invalid argument: uri not found [invalid]] [msg:{   <nil> [] <nil>}]
test: Get(from-uri-3) -> : [error:<nil>] [msg:{to-uri from-uri-3 event:ping Not provided [] <nil>}]
test: include(event:shutdown,Code(93)) -> : []
test: exclude(event:shutdown,Code(93)) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:startup,Code(93)) -> : [from-uri-0]
test: exclude(event:startup,Code(93)) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:ping,Code(93)) -> : [from-uri-2 from-uri-3]
test: exclude(event:ping,Code(93)) -> : [from-uri-0 from-uri-1 from-uri-4]

func (*MessageCache) Count

func (r *MessageCache) Count() int

func (*MessageCache) Exclude

func (r *MessageCache) Exclude(event string, status codes.Code) []string

func (*MessageCache) Filter

func (r *MessageCache) Filter(event string, code codes.Code, include bool) []string

func (*MessageCache) Get

func (r *MessageCache) Get(uri string) (Message, error)

func (*MessageCache) Include

func (r *MessageCache) Include(event string, status codes.Code) []string

func (*MessageCache) Uri

func (r *MessageCache) Uri() []string
Example
resp := NewMessageCache()

resp.Add(Message{To: "to-uri", From: "from-uri-0", Event: StartupEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-1", Event: StartupEvent, Status: runtime.NewStatusOK()})
resp.Add(Message{To: "to-uri", From: "from-uri-2", Event: PingEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-3", Event: PingEvent, Status: runtime.NewStatusCode(runtime.StatusNotProvided)})
resp.Add(Message{To: "to-uri", From: "from-uri-4", Event: PingEvent, Status: runtime.NewHttpStatusCode(http.StatusOK)})

fmt.Printf("test: count() -> : %v\n", resp.Count())

m, err := resp.Get("invalid")
fmt.Printf("test: Get(%v) -> : [error:%v] [msg:%v]\n", "invalid", err, m)

m, err = resp.Get("from-uri-3")
fmt.Printf("test: Get(%v) -> : [error:%v] [msg:%v]\n", "from-uri-3", err, m)

fmt.Printf("test: include(%v,%v) -> : %v\n", ShutdownEvent, runtime.StatusNotProvided, resp.Include(ShutdownEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", ShutdownEvent, runtime.StatusNotProvided, resp.Exclude(ShutdownEvent, runtime.StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", StartupEvent, runtime.StatusNotProvided, resp.Include(StartupEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", StartupEvent, runtime.StatusNotProvided, resp.Exclude(StartupEvent, runtime.StatusNotProvided))

fmt.Printf("test: include(%v,%v) -> : %v\n", PingEvent, runtime.StatusNotProvided, resp.Include(PingEvent, runtime.StatusNotProvided))
fmt.Printf("test: exclude(%v,%v) -> : %v\n", PingEvent, runtime.StatusNotProvided, resp.Exclude(PingEvent, runtime.StatusNotProvided))
Output:

test: count() -> : 5
test: Get(invalid) -> : [error:invalid argument: uri not found [invalid]] [msg:{   <nil> [] <nil>}]
test: Get(from-uri-3) -> : [error:<nil>] [msg:{to-uri from-uri-3 event:ping Not provided [] <nil>}]
test: include(event:shutdown,Code(93)) -> : []
test: exclude(event:shutdown,Code(93)) -> : [from-uri-0 from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:startup,Code(93)) -> : [from-uri-0]
test: exclude(event:startup,Code(93)) -> : [from-uri-1 from-uri-2 from-uri-3 from-uri-4]
test: include(event:ping,Code(93)) -> : [from-uri-2 from-uri-3]
test: exclude(event:ping,Code(93)) -> : [from-uri-0 from-uri-1 from-uri-4]

type MessageHandler

type MessageHandler func(msg Message)

MessageHandler - function type to process a Message

func NewMessageCacheHandler

func NewMessageCacheHandler(cache *MessageCache) MessageHandler

NewMessageCacheHandler - handler to receive messages into a cache.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL