Documentation ¶
Overview ¶
Package topic provides a consistent and unique API for Pub/Sub integrations, allowing to publish messages to a topic.
Index ¶
- Constants
- type Config
- type Handler
- func (h *Handler) ActivityPublishExecution(ctx context.Context, input InputPublish) (OutputPublish, error)
- func (h *Handler) ActivityPublishValidation(ctx context.Context, input InputPublish) (OutputPublish, error)
- func (h *Handler) Close() error
- func (h *Handler) ConfigMap() map[string]any
- func (h *Handler) Init() error
- func (h *Handler) IsReady() bool
- func (h *Handler) ListActivities() []string
- func (h *Handler) ListWorkflows() []string
- func (h *Handler) Register(w worker.Worker) error
- func (h *Handler) String() string
- func (h *Handler) WorkflowPublish(ctx workflow.Context, input InputPublish) (OutputPublish, error)
- type InputPublish
- type OutputPublish
- type Policies
- type Topic
- type With
Constants ¶
const Specification string = "topic"
Specification is the string representation of the topic specification.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Topic is the name of the topic to publish to. The format of the name is // specific to each integration. // // Example: "my-topic" // // Required. Topic string `json:"topic"` // Policies allows to set activity policies, such as timeouts and retries. Policies Policies `json:"policies"` // contains filtered or unexported fields }
Config allows the end-user to configure the specification for an integration.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles the topic specification for an integration.
func New ¶
func New(ctx context.Context, from integration.Integration, config Config, attachments ...With) (*Handler, error)
New returns a new topic Handler for the given integration. It applies the configuration passed by the end-user (forwarded by the integration). It also applies additional attachments an integration might leverage.
Example:
func New(myintegration, config, WithTopic(topic))
func (*Handler) ActivityPublishExecution ¶
func (h *Handler) ActivityPublishExecution(ctx context.Context, input InputPublish) (OutputPublish, error)
ActivityPublishExecution is the activity publishing the message to the topic.
func (*Handler) ActivityPublishValidation ¶
func (h *Handler) ActivityPublishValidation(ctx context.Context, input InputPublish) (OutputPublish, error)
ActivityPublishValidation is the activity validating the input.
func (*Handler) Close ¶
Close tries to properly close the specification. An error is returned in case the Handler has already been closed.
func (*Handler) ConfigMap ¶
ConfigMap transforms the configuration to a map, including a "from" key with the configuration of the overlying integration.
func (*Handler) Init ¶
Init initializes the specification. An error is returned in case the Handler has already been initialized.
func (*Handler) IsReady ¶
IsReady indicates if the specification is ready to be consumed by the overlying integration. The specification must be initialized, must not be closed, and must have registered its workflows and activities in the Temporal worker.
func (*Handler) ListActivities ¶
ListActivities returns a sorted list of activities' name registered by the specification for the overlying integration.
func (*Handler) ListWorkflows ¶
ListWorkflows returns a sorted list of workflows' name registered by the specification for the overlying integration.
func (*Handler) Register ¶
Register registers the specification's workflows and activities in the given Temporal worker. An error is returned in case the registration has already been made.
func (*Handler) WorkflowPublish ¶
func (h *Handler) WorkflowPublish(ctx workflow.Context, input InputPublish) (OutputPublish, error)
WorkflowPublish is a high-level opiniated Temporal workflow. It executes the following activities:
ActivityPublishValidation: Validates the input (local activity).
ActivityPublishExecution: Publishes the message received in input to the topic.
type InputPublish ¶
type InputPublish struct { // Policies passed in the input can override the ones set when creating the // specification. It will only apply if allowed in the integration's Config // via AllowPoliciesOverride. Policies *Policies `json:"policies"` // Context represents the event's context, shared across every specifications // and integrations of this ecosystem. Context *event.Context `json:"context,omitempty"` // Message is the message to publish to the integration using the topic set in // the specification's Config. // // Required. Message []byte `json:"message"` }
InputPublish is the input for the "Publish" workflow and activities.
func (*InputPublish) Validate ¶
func (input *InputPublish) Validate(config *Config) error
Validate can be used to validate the workflow/activity's input. It's the validation function used in the local activity ActivityPublishValidation.
type OutputPublish ¶
type OutputPublish struct { // Status is the status of the workflow or activity. It's one of "success", // "failure". Status lifecycle.Status `json:"status"` }
OutputPublish is the output for the "Publish" workflow and activities.
type Policies ¶
type Policies struct { // Execution is the policy to apply by the activity used to publish a message. // // Note: Since this is a short-live policy, activity's heartbeat is not used. // Therefore, Execution.HeartbeatTimeout is not applied. We advise to set a // short-live Execution.SingleAttemptTimeout, such as 3 seconds. Execution lifecycle.ActivityPolicy `json:"execution"` // contains filtered or unexported fields }
Policies represents the Temporal activity policies to apply within the workflows exposed by this package and the overlying integration.
type Topic ¶
type Topic interface { // RegisterWithTopic allows an end-user to register the topic specification // within an integration. RegisterWithTopic(worker.Worker, Config) error }
Topic is the interface used by an overlying integration to leverage the topic specification.
type With ¶
With allows an integration to customize the behavior of the specification. It's in addition to Config. Since Config is designed for — and accessible by — end-users, With is specifically designed for integrations.
For example, an integration must call WithTopic to attach the *pubsub.Topic:
func New(myintegration, config, WithTopic(topic))