msg

package
v0.77.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

README

Msg - Messaging Service

Google Cloud pubsub

The following workflow define simple topic/subscription producing and consuming example.

Example credentials 'gcp-e2e' is name of google secrets placed to ~/.secret/gcp-e2e.json

endly test

@test.yaml

init:
  gcpCredentials: gcp-e2e
pipeline:
  create:
    action: msg:setupResource
    resources:
      - URL: myTopic
        type: topic
        vendor: gcp
        credentials: $gcpCredentials
      - URL: mySubscription
        type: subscription
        vendor: gcp
        credentials: $gcpCredentials
        config:
          topic:
            URL: /projects/${msg.projectID}/topics/myTopic

  setup:
    action: msg:push
    dest:
      URL: /projects/${msg.projectID}/topics/myTopic
      credentials: $gcpCredentials
    messages:
      - data: "this is my 1st message"
        attributes:
          attr1: abc
      - data: "this is my 2nd message"
        attributes:
          attr1: xyz

  validate:
    action: msg:pull
    count: 2
    nack: true
    source:
      URL: /projects/${msg.projectID}/subscriptions/mySubscription
      credentials: $gcpCredentials
    expect:
      - '@indexBy@': 'Attributes.attr1'
      - Data: "this is my 1st message"
        Attributes:
          attr1: abc
      - Data: "this is my 2nd message"
        Attributes:
          attr1: xyz
Amazon Simple Queue Service

The following workflow creates a queue to produce and consume test messages.

Example credentials 'aws-e2e' is name of aws secrets placed to ~/.secret/aws-e2e.json

endly queue.yaml

@queue.yaml

init:
  awsCredentials: aws-e2e
pipeline:
  setup:
    action: msg:setupResource
    credentials: $awsCredentials
    resources:
      - URL: mye2eQueue1
        type: queue
        vendor: aws

  trigger:
    action: msg:push
    credentials: $awsCredentials
    sleepTimeMs: 5000
    dest:
      URL: mye2eQueue1
      type: queue
      vendor: aws
    messages:
      - data: 'Test: this is my 1st message'
      - data: 'Test: this is my 2nd message'

  validate:
    action: msg:pull
    credentials: $awsCredentials
    timeoutMs: 20000
    count: 2
    source:
      URL: mye2eQueue1
      type: queue
      vendor: aws
    expect:
      - '@indexBy@': 'Data'
      - Data: "Test: this is my 1st message"
      - Data: "Test: this is my 2nd message"
  info:
    action: print
    message: $AsJSON($validate)
Amazon Simple Notification Service

The following workflow creates a topic to produce test messages.

Example credentials 'aws-e2e' is name of aws secrets placed to ~/.secret/aws-e2e.json

endly topic.yaml

@topic.yaml

init:
  awsCredentials: aws-e2e
pipeline:
  setup:
    action: msg:setupResource
    credentials: $awsCredentials
    resources:
      - URL: mye2eTopic
          type: topic
          vendor: aws

  trigger:
    action: msg:push
    credentials: $awsCredentials
    sleepTimeMs: 5000
    dest:
      URL: mye2eTopic
      type: topic
      vendor: aws
    messages:
      - data: 'Test: this is my 1st message'
        attributes:
          id: abc1
      - data: 'Test: this is my 2nd message'
        attributes:
          id: abc2

Kafka

The following workflow define kafka topic and produces and consume messages.

endly test

@test.yaml

pipeline:
  startUp:
    action: docker/ssh:composeUp
    comments: setup kafka cluster
    sleepTimeMs: 10000
    runInBackground: true
    source:
      URL: docker-compose.yml

  create:
    sleepTimeMs: 10000
    action: msg:setupResource
    comments: create topic and wait for a leadership election
    resources:
      - URL: myTopic
        type: topic
        replicationFactor: 1
        partitions: 1
        brokers:
          - localhost:9092


  setup:
    action: msg:push
    dest:
      url: tcp://localhost:9092/myTopic
      vendor: kafka

    messages:
      - data: "this is my 1st message"
        attributes:
          key: abc
      - data: "this is my 2nd message"
        attributes:
          key: xyz

  validate:
    action: msg:pull
    count: 2
    nack: true
    source:
      url: tcp://localhost:9092/myTopic
      vendor: kafka
    expect:
      - '@indexBy@': 'Attributes.key'
      - Data: "this is my 1st message"
        Attributes:
          key: abc
      - Data: "this is my 2nd message"
        Attributes:
          key: xyz

  cleanUp:
    action: docker/ssh:composeDown
    source:
      URL: docker-compose.yml

Documentation

Index

Constants

View Source
const (
	ResourceVendorGoogleCloudPlatform = "gcp"
	ResourceVendorAmazonWebService    = "aws"
	ResourceVendorKafka               = "kafka"
)
View Source
const (
	ResourceTypeTopic        = "topic"
	ResourceTypeSubscription = "subscription"
	ResourceTypeQueue        = "queue"
)
View Source
const (
	//ServiceID represents gloud msg  pubsub service id.
	ServiceID = "msg"
)

Variables

This section is empty.

Functions

func New

func New() endly.Service

New creates a new NoOperation service.

Types

type Client

type Client interface {
	Push(ctx context.Context, dest *Resource, message *Message) (Result, error)

	PullN(ctx context.Context, source *Resource, count int, nack bool) ([]*Message, error)

	SetupResource(resource *ResourceSetup) (*Resource, error)

	DeleteResource(resource *Resource) error

	Close() error
}

func NewPubSubClient

func NewPubSubClient(context *endly.Context, dest *Resource, timeout time.Duration) (Client, error)

NewPubSubClient creates a new Client

type Config

type Config struct {
	Topic               *Resource
	Labels              map[string]string
	Attributes          map[string]string
	AckDeadline         time.Duration
	RetentionDuration   time.Duration
	RetainAckedMessages bool
}

Config represent a subscription config

func NewConfig

func NewConfig(topic string) *Config

NewConfig create new config

type CreateRequest

type CreateRequest struct {
	Credentials string
	Resources   []*ResourceSetup
}

CreateRequest represents a create resource request

func (*CreateRequest) Init

func (r *CreateRequest) Init() error

func (*CreateRequest) Validate

func (r *CreateRequest) Validate() error

type CreateResponse

type CreateResponse struct {
	Resources []*Resource
}

CreateResponse represents a create resource response

type DeleteRequest

type DeleteRequest struct {
	Credentials string
	Resources   []*Resource
}

DeleteRequest represents a delete resource request

func (*DeleteRequest) Init

func (r *DeleteRequest) Init() error

type DeleteResponse

type DeleteResponse struct{}

DeleteResponse represents a delete resource response

type Message

type Message struct {
	ID          string
	Subject     string
	Attributes  map[string]interface{}
	Data        interface{}
	Transformed interface{} `description:"udf transformed data"`
}

func (*Message) Expand

func (m *Message) Expand(state data.Map) *Message

type PullRequest

type PullRequest struct {
	Credentials string
	Source      *Resource
	TimeoutMs   int
	Count       int
	Nack        bool `description:"flag indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback."`
	UDF         string
	Expect      interface{}
}

PullRequest represents a pull request

func (*PullRequest) Init

func (r *PullRequest) Init() error

func (*PullRequest) Validate

func (r *PullRequest) Validate() error

type PullResponse

type PullResponse struct {
	Messages []*Message
	Assert   *validator.AssertResponse
}

PullRequest represents a pull response

type PushRequest

type PushRequest struct {
	Credentials string
	Dest        *Resource
	Messages    []*Message
	Source      *location.Resource `` /* 126-byte string literal not displayed */
	TimeoutMs   int
	UDF         string
	// contains filtered or unexported fields
}

PushRequest represents push request

func (*PushRequest) Init

func (r *PushRequest) Init() error

func (*PushRequest) Validate

func (r *PushRequest) Validate() error

type PushResponse

type PushResponse struct {
	Results []Result
}

PushResponse represents a push response

type Resource

type Resource struct {
	URL               string
	Brokers           []string
	Credentials       string
	Offset            int
	GroupID           string
	Partition         int
	ReplicationFactor int
	Partitions        int
	ID                string
	Name              string
	Type              string `description:"resource type: topic, subscription"`
	Vendor            string
	Config            interface{} `description:"vendor client config"`
	// contains filtered or unexported fields
}

func NewResource

func NewResource(resourceType, URL, credentials string) *Resource

NewResource creates a new resource

func (*Resource) Init

func (r *Resource) Init() error

Init initializes resource

type ResourceSetup

type ResourceSetup struct {
	Resource
	Recreate bool
	Config   *Config
}

Resource represents resource setup

func NewResourceSetup

func NewResourceSetup(resourceType, URL, credentials string, recreate bool, config *Config) *ResourceSetup

NewResourceSetup creates a new URL

func (*ResourceSetup) Init

func (r *ResourceSetup) Init() error

Init initializes setup resource

func (*ResourceSetup) Validate

func (r *ResourceSetup) Validate() error

type Result

type Result interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL