tdbg

package
v1.26.2-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 64 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	DefaultFrontendAddress = "127.0.0.1:7233"
)

Variables

View Source
var (
	FlagAddress                    = "address"
	FlagHistoryAddress             = "history-address"
	FlagNamespaceID                = "namespace-id"
	FlagNamespace                  = "namespace"
	FlagNamespaceAlias             = []string{"n"}
	FlagShardID                    = "shard-id"
	FlagWorkflowID                 = "workflow-id"
	FlagWorkflowIDAlias            = []string{"wid"}
	FlagRunID                      = "run-id"
	FlagRunIDAlias                 = []string{"rid"}
	FlagNumberOfShards             = "number-of-shards"
	FlagMinEventID                 = "min-event-id"
	FlagMaxEventID                 = "max-event-id"
	FlagTaskQueue                  = "task-queue"
	FlagTaskQueueType              = "task-queue-type"
	FlagContextTimeout             = "context-timeout"
	FlagContextTimeoutAlias        = []string{"ct"}
	FlagCluster                    = "cluster"
	FlagTargetCluster              = "target-cluster"
	FlagPageSize                   = "pagesize"
	FlagFrom                       = "from"
	FlagPrintFullyDetail           = "print-full"
	FlagPrintJSON                  = "print-json"
	FlagHeartbeatedWithin          = "heartbeated-within"
	FlagInputFilename              = "input-filename"
	FlagOutputFilename             = "output-filename"
	FlagClusterMembershipRole      = "role"
	FlagSkipErrorMode              = "skip-errors"
	FlagTaskID                     = "task-id"
	FlagTaskCategory               = "task-category"
	FlagTaskVisibilityTimestamp    = "task-timestamp"
	FlagMinVisibilityTimestamp     = "min-visibility-ts"
	FlagMaxVisibilityTimestamp     = "max-visibility-ts"
	FlagEnableTLS                  = "tls"
	FlagTLSCertPath                = "tls-cert-path"
	FlagTLSKeyPath                 = "tls-key-path"
	FlagTLSCaPath                  = "tls-ca-path"
	FlagTLSDisableHostVerification = "tls-disable-host-verification"
	FlagTLSServerName              = "tls-server-name"
	FlagLastMessageID              = "last-message-id"
	FlagJobToken                   = "job-token"
	FlagReason                     = "reason"
	FlagYes                        = "yes"
	FlagMore                       = "more"
	FlagMinEventVersion            = "min-event-version"
	FlagMaxEventVersion            = "max-event-version"
	FlagMinTaskID                  = "min-task-id"
	FlagMaxTaskID                  = "max-task-id"
	FlagDLQType                    = "dlq-type"
	FlagQueueType                  = "queue-type"
	FlagDLQVersion                 = "dlq-version"
	FlagMaxMessageCount            = "max-message-count"
	FlagProtoType                  = "type"
	FlagHexData                    = "hex-data"
	FlagHexFile                    = "hex-file"
	FlagBinaryFile                 = "binary-file"
	FlagBase64Data                 = "base64-data"
	FlagBase64File                 = "base64-file"
	FlagTaskCategoryID             = "task-category-id"
	FlagEncoding                   = "encoding"
	FlagPartitionID                = "partition-id"
	FlagStickyName                 = "sticky-name"
	FlagBuildIDs                   = "select-build-id"
	FlagUnversioned                = "select-unversioned"
	FlagAllActive                  = "select-all-active"
)

Flags used to specify cli command line arguments

Functions

func AdminDecodeBase64

func AdminDecodeBase64(c *cli.Context) error

func AdminDecodeProto

func AdminDecodeProto(c *cli.Context) error

func AdminDeleteWorkflow

func AdminDeleteWorkflow(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error

AdminDeleteWorkflow force deletes a workflow's mutable state (both concrete and current), history, and visibility records as long as it's possible. It should only be used as a troubleshooting tool since no additional check will be done before the deletion. (e.g. if a child workflow has recorded its result in the parent workflow) Please use normal workflow delete command to gracefully delete a workflow execution.

func AdminDescribeHistoryHost

func AdminDescribeHistoryHost(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeHistoryHost describes history host

func AdminDescribeShard

func AdminDescribeShard(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeShard describes shard by shard id

func AdminDescribeTaskQueuePartition added in v1.26.0

func AdminDescribeTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeTaskQueuePartition displays task queue partition information

func AdminDescribeWorkflow

func AdminDescribeWorkflow(c *cli.Context, clientFactory ClientFactory) error

AdminDescribeWorkflow describe a new workflow execution for admin

func AdminForceUnloadTaskQueuePartition added in v1.26.2

func AdminForceUnloadTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error

AdminForceUnloadTaskQueuePartition forcefully unloads a task queue partition

func AdminGetShardID

func AdminGetShardID(c *cli.Context) error

AdminGetShardID get shardID

func AdminImportWorkflow added in v1.23.0

func AdminImportWorkflow(c *cli.Context, clientFactory ClientFactory) error

AdminImportWorkflow imports history

func AdminListClusterMembers

func AdminListClusterMembers(c *cli.Context, clientFactory ClientFactory) error

AdminListClusterMembers outputs a list of cluster members

func AdminListGossipMembers

func AdminListGossipMembers(c *cli.Context, clientFactory ClientFactory) error

AdminListGossipMembers outputs a list of gossip members

func AdminListShardTasks

func AdminListShardTasks(c *cli.Context, clientFactory ClientFactory, registry tasks.TaskCategoryRegistry) error

AdminListShardTasks outputs a list of a tasks for given Shard and Task Category

func AdminListTaskQueueTasks

func AdminListTaskQueueTasks(c *cli.Context, clientFactory ClientFactory) error

AdminListTaskQueueTasks displays task information

func AdminRebuildMutableState

func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error

AdminRebuildMutableState rebuild a workflow mutable state using persisted history events

func AdminRefreshWorkflowTasks

func AdminRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory) error

AdminRefreshWorkflowTasks refreshes all the tasks of a workflow

func AdminRemoveTask

func AdminRemoveTask(
	c *cli.Context,
	clientFactory ClientFactory,
	taskCategoryRegistry tasks.TaskCategoryRegistry,
) error

AdminRemoveTask describes history host

func AdminReplicateWorkflow added in v1.26.0

func AdminReplicateWorkflow(
	c *cli.Context,
	clientFactory ClientFactory,
) error

AdminReplicateWorkflow force replicates a workflow by generating replication tasks

func AdminShardManagement

func AdminShardManagement(c *cli.Context, clientFactory ClientFactory) error

AdminShardManagement describes history host

func AdminShowWorkflow

func AdminShowWorkflow(c *cli.Context, clientFactory ClientFactory) error

AdminShowWorkflow shows history

func NewCliApp

func NewCliApp(opts ...Option) *cli.App

NewCliApp instantiates a new instance of the CLI application.

func StringToEnum added in v1.23.0

func StringToEnum(search string, candidates map[string]int32) (int32, error)

Types

type BoolFlagLookup added in v1.23.0

type BoolFlagLookup interface {
	Bool(name string) bool
}

BoolFlagLookup can be satisfied by github.com/urfave/cli/v2.Context.

type ClientFactory

type ClientFactory interface {
	AdminClient(c *cli.Context) adminservice.AdminServiceClient
	WorkflowClient(c *cli.Context) workflowservice.WorkflowServiceClient
}

ClientFactory is used to construct rpc clients

func NewClientFactory

func NewClientFactory(opts ...ClientFactoryOption) ClientFactory

NewClientFactory creates a new ClientFactory

type ClientFactoryOption added in v1.23.0

type ClientFactoryOption func(params *clientFactoryParams)

ClientFactoryOption is used to configure the ClientFactory via NewClientFactory.

func WithFrontendAddress added in v1.23.0

func WithFrontendAddress(address string) ClientFactoryOption

WithFrontendAddress ensures that admin clients created by the factory will connect to the specified address.

type DLQJobService added in v1.23.0

type DLQJobService struct {
	// contains filtered or unexported fields
}

func (*DLQJobService) CancelJob added in v1.23.0

func (ac *DLQJobService) CancelJob(c *cli.Context) error

func (*DLQJobService) DescribeJob added in v1.23.0

func (ac *DLQJobService) DescribeJob(c *cli.Context) error

type DLQMessage added in v1.23.0

type DLQMessage struct {
	// MessageID is the ID of the message within the DLQ. You can use this ID as an input to the `--last_message_id`
	// flag for the `purge` and `merge` commands.
	MessageID int64 `json:"message_id"`
	// ShardID is only used for non-namespace replication tasks.
	ShardID int32 `json:"shard_id"`
	// Payload contains the parsed task metadata from the server.
	Payload *TaskPayload `json:"payload"`
}

DLQMessage is used primarily to form the JSON output of the `read` command. It's only used for v2.

type DLQService added in v1.23.0

type DLQService interface {
	ReadMessages(c *cli.Context) error
	PurgeMessages(c *cli.Context) error
	MergeMessages(c *cli.Context) error
	ListQueues(c *cli.Context) error
}

type DLQServiceProvider added in v1.23.0

type DLQServiceProvider struct {
	// contains filtered or unexported fields
}

func NewDLQServiceProvider added in v1.23.0

func NewDLQServiceProvider(
	clientFactory ClientFactory,
	taskBlobEncoder TaskBlobEncoder,
	taskCategoryRegistry tasks.TaskCategoryRegistry,
	writer io.Writer,
	prompterFactory PrompterFactory,
) *DLQServiceProvider

func (*DLQServiceProvider) GetDLQJobService added in v1.23.0

func (p *DLQServiceProvider) GetDLQJobService() DLQJobService

GetDLQJobService returns a DLQJobService.

func (*DLQServiceProvider) GetDLQService added in v1.23.0

func (p *DLQServiceProvider) GetDLQService(
	c *cli.Context,
) (DLQService, error)

GetDLQService returns a DLQService based on FlagDLQVersion.

type DLQV1Service added in v1.23.0

type DLQV1Service struct {
	// contains filtered or unexported fields
}

func NewDLQV1Service added in v1.23.0

func NewDLQV1Service(clientFactory ClientFactory, prompter *Prompter, writer io.Writer) *DLQV1Service

func (*DLQV1Service) ListQueues added in v1.23.0

func (ac *DLQV1Service) ListQueues(c *cli.Context) error

func (*DLQV1Service) MergeMessages added in v1.23.0

func (ac *DLQV1Service) MergeMessages(c *cli.Context) error

func (*DLQV1Service) PurgeMessages added in v1.23.0

func (ac *DLQV1Service) PurgeMessages(c *cli.Context) error

func (*DLQV1Service) ReadMessages added in v1.23.0

func (ac *DLQV1Service) ReadMessages(c *cli.Context) (err error)

type DLQV2Service added in v1.23.0

type DLQV2Service struct {
	// contains filtered or unexported fields
}

DLQV2Service implements DLQService for persistence.QueueV2.

func NewDLQJobService added in v1.23.0

func NewDLQJobService(
	clientFactory ClientFactory,
	writer io.Writer,
) *DLQV2Service

func NewDLQV2Service added in v1.23.0

func NewDLQV2Service(
	category tasks.Category,
	sourceCluster string,
	targetCluster string,
	clientFactory ClientFactory,
	writer io.Writer,
	prompter *Prompter,
	taskBlobEncoder TaskBlobEncoder,
) *DLQV2Service

func (*DLQV2Service) ListQueues added in v1.23.0

func (ac *DLQV2Service) ListQueues(c *cli.Context) (err error)

func (*DLQV2Service) MergeMessages added in v1.23.0

func (ac *DLQV2Service) MergeMessages(c *cli.Context) error

func (*DLQV2Service) PurgeMessages added in v1.23.0

func (ac *DLQV2Service) PurgeMessages(c *cli.Context) error

func (*DLQV2Service) ReadMessages added in v1.23.0

func (ac *DLQV2Service) ReadMessages(c *cli.Context) (err error)

type DefaultFrontendAddressProvider added in v1.23.0

type DefaultFrontendAddressProvider struct{}

DefaultFrontendAddressProvider uses FlagAddress to determine the frontend address, defaulting to DefaultFrontendAddress if FlagAddress is not set or is empty.

func (DefaultFrontendAddressProvider) GetFrontendAddress added in v1.23.0

func (d DefaultFrontendAddressProvider) GetFrontendAddress(c *cli.Context) string

type HttpGetter

type HttpGetter interface {
	Get(url string) (resp *http.Response, err error)
}

HttpGetter defines http.Client.Get(...) as an interface so we can mock it

type Option added in v1.23.0

type Option func(params *Params)

Option modifies the Params for tdbg.

type Params added in v1.23.0

type Params struct {
	// ClientFactory creates Temporal service clients for tdbg to use.
	ClientFactory ClientFactory
	// TaskCategoryRegistry is used to determine which task categories are available for tdbg to use.
	TaskCategoryRegistry tasks.TaskCategoryRegistry
	// Writer is used to write output from tdbg. The default is os.Stdout.
	Writer io.Writer
	// ErrWriter is used to write errors from tdbg. The default is os.Stderr.
	ErrWriter io.Writer
	// TaskBlobEncoder is needed for custom task serialization. The default uses PredefinedTaskBlobDeserializer.
	TaskBlobEncoder TaskBlobEncoder
}

Params which are customizable for the CLI application.

type PredefinedTaskBlobDeserializer added in v1.23.0

type PredefinedTaskBlobDeserializer struct{}

PredefinedTaskBlobDeserializer is a TaskBlobProtoDeserializer that deserializes task blobs into the predefined task categories that are used by Temporal. If your server has custom categories, you'll want to build something on top of this.

func NewPredefinedTaskBlobDeserializer added in v1.23.0

func NewPredefinedTaskBlobDeserializer() PredefinedTaskBlobDeserializer

NewPredefinedTaskBlobDeserializer returns a TaskBlobProtoDeserializer that works for the stock task categories of the server. You need to extend this if you have custom task categories.

func (PredefinedTaskBlobDeserializer) Deserialize added in v1.23.0

func (d PredefinedTaskBlobDeserializer) Deserialize(categoryID int, blob *commonpb.DataBlob) (proto.Message, error)

Deserialize a task blob from one of the server's predefined task categories into a proto message.

type Prompter added in v1.23.0

type Prompter struct {
	// contains filtered or unexported fields
}

Prompter is a helper for prompting the user for confirmation.

func NewPrompter added in v1.23.0

func NewPrompter(c BoolFlagLookup, opts ...PrompterOption) *Prompter

NewPrompter creates a new Prompter. In most cases, the first argument should be github.com/urfave/cli/v2.Context.

func (*Prompter) Prompt added in v1.23.0

func (p *Prompter) Prompt(msg string)

Prompt the user for confirmation. If the user does not respond with "y" or "yes" (case-insensitive and without leading or trailing space), the process will exit with code 1.

type PrompterFactory added in v1.23.0

type PrompterFactory func(c BoolFlagLookup) *Prompter

func NewPrompterFactory added in v1.23.0

func NewPrompterFactory(opts ...PrompterOption) PrompterFactory

type PrompterOption added in v1.23.0

type PrompterOption func(*PrompterParams)

PrompterOption is used to override default PrompterParams.

type PrompterParams added in v1.23.0

type PrompterParams struct {
	// Reader defaults to os.Stdin.
	Reader io.Reader
	// Writer defaults to os.Stdout.
	Writer io.Writer
	// Exiter defaults to [os.Exit].
	Exiter func(code int)
}

PrompterParams is used to configure a new Prompter.

type ProtoTaskBlobEncoder added in v1.23.0

type ProtoTaskBlobEncoder struct {
	// contains filtered or unexported fields
}

ProtoTaskBlobEncoder is a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob into a proto message, and then uses protojson to marshal the proto message into a human-readable format.

func NewProtoTaskBlobEncoder added in v1.23.0

func NewProtoTaskBlobEncoder(deserializer TaskBlobProtoDeserializer) *ProtoTaskBlobEncoder

NewProtoTaskBlobEncoder returns a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob.

func (*ProtoTaskBlobEncoder) Encode added in v1.23.0

func (e *ProtoTaskBlobEncoder) Encode(writer io.Writer, categoryID int, blob *commonpb.DataBlob) error

Encode a blob for a given task category to a human-readable format by deserializing the blob into a proto message and then pretty-printing it using protojson.

type TaskBlobEncoder added in v1.23.0

type TaskBlobEncoder interface {
	Encode(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error
}

TaskBlobEncoder takes a blob for a given task category and encodes it to a human-readable format. Here's a breakdown of the relationship between all the related types needed to implement a custom encoder: - NewCliApp accepts a list of Option objects. - Option objects modify Params. - Params contain a TaskBlobEncoder. - TaskBlobEncoder is implemented by ProtoTaskBlobEncoder. - ProtoTaskBlobEncoder uses protojson to marshal proto.Message objects from a TaskBlobProtoDeserializer. - TaskBlobProtoDeserializer is implemented by the stock PredefinedTaskBlobDeserializer. - PredefinedTaskBlobDeserializer deserializes commonpb.DataBlob objects into proto.Message objects.

Example
package main

import (
	"bytes"
	"fmt"
	"io"
	"os"
	"strconv"

	commonpb "go.temporal.io/api/common/v1"

	enumspb "go.temporal.io/api/enums/v1"
	"go.temporal.io/server/service/history/tasks"
	"go.temporal.io/server/tools/tdbg"
)

var customCategory = tasks.Category{}

func main() {
	var output bytes.Buffer
	app := tdbg.NewCliApp(func(params *tdbg.Params) {
		params.Writer = &output
		stockEncoder := params.TaskBlobEncoder
		params.TaskBlobEncoder = tdbg.TaskBlobEncoderFn(func(
			writer io.Writer,
			taskCategoryID int,
			blob *commonpb.DataBlob,
		) error {
			if taskCategoryID == customCategory.ID() {
				_, err := writer.Write(append([]byte("hello, "), blob.Data...))
				return err
			}
			return stockEncoder.Encode(writer, taskCategoryID, blob)
		})
	})
	file, err := os.CreateTemp("", "*")
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := os.Remove(file.Name()); err != nil {
			panic(err)
		}
	}()
	_, err = file.Write([]byte("\"world\""))
	if err != nil {
		panic(err)
	}
	err = app.Run([]string{
		"tdbg", "decode", "task",
		"--" + tdbg.FlagEncoding, enumspb.ENCODING_TYPE_JSON.String(),
		"--" + tdbg.FlagTaskCategoryID, strconv.Itoa(customCategory.ID()),
		"--" + tdbg.FlagBinaryFile, file.Name(),
	})
	if err != nil {
		panic(err)
	}

	fmt.Println(output.String())

}
Output:

hello, "world"

type TaskBlobEncoderFn added in v1.23.0

type TaskBlobEncoderFn func(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error

TaskBlobEncoderFn implements TaskBlobEncoder by calling a function.

func (TaskBlobEncoderFn) Encode added in v1.23.0

func (e TaskBlobEncoderFn) Encode(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error

Encode the task by calling the function.

type TaskBlobProtoDeserializer added in v1.23.0

type TaskBlobProtoDeserializer interface {
	Deserialize(taskCategoryID int, blob *commonpb.DataBlob) (proto.Message, error)
}

TaskBlobProtoDeserializer is used to deserialize task blobs into proto messages. This makes it easier to create an encoder if your tasks are all backed by protos. We separate this from the encoder because we don't want the encoder to be tied to protos as the wire format.

type TaskPayload added in v1.23.0

type TaskPayload struct {
	// contains filtered or unexported fields
}

TaskPayload implements both json.Marshaler and json.Unmarshaler. This allows us to pretty-print tasks using jsonpb when serializing and then store the raw bytes of the task payload for later use when deserializing. We need to store the raw bytes instead of immediately decoding to a concrete type because that logic is dynamic and can't depend solely on the task category ID in case there are additional task categories in use.

func (*TaskPayload) Bytes added in v1.23.0

func (p *TaskPayload) Bytes() []byte

Bytes returns the raw bytes of the deserialized TaskPayload. This will return nil if the payload has not been deserialized yet.

func (*TaskPayload) MarshalJSON added in v1.23.0

func (p *TaskPayload) MarshalJSON() ([]byte, error)

func (*TaskPayload) UnmarshalJSON added in v1.23.0

func (p *TaskPayload) UnmarshalJSON(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL