xctrl

command module
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2023 License: MIT Imports: 6 Imported by: 0

README

xctrl - XSwitch XCC API Go 语言 SDK

小樱桃在用的Go语言SDK。

目录结构:

  • ctrl:节点管理
  • proto:Google Protocol Buffer协议描述
  • tboy 是一个冒牌的的FreeSWITCH,用于测试
  • xctrl:xctrl Go语言SDK生成器,参考自Go Micro框架

需要注意,xctrl并不使用Protocol Buffer以及gRPC,而是使用了Protocol Buffer的协议描述,方便生成跨语言的客户端户。xctrl与XSwitch之间使用JSON数据格式,并使用JSON-RPC封装。

SDK使用

SDK依赖google.golang.org/protobuf/包,本SDK有两种使用方式:

  • 直接使用ctrlxctrl包,集成了协议结构体(proto/xctrl/xctrl.pb.go)及NATS消息收发(proto/xctrl/xctrl.pb.xctrl.go)。
  • 仅使用生成的Go结构体,如仅使用xctrl.pb.go,而不使用xctrl.pb.xctrl.go,可以直接将这两个文件复制到你的项目中。

ctrl

ctrl是XSwitch控制器,用于控制XSwitch。它提供了一些函数方便与XSwitch交互。

ctrl.Init

初始化

func Init(h Handler, trace bool, addrs string) error
  • h是一个ctrl.Handler类型的结构,必须实现它定义的几个函数,下面会有详细描述。
  • trace:是否开启内部Trace日志。
  • addrs是NATS地址,可能可以支持多个以逗号分隔的地址,但未测试。

初始化后,内部会生成一个全局的globalCtrl单例,用于存储内部状态。

Handler是一个interface,必须实现如下几个函数(可以是空函数)。

type Handler interface {
	// ctx , topic, reply,Params
	Request(context.Context, string, string, *Request)
	// ctx , topic ,reply  Params
	App(context.Context, string, string, *Message)
	// ctx , topic , Params
	Event(context.Context, string, *Request)
	// ctx , topic , Params
	Result(context.Context, string, *Result)
}
ctrl.EnableRequest
func EnableRequest(topic string) error

订阅Request请求消息。主要用于处理FreeSWITCH的请求,如dialplandirectoryconfig等。这种订阅总是异步处理的。

ctrl.EnableApp
func EnableApp(topic string) error

订阅一个Topic,是一个全能的订阅方式,包括接收Node的事件、返回结果等。

对于Event.Channel事件,回调函数里它将以当前的channel的uuid为topic和queue启用一个bus消息总线进行订阅处理,一方面避免nats回调端的阻塞,另一方面, 使channel在bus中成为一个串行的订阅。因而,对于同一个Channel UUID来说,回调是串行的,保证channel的START,RING,ANSWER,DESTROY等事件处理的有序性。

对于其它事件,它将使用新的Go Routine进行回调,因而,无法保证顺序。

EnableEvent
func EnableEvent(topic string, queue string) error

订阅事件相应的Topic,如cn.xswitch.ctrl.cdr。目前,除cn.xswitch.ctrl.cdr是在NATS中串行回调外,其它均为在新的Go Routine中回调。

Subscribe
func Subscribe(topic string, cb nats.EventCallback, queue string) (nats.Subscriber, error)

调用底层的NATS发起一个订阅。所有回调在同一个NATS Go Routine中回调。需要避免阻塞。

对Node中Channel的处理

Node侧为FreeSWITCH侧,订阅cn.xswitch.node以及cn.xswitch.node.$node-uuid

Ctrl侧为控制侧,订阅cn.xswitch.ctrlcn.xswitch.ctrl.$ctrl-uuid

对于呼入,FreeSWITCH会发送Event.Channel消息,第一个消息是state = START,最后一个是state = DESTROY

对于呼出,第一个消息是state = CALLING、最后一个是state = DESTROY

只要Channel产生,都会产生Event.CDR事件。

同步处理机制

同步处理机制简单。系统通过client包,直接进行NATS同步调用。

收到state = START后,执行

result, err := ctrl.Service().Accept(...)
ctrl.Service().Answer(...)
ctrl.Service().Play(...)
ctrl.Service().Hangup(...)

由于这些操作都是阻塞的,因而,要保证在一个新的Go Routine中运行,以避免阻塞消息的接收。

同步调用使用简单,但有个明显的不足,比如,Play是阻塞的,无法在当前的Go Routine中终止。如果需要提前终止一个长的Play操作,可以在其它的Go Routine中执行Stop,这通常需要需要外部的触发机制(如API),或提前启动一个Go Routine专门用于定时发Stop

如果Play正常结束,会返回code = 200,如果被中止,通常会返回code = 410。有时候,对端主动挂机,也会导致Play提前终止。

可以通过检查Play的返回码,或者根据是否接收到state = DESTROY消息,或者主动发XNode.GetState接口向Node查询Channel的生存状态。

关于err的处理:

上述接口返回的err是一个*errors.Error类型(在stack/中实现),可以按如下方式处理:

if err != nil {
	err1 := err.(*errors.Error)
	if err1.code == 500 {
	}
}
基于Context的同步处理机制

上述同步处理机制中,如果对端没有响应,则在超时前无法取消。可以使用Context进行超时设置或中途取消。

ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
resullt, err := ctrl.AService().Play(ctx, ...)
defer cancel()
if err != nil {
		err1 := err.(*errors.Error)
	if err1.code == 408 {// timeout
	}
}
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
resullt, err := ctrl.AService().Play(ctx, ...)
go func() {
	// 100ms后cancel
	time.Sleep(100 * time.MilliSecond)
	cancel()
}
if err != nil {
	err1 := err.(*errors.Error)
	if err1.code == 499 {// canceled ...
	}
}
异步处理机制

有一个ctrl.AsyncService()可以发送异步的命令。如:

ctrl.AsyncService().Play(...)

异步命令调用nats.Publish发送消息,会立即返回。除非NATS连接失败,结果永远会返回code = 201

异步命令无法获取执行结果。

虽然有一个ctrl.EnableResult可用,但它独占一个订阅主题,实际上用处不大。

ACall接口

该接口是一个试验接口。

另一种处理方式是不使用上述机制,通过独立的订阅支持ACall接口。

ctrl.Subscribe("cn.xswitch.ctrl."+ctrl.UUID(), EventCallback, "ctrl")

订阅后,以在EventCallback回调中再调用ctrl.DoResultCallback处理结果:

