structs

package
v0.4.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamNotOnline = errors.New("Stream is not Online")

Functions

This section is empty.

Types

type Await

type Await struct {
	sync.RWMutex
	Created        time.Time
	State          StreamState
	Resp           chan *TaskResponse
	ReceivedFinals int
	Uids           []uuid.UUID
}

func NewAwait

func NewAwait(sendIDs []uuid.UUID) (aw *Await)

func (*Await) Close

func (aw *Await) Close()

func (*Await) Send

func (aw *Await) Send(tr *TaskResponse) (bool, error)

func (*Await) SetState added in v0.3.11

func (aw *Await) SetState(ss StreamState)

type ClientControl

type ClientControl struct {
	Type string
	Resp chan ClientResponse
}

type ClientResponse

type ClientResponse struct {
	OK    bool
	Error string
	Time  time.Duration
}

type ConnTransport

type ConnTransport interface {
	Run(ctx context.Context, logger *zap.Logger, stream *StreamAccess)
	Type() string
}

type IndexerClienter

type IndexerClienter interface {
	RegisterStream(ctx context.Context, stream *StreamAccess) error
}

type SimpleWorkerInfo

type SimpleWorkerInfo struct {
	Network string
	Version string
	ChainID string
	ID      string
}

type StreamAccess

type StreamAccess struct {
	State           StreamState
	StreamID        uuid.UUID
	ResponseMap     map[uuid.UUID]*Await
	RequestListener chan TaskRequest

	ManagerID string

	ClientControl chan ClientControl

	CancelConnection context.CancelFunc

	WorkerInfo *WorkerInfo
	Transport  ConnTransport

	MapLock sync.RWMutex
	// contains filtered or unexported fields
}

func NewStreamAccess

func NewStreamAccess(transport ConnTransport, managerID string, conn *WorkerInfo) *StreamAccess

func (*StreamAccess) Close

func (sa *StreamAccess) Close() error

func (*StreamAccess) Ping

func (sa *StreamAccess) Ping(ctx context.Context) (time.Duration, error)

func (*StreamAccess) Reconnect

func (sa *StreamAccess) Reconnect(ctx context.Context, logger *zap.Logger) error

func (*StreamAccess) Recv

func (sa *StreamAccess) Recv(tr *TaskResponse) error

func (*StreamAccess) Req

func (sa *StreamAccess) Req(tr TaskRequest, aw *Await) error

func (*StreamAccess) Run

func (sa *StreamAccess) Run(ctx context.Context, logger *zap.Logger) error

type StreamState

type StreamState int
const (
	StreamUnknown StreamState = iota
	StreamOnline
	StreamError
	StreamReconnecting
	StreamClosing
	StreamOffline
)

type TaskError

type TaskError struct {
	Msg  string
	Type TaskErrorType
}

type TaskErrorType

type TaskErrorType string

type TaskRequest

type TaskRequest struct {
	ID      uuid.UUID
	Network string
	ChainID string
	Version string

	Type    string
	Payload json.RawMessage
}

type TaskResponse

type TaskResponse struct {
	ID      uuid.UUID
	Version string
	Type    string
	Order   int64
	Final   bool
	Error   TaskError
	Payload json.RawMessage
}

type TaskWorkerInfo added in v0.4.0

type TaskWorkerInfo struct {
	WorkerID string    `json:"worker_id"`
	LastSend time.Time `json:"last_send"`

	StreamState StreamState `json:"stream_state"`
	StreamID    uuid.UUID   `json:"stream_id"`
	ResponseMap map[uuid.UUID]TaskWorkerInfoAwait
}

type TaskWorkerInfoAwait added in v0.4.0

type TaskWorkerInfoAwait struct {
	Created        time.Time   `json:"created"`
	State          StreamState `json:"state"`
	ReceivedFinals int         `json:"received_finals"`
	Uids           []uuid.UUID `json:"expected"`
}

type TaskWorkerRecord added in v0.4.0

type TaskWorkerRecord struct {
	WorkerID string
	Stream   *StreamAccess
	LastSend time.Time
	L        sync.RWMutex
}

type TaskWorkerRecordInfo added in v0.4.0

type TaskWorkerRecordInfo struct {
	Workers []TaskWorkerInfo `json:"workers"`
	All     int              `json:"all"`
	Active  int              `json:"active"`
}

type WorkerAddress

type WorkerAddress struct {
	IP      net.IP `json:"ip"`
	Address string `json:"address"`
}

type WorkerCompositeKey

type WorkerCompositeKey struct {
	Network string
	ChainID string
	Version string
}

type WorkerConnection

type WorkerConnection struct {
	Version   string          `json:"version"`
	Type      string          `json:"type"`
	Addresses []WorkerAddress `json:"addresses"`
}

type WorkerInfo

type WorkerInfo struct {
	NodeSelfID     string             `json:"node_id"`
	Type           string             `json:"type"`
	ChainID        string             `json:"chain_id"`
	State          StreamState        `json:"state"`
	ConnectionInfo []WorkerConnection `json:"connection"`
	LastCheck      time.Time          `json:"last_check"`

	L sync.RWMutex `json:"-"`
}

func (*WorkerInfo) Clone added in v0.4.0

func (wi *WorkerInfo) Clone() *WorkerInfo

func (*WorkerInfo) LastChecked added in v0.4.0

func (wi *WorkerInfo) LastChecked()

func (*WorkerInfo) SetState added in v0.4.0

func (wi *WorkerInfo) SetState(ss StreamState)

type WorkerInfoStatic added in v0.4.0

type WorkerInfoStatic struct {
	*WorkerInfo
	TaskWorkerInfo TaskWorkerInfo `json:"tasks"`
}

type WorkerNetworkStatic added in v0.4.0

type WorkerNetworkStatic struct {
	Workers map[string]WorkerInfoStatic `json:"workers"`
	All     int                         `json:"all"`
	Active  int                         `json:"active"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL