## 目录 1. [FreeSWITCH技术架构](#第一部分-freeswitch技术架构) 2. [核心模块详解](#第二部分-核心模块详解) 3. [ESL开发指南](#第三部分-esl开发指南) 4. [呼叫中心场景开发实践](#第四部分-呼叫中心场景开发实践) 5. [高可用部署方案](#第五部分-高可用部署方案) 6. [性能优化指南](#第六部分-性能优化指南) --- ## 第一部分 FreeSWITCH技术架构 ### 1.1 整体架构概览 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 外部系统/应用层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ CTI中间件 │ │ Web应用 │ │ 业务系统 │ │ 监控系统 │ │ 第三方App│ │ │ └─────┬────┘ └─────┬────┘ └─────┬────┘ └─────┬────┘ └─────┬────┘ │ │ │ │ │ │ │ │ │ └─────────────┴─────────────┴──────┬──────┴─────────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ ESL接口 │ │ │ │ (TCP:8021) │ │ │ └──────┬──────┘ │ └───────────────────────────────────────────┼─────────────────────────────────┘ │ ┌───────────────────────────────────────────▼─────────────────────────────────┐ │ FreeSWITCH 核心层 │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Core(核心引擎) │ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ │ │ │ 状态机管理 │ │ 内存管理 │ │ 线程池管理 │ │ │ │ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ │ │ │ 事件系统 │ │ 数据库抽象 │ │ 日志系统 │ │ │ │ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Module Interface(模块接口层) │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Endpoint │ │ Application │ │ Codec │ │ Dialplan │ │ │ │ │ │ 终端模块 │ │ 应用模块 │ │ 编解码 │ │ 拨号计划 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Event │ │ Format │ │ Logger │ │ XML │ │ │ │ │ │ 事件模块 │ │ 文件格式 │ │ 日志模块 │ │ XML处理 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ │ ┌───────────────────────────────────────────▼─────────────────────────────────┐ │ 协议/媒体层 │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ mod_sofia │ │ mod_verto │ │ mod_rtmp │ │ mod_skinny │ │ │ │ (SIP协议) │ │ (WebRTC) │ │ (RTMP) │ │ (Cisco) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 媒体处理引擎 │ │ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ │ │ RTP处理 │ │ 编解码转换 │ │ 录音处理 │ │ DTMF检测 │ │ │ │ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ │ ┌───────────────────────────────────────────▼─────────────────────────────────┐ │ 网络层 │ │ SIP/UDP/TCP RTP/SRTP WebSocket HTTP │ │ :5060 :16384-32768 :8081 :8080 │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### 1.2 核心概念 #### 1.2.1 Channel(通道) Channel是FreeSWITCH中最核心的概念,代表一路通话的端点。 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 一通电话 │ │ │ │ ┌─────────────┐ Bridge ┌─────────────┐ │ │ │ Channel A │◄─────────────────────────►│ Channel B │ │ │ │ (主叫方) │ │ (被叫方) │ │ │ │ │ │ │ │ │ │ UUID: xxx-a │ │ UUID: xxx-b │ │ │ │ State: CS_ │ │ State: CS_ │ │ │ │ Variables │ │ Variables │ │ │ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` **Channel状态机:** ``` ┌──────────────────────────────────────────┐ │ │ ▼ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──┴──────┐ │ NEW │───►│ INIT │───►│ ROUTING │───►│ EXECUTE │───►│ HANGUP │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ ┌────▼────┐ ┌────▼────┐ │ CONSUME │ │ EXCHANGE│ │ MEDIA │ │ MEDIA │ └─────────┘ └─────────┘ Channel状态说明: ├── CS_NEW : 新建通道 ├── CS_INIT : 初始化 ├── CS_ROUTING : 路由中(执行dialplan) ├── CS_EXECUTE : 执行应用中 ├── CS_EXCHANGE_MEDIA : 媒体交换中(bridged) ├── CS_CONSUME_MEDIA : 媒体消费中 ├── CS_HIBERNATE : 休眠状态 ├── CS_HANGUP : 挂断中 └── CS_DESTROY : 销毁 ``` #### 1.2.2 Session(会话) Session包含Channel及其相关上下文信息: ```c // Session结构概念图 Session { ├── Channel // 通道信息 │ ├── UUID // 唯一标识 │ ├── State // 当前状态 │ ├── Direction // 呼叫方向(inbound/outbound) │ └── Variables // 通道变量 │ ├── Caller Profile // 主叫信息 │ ├── caller_id_name │ ├── caller_id_number │ ├── destination_number │ └── ... │ ├── Codec // 编解码信息 ├── RTP Session // RTP会话 └── Bridge Session // 桥接会话(如果有) } ``` #### 1.2.3 Event(事件) FreeSWITCH基于事件驱动架构,所有状态变化都产生事件: ``` ┌─────────────────────────────────────────────────────────────────┐ │ FreeSWITCH Event System │ │ │ │ ┌───────────────┐ │ │ │ Event Producer│ ──────┐ │ │ │ (模块/核心) │ │ │ │ └───────────────┘ │ ┌─────────────────────────┐ │ │ ├─────►│ Event Dispatcher │ │ │ ┌───────────────┐ │ │ (事件分发器) │ │ │ │ Event Producer│ ──────┤ └───────────┬─────────────┘ │ │ │ (模块/核心) │ │ │ │ │ └───────────────┘ │ ┌───────────┴─────────────┐ │ │ │ │ │ │ │ │ ┌───────────────┐ │ ▼ ▼ ▼ │ │ │ Event Producer│ ──────┘ ┌───────┐ ┌───────┐ ┌───────┐│ │ │ (模块/核心) │ │ ESL │ │Internal│ │ Lua/ ││ │ └───────────────┘ │Consumer│ │Handler │ │ JS ││ │ └───────┘ └───────┘ └───────┘│ └─────────────────────────────────────────────────────────────────┘ ``` **常用事件类型:** | 事件名称 | 触发时机 | 用途 | |---------|---------|------| | CHANNEL_CREATE | 通道创建 | 新呼叫进入 | | CHANNEL_ANSWER | 通话接听 | 通话建立 | | CHANNEL_BRIDGE | 通道桥接 | 双方接通 | | CHANNEL_UNBRIDGE | 取消桥接 | 桥接断开 | | CHANNEL_HANGUP | 挂断 | 通话结束 | | CHANNEL_HANGUP_COMPLETE | 挂断完成 | 通话完全结束 | | CHANNEL_STATE | 状态变化 | 状态监控 | | DTMF | 按键检测 | IVR交互 | | RECORD_START | 开始录音 | 录音控制 | | RECORD_STOP | 停止录音 | 录音控制 | | CUSTOM | 自定义事件 | 业务扩展 | ### 1.3 目录结构 ``` /usr/local/freeswitch/ ├── bin/ # 可执行文件 │ └── freeswitch # 主程序 │ ├── conf/ # 配置文件目录 │ ├── freeswitch.xml # 主配置文件 │ ├── vars.xml # 全局变量 │ ├── sip_profiles/ # SIP配置 │ │ ├── internal.xml # 内部Profile(坐席注册) │ │ └── external.xml # 外部Profile(中继对接) │ ├── dialplan/ # 拨号计划 │ │ ├── default.xml # 默认拨号计划 │ │ └── public.xml # 公共拨号计划(呼入) │ ├── directory/ # 用户目录(分机注册) │ │ └── default/ │ │ └── 1000.xml # 分机1000配置 │ ├── autoload_configs/ # 模块自动加载配置 │ │ ├── modules.conf.xml # 模块加载列表 │ │ ├── sofia.conf.xml # SIP模块配置 │ │ ├── event_socket.conf.xml# ESL配置 │ │ └── ... │ └── lang/ # 语音提示 │ ├── lib/ # 库文件 ├── mod/ # 模块文件(.so) ├── log/ # 日志目录 ├── db/ # 数据库文件 ├── recordings/ # 录音文件 ├── sounds/ # 语音文件 ├── scripts/ # 脚本目录 └── storage/ # 存储目录 ``` --- ## 第二部分 核心模块详解 ### 2.1 mod_sofia(SIP模块) mod_sofia是FreeSWITCH的SIP协议栈实现,基于Sofia-SIP库。 #### 2.1.1 架构 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ mod_sofia │ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Sofia-SIP Stack │ │ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ │ │ NUA │ │ NTA │ │ TPORT │ │ SDP │ │ │ │ │ │ User Agent│ │Transaction│ │ Transport │ │ Parser │ │ │ │ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Profile Management │ │ │ │ │ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ │ │ Internal Profile │ │ External Profile │ │ │ │ │ │ (坐席/分机注册) │ │ (中继/网关) │ │ │ │ │ │ UDP:5060 │ │ UDP:5080 │ │ │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ Gateway Management │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │ │ │Gateway A│ │Gateway B│ │Gateway C│ ... │ │ │ │ │ │ │(运营商1) │ │(运营商2) │ │(备用) │ │ │ │ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ``` #### 2.1.2 配置示例 **Internal Profile(坐席注册):** ```xml ``` **External Profile(中继对接):** ```xml ``` ### 2.2 Dialplan(拨号计划) 拨号计划定义了呼叫的路由和处理逻辑。 #### 2.2.1 执行流程 ``` ┌──────────────────────────────────────────────────────────────────┐ │ Dialplan 执行流程 │ │ │ │ 来电进入 │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ Context 选择 ││ │ │ (根据Profile配置的context) ││ │ │ 例如: public (呼入) / default (内部) ││ │ └──────────────────────────┬──────────────────────────────────┘│ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ Extension 匹配 ││ │ │ (按顺序匹配condition) ││ │ │ ││ │ │ Extension 1: ──┐ ││ │ │ Condition ───┼─► 不匹配 ───► 下一个Extension ││ │ │ │ ││ │ │ └─► 匹配 ───► 执行Actions ││ │ └──────────────────────────┬──────────────────────────────────┘│ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ Action 执行 ││ │ │ (按顺序执行应用) ││ │ │ ││ │ │ action: answer ││ │ │ action: playback ││ │ │ action: bridge ││ │ │ ... ││ │ └─────────────────────────────────────────────────────────────┘│ │ │ └──────────────────────────────────────────────────────────────────┘ ``` #### 2.2.2 配置示例 **呼入路由(public.xml):** ```xml ``` **IVR和呼叫处理(default.xml):** ```xml ``` ### 2.3 mod_event_socket(ESL模块) ESL是FreeSWITCH对外提供的控制接口,是开发呼叫中心的核心。 #### 2.3.1 工作模式 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ ESL 工作模式 │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ Inbound 模式 │ │ │ │ (客户端主动连接) │ │ │ │ │ │ │ │ ┌───────────┐ TCP:8021 ┌──────────────────┐ │ │ │ │ │ CTI中间件 │ ──────────────────────► │ FreeSWITCH │ │ │ │ │ │ (Client) │ ◄────────────────────── │ (Server) │ │ │ │ │ └───────────┘ Event Stream └──────────────────┘ │ │ │ │ │ │ │ │ 特点: │ │ │ │ - 客户端主动连接FreeSWITCH │ │ │ │ - 可以订阅事件、发送命令 │ │ │ │ - 适合集中控制、监控场景 │ │ │ │ - 推荐呼叫中心使用此模式 │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ Outbound 模式 │ │ │ │ (FreeSWITCH主动连接) │ │ │ │ │ │ │ │ ┌──────────────────┐ TCP ┌───────────────┐ │ │ │ │ │ FreeSWITCH │ ──────────────────►│ 应用服务器 │ │ │ │ │ │ (每个呼叫) │ ◄────────────────── │ (Server) │ │ │ │ │ └──────────────────┘ └───────────────┘ │ │ │ │ │ │ │ │ 特点: │ │ │ │ - 每个呼叫FreeSWITCH主动连接应用 │ │ │ │ - 在dialplan中使用socket应用触发 │ │ │ │ - 适合简单IVR场景 │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` #### 2.3.2 ESL配置 ```xml ``` --- ## 第三部分 ESL开发指南 ### 3.1 ESL协议详解 #### 3.1.1 协议格式 ESL使用类HTTP的文本协议: ``` ┌─────────────────────────────────────────────────────────────────────┐ │ ESL 消息格式 │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ 请求格式: │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ command [arguments] │ │ │ │ │ │ Header-Name: header-value │ │ │ │ │ │ Header-Name: header-value │ │ │ │ │ │ │ │ │ │ │ │ [body if Content-Length > 0] │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ 响应格式: │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ Content-Type: text/event-plain │ │ │ │ │ │ Content-Length: 1234 │ │ │ │ │ │ Reply-Text: +OK [message] │ │ │ │ │ │ │ │ │ │ │ │ [body] │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` #### 3.1.2 常用命令 | 命令类型 | 命令 | 说明 | |---------|------|------| | **认证** | `auth ` | 连接认证 | | **事件订阅** | `event plain ` | 订阅事件 | | | `event plain ALL` | 订阅所有事件 | | | `event plain CHANNEL_CREATE CHANNEL_ANSWER CHANNEL_HANGUP` | 订阅指定事件 | | | `nixevent ` | 取消订阅 | | | `noevents` | 取消所有订阅 | | **API命令** | `api [args]` | 同步执行API | | | `bgapi [args]` | 异步执行API | | **过滤器** | `filter
` | 过滤特定事件 | | | `filter Unique-ID ` | 只接收指定UUID事件 | | **呼叫控制** | `sendmsg ` | 向指定通道发送消息 | | | `execute` | 在通道上执行应用 | | **其他** | `log ` | 设置日志级别 | | | `linger` | 保持连接 | | | `nolinger` | 取消保持 | | | `exit` | 断开连接 | ### 3.2 Java ESL开发 #### 3.2.1 项目结构 ``` callcenter-cti/ ├── pom.xml ├── src/main/java/ │ └── com/callcenter/ │ ├── Application.java │ ├── esl/ │ │ ├── EslConnectionManager.java # ESL连接管理 │ │ ├── EslEventHandler.java # 事件处理 │ │ ├── EslCommandExecutor.java # 命令执行 │ │ └── InboundClient.java # Inbound客户端 │ ├── service/ │ │ ├── AgentService.java # 坐席服务 │ │ ├── CallService.java # 呼叫服务 │ │ ├── QueueService.java # 排队服务 │ │ └── AcdService.java # 话务分配服务 │ ├── model/ │ │ ├── Agent.java # 坐席模型 │ │ ├── Call.java # 呼叫模型 │ │ └── Queue.java # 队列模型 │ ├── api/ │ │ ├── AgentController.java # 坐席API │ │ └── CallController.java # 呼叫API │ └── websocket/ │ └── EventPushHandler.java # 事件推送 └── src/main/resources/ └── application.yml ``` #### 3.2.2 Maven依赖 ```xml org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-websocket org.freeswitch.esl.client org.freeswitch.esl.client 0.9.2 io.netty netty-all 4.1.86.Final org.springframework.boot spring-boot-starter-data-redis com.alibaba fastjson 2.0.23 org.apache.commons commons-lang3 org.projectlombok lombok ``` #### 3.2.3 ESL连接管理 ```java package com.callcenter.esl; import lombok.extern.slf4j.Slf4j; import org.freeswitch.esl.client.inbound.Client; import org.freeswitch.esl.client.inbound.InboundConnectionFailure; import org.freeswitch.esl.client.transport.event.EslEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * ESL连接管理器 * 负责与FreeSWITCH建立和维护连接 */ @Slf4j @Component public class EslConnectionManager { @Value("${freeswitch.esl.host:127.0.0.1}") private String eslHost; @Value("${freeswitch.esl.port:8021}") private int eslPort; @Value("${freeswitch.esl.password:ClueCon}") private String eslPassword; @Value("${freeswitch.esl.timeout:30}") private int timeout; private Client eslClient; private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean reconnecting = new AtomicBoolean(false); private ScheduledExecutorService scheduler; private ExecutorService eventExecutor; private EslEventHandler eventHandler; @PostConstruct public void init() { // 初始化线程池 scheduler = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, "esl-reconnect")); eventExecutor = new ThreadPoolExecutor( 4, 16, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), r -> new Thread(r, "esl-event-handler"), new ThreadPoolExecutor.CallerRunsPolicy() ); // 建立连接 connect(); // 启动心跳检测 startHeartbeat(); } /** * 建立ESL连接 */ public synchronized void connect() { if (connected.get()) { log.info("ESL已连接,无需重复连接"); return; } try { log.info("正在连接FreeSWITCH ESL: {}:{}", eslHost, eslPort); eslClient = new Client(); eslClient.connect(eslHost, eslPort, eslPassword, timeout); // 订阅事件 subscribeEvents(); // 设置事件监听器 eslClient.setEventSubscriptions("plain", "all"); eslClient.addEventListener((ctx, event) -> { handleEvent(event); }); connected.set(true); reconnecting.set(false); log.info("ESL连接成功"); } catch (InboundConnectionFailure e) { log.error("ESL连接失败: {}", e.getMessage()); connected.set(false); scheduleReconnect(); } } /** * 订阅所需事件 */ private void subscribeEvents() { String events = String.join(" ", "CHANNEL_CREATE", "CHANNEL_ANSWER", "CHANNEL_BRIDGE", "CHANNEL_UNBRIDGE", "CHANNEL_HANGUP", "CHANNEL_HANGUP_COMPLETE", "CHANNEL_STATE", "DTMF", "RECORD_START", "RECORD_STOP", "CUSTOM" ); eslClient.setEventSubscriptions("plain", events); eslClient.addEventFilter("Event-Name", "CHANNEL_CREATE"); log.info("已订阅事件: {}", events); } /** * 处理事件 */ private void handleEvent(EslEvent event) { eventExecutor.submit(() -> { try { if (eventHandler != null) { eventHandler.handle(event); } } catch (Exception e) { log.error("事件处理异常: {}", e.getMessage(), e); } }); } /** * 启动心跳检测 */ private void startHeartbeat() { scheduler.scheduleAtFixedRate(() -> { if (connected.get()) { try { // 发送status命令检测连接 eslClient.sendSyncApiCommand("status", ""); } catch (Exception e) { log.warn("心跳检测失败,连接可能已断开"); connected.set(false); scheduleReconnect(); } } }, 30, 30, TimeUnit.SECONDS); } /** * 计划重连 */ private void scheduleReconnect() { if (reconnecting.compareAndSet(false, true)) { scheduler.schedule(() -> { log.info("尝试重新连接ESL..."); connect(); }, 5, TimeUnit.SECONDS); } } /** * 同步执行API命令 */ public String sendSyncApiCommand(String command, String args) { checkConnection(); try { return eslClient.sendSyncApiCommand(command, args).getBodyLines().get(0); } catch (Exception e) { log.error("执行API命令失败: {} {}", command, args, e); throw new RuntimeException("ESL命令执行失败", e); } } /** * 异步执行API命令 */ public CompletableFuture sendAsyncApiCommand(String command, String args) { return CompletableFuture.supplyAsync(() -> sendSyncApiCommand(command, args), eventExecutor); } /** * 向指定UUID发送消息执行应用 */ public void sendExecute(String uuid, String application, String args) { checkConnection(); try { eslClient.sendSyncApiCommand("uuid_broadcast", uuid + " " + application + "::" + args + " both"); } catch (Exception e) { log.error("发送Execute失败: uuid={}, app={}", uuid, application, e); } } /** * 检查连接状态 */ private void checkConnection() { if (!connected.get()) { throw new RuntimeException("ESL未连接"); } } /** * 获取连接状态 */ public boolean isConnected() { return connected.get(); } /** * 设置事件处理器 */ public void setEventHandler(EslEventHandler handler) { this.eventHandler = handler; } @PreDestroy public void destroy() { try { connected.set(false); if (eslClient != null) { eslClient.close(); } if (scheduler != null) { scheduler.shutdown(); } if (eventExecutor != null) { eventExecutor.shutdown(); } log.info("ESL连接已关闭"); } catch (Exception e) { log.error("关闭ESL连接异常", e); } } } ``` #### 3.2.4 事件处理器 ```java package com.callcenter.esl; import com.callcenter.model.Call; import com.callcenter.model.CallDirection; import com.callcenter.model.CallState; import com.callcenter.service.AcdService; import com.callcenter.service.CallService; import com.callcenter.websocket.EventPushService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.freeswitch.esl.client.transport.event.EslEvent; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.Map; /** * ESL事件处理器 */ @Slf4j @Component @RequiredArgsConstructor public class EslEventHandler { private final CallService callService; private final AcdService acdService; private final EventPushService eventPushService; /** * 处理ESL事件 */ public void handle(EslEvent event) { String eventName = event.getEventName(); Map headers = event.getEventHeaders(); log.debug("收到事件: {}, UUID: {}", eventName, headers.get("Unique-ID")); switch (eventName) { case "CHANNEL_CREATE": handleChannelCreate(headers); break; case "CHANNEL_ANSWER": handleChannelAnswer(headers); break; case "CHANNEL_BRIDGE": handleChannelBridge(headers); break; case "CHANNEL_UNBRIDGE": handleChannelUnbridge(headers); break; case "CHANNEL_HANGUP": handleChannelHangup(headers); break; case "CHANNEL_HANGUP_COMPLETE": handleChannelHangupComplete(headers); break; case "DTMF": handleDtmf(headers); break; case "RECORD_START": handleRecordStart(headers); break; case "RECORD_STOP": handleRecordStop(headers); break; case "CUSTOM": handleCustomEvent(headers); break; default: log.trace("未处理的事件类型: {}", eventName); } } /** * 处理通道创建事件(新呼叫) */ private void handleChannelCreate(Map headers) { String uuid = headers.get("Unique-ID"); String direction = headers.get("Call-Direction"); String callerNumber = headers.get("Caller-Caller-ID-Number"); String calleeNumber = headers.get("Caller-Destination-Number"); log.info("新呼叫: uuid={}, direction={}, caller={}, callee={}", uuid, direction, callerNumber, calleeNumber); // 创建呼叫记录 Call call = new Call(); call.setUuid(uuid); call.setDirection("inbound".equals(direction) ? CallDirection.INBOUND : CallDirection.OUTBOUND); call.setCallerNumber(callerNumber); call.setCalleeNumber(calleeNumber); call.setState(CallState.RINGING); call.setCreateTime(LocalDateTime.now()); // 获取业务标识(从通道变量) String businessType = headers.get("variable_business_type"); String skillGroup = headers.get("variable_skill_group"); call.setBusinessType(businessType); call.setSkillGroup(skillGroup); callService.saveCall(call); // 如果是呼入,触发ACD排队 if ("inbound".equals(direction)) { acdService.enqueue(call); } // 推送事件到前端 eventPushService.pushCallEvent("CALL_RINGING", call); } /** * 处理通话接听事件 */ private void handleChannelAnswer(Map headers) { String uuid = headers.get("Unique-ID"); log.info("通话接听: uuid={}", uuid); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setState(CallState.ANSWERED); call.setAnswerTime(LocalDateTime.now()); callService.updateCall(call); // 推送事件 eventPushService.pushCallEvent("CALL_ANSWERED", call); } } /** * 处理通道桥接事件(双方接通) */ private void handleChannelBridge(Map headers) { String uuid = headers.get("Unique-ID"); String otherUuid = headers.get("Other-Leg-Unique-ID"); log.info("通道桥接: uuid={}, otherUuid={}", uuid, otherUuid); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setState(CallState.BRIDGED); call.setBridgeUuid(otherUuid); call.setBridgeTime(LocalDateTime.now()); callService.updateCall(call); // 推送事件 eventPushService.pushCallEvent("CALL_BRIDGED", call); } } /** * 处理取消桥接事件 */ private void handleChannelUnbridge(Map headers) { String uuid = headers.get("Unique-ID"); log.info("取消桥接: uuid={}", uuid); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setState(CallState.UNBRIDGED); callService.updateCall(call); eventPushService.pushCallEvent("CALL_UNBRIDGED", call); } } /** * 处理挂断事件 */ private void handleChannelHangup(Map headers) { String uuid = headers.get("Unique-ID"); String hangupCause = headers.get("Hangup-Cause"); log.info("通话挂断: uuid={}, cause={}", uuid, hangupCause); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setState(CallState.HANGUP); call.setHangupCause(hangupCause); call.setHangupTime(LocalDateTime.now()); // 计算通话时长 if (call.getAnswerTime() != null) { long duration = java.time.Duration.between( call.getAnswerTime(), call.getHangupTime()).getSeconds(); call.setDuration(duration); } callService.updateCall(call); // 推送事件 eventPushService.pushCallEvent("CALL_HANGUP", call); // 更新坐席状态 if (call.getAgentId() != null) { acdService.onCallEnd(call.getAgentId(), call); } } } /** * 处理挂断完成事件(获取完整话单) */ private void handleChannelHangupComplete(Map headers) { String uuid = headers.get("Unique-ID"); String billsec = headers.get("variable_billsec"); String duration = headers.get("variable_duration"); String recordingPath = headers.get("variable_recording_path"); log.info("通话结束: uuid={}, billsec={}, recording={}", uuid, billsec, recordingPath); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setBillsec(billsec != null ? Long.parseLong(billsec) : 0); call.setRecordingPath(recordingPath); callService.updateCall(call); // 推送话单 eventPushService.pushCdr(call); } } /** * 处理DTMF按键事件 */ private void handleDtmf(Map headers) { String uuid = headers.get("Unique-ID"); String digit = headers.get("DTMF-Digit"); log.info("DTMF按键: uuid={}, digit={}", uuid, digit); // 推送按键事件 eventPushService.pushDtmf(uuid, digit); } /** * 处理录音开始事件 */ private void handleRecordStart(Map headers) { String uuid = headers.get("Unique-ID"); String recordPath = headers.get("Record-File-Path"); log.info("开始录音: uuid={}, path={}", uuid, recordPath); Call call = callService.getCallByUuid(uuid); if (call != null) { call.setRecordingPath(recordPath); callService.updateCall(call); } } /** * 处理录音停止事件 */ private void handleRecordStop(Map headers) { String uuid = headers.get("Unique-ID"); String recordPath = headers.get("Record-File-Path"); log.info("停止录音: uuid={}, path={}", uuid, recordPath); } /** * 处理自定义事件 */ private void handleCustomEvent(Map headers) { String subclass = headers.get("Event-Subclass"); log.debug("自定义事件: subclass={}", subclass); // 处理特定的自定义事件 if ("callcenter::info".equals(subclass)) { // 处理呼叫中心相关事件 } } } ``` #### 3.2.5 呼叫服务 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import com.callcenter.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; /** * 呼叫服务 * 提供呼叫控制相关功能 */ @Slf4j @Service @RequiredArgsConstructor public class CallService { private final EslConnectionManager eslManager; private final RedisTemplate redisTemplate; private static final String CALL_KEY_PREFIX = "call:"; private static final int CALL_EXPIRE_HOURS = 24; /** * 发起外呼 * @param agentId 坐席ID * @param callerNumber 主叫号码(外显号码) * @param calleeNumber 被叫号码 * @return 呼叫UUID */ public String originate(String agentId, String callerNumber, String calleeNumber) { log.info("发起外呼: agent={}, caller={}, callee={}", agentId, callerNumber, calleeNumber); // 获取坐席分机号 Agent agent = getAgent(agentId); if (agent == null || agent.getState() != AgentState.READY) { throw new RuntimeException("坐席不可用"); } // 构建originate命令 // originate {变量}呼叫字符串 &应用(参数) StringBuilder cmd = new StringBuilder(); cmd.append("{"); cmd.append("origination_caller_id_number=").append(callerNumber).append(","); cmd.append("origination_caller_id_name=公司名称").append(","); cmd.append("agent_id=").append(agentId).append(","); cmd.append("direction=outbound").append(","); cmd.append("RECORD_STEREO=true"); cmd.append("}"); // A-leg: 先呼叫坐席分机 cmd.append("user/").append(agent.getExtension()); // B-leg: 坐席接听后呼叫客户 cmd.append(" &bridge({origination_caller_id_number=") .append(callerNumber) .append("}sofia/gateway/carrier_primary/") .append(calleeNumber) .append(")"); String result = eslManager.sendSyncApiCommand("originate", cmd.toString()); if (result.startsWith("+OK")) { String uuid = result.substring(4).trim(); log.info("外呼成功: uuid={}", uuid); return uuid; } else { log.error("外呼失败: {}", result); throw new RuntimeException("外呼失败: " + result); } } /** * 接听来电 */ public void answer(String uuid) { log.info("接听来电: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_answer", uuid); } /** * 挂断通话 */ public void hangup(String uuid) { log.info("挂断通话: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_kill", uuid); } /** * 挂断通话(带原因) */ public void hangup(String uuid, String cause) { log.info("挂断通话: uuid={}, cause={}", uuid, cause); eslManager.sendSyncApiCommand("uuid_kill", uuid + " " + cause); } /** * 保持通话 */ public void hold(String uuid) { log.info("保持通话: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_hold", uuid); } /** * 取回保持 */ public void unhold(String uuid) { log.info("取回通话: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_hold", "off " + uuid); } /** * 静音 */ public void mute(String uuid) { log.info("静音: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_audio", uuid + " start mute"); } /** * 取消静音 */ public void unmute(String uuid) { log.info("取消静音: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_audio", uuid + " stop"); } /** * 盲转(直接转接) */ public void blindTransfer(String uuid, String destination) { log.info("盲转: uuid={}, dest={}", uuid, destination); eslManager.sendSyncApiCommand("uuid_transfer", uuid + " " + destination); } /** * 咨询转(先咨询再转接) */ public String consultTransfer(String uuid, String destination) { log.info("咨询转: uuid={}, dest={}", uuid, destination); // 先保持当前通话 hold(uuid); // 发起咨询呼叫 String consultCmd = "{origination_uuid=consult_" + uuid + "}user/" + destination; String result = eslManager.sendSyncApiCommand("originate", consultCmd + " &park()"); if (result.startsWith("+OK")) { return result.substring(4).trim(); } throw new RuntimeException("咨询转失败"); } /** * 完成咨询转接 */ public void completeTransfer(String originalUuid, String consultUuid) { log.info("完成咨询转: original={}, consult={}", originalUuid, consultUuid); eslManager.sendSyncApiCommand("uuid_bridge", originalUuid + " " + consultUuid); } /** * 三方通话 */ public void conference(String uuid1, String uuid2, String conferenceName) { log.info("三方通话: uuid1={}, uuid2={}, conf={}", uuid1, uuid2, conferenceName); // 将两个通道加入会议 eslManager.sendSyncApiCommand("uuid_transfer", uuid1 + " conference:" + conferenceName + " inline"); eslManager.sendSyncApiCommand("uuid_transfer", uuid2 + " conference:" + conferenceName + " inline"); } /** * 播放语音 */ public void playback(String uuid, String filePath) { log.info("播放语音: uuid={}, file={}", uuid, filePath); eslManager.sendSyncApiCommand("uuid_broadcast", uuid + " " + filePath + " both"); } /** * 停止播放 */ public void stopPlayback(String uuid) { log.info("停止播放: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_break", uuid); } /** * 开始录音 */ public void startRecording(String uuid, String filePath) { log.info("开始录音: uuid={}, file={}", uuid, filePath); eslManager.sendSyncApiCommand("uuid_record", uuid + " start " + filePath); } /** * 停止录音 */ public void stopRecording(String uuid, String filePath) { log.info("停止录音: uuid={}", uuid); eslManager.sendSyncApiCommand("uuid_record", uuid + " stop " + filePath); } /** * 桥接两个通道 */ public void bridge(String uuid1, String uuid2) { log.info("桥接通道: uuid1={}, uuid2={}", uuid1, uuid2); eslManager.sendSyncApiCommand("uuid_bridge", uuid1 + " " + uuid2); } /** * 设置通道变量 */ public void setVariable(String uuid, String name, String value) { eslManager.sendSyncApiCommand("uuid_setvar", uuid + " " + name + " " + value); } /** * 获取通道变量 */ public String getVariable(String uuid, String name) { return eslManager.sendSyncApiCommand("uuid_getvar", uuid + " " + name); } // ============ 呼叫记录管理 ============ /** * 保存呼叫记录 */ public void saveCall(Call call) { String key = CALL_KEY_PREFIX + call.getUuid(); redisTemplate.opsForValue().set(key, call, CALL_EXPIRE_HOURS, TimeUnit.HOURS); } /** * 获取呼叫记录 */ public Call getCallByUuid(String uuid) { String key = CALL_KEY_PREFIX + uuid; return (Call) redisTemplate.opsForValue().get(key); } /** * 更新呼叫记录 */ public void updateCall(Call call) { saveCall(call); } /** * 获取坐席信息 */ private Agent getAgent(String agentId) { // TODO: 从坐席服务获取 return null; } } ``` #### 3.2.6 ACD话务分配服务 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import com.callcenter.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * ACD话务分配服务 * 实现多种分配策略:历史服务优先、最长空闲优先、轮询 */ @Slf4j @Service @RequiredArgsConstructor public class AcdService { private final EslConnectionManager eslManager; private final RedisTemplate redisTemplate; private final AgentService agentService; private final CallService callService; // 队列Key前缀 private static final String QUEUE_KEY_PREFIX = "queue:"; // 坐席最后服务客户映射 private static final String AGENT_CUSTOMER_MAP_KEY = "agent:customer:map"; // 坐席最后空闲时间 private static final String AGENT_IDLE_TIME_KEY = "agent:idle:time:"; // 轮询索引 private final Map roundRobinIndex = new ConcurrentHashMap<>(); /** * 呼叫入队 */ public void enqueue(Call call) { String skillGroup = call.getSkillGroup(); String queueKey = QUEUE_KEY_PREFIX + skillGroup; QueueItem item = new QueueItem(); item.setCallUuid(call.getUuid()); item.setCallerNumber(call.getCallerNumber()); item.setSkillGroup(skillGroup); item.setEnqueueTime(LocalDateTime.now()); item.setPriority(calculatePriority(call)); // 使用SortedSet,按优先级和入队时间排序 double score = item.getPriority() * 1000000000L + System.currentTimeMillis() / 1000; redisTemplate.opsForZSet().add(queueKey, item, score); log.info("呼叫入队: uuid={}, queue={}, priority={}", call.getUuid(), skillGroup, item.getPriority()); // 尝试立即分配 tryAssign(skillGroup); } /** * 计算优先级(数字越小优先级越高) */ private int calculatePriority(Call call) { // VIP客户优先级高 if ("VIP".equals(call.getCustomerLevel())) { return 1; } // 投诉优先 if ("complaint".equals(call.getBusinessType())) { return 2; } // 普通客户 return 5; } /** * 尝试分配队列中的呼叫 */ public void tryAssign(String skillGroup) { String queueKey = QUEUE_KEY_PREFIX + skillGroup; // 获取可用坐席 List availableAgents = agentService.getAvailableAgents(skillGroup); if (availableAgents.isEmpty()) { log.debug("技能组{}无可用坐席", skillGroup); return; } // 获取队列中的呼叫 Set queueItems = redisTemplate.opsForZSet().range(queueKey, 0, 0); if (queueItems == null || queueItems.isEmpty()) { log.debug("技能组{}队列为空", skillGroup); return; } QueueItem item = (QueueItem) queueItems.iterator().next(); Call call = callService.getCallByUuid(item.getCallUuid()); if (call == null || call.getState() == CallState.HANGUP) { // 呼叫已结束,移除队列 redisTemplate.opsForZSet().remove(queueKey, item); tryAssign(skillGroup); // 继续处理下一个 return; } // 选择坐席 Agent selectedAgent = selectAgent(availableAgents, call); if (selectedAgent != null) { // 从队列移除 redisTemplate.opsForZSet().remove(queueKey, item); // 分配呼叫 assignCallToAgent(call, selectedAgent); } } /** * 选择坐席(多策略) */ private Agent selectAgent(List agents, Call call) { if (agents.isEmpty()) { return null; } // 策略1: 历史服务优先 Agent lastAgent = findLastServingAgent(call.getCallerNumber(), agents); if (lastAgent != null) { log.info("历史服务优先: 客户{}分配给上次服务坐席{}", call.getCallerNumber(), lastAgent.getId()); return lastAgent; } // 策略2: 最长空闲优先 Agent longestIdleAgent = findLongestIdleAgent(agents); if (longestIdleAgent != null) { log.info("最长空闲优先: 选择坐席{}", longestIdleAgent.getId()); return longestIdleAgent; } // 策略3: 轮询 Agent roundRobinAgent = selectByRoundRobin(agents, call.getSkillGroup()); log.info("轮询分配: 选择坐席{}", roundRobinAgent.getId()); return roundRobinAgent; } /** * 查找上次服务该客户的坐席 */ private Agent findLastServingAgent(String customerNumber, List agents) { String lastAgentId = (String) redisTemplate.opsForHash() .get(AGENT_CUSTOMER_MAP_KEY, customerNumber); if (lastAgentId != null) { return agents.stream() .filter(a -> a.getId().equals(lastAgentId)) .findFirst() .orElse(null); } return null; } /** * 查找空闲时间最长的坐席 */ private Agent findLongestIdleAgent(List agents) { return agents.stream() .max(Comparator.comparing(agent -> { String key = AGENT_IDLE_TIME_KEY + agent.getId(); Long idleTime = (Long) redisTemplate.opsForValue().get(key); return idleTime != null ? idleTime : 0L; })) .orElse(null); } /** * 轮询选择坐席 */ private Agent selectByRoundRobin(List agents, String skillGroup) { int currentIndex = roundRobinIndex.getOrDefault(skillGroup, 0); int nextIndex = (currentIndex + 1) % agents.size(); roundRobinIndex.put(skillGroup, nextIndex); return agents.get(currentIndex); } /** * 分配呼叫给坐席 */ private void assignCallToAgent(Call call, Agent agent) { log.info("分配呼叫: uuid={} -> agent={}", call.getUuid(), agent.getId()); // 更新坐席状态 agentService.updateAgentState(agent.getId(), AgentState.RINGING); // 更新呼叫信息 call.setAgentId(agent.getId()); call.setAgentExtension(agent.getExtension()); callService.updateCall(call); // 将呼叫转接到坐席分机 // 方式1: 使用uuid_transfer eslManager.sendSyncApiCommand("uuid_transfer", call.getUuid() + " " + agent.getExtension() + " XML default"); // 方式2: 使用originate + bridge(更灵活) // String bridgeCmd = String.format( // "{origination_uuid=%s,agent_id=%s}user/%s", // UUID.randomUUID(), agent.getId(), agent.getExtension()); // eslManager.sendSyncApiCommand("uuid_broadcast", // call.getUuid() + " bridge::" + bridgeCmd + " both"); // 记录坐席-客户关系(用于历史服务优先) redisTemplate.opsForHash().put(AGENT_CUSTOMER_MAP_KEY, call.getCallerNumber(), agent.getId()); } /** * 通话结束处理 */ public void onCallEnd(String agentId, Call call) { log.info("通话结束: agent={}, call={}", agentId, call.getUuid()); // 更新坐席状态为后处理 agentService.updateAgentState(agentId, AgentState.WRAP_UP); // 设置后处理超时(30秒后自动置闲) // 由定时任务处理 } /** * 坐席置闲时记录空闲时间 */ public void onAgentReady(String agentId, String skillGroup) { String key = AGENT_IDLE_TIME_KEY + agentId; redisTemplate.opsForValue().set(key, System.currentTimeMillis()); // 尝试分配队列中的呼叫 tryAssign(skillGroup); } /** * 定时处理队列 */ @Scheduled(fixedDelay = 1000) public void processQueues() { // 获取所有技能组 Set skillGroups = agentService.getAllSkillGroups(); for (String skillGroup : skillGroups) { tryAssign(skillGroup); } } /** * 获取队列状态 */ public QueueStatus getQueueStatus(String skillGroup) { String queueKey = QUEUE_KEY_PREFIX + skillGroup; QueueStatus status = new QueueStatus(); status.setSkillGroup(skillGroup); status.setWaitingCount(redisTemplate.opsForZSet().size(queueKey)); status.setAvailableAgents(agentService.getAvailableAgents(skillGroup).size()); // 计算最长等待时间 Set items = redisTemplate.opsForZSet().range(queueKey, 0, 0); if (items != null && !items.isEmpty()) { QueueItem firstItem = (QueueItem) items.iterator().next(); long waitSeconds = java.time.Duration.between( firstItem.getEnqueueTime(), LocalDateTime.now()).getSeconds(); status.setLongestWaitTime(waitSeconds); } return status; } /** * 溢出处理 * 当技能组无可用坐席且等待时间超过阈值时,溢出到其他技能组 */ @Scheduled(fixedDelay = 5000) public void handleOverflow() { // 获取溢出配置 Map overflowConfig = getOverflowConfig(); for (Map.Entry entry : overflowConfig.entrySet()) { String sourceGroup = entry.getKey(); String targetGroup = entry.getValue(); String queueKey = QUEUE_KEY_PREFIX + sourceGroup; Set items = redisTemplate.opsForZSet().range(queueKey, 0, -1); if (items == null || items.isEmpty()) { continue; } // 检查是否需要溢出 List sourceAgents = agentService.getAvailableAgents(sourceGroup); List targetAgents = agentService.getAvailableAgents(targetGroup); if (sourceAgents.isEmpty() && !targetAgents.isEmpty()) { for (Object obj : items) { QueueItem item = (QueueItem) obj; long waitSeconds = java.time.Duration.between( item.getEnqueueTime(), LocalDateTime.now()).getSeconds(); // 等待超过60秒,溢出 if (waitSeconds > 60) { log.info("溢出处理: uuid={}, {}->{}", item.getCallUuid(), sourceGroup, targetGroup); // 移出原队列 redisTemplate.opsForZSet().remove(queueKey, item); // 加入目标队列 item.setSkillGroup(targetGroup); String targetQueueKey = QUEUE_KEY_PREFIX + targetGroup; double score = item.getPriority() * 1000000000L + System.currentTimeMillis() / 1000; redisTemplate.opsForZSet().add(targetQueueKey, item, score); // 尝试分配 tryAssign(targetGroup); } } } } } private Map getOverflowConfig() { // 配置:源技能组 -> 目标技能组 Map config = new HashMap<>(); config.put("rescue_group", "default_group"); config.put("housekeeping_group", "default_group"); config.put("driver_group", "default_group"); return config; } } ``` #### 3.2.7 坐席服务 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import com.callcenter.model.*; import com.callcenter.websocket.EventPushService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; /** * 坐席服务 */ @Slf4j @Service @RequiredArgsConstructor public class AgentService { private final RedisTemplate redisTemplate; private final EslConnectionManager eslManager; private final EventPushService eventPushService; private final AcdService acdService; private static final String AGENT_KEY_PREFIX = "agent:"; private static final String AGENT_SET_KEY = "agents:all"; private static final String SKILL_GROUP_AGENTS_KEY = "skillgroup:agents:"; /** * 坐席签入 */ public void signIn(String agentId, String extension, List skillGroups) { log.info("坐席签入: agentId={}, extension={}, skills={}", agentId, extension, skillGroups); // 检查分机是否注册 String regStatus = eslManager.sendSyncApiCommand("sofia_contact", "internal/" + extension + "@${domain}"); if (regStatus.contains("error")) { throw new RuntimeException("分机未注册: " + extension); } Agent agent = new Agent(); agent.setId(agentId); agent.setExtension(extension); agent.setSkillGroups(skillGroups); agent.setState(AgentState.NOT_READY); agent.setSignInTime(LocalDateTime.now()); // 保存坐席信息 String key = AGENT_KEY_PREFIX + agentId; redisTemplate.opsForValue().set(key, agent); // 加入坐席集合 redisTemplate.opsForSet().add(AGENT_SET_KEY, agentId); // 加入技能组 for (String skillGroup : skillGroups) { redisTemplate.opsForSet().add(SKILL_GROUP_AGENTS_KEY + skillGroup, agentId); } // 推送状态变更 eventPushService.pushAgentState(agent); } /** * 坐席签出 */ public void signOut(String agentId) { log.info("坐席签出: agentId={}", agentId); Agent agent = getAgent(agentId); if (agent == null) { return; } // 检查是否在通话中 if (agent.getState() == AgentState.BUSY || agent.getState() == AgentState.RINGING) { throw new RuntimeException("坐席正在通话中,无法签出"); } // 从技能组移除 for (String skillGroup : agent.getSkillGroups()) { redisTemplate.opsForSet().remove(SKILL_GROUP_AGENTS_KEY + skillGroup, agentId); } // 从坐席集合移除 redisTemplate.opsForSet().remove(AGENT_SET_KEY, agentId); // 删除坐席信息 String key = AGENT_KEY_PREFIX + agentId; redisTemplate.delete(key); // 推送签出事件 agent.setState(AgentState.OFFLINE); eventPushService.pushAgentState(agent); } /** * 坐席置闲 */ public void ready(String agentId) { log.info("坐席置闲: agentId={}", agentId); Agent agent = getAgent(agentId); if (agent == null) { throw new RuntimeException("坐席不存在"); } if (agent.getState() == AgentState.BUSY) { throw new RuntimeException("坐席正在通话中"); } updateAgentState(agentId, AgentState.READY); // 通知ACD有坐席可用 for (String skillGroup : agent.getSkillGroups()) { acdService.onAgentReady(agentId, skillGroup); } } /** * 坐席置忙 */ public void notReady(String agentId, String reason) { log.info("坐席置忙: agentId={}, reason={}", agentId, reason); Agent agent = getAgent(agentId); if (agent == null) { throw new RuntimeException("坐席不存在"); } if (agent.getState() == AgentState.BUSY) { throw new RuntimeException("坐席正在通话中"); } agent.setNotReadyReason(reason); updateAgentState(agentId, AgentState.NOT_READY); } /** * 坐席小休 */ public void pause(String agentId, String reason) { log.info("坐席小休: agentId={}, reason={}", agentId, reason); Agent agent = getAgent(agentId); if (agent == null) { throw new RuntimeException("坐席不存在"); } if (agent.getState() == AgentState.BUSY) { throw new RuntimeException("坐席正在通话中"); } agent.setPauseReason(reason); updateAgentState(agentId, AgentState.PAUSED); } /** * 更新坐席状态 */ public void updateAgentState(String agentId, AgentState newState) { Agent agent = getAgent(agentId); if (agent == null) { return; } AgentState oldState = agent.getState(); agent.setState(newState); agent.setStateTime(LocalDateTime.now()); // 保存 String key = AGENT_KEY_PREFIX + agentId; redisTemplate.opsForValue().set(key, agent); log.info("坐席状态变更: agent={}, {} -> {}", agentId, oldState, newState); // 推送状态变更 eventPushService.pushAgentState(agent); } /** * 获取坐席信息 */ public Agent getAgent(String agentId) { String key = AGENT_KEY_PREFIX + agentId; return (Agent) redisTemplate.opsForValue().get(key); } /** * 获取技能组可用坐席 */ public List getAvailableAgents(String skillGroup) { Set agentIds = redisTemplate.opsForSet() .members(SKILL_GROUP_AGENTS_KEY + skillGroup); if (agentIds == null || agentIds.isEmpty()) { return Collections.emptyList(); } return agentIds.stream() .map(id -> getAgent((String) id)) .filter(agent -> agent != null && agent.getState() == AgentState.READY) .collect(Collectors.toList()); } /** * 获取所有技能组 */ public Set getAllSkillGroups() { Set keys = redisTemplate.keys(SKILL_GROUP_AGENTS_KEY + "*"); if (keys == null) { return Collections.emptySet(); } return keys.stream() .map(key -> key.replace(SKILL_GROUP_AGENTS_KEY, "")) .collect(Collectors.toSet()); } /** * 获取所有坐席状态(监控大屏) */ public List getAllAgents() { Set agentIds = redisTemplate.opsForSet().members(AGENT_SET_KEY); if (agentIds == null) { return Collections.emptyList(); } return agentIds.stream() .map(id -> getAgent((String) id)) .filter(Objects::nonNull) .collect(Collectors.toList()); } /** * 获取坐席状态统计 */ public Map getAgentStateStats() { return getAllAgents().stream() .collect(Collectors.groupingBy(Agent::getState, Collectors.counting())); } } ``` ### 3.3 Go ESL开发 如果您的团队更熟悉Go语言,以下是Go版本的ESL开发示例: #### 3.3.1 项目结构 ``` callcenter-cti-go/ ├── go.mod ├── go.sum ├── main.go ├── config/ │ └── config.go ├── esl/ │ ├── client.go # ESL客户端 │ ├── event.go # 事件定义 │ └── handler.go # 事件处理器 ├── service/ │ ├── agent.go # 坐席服务 │ ├── call.go # 呼叫服务 │ └── acd.go # ACD服务 ├── model/ │ └── models.go # 数据模型 ├── api/ │ ├── router.go # 路由 │ ├── agent_handler.go # 坐席API │ └── call_handler.go # 呼叫API └── websocket/ └── hub.go # WebSocket推送 ``` #### 3.3.2 ESL客户端 ```go // esl/client.go package esl import ( "bufio" "fmt" "log" "net" "net/textproto" "strconv" "strings" "sync" "time" ) // ESL客户端 type Client struct { conn net.Conn reader *textproto.Reader writer *bufio.Writer mu sync.Mutex connected bool password string eventChan chan *Event handlers []EventHandler reconnect bool } // 事件处理器接口 type EventHandler interface { Handle(event *Event) } // 创建ESL客户端 func NewClient(host string, port int, password string) (*Client, error) { client := &Client{ password: password, eventChan: make(chan *Event, 1000), handlers: make([]EventHandler, 0), reconnect: true, } err := client.connect(host, port) if err != nil { return nil, err } // 启动事件读取协程 go client.readLoop() // 启动事件分发协程 go client.dispatchLoop() return client, nil } // 连接FreeSWITCH func (c *Client) connect(host string, port int) error { addr := fmt.Sprintf("%s:%d", host, port) conn, err := net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { return fmt.Errorf("连接失败: %v", err) } c.conn = conn c.reader = textproto.NewReader(bufio.NewReader(conn)) c.writer = bufio.NewWriter(conn) // 读取欢迎消息 _, err = c.readMessage() if err != nil { return err } // 认证 err = c.auth() if err != nil { return err } c.connected = true log.Println("ESL连接成功") return nil } // 认证 func (c *Client) auth() error { _, err := c.sendCommand(fmt.Sprintf("auth %s", c.password)) if err != nil { return fmt.Errorf("认证失败: %v", err) } return nil } // 发送命令 func (c *Client) sendCommand(command string) (*Message, error) { c.mu.Lock() defer c.mu.Unlock() _, err := c.writer.WriteString(command + "\n\n") if err != nil { return nil, err } c.writer.Flush() return c.readMessage() } // 发送API命令 func (c *Client) API(command string, args string) (string, error) { cmd := fmt.Sprintf("api %s %s", command, args) msg, err := c.sendCommand(cmd) if err != nil { return "", err } return msg.Body, nil } // 发送后台API命令 func (c *Client) BgAPI(command string, args string) (string, error) { cmd := fmt.Sprintf("bgapi %s %s", command, args) msg, err := c.sendCommand(cmd) if err != nil { return "", err } return msg.GetHeader("Job-UUID"), nil } // 订阅事件 func (c *Client) Subscribe(events ...string) error { eventStr := strings.Join(events, " ") _, err := c.sendCommand(fmt.Sprintf("event plain %s", eventStr)) return err } // 订阅所有事件 func (c *Client) SubscribeAll() error { _, err := c.sendCommand("event plain ALL") return err } // 读取消息 func (c *Client) readMessage() (*Message, error) { msg := &Message{ Headers: make(map[string]string), } // 读取headers for { line, err := c.reader.ReadLine() if err != nil { return nil, err } if line == "" { break } parts := strings.SplitN(line, ": ", 2) if len(parts) == 2 { msg.Headers[parts[0]] = parts[1] } } // 读取body if lenStr := msg.GetHeader("Content-Length"); lenStr != "" { length, _ := strconv.Atoi(lenStr) if length > 0 { body := make([]byte, length) _, err := c.reader.R.Read(body) if err != nil { return nil, err } msg.Body = string(body) } } return msg, nil } // 事件读取循环 func (c *Client) readLoop() { for c.connected { msg, err := c.readMessage() if err != nil { log.Printf("读取消息错误: %v", err) c.connected = false if c.reconnect { c.tryReconnect() } return } contentType := msg.GetHeader("Content-Type") if contentType == "text/event-plain" { event := parseEvent(msg) c.eventChan <- event } } } // 事件分发循环 func (c *Client) dispatchLoop() { for event := range c.eventChan { for _, handler := range c.handlers { go handler.Handle(event) } } } // 尝试重连 func (c *Client) tryReconnect() { for i := 0; i < 10; i++ { time.Sleep(5 * time.Second) log.Printf("尝试重连ESL (第%d次)...", i+1) // TODO: 重新连接 } } // 添加事件处理器 func (c *Client) AddHandler(handler EventHandler) { c.handlers = append(c.handlers, handler) } // 关闭连接 func (c *Client) Close() { c.connected = false c.reconnect = false if c.conn != nil { c.conn.Close() } close(c.eventChan) } // 消息结构 type Message struct { Headers map[string]string Body string } func (m *Message) GetHeader(name string) string { if v, ok := m.Headers[name]; ok { return v } return "" } // 解析事件 func parseEvent(msg *Message) *Event { event := &Event{ Headers: make(map[string]string), } // 解析body中的headers lines := strings.Split(msg.Body, "\n") for _, line := range lines { parts := strings.SplitN(line, ": ", 2) if len(parts) == 2 { // URL解码 value := strings.ReplaceAll(parts[1], "%20", " ") event.Headers[parts[0]] = value } } event.Name = event.GetHeader("Event-Name") event.UUID = event.GetHeader("Unique-ID") return event } ``` #### 3.3.3 事件定义 ```go // esl/event.go package esl // 事件结构 type Event struct { Name string UUID string Headers map[string]string } func (e *Event) GetHeader(name string) string { if v, ok := e.Headers[name]; ok { return v } return "" } // 事件名称常量 const ( EventChannelCreate = "CHANNEL_CREATE" EventChannelAnswer = "CHANNEL_ANSWER" EventChannelBridge = "CHANNEL_BRIDGE" EventChannelUnbridge = "CHANNEL_UNBRIDGE" EventChannelHangup = "CHANNEL_HANGUP" EventChannelHangupComplete = "CHANNEL_HANGUP_COMPLETE" EventChannelState = "CHANNEL_STATE" EventDTMF = "DTMF" EventRecordStart = "RECORD_START" EventRecordStop = "RECORD_STOP" EventCustom = "CUSTOM" ) ``` #### 3.3.4 事件处理器 ```go // esl/handler.go package esl import ( "callcenter/service" "log" ) // 呼叫中心事件处理器 type CallCenterHandler struct { callService *service.CallService agentService *service.AgentService acdService *service.AcdService } func NewCallCenterHandler( callService *service.CallService, agentService *service.AgentService, acdService *service.AcdService, ) *CallCenterHandler { return &CallCenterHandler{ callService: callService, agentService: agentService, acdService: acdService, } } func (h *CallCenterHandler) Handle(event *Event) { switch event.Name { case EventChannelCreate: h.handleChannelCreate(event) case EventChannelAnswer: h.handleChannelAnswer(event) case EventChannelBridge: h.handleChannelBridge(event) case EventChannelHangup: h.handleChannelHangup(event) case EventChannelHangupComplete: h.handleChannelHangupComplete(event) case EventDTMF: h.handleDTMF(event) } } func (h *CallCenterHandler) handleChannelCreate(event *Event) { uuid := event.UUID direction := event.GetHeader("Call-Direction") callerNumber := event.GetHeader("Caller-Caller-ID-Number") calleeNumber := event.GetHeader("Caller-Destination-Number") log.Printf("新呼叫: uuid=%s, direction=%s, caller=%s, callee=%s", uuid, direction, callerNumber, calleeNumber) // 创建呼叫记录 call := &service.Call{ UUID: uuid, Direction: direction, CallerNumber: callerNumber, CalleeNumber: calleeNumber, State: "ringing", } h.callService.SaveCall(call) // 呼入排队 if direction == "inbound" { skillGroup := event.GetHeader("variable_skill_group") call.SkillGroup = skillGroup h.acdService.Enqueue(call) } } func (h *CallCenterHandler) handleChannelAnswer(event *Event) { uuid := event.UUID log.Printf("通话接听: uuid=%s", uuid) call := h.callService.GetCall(uuid) if call != nil { call.State = "answered" h.callService.UpdateCall(call) } } func (h *CallCenterHandler) handleChannelBridge(event *Event) { uuid := event.UUID otherUUID := event.GetHeader("Other-Leg-Unique-ID") log.Printf("通道桥接: uuid=%s, other=%s", uuid, otherUUID) call := h.callService.GetCall(uuid) if call != nil { call.State = "bridged" call.BridgeUUID = otherUUID h.callService.UpdateCall(call) } } func (h *CallCenterHandler) handleChannelHangup(event *Event) { uuid := event.UUID hangupCause := event.GetHeader("Hangup-Cause") log.Printf("通话挂断: uuid=%s, cause=%s", uuid, hangupCause) call := h.callService.GetCall(uuid) if call != nil { call.State = "hangup" call.HangupCause = hangupCause h.callService.UpdateCall(call) // 更新坐席状态 if call.AgentID != "" { h.acdService.OnCallEnd(call.AgentID, call) } } } func (h *CallCenterHandler) handleChannelHangupComplete(event *Event) { uuid := event.UUID billsec := event.GetHeader("variable_billsec") recordingPath := event.GetHeader("variable_recording_path") log.Printf("通话结束: uuid=%s, billsec=%s, recording=%s", uuid, billsec, recordingPath) call := h.callService.GetCall(uuid) if call != nil { call.Billsec = billsec call.RecordingPath = recordingPath h.callService.UpdateCall(call) } } func (h *CallCenterHandler) handleDTMF(event *Event) { uuid := event.UUID digit := event.GetHeader("DTMF-Digit") log.Printf("DTMF按键: uuid=%s, digit=%s", uuid, digit) } ``` --- ## 第四部分 呼叫中心场景开发实践 ### 4.1 预测式外呼实现 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import com.callcenter.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 预测式外呼服务 */ @Slf4j @Service @RequiredArgsConstructor public class PredictiveDialerService { private final EslConnectionManager eslManager; private final AgentService agentService; private final CallService callService; // 外呼任务 private final Map activeTasks = new ConcurrentHashMap<>(); // 外呼线程池 private final ExecutorService dialerExecutor = Executors.newFixedThreadPool(10); /** * 创建外呼任务 */ public String createTask(DialTaskRequest request) { String taskId = UUID.randomUUID().toString(); DialTask task = new DialTask(); task.setId(taskId); task.setName(request.getName()); task.setSkillGroup(request.getSkillGroup()); task.setCallerIdNumber(request.getCallerIdNumber()); task.setNumbers(new LinkedBlockingQueue<>(request.getNumbers())); task.setMaxConcurrent(request.getMaxConcurrent()); task.setState(TaskState.CREATED); task.setStats(new DialStats()); activeTasks.put(taskId, task); log.info("创建外呼任务: taskId={}, name={}, numbers={}", taskId, request.getName(), request.getNumbers().size()); return taskId; } /** * 启动任务 */ public void startTask(String taskId) { DialTask task = activeTasks.get(taskId); if (task == null) { throw new RuntimeException("任务不存在"); } task.setState(TaskState.RUNNING); log.info("启动外呼任务: {}", taskId); // 启动外呼循环 dialerExecutor.submit(() -> dialLoop(task)); } /** * 暂停任务 */ public void pauseTask(String taskId) { DialTask task = activeTasks.get(taskId); if (task != null) { task.setState(TaskState.PAUSED); log.info("暂停外呼任务: {}", taskId); } } /** * 恢复任务 */ public void resumeTask(String taskId) { DialTask task = activeTasks.get(taskId); if (task != null && task.getState() == TaskState.PAUSED) { task.setState(TaskState.RUNNING); dialerExecutor.submit(() -> dialLoop(task)); log.info("恢复外呼任务: {}", taskId); } } /** * 停止任务 */ public void stopTask(String taskId) { DialTask task = activeTasks.get(taskId); if (task != null) { task.setState(TaskState.STOPPED); log.info("停止外呼任务: {}", taskId); } } /** * 外呼循环 */ private void dialLoop(DialTask task) { log.info("开始外呼循环: taskId={}", task.getId()); while (task.getState() == TaskState.RUNNING && !task.getNumbers().isEmpty()) { try { // 计算应该发起的呼叫数 int dialCount = calculateDialCount(task); if (dialCount <= 0) { // 没有空闲坐席,等待 Thread.sleep(1000); continue; } // 发起外呼 for (int i = 0; i < dialCount; i++) { String number = task.getNumbers().poll(); if (number == null) { break; } dial(task, number); } // 短暂等待,控制拨打速率 Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("外呼异常: {}", e.getMessage(), e); } } // 任务完成 if (task.getNumbers().isEmpty()) { task.setState(TaskState.COMPLETED); } log.info("外呼循环结束: taskId={}, state={}", task.getId(), task.getState()); } /** * 计算应该发起的呼叫数(预测算法) */ private int calculateDialCount(DialTask task) { // 获取可用坐席数 List availableAgents = agentService.getAvailableAgents(task.getSkillGroup()); int availableAgentCount = availableAgents.size(); if (availableAgentCount == 0) { return 0; } // 当前正在拨打的数量 int currentDialing = task.getStats().getCurrentDialing().get(); // 历史接通率(动态计算) double connectRate = calculateConnectRate(task); // 预测式算法: // 目标:让坐席接到电话时,刚好有客户接通 // 预测需要拨打数 = 空闲坐席数 / 接通率 * 系数 double factor = 1.2; // 超拨系数,避免坐席空闲 int predictedDial = (int) Math.ceil(availableAgentCount / connectRate * factor); // 限制最大并发 int maxAllowed = task.getMaxConcurrent() - currentDialing; return Math.min(predictedDial, maxAllowed); } /** * 计算接通率 */ private double calculateConnectRate(DialTask task) { DialStats stats = task.getStats(); int total = stats.getTotalDialed().get(); int connected = stats.getConnected().get(); if (total < 10) { // 样本太少,使用默认值 return 0.3; } double rate = (double) connected / total; // 限制范围 return Math.max(0.1, Math.min(0.8, rate)); } /** * 发起单个外呼 */ private void dial(DialTask task, String number) { log.info("外呼: taskId={}, number={}", task.getId(), number); task.getStats().getTotalDialed().incrementAndGet(); task.getStats().getCurrentDialing().incrementAndGet(); try { // 构建originate命令 // 先拨打客户,客户接听后转到队列等待坐席 StringBuilder cmd = new StringBuilder(); cmd.append("{"); cmd.append("origination_caller_id_number=").append(task.getCallerIdNumber()).append(","); cmd.append("task_id=").append(task.getId()).append(","); cmd.append("dial_number=").append(number).append(","); cmd.append("dialer_mode=predictive"); cmd.append("}"); // 通过网关外呼 cmd.append("sofia/gateway/carrier_primary/").append(number); // 客户接听后的处理 cmd.append(" &lua(predictive_handler.lua)"); String result = eslManager.sendSyncApiCommand("originate", cmd.toString()); if (!result.startsWith("+OK")) { log.warn("外呼失败: number={}, result={}", number, result); task.getStats().getFailed().incrementAndGet(); } } catch (Exception e) { log.error("外呼异常: number={}", number, e); task.getStats().getFailed().incrementAndGet(); } finally { task.getStats().getCurrentDialing().decrementAndGet(); } } /** * 客户接听回调(由事件触发) */ public void onCustomerAnswer(String uuid, String taskId) { DialTask task = activeTasks.get(taskId); if (task == null) { eslManager.sendSyncApiCommand("uuid_kill", uuid + " NO_ROUTE_DESTINATION"); return; } task.getStats().getConnected().incrementAndGet(); // 查找可用坐席 List agents = agentService.getAvailableAgents(task.getSkillGroup()); if (agents.isEmpty()) { // 无可用坐席,播放等待音乐 eslManager.sendSyncApiCommand("uuid_broadcast", uuid + " ivr/please_wait.wav both"); // 放入等待队列 // TODO: 实现等待逻辑 // 超时处理 task.getStats().getAbandoned().incrementAndGet(); } else { // 转接坐席 Agent agent = agents.get(0); agentService.updateAgentState(agent.getId(), AgentState.RINGING); eslManager.sendSyncApiCommand("uuid_transfer", uuid + " " + agent.getExtension() + " XML default"); task.getStats().getTransferred().incrementAndGet(); } } /** * 获取任务统计 */ public DialStats getTaskStats(String taskId) { DialTask task = activeTasks.get(taskId); return task != null ? task.getStats() : null; } } /** * 外呼任务 */ @Data class DialTask { private String id; private String name; private String skillGroup; private String callerIdNumber; private BlockingQueue numbers; private int maxConcurrent; private TaskState state; private DialStats stats; } /** * 任务状态 */ enum TaskState { CREATED, RUNNING, PAUSED, STOPPED, COMPLETED } /** * 外呼统计 */ @Data class DialStats { private AtomicInteger totalDialed = new AtomicInteger(0); private AtomicInteger currentDialing = new AtomicInteger(0); private AtomicInteger connected = new AtomicInteger(0); private AtomicInteger transferred = new AtomicInteger(0); private AtomicInteger failed = new AtomicInteger(0); private AtomicInteger abandoned = new AtomicInteger(0); } ``` ### 4.2 机器人外呼实现 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import com.callcenter.model.*; import com.callcenter.tts.TtsService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.*; /** * 机器人外呼服务 * 用于批量通知、回访、满意度调查 */ @Slf4j @Service @RequiredArgsConstructor public class RobotDialerService { private final EslConnectionManager eslManager; private final TtsService ttsService; private final Map activeTasks = new ConcurrentHashMap<>(); private final ExecutorService executor = Executors.newFixedThreadPool(20); /** * 创建机器人外呼任务 */ public String createTask(RobotTaskRequest request) { String taskId = UUID.randomUUID().toString(); RobotTask task = new RobotTask(); task.setId(taskId); task.setType(request.getType()); // NOTIFICATION, SURVEY, CALLBACK task.setNumbers(new LinkedBlockingQueue<>(request.getNumbers())); task.setTtsText(request.getTtsText()); task.setVariables(request.getVariables()); // 每个号码的变量 task.setMaxConcurrent(request.getMaxConcurrent()); task.setRetryTimes(request.getRetryTimes()); task.setCollectDtmf(request.isCollectDtmf()); task.setState(TaskState.CREATED); task.setResults(new ConcurrentHashMap<>()); activeTasks.put(taskId, task); return taskId; } /** * 启动任务 */ public void startTask(String taskId) { RobotTask task = activeTasks.get(taskId); if (task == null) { throw new RuntimeException("任务不存在"); } task.setState(TaskState.RUNNING); executor.submit(() -> robotDialLoop(task)); } /** * 机器人外呼循环 */ private void robotDialLoop(RobotTask task) { log.info("开始机器人外呼: taskId={}", task.getId()); Semaphore semaphore = new Semaphore(task.getMaxConcurrent()); while (task.getState() == TaskState.RUNNING) { String number = task.getNumbers().poll(); if (number == null) { break; } try { semaphore.acquire(); executor.submit(() -> { try { robotCall(task, number); } finally { semaphore.release(); } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } // 等待所有呼叫完成 try { semaphore.acquire(task.getMaxConcurrent()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } task.setState(TaskState.COMPLETED); log.info("机器人外呼完成: taskId={}", task.getId()); } /** * 单个机器人呼叫 */ private void robotCall(RobotTask task, String number) { log.info("机器人呼叫: taskId={}, number={}", task.getId(), number); RobotCallResult result = new RobotCallResult(); result.setNumber(number); result.setStartTime(System.currentTimeMillis()); try { // 替换TTS文本中的变量 String ttsText = replaceTtsVariables(task.getTtsText(), task.getVariables().get(number)); // 生成语音文件 String audioFile = ttsService.synthesize(ttsText); // 构建外呼命令 StringBuilder cmd = new StringBuilder(); cmd.append("{"); cmd.append("task_id=").append(task.getId()).append(","); cmd.append("dial_number=").append(number).append(","); cmd.append("robot_mode=true").append(","); cmd.append("audio_file=").append(audioFile); cmd.append("}"); cmd.append("sofia/gateway/carrier_primary/").append(number); if (task.isCollectDtmf()) { // 需要收集按键 cmd.append(" &lua(robot_survey.lua)"); } else { // 纯播报 cmd.append(" &playback(").append(audioFile).append(")"); } String apiResult = eslManager.sendSyncApiCommand("originate", cmd.toString()); if (apiResult.startsWith("+OK")) { result.setStatus("SUCCESS"); result.setCallUuid(apiResult.substring(4).trim()); } else { result.setStatus("FAILED"); result.setError(apiResult); // 重试逻辑 handleRetry(task, number); } } catch (Exception e) { log.error("机器人呼叫异常: number={}", number, e); result.setStatus("ERROR"); result.setError(e.getMessage()); handleRetry(task, number); } result.setEndTime(System.currentTimeMillis()); task.getResults().put(number, result); } /** * 替换TTS变量 */ private String replaceTtsVariables(String template, Map vars) { if (vars == null || vars.isEmpty()) { return template; } String result = template; for (Map.Entry entry : vars.entrySet()) { result = result.replace("${" + entry.getKey() + "}", entry.getValue()); } return result; } /** * 处理重试 */ private void handleRetry(RobotTask task, String number) { int retryCount = task.getRetryCount().getOrDefault(number, 0); if (retryCount < task.getRetryTimes()) { task.getRetryCount().put(number, retryCount + 1); task.getNumbers().offer(number); log.info("加入重试队列: number={}, retry={}", number, retryCount + 1); } } /** * 处理DTMF按键结果(由事件触发) */ public void onDtmfResult(String uuid, String taskId, String number, String digit) { RobotTask task = activeTasks.get(taskId); if (task == null) { return; } RobotCallResult result = task.getResults().get(number); if (result != null) { result.setDtmfDigit(digit); log.info("收到按键: taskId={}, number={}, digit={}", taskId, number, digit); } } /** * 获取任务结果 */ public List getTaskResults(String taskId) { RobotTask task = activeTasks.get(taskId); if (task == null) { return Collections.emptyList(); } return new ArrayList<>(task.getResults().values()); } } ``` ### 4.3 班长监控实现 ```java package com.callcenter.service; import com.callcenter.esl.EslConnectionManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * 班长监控服务 */ @Slf4j @Service @RequiredArgsConstructor public class SupervisorService { private final EslConnectionManager eslManager; /** * 监听坐席通话 * 班长可以听到双方说话,但双方听不到班长 */ public void spy(String supervisorExtension, String targetUuid) { log.info("开始监听: supervisor={}, target={}", supervisorExtension, targetUuid); // 使用eavesdrop应用 // originate user/supervisor &eavesdrop(target_uuid) String cmd = String.format( "{origination_caller_id_name=监听}user/%s &eavesdrop(%s)", supervisorExtension, targetUuid ); String result = eslManager.sendSyncApiCommand("originate", cmd); if (!result.startsWith("+OK")) { throw new RuntimeException("监听失败: " + result); } } /** * 强插通话 * 班长加入通话,三方都可以听到 */ public void barge(String supervisorExtension, String targetUuid) { log.info("强插通话: supervisor={}, target={}", supervisorExtension, targetUuid); // 使用三方会议实现 String confName = "barge_" + targetUuid; // 1. 将目标通话转入会议 eslManager.sendSyncApiCommand("uuid_transfer", targetUuid + " conference:" + confName + "+flags{mute} inline"); // 2. 呼叫班长加入会议 String cmd = String.format( "{origination_caller_id_name=强插}user/%s &conference(%s)", supervisorExtension, confName ); eslManager.sendSyncApiCommand("originate", cmd); } /** * 强拆通话 * 强制挂断坐席通话 */ public void hangup(String targetUuid, String reason) { log.info("强拆通话: target={}, reason={}", targetUuid, reason); eslManager.sendSyncApiCommand("uuid_kill", targetUuid + " " + reason); } /** * 密语(耳语) * 班长对坐席说话,客户听不到 */ public void whisper(String supervisorExtension, String agentUuid) { log.info("密语: supervisor={}, agent={}", supervisorExtension, agentUuid); // 使用eavesdrop的whisper模式 String cmd = String.format( "{origination_caller_id_name=密语}user/%s &eavesdrop(%s whisper)", supervisorExtension, agentUuid ); eslManager.sendSyncApiCommand("originate", cmd); } /** * 获取实时通话列表 */ public String getActiveCalls() { return eslManager.sendSyncApiCommand("show", "calls"); } /** * 获取坐席通道信息 */ public String getChannelInfo(String uuid) { return eslManager.sendSyncApiCommand("uuid_dump", uuid); } } ``` --- ## 第五部分 高可用部署方案 ### 5.1 主备架构 ``` ┌─────────────────┐ │ Keepalived │ │ VIP漂移管理 │ └────────┬────────┘ │ ┌───────────────────────────┴───────────────────────────┐ │ VIP: 10.10.1.100 │ │ (SIP/ESL统一入口) │ └───────────────────────────┬───────────────────────────┘ │ ┌─────────────────────┴─────────────────────┐ │ │ ▼ ▼ ┌────────────────────┐ ┌────────────────────┐ │ Master节点 │ 心跳检测 │ Backup节点 │ │ 10.10.1.10 │◄──────────────────►│ 10.10.1.11 │ ├────────────────────┤ ├────────────────────┤ │ FreeSWITCH │ │ FreeSWITCH │ │ CTI中间件 │ │ CTI中间件 │ │ API服务 │ │ API服务 │ └────────────────────┘ └────────────────────┘ │ │ │ 共享存储 / 数据同步 │ │ │ └─────────────────────┬─────────────────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ MySQL 主从 │ │ Redis Sentinel │ │ NAS存储 │ │ 10.10.2.10/11 │ │ 10.10.2.20-22 │ │ 10.10.2.30 │ └─────────────────┘ └─────────────────┘ └─────────────────┘ ``` ### 5.2 Keepalived配置 **Master节点配置:** ```bash # /etc/keepalived/keepalived.conf (Master) global_defs { router_id FS_MASTER } # 检测FreeSWITCH状态的脚本 vrrp_script check_freeswitch { script "/etc/keepalived/check_freeswitch.sh" interval 2 weight -20 fall 3 rise 2 } vrrp_instance VI_FS { state MASTER interface eth0 virtual_router_id 51 priority 100 advert_int 1 authentication { auth_type PASS auth_pass your_password } virtual_ipaddress { 10.10.1.100/24 dev eth0 } track_script { check_freeswitch } notify_master "/etc/keepalived/notify_master.sh" notify_backup "/etc/keepalived/notify_backup.sh" notify_fault "/etc/keepalived/notify_fault.sh" } ``` **检测脚本:** ```bash #!/bin/bash # /etc/keepalived/check_freeswitch.sh # 检测FreeSWITCH进程 if ! pgrep -x "freeswitch" > /dev/null; then exit 1 fi # 检测ESL端口 if ! nc -z localhost 8021; then exit 1 fi # 检测SIP端口 if ! nc -z localhost 5060; then exit 1 fi # 执行简单API命令测试 result=$(fs_cli -x "status" 2>/dev/null) if [[ $? -ne 0 ]]; then exit 1 fi exit 0 ``` **主节点切换通知脚本:** ```bash #!/bin/bash # /etc/keepalived/notify_master.sh # 记录日志 echo "$(date): Becoming MASTER" >> /var/log/keepalived-state.log # 启动FreeSWITCH(如果未运行) systemctl start freeswitch # 启动CTI服务 systemctl start callcenter-cti # 发送告警通知 curl -X POST "http://your-alert-api/notify" \ -d "message=FreeSWITCH切换到Master节点" ``` ### 5.3 配置同步方案 ```bash #!/bin/bash # sync_config.sh - 配置同步脚本 MASTER_HOST="10.10.1.10" BACKUP_HOST="10.10.1.11" FS_CONF_DIR="/usr/local/freeswitch/conf" # 使用rsync同步配置 sync_config() { rsync -avz --delete \ ${FS_CONF_DIR}/ \ ${BACKUP_HOST}:${FS_CONF_DIR}/ # 通知备机重载配置 ssh ${BACKUP_HOST} "fs_cli -x 'reloadxml'" } # 监控配置变化 inotifywait -m -r -e modify,create,delete ${FS_CONF_DIR} | while read path action file; do echo "配置变化: $path$file ($action)" sync_config done ``` ### 5.4 FreeSWITCH集群配置 对于需要更高并发的场景,可以部署FreeSWITCH集群: ```xml ``` --- ## 第六部分 性能优化指南 ### 6.1 FreeSWITCH优化 **系统级优化:** ```bash # /etc/security/limits.conf freeswitch soft nofile 65536 freeswitch hard nofile 65536 freeswitch soft core unlimited freeswitch hard core unlimited # /etc/sysctl.conf net.core.rmem_max = 16777216 net.core.wmem_max = 16777216 net.core.rmem_default = 1048576 net.core.wmem_default = 1048576 net.core.netdev_max_backlog = 30000 net.ipv4.tcp_max_syn_backlog = 10240 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_fin_timeout = 15 ``` **FreeSWITCH配置优化:** ```xml ``` ### 6.2 CTI中间件优化 ```java // 连接池配置 @Configuration public class ConnectionPoolConfig { @Bean public ThreadPoolTaskExecutor eslEventExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(8); executor.setMaxPoolSize(32); executor.setQueueCapacity(10000); executor.setThreadNamePrefix("esl-event-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://10.10.2.20:6379") .addNodeAddress("redis://10.10.2.21:6379") .addNodeAddress("redis://10.10.2.22:6379") .setMasterConnectionPoolSize(64) .setSlaveConnectionPoolSize(64); return Redisson.create(config); } } ``` ### 6.3 监控指标 ```yaml # prometheus配置 scrape_configs: - job_name: 'freeswitch' static_configs: - targets: ['10.10.1.10:9282', '10.10.1.11:9282'] - job_name: 'callcenter-cti' static_configs: - targets: ['10.10.1.10:8080', '10.10.1.11:8080'] metrics_path: '/actuator/prometheus' ``` **关键监控指标:** | 指标 | 说明 | 阈值 | |------|------|------| | freeswitch_sessions_active | 当前活跃会话数 | <80%最大值 | | freeswitch_sessions_per_second | 每秒新建会话数 | <100 | | freeswitch_cpu_usage | CPU使用率 | <70% | | freeswitch_memory_usage | 内存使用率 | <80% | | esl_event_queue_size | 事件队列大小 | <5000 | | acd_queue_wait_time | 平均等待时间 | <30s | | agent_utilization | 坐席利用率 | 目标>70% | --- ## 总结 本文档详细介绍了FreeSWITCH的技术架构和ESL开发实践,涵盖了: 1. **核心概念**:Channel、Session、Event等基础概念 2. **模块详解**:mod_sofia、Dialplan、ESL等核心模块 3. **ESL开发**:Java和Go两种语言的开发示例 4. **场景实践**:ACD、预测式外呼、机器人外呼、班长监控 5. **高可用部署**:主备架构、Keepalived配置 6. **性能优化**:系统优化、应用优化、监控指标 建议开发团队在正式开发前: - 搭建FreeSWITCH测试环境熟悉操作 - 仔细研读FreeSWITCH官方文档 - 从简单场景(如点击拨号)开始,逐步实现复杂功能 - 重视测试,特别是高可用切换测试和压力测试