func EventCallback(ctx context.Context, ev nats.Event) error {
	xlog.Info(ev.Topic(), string(ev.Message().Body))

	var msg ctrl.Message
	err := json.Unmarshal(ev.Message().Body, &msg)

	if err != nil {
		xlog.Error("parse error", ev)
		return err
	}

	if msg.Method == "" { // maybe a result
		go ctrl.DoResultCallback(&msg)
		return nil
	}

	xlog.Error(msg.Method)

	switch msg.Method {
	case "Event.Channel":
    ...

由于该EventCallback是调用者自己实现的,因而可以自己选择是否在Go Routine中进行回调。

在调用时,可以通过ctrl.ACallOption().WithCallback()传入要回调的函数。

	err := ctrl.ACall(node, "Dial",
		&xctrl.DialRequest{
			CtrlUuid: channel.CtrlUuid,
			Destination: &xctrl.Destination{
				GlobalParams: map[string]string{
					"ignore_early_media": "true",
				},
				CallParams: []*xctrl.CallParam{
					{
						Uuid:       channel.Uuid,
						CidName:    "TEST",
						CidNumber:  "test",
						DestNumber: "1008",
						DialString: "sofia/public/10000210@rts.xswitch.cn:20003",
						Params: map[string]string{
							"absolute_codec_string": "PCMA,PCMU",
						},
					},
				},
			},
		},
		ctrl.ACallOption().WithCallback(func(msg *ctrl.Message, data interface{}) {
			xlog.Info(string(*msg.Result))
			r := &xctrl.DialResponse{}
			err := json.Unmarshal(*msg.Result, &r)

			if err != nil {
				xlog.Error(err)
			}

			xlog.Info(r.Cause)
		}),
	)

实际使用时,建议使用上面介绍的bus队列机制对同一个Channel UUID相关的消息分流到独立的Go Routine中,这样,可以更好的控制生命周期。

Channel的生命周期

呼入

digraph G {
	START -> DESTROY
	START -> RINGING -> ANSWERED -> DESTROY[color=green]
	START -> ANSWERED -> DESTROY[color=blue]
	START -> ANSWERED -> BRIDGE -> UNBRIDGE -> DESTROY[color=red]
}

呼出,其中,M代表有媒体,N代表ignore_early_media=true的情况。

digraph G {
	CALLING -> RINGING -> DESTROY
	CALLING -> RINGING -> ANSWERED -> DESTROY[color=green]
	CALLING -> ANSWER -> DESTROY[style=dashed color=grey]
	CALLING -> RINGING -> MEDIA -> READY -> ANSWERED -> DESTROY[color=blue label="M"]
	CALLING -> RINGING -> ANSWERED -> MEDIA -> READY-> DESTROY[color=red label="N"]
	CALLING -> RINGING -> MEDIA -> BRIDGE -> ANSWERED -> UNBRIDGE -> DESTROY[color=purple label="M"]
	CALLING -> RINGING -> ANSWERED -> MEDIA -> BRIDGE -> UNBRIDGE -> DESTROY[color=pink label=N]
}

在调用XNode.Dial外呼的时候,在ignore_early_media=false(默认)的情况下,收到MEDIA就会触发READY事件。如果为true,则需要等到ANSWERED以后才会触发READY状态。不管什么情况,都需要在收到READY状态后才可以对Channel进行控制。

在执行XNode.Bridge时,没有READY事件,这时可以根据ANSWEREDBRIDGE事件处理业务逻辑。

在XNode中,一个Channel从创建开始(state = STARTstate = CALLING),到销毁(state = DESTROY),是一个完整的生命周期。销毁前,会发送Event.CDR事件,通常会在单独的Topic上发出(可配置)。

由于Event.Channel并不包含完整的数据(通道变量等),因而建议在Ctrl侧对Channel数据进行缓存。简单的缓存办法是直接根据Channel UUID放到一个Map中。由于Channel更新相对频繁,因而sync.Map可能不大适用,直接用Map + sync.Mutex可能更直接一些。

一般来说,只要Channel被创建,总会有对应的DESTROY消息。但是,在XNode发生崩溃的情况下,需要准备超时垃圾回收机制。

这样Ctrl的总体实现就会很复杂。

另一种实现思路是将Channel相关的状态都在XNode侧用通道变量保存。每次事件都带上全量的通道变量,这样Ctrl侧的逻辑实现就会简单一些,代价是会增加NATS消息吞吐量,因为大多数情况下,绝大部分的通道变量是无用的。全量的通道变量暂时还没有实现。

一种优化方案是根据实际的业务场景选择是否启用和传递哪些通道变量。暂时还没有实现。

Context

Ctrl中的Context使用了标准的Go Context包,目前没有太大用处,大部分可以直接传入context.Background()context.TODO()

queueBufferSize

在订阅事件的时候会使用这个变量大小进行channel的初始化,1024容量足够事件使用,太小会导致程序阻塞卡顿,影响运行效率。

protobuf 扩展

在NativeJSAPI中,请求和返回的对象是多种多样的,因此定义一个单一的函数比较困难。我们在xctrl包中扩展了XNativeJSRequestXNativeJSResponse以代替原来的NatvieJSRequestNativeJSResponse。用法如下:

req := &xctrl.XNativeJSRequest{
	CtrlUuid: CtrlUUID,
	Data: &xctrl.XNativeJSRequestData{
		Command: "sofia.status",
		Data: *ctrl.ToRawMessage(map[string]string{
			"profile": profile_name,
		}),
	},
}
response, err := ctrl.Service().NativeJSAPI(context.Background(), req, ctrl.WithAddress(""))

bus

bus是一个消息总线,相当于一个内部消息队列,支持Pub/Sub模式。

bus.Subscribe("topic", "queue", func(ev *Event) error {
})

ev := NewEvent("Flag", "test-topic", "message", "data")
bus.Publish(ev)

Publish用于异步地往消息队列中发送一个消息。消息会发送到一个chan缓冲队列中,如果队列中未消费的消息达到最大值,Publish操作将会被阻塞。默认的最大值为:inboundBufferSize = 10240000

Subscribe用于订阅一个主题(toipc),收到消息后会回调一个回调函数。如果queue参数为空字符串,则回调函数会在一个新的Go Routine中回调,因此可能无法保证顺序。

如果queue非空,则为对于每一个订阅者而言,每一个queue生成一个Go Routine,所有发送到该queue的消息将会被顺序调用,因此应该保证queue的粒度,在回调函数中不要过度阻塞。

queue的典型应用是针对在FreeSWITCH中的一路通话,每一个Channel UUID都可以作为一个独立的queue进行订阅,这样,即使消息回调函数发生阻塞,也只影响这一路通话。

如果Event的Flag参数为DESTROY,则Go Routine将会终止,并自动取消订阅。

过期

在异常情况下,可能由于收不到DESTROY相关的消息,导致Go Routine无法正常终止,相关的资源也无法释放。使用SubscribeWithExpire,可以在极端情况下保证资源释放。需要检查回调中的Flag是否为TIMEOUT,如:

bus.SubscribeWithExpire("topic", "queue", time.Hour, func(ev *Event) error {
	if ev.Flag == "TIMEOUT" {
		bus.Unsubscribe("topic", "queue")
	}
})
多次订阅相同的topic和相同的queue

在实际生产中会有很多个订阅者同时订阅相同的topic和相同的queue,多个订阅者是竞争关系,即对于一个特定的消息,有且只有一个订阅者能接收到消息。这一点跟NATS的Queue订阅类似。

多次订阅相同的topic和不同的queue

多个订阅者订阅相同的topic和不同的queue,对于一条特定的消息,多个订阅者都能收到。跟NATS类似。

多次订阅不同的topic和相同的queue

在实际生产中会有很多个订阅者订阅不同的topic和相同的queuequeue之间没有必然的联系,因为订阅者首先是以Topic区分的。

其他

目前,EnableXXX之类的都是在Queue方式订阅的,没有考虑到多Ctrl的情况。有待进一步设计。

更多文档参见proto/doc

使用

  1. 克隆该项目到本地:
git clone https://git.xswitch.cn/xswitch/xctrl.git
cd xctrl
  1. Protocol Buffers编译器(protoc)
brew install protobuf
  1. 安装protoc-gen-doc依赖:
  • 推荐方式:
make setup  
  • 手动安装:
go install github.com/chuanlinzhang/protoc-gen-doc/cmd/protoc-gen-doc@v0.0.2
  1. 根据需要生成相应语言的代码:
  • 生成Go代码
make proto

  • 生成Java代码
make java

  • 生成HTML文档
make doc-html

  • 生成Markdown文档
make doc-md

测试

go run main.go
make test

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
bus
nats
Package nats provides a NATS Conn
Package nats provides a NATS Conn
example
inbound Module
proto
Package snowflake provides a very simple Twitter snowflake generator and parser.
Package snowflake provides a very simple Twitter snowflake generator and parser.
Package micro is a pluggable framework for microservices
Package micro is a pluggable framework for microservices
api
api/handler
Package handler provides http handlers
Package handler provides http handlers
api/handler/event
Package event provides a handler which publishes an event
Package event provides a handler which publishes an event
api/handler/web
Package web contains the web handler including websocket support
Package web contains the web handler including websocket support
api/resolver
Package resolver resolves a http request to an endpoint
Package resolver resolves a http request to an endpoint
api/resolver/grpc
Package grpc resolves a grpc service like /greeter.Say/Hello to greeter service
Package grpc resolves a grpc service like /greeter.Say/Hello to greeter service
api/resolver/host
Package host resolves using http host
Package host resolves using http host
api/resolver/path
Package path resolves using http path
Package path resolves using http path
api/resolver/vpath
Package vpath resolves using http path and recognised versioned urls
Package vpath resolves using http path and recognised versioned urls
api/router
Package router provides api service routing
Package router provides api service routing
api/router/registry
Package registry provides a dynamic api service router
Package registry provides a dynamic api service router
api/server
Package server provides an API gateway server which handles inbound requests
Package server provides an API gateway server which handles inbound requests
client
Package client is an interface for an RPC client
Package client is an interface for an RPC client
codec
Package codec is an interface for encoding messages
Package codec is an interface for encoding messages
codec/bytes
Package bytes provides a bytes codec which does not encode or decode anything
Package bytes provides a bytes codec which does not encode or decode anything
codec/grpc
Package grpc provides a grpc codec
Package grpc provides a grpc codec
codec/json
Package json provides a json codec
Package json provides a json codec
codec/jsonrpc
Package jsonrpc provides a json-rpc 1.0 codec
Package jsonrpc provides a json-rpc 1.0 codec
codec/proto
Package proto provides a proto codec
Package proto provides a proto codec
codec/protorpc
Package proto is a generated protocol buffer package.
Package proto is a generated protocol buffer package.
codec/text
Package text reads any text/* content-type
Package text reads any text/* content-type
debug/handler
Package handler implements service debug handler embedded in go-micro services
Package handler implements service debug handler embedded in go-micro services
debug/log
Package log provides debug logging
Package log provides debug logging
debug/log/memory
Package memory provides an in memory log buffer
Package memory provides an in memory log buffer
debug/profile
Package profile is for profilers
Package profile is for profilers
debug/profile/http
Package http enables the http profiler
Package http enables the http profiler
debug/profile/pprof
Package pprof provides a pprof profiler
Package pprof provides a pprof profiler
debug/stats
Package stats provides runtime stats
Package stats provides runtime stats
debug/trace
Package trace provides an interface for distributed tracing
Package trace provides an interface for distributed tracing
errors
Package errors provides a way to return detailed information for an RPC request error.
Package errors provides a way to return detailed information for an RPC request error.
logger
Package log provides a log interface
Package log provides a log interface
metadata
Package metadata is a way of defining message headers
Package metadata is a way of defining message headers
server
Package server is an interface for a micro server
Package server is an interface for a micro server
store
Package store is an interface for distribute data storage.
Package store is an interface for distribute data storage.
store/cache
Package cache implements a faulting style read cache on top of multiple micro stores
Package cache implements a faulting style read cache on top of multiple micro stores
store/file
Package local is a file system backed store
Package local is a file system backed store
store/memory
Package memory is a in-memory store store
Package memory is a in-memory store store
sync
Package sync is an interface for distributed synchronization
Package sync is an interface for distributed synchronization
sync/memory
Package memory provides a sync.Mutex implementation of the lock for local use
Package memory provides a sync.Mutex implementation of the lock for local use
util/backoff
Package backoff provides backoff functionality
Package backoff provides backoff functionality
util/log
Package log is a global internal logger
Package log is a global internal logger
util/qson
Package qson implmenets decoding of URL query params into JSON and Go values (using JSON struct tags).
Package qson implmenets decoding of URL query params into JSON and Go values (using JSON struct tags).
util/ring
Package ring provides a simple ring buffer for storing local data
Package ring provides a simple ring buffer for storing local data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL