internal

package
v0.0.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInternalSystemFailure = errors.New("internal system failure")
	ErrEventStoreNotFound    = errors.New("event store not found")
	ErrProductNotFound       = errors.New("product not found")
	ErrProductExistsAlready  = errors.New("product exists already")
	ErrInvalidProductName    = errors.New("invalid product name")
)
View Source
var (
	ErrSubscriptionNotFound      = errors.New("subscription not found")
	ErrSubscriptionExistsAlready = errors.New("subscription exists already")
	ErrInvalidSubscriptionID     = errors.New("invalid subscription ID")
)
View Source
var (
	ErrTokenNotFound      = errors.New("token not found")
	ErrTokenExistsAlready = errors.New("token exists already")
	ErrInvalidTokenName   = errors.New("invalid token name")
)

Functions

func GenerateRandomString

func GenerateRandomString(n int) (string, error)

Types

type ConfigEntry

type ConfigEntry struct {
	// contains filtered or unexported fields
}

func (*ConfigEntry) Auth

func (ce *ConfigEntry) Auth() *ConfigEntryAuth

func (*ConfigEntry) Secret

func (ce *ConfigEntry) Secret() *ConfigEntrySecret

type ConfigEntryAuth

type ConfigEntryAuth struct {
	Enabled bool `json:"enabled"`
}

type ConfigEntrySecret

type ConfigEntrySecret struct {
	Key string `json:"key"`
}

Configuration entry interface

type ConfigManager

type ConfigManager struct {
	// contains filtered or unexported fields
}

func NewConfigManager

func NewConfigManager(client *core.Client, domain string) *ConfigManager

func (*ConfigManager) GetEntry

func (cm *ConfigManager) GetEntry(key string) *ConfigEntry

func (*ConfigManager) InitializeEntry

func (cm *ConfigManager) InitializeEntry(key string, initialFn func() []byte) ([]byte, error)

func (*ConfigManager) SetEntry

func (cm *ConfigManager) SetEntry(key string, value []byte) error

type IConfigEntry

type IConfigEntry interface {
	Secret() *ConfigEntrySecret
	Auth() *ConfigEntryAuth
}

type ProductManager

type ProductManager struct {
	// contains filtered or unexported fields
}

func NewProductManager

func NewProductManager(client *core.Client, domain string) *ProductManager

func (*ProductManager) CreateProduct

func (pm *ProductManager) CreateProduct(productSetting *product.ProductSetting) (*product.ProductSetting, error)

func (*ProductManager) DeleteConsumer

func (pm *ProductManager) DeleteConsumer(productName string, consumerName string) error

func (*ProductManager) DeleteProduct

func (pm *ProductManager) DeleteProduct(name string) error

func (*ProductManager) GetProduct

func (pm *ProductManager) GetProduct(name string) (*product.ProductSetting, error)

func (*ProductManager) GetProductState

func (pm *ProductManager) GetProductState(setting *product.ProductSetting) (*product.ProductState, error)

func (*ProductManager) InitConsumer

func (pm *ProductManager) InitConsumer(productName string, consumerName string, partitions []int, startSeq uint64) error

func (pm *ProductManager) PrepareSubscription(productName string, durable string, startSeq uint64) error {

	js, err := pm.client.GetJetStream()
	if err != nil {
		return err
	}

	// Check if the stream already exists
	streamName := fmt.Sprintf(productEventStream, pm.domain, productName)
	_, err = js.StreamInfo(streamName)
	if err != nil {
		return err
	}

	// Check wheter consumer exist or not
	_, err = js.ConsumerInfo(streamName, streamName)
	if err != nats.ErrConsumerNotFound {
		return err
	}

	// The consumer exists already
	if err == nil {
		return nil
	}

	// Preparing pull consumer
	subject := fmt.Sprintf(productEventSubject, pm.domain, productName, "*")
	cfg := &nats.ConsumerConfig{
		Durable:       durable,
		Description:   "Product Subscription",
		FilterSubject: subject,
		AckPolicy:     nats.AckExplicitPolicy,
		MaxAckPending: 2000,
		MaxDeliver:    -1,
		ReplayPolicy:  nats.ReplayInstantPolicy,
		//DeliverSubject: nats.NewInbox(),
	}

	if startSeq > 0 {
		cfg.DeliverPolicy = nats.DeliverByStartSequencePolicy
		cfg.OptStartSeq = startSeq
	}

	// Create consumer on data product stream
	_, err = js.AddConsumer(streamName, cfg)
	if err != nil {
		return err
	}

	return nil
}

func (*ProductManager) ListProducts

func (pm *ProductManager) ListProducts() ([]*product.ProductSetting, error)

func (*ProductManager) PurgeProduct

func (pm *ProductManager) PurgeProduct(name string) error

func (*ProductManager) UpdateProduct

func (pm *ProductManager) UpdateProduct(name string, productSetting *product.ProductSetting) (*product.ProductSetting, error)

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager

func NewSubscriptionManager(client *core.Client, domain string) *SubscriptionManager

func (*SubscriptionManager) CreateSubscription

func (sm *SubscriptionManager) CreateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)

func (*SubscriptionManager) DeleteSubscription

func (sm *SubscriptionManager) DeleteSubscription(id string) error

func (*SubscriptionManager) GetSubscription

func (sm *SubscriptionManager) GetSubscription(id string) (*subscription.SubscriptionSetting, error)

func (*SubscriptionManager) ListSubscriptions

func (sm *SubscriptionManager) ListSubscriptions() ([]*subscription.SubscriptionSetting, error)

func (*SubscriptionManager) UpdateSubscription

func (sm *SubscriptionManager) UpdateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)

type TokenManager

type TokenManager struct {
	// contains filtered or unexported fields
}

func NewTokenManager

func NewTokenManager(client *core.Client, domain string) *TokenManager

func (*TokenManager) CreateToken

func (tm *TokenManager) CreateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)

func (*TokenManager) DeleteToken

func (tm *TokenManager) DeleteToken(t string) error

func (*TokenManager) GetToken

func (tm *TokenManager) GetToken(t string) (*token.TokenSetting, error)

func (*TokenManager) ListTokens

func (tm *TokenManager) ListTokens() ([]*token.TokenSetting, error)

func (*TokenManager) UpdateToken

func (tm *TokenManager) UpdateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL