Documentation ¶
Index ¶
- Variables
- func GenerateRandomString(n int) (string, error)
- type ConfigEntry
- type ConfigEntryAuth
- type ConfigEntrySecret
- type ConfigManager
- type IConfigEntry
- type ProductManager
- func (pm *ProductManager) CreateProduct(productSetting *product.ProductSetting) (*product.ProductSetting, error)
- func (pm *ProductManager) DeleteConsumer(productName string, consumerName string) error
- func (pm *ProductManager) DeleteProduct(name string) error
- func (pm *ProductManager) GetProduct(name string) (*product.ProductSetting, error)
- func (pm *ProductManager) GetProductState(setting *product.ProductSetting) (*product.ProductState, error)
- func (pm *ProductManager) InitConsumer(productName string, consumerName string, partitions []int, startSeq uint64) error
- func (pm *ProductManager) ListProducts() ([]*product.ProductSetting, error)
- func (pm *ProductManager) PurgeProduct(name string) error
- func (pm *ProductManager) UpdateProduct(name string, productSetting *product.ProductSetting) (*product.ProductSetting, error)
- type SubscriptionManager
- func (sm *SubscriptionManager) CreateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)
- func (sm *SubscriptionManager) DeleteSubscription(id string) error
- func (sm *SubscriptionManager) GetSubscription(id string) (*subscription.SubscriptionSetting, error)
- func (sm *SubscriptionManager) ListSubscriptions() ([]*subscription.SubscriptionSetting, error)
- func (sm *SubscriptionManager) UpdateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)
- type TokenManager
- func (tm *TokenManager) CreateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)
- func (tm *TokenManager) DeleteToken(t string) error
- func (tm *TokenManager) GetToken(t string) (*token.TokenSetting, error)
- func (tm *TokenManager) ListTokens() ([]*token.TokenSetting, error)
- func (tm *TokenManager) UpdateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrInternalSystemFailure = errors.New("internal system failure") ErrEventStoreNotFound = errors.New("event store not found") ErrProductNotFound = errors.New("product not found") ErrProductExistsAlready = errors.New("product exists already") ErrInvalidProductName = errors.New("invalid product name") )
Functions ¶
func GenerateRandomString ¶
Types ¶
type ConfigEntry ¶
type ConfigEntry struct {
// contains filtered or unexported fields
}
func (*ConfigEntry) Auth ¶
func (ce *ConfigEntry) Auth() *ConfigEntryAuth
func (*ConfigEntry) Secret ¶
func (ce *ConfigEntry) Secret() *ConfigEntrySecret
type ConfigEntryAuth ¶
type ConfigEntryAuth struct {
Enabled bool `json:"enabled"`
}
type ConfigEntrySecret ¶
type ConfigEntrySecret struct {
Key string `json:"key"`
}
Configuration entry interface
type ConfigManager ¶
type ConfigManager struct {
// contains filtered or unexported fields
}
func NewConfigManager ¶
func NewConfigManager(client *core.Client, domain string) *ConfigManager
func (*ConfigManager) GetEntry ¶
func (cm *ConfigManager) GetEntry(key string) *ConfigEntry
func (*ConfigManager) InitializeEntry ¶
func (cm *ConfigManager) InitializeEntry(key string, initialFn func() []byte) ([]byte, error)
type IConfigEntry ¶
type IConfigEntry interface { Secret() *ConfigEntrySecret Auth() *ConfigEntryAuth }
type ProductManager ¶
type ProductManager struct {
// contains filtered or unexported fields
}
func NewProductManager ¶
func NewProductManager(client *core.Client, domain string) *ProductManager
func (*ProductManager) CreateProduct ¶
func (pm *ProductManager) CreateProduct(productSetting *product.ProductSetting) (*product.ProductSetting, error)
func (*ProductManager) DeleteConsumer ¶
func (pm *ProductManager) DeleteConsumer(productName string, consumerName string) error
func (*ProductManager) DeleteProduct ¶
func (pm *ProductManager) DeleteProduct(name string) error
func (*ProductManager) GetProduct ¶
func (pm *ProductManager) GetProduct(name string) (*product.ProductSetting, error)
func (*ProductManager) GetProductState ¶
func (pm *ProductManager) GetProductState(setting *product.ProductSetting) (*product.ProductState, error)
func (*ProductManager) InitConsumer ¶
func (pm *ProductManager) InitConsumer(productName string, consumerName string, partitions []int, startSeq uint64) error
func (pm *ProductManager) PrepareSubscription(productName string, durable string, startSeq uint64) error {
js, err := pm.client.GetJetStream() if err != nil { return err } // Check if the stream already exists streamName := fmt.Sprintf(productEventStream, pm.domain, productName) _, err = js.StreamInfo(streamName) if err != nil { return err } // Check wheter consumer exist or not _, err = js.ConsumerInfo(streamName, streamName) if err != nats.ErrConsumerNotFound { return err } // The consumer exists already if err == nil { return nil } // Preparing pull consumer subject := fmt.Sprintf(productEventSubject, pm.domain, productName, "*") cfg := &nats.ConsumerConfig{ Durable: durable, Description: "Product Subscription", FilterSubject: subject, AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 2000, MaxDeliver: -1, ReplayPolicy: nats.ReplayInstantPolicy, //DeliverSubject: nats.NewInbox(), } if startSeq > 0 { cfg.DeliverPolicy = nats.DeliverByStartSequencePolicy cfg.OptStartSeq = startSeq } // Create consumer on data product stream _, err = js.AddConsumer(streamName, cfg) if err != nil { return err } return nil }
func (*ProductManager) ListProducts ¶
func (pm *ProductManager) ListProducts() ([]*product.ProductSetting, error)
func (*ProductManager) PurgeProduct ¶
func (pm *ProductManager) PurgeProduct(name string) error
func (*ProductManager) UpdateProduct ¶
func (pm *ProductManager) UpdateProduct(name string, productSetting *product.ProductSetting) (*product.ProductSetting, error)
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
func NewSubscriptionManager ¶
func NewSubscriptionManager(client *core.Client, domain string) *SubscriptionManager
func (*SubscriptionManager) CreateSubscription ¶
func (sm *SubscriptionManager) CreateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)
func (*SubscriptionManager) DeleteSubscription ¶
func (sm *SubscriptionManager) DeleteSubscription(id string) error
func (*SubscriptionManager) GetSubscription ¶
func (sm *SubscriptionManager) GetSubscription(id string) (*subscription.SubscriptionSetting, error)
func (*SubscriptionManager) ListSubscriptions ¶
func (sm *SubscriptionManager) ListSubscriptions() ([]*subscription.SubscriptionSetting, error)
func (*SubscriptionManager) UpdateSubscription ¶
func (sm *SubscriptionManager) UpdateSubscription(id string, subscriptionSetting *subscription.SubscriptionSetting) (*subscription.SubscriptionSetting, error)
type TokenManager ¶
type TokenManager struct {
// contains filtered or unexported fields
}
func NewTokenManager ¶
func NewTokenManager(client *core.Client, domain string) *TokenManager
func (*TokenManager) CreateToken ¶
func (tm *TokenManager) CreateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)
func (*TokenManager) DeleteToken ¶
func (tm *TokenManager) DeleteToken(t string) error
func (*TokenManager) GetToken ¶
func (tm *TokenManager) GetToken(t string) (*token.TokenSetting, error)
func (*TokenManager) ListTokens ¶
func (tm *TokenManager) ListTokens() ([]*token.TokenSetting, error)
func (*TokenManager) UpdateToken ¶
func (tm *TokenManager) UpdateToken(t string, tokenSetting *token.TokenSetting) (*token.TokenSetting, error)
Click to show internal directories.
Click to hide internal directories.