1.安装 Workerman
安装GatewayWorker内核文件(不包含start_gateway.php start_businessworker.php等启动入口文件),直接上composer
composer require workerman/gateway-worker
2.创建 Workerman 启动文件
创建一个自定义命令类文件来启动 Socket 服务端,新建
application/push/command/Workerman.php
namespace app\push\command;
use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
class Workerman extends Command
{
protected function configure()
{
$this->setName('workerman')
->addArgument('action', Argument::OPTIONAL, "action start|stop|restart")
->addArgument('type', Argument::OPTIONAL, "d -d")
->setDescription('workerman chat');
}
protected function execute(Input $input, Output $output)
{
global $argv;
$action = trim($input->getArgument('action'));
$type = trim($input->getArgument('type')) ? '-d' : '';
$argv[0] = 'chat';
$argv[1] = $action;
$argv[2] = $type ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
Worker::runAll();
}
private function startBusinessWorker()
{
// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker进程数量
$worker->count = 4;
//设置处理业务的类,此处制定Events的命名空间
$worker->eventHandler= \app\push\controller\Events::class;
// 服务注册地址
$worker->registerAddress = '127.0.0.1:1238';
}
private function startGateWay()
{
// gateway 进程,这里使用Text协议,可以用telnet测试
$gateway = new Gateway("websocket://0.0.0.0:8282");
// gateway名称,status方便查看
$gateway->name = 'YourAppGateway';
// gateway进程数
$gateway->count = 4;
// 本机ip,分布式部署时使用内网ip
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
// 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口
$gateway->startPort = 20003;
// 服务注册地址
$gateway->registerAddress = '127.0.0.1:1238';
// 心跳间隔
$gateway->pingInterval = 55;
$gateway->pingNotResponseLimit = 1;
// 心跳数据
$gateway->pingData = '';
}
private function startRegister()
{
new Register('text://0.0.0.0:1238');
}
}
配置 application/command.php 文件
return [
'app\common\command\Workerman',
];
3.创建事件监听文件
创建 application/push/controller/Events.php 文件来监听处理 workerman 的各种事件。
<?php
namespace app\push\controller;
use GatewayWorker\Lib\Gateway;
use think\Hook;
use Workerman\Lib\Timer;
class Events
{
//定时器间隔
protected static $interval = 2;
//定时器
protected static $timer = null;
//事件处理类
protected static $evevtRunClass = \app\push\controller\EvevtRun::class;
/*
* 消息事件回调 class
*
* */
protected static $eventClassName = \app\push\controller\Push::class;
/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
$message_data = json_decode($message,true);
if (!$message_data) return ;
try{
if(!isset($message_data['type'])) throw new \Exception('缺少消息参数类型');
//消息回調处理
$evevtName = self::$eventClassName.'::instance';
if(is_callable($evevtName))
$evevtName()->start($message_data['type'],$client_id,$message_data);
else
throw new \Exception('消息处理回调不存在。['+$evevtName+']');
}catch (\Exception $e){
var_dump([
'file'=>$e->getFile(),
'code'=>$e->getCode(),
'msg'=>$e->getMessage(),
'line'=>$e->getLine()
]);
}
}
/**
* 当用户连接时触发的方法
* @param integer $client_id 连接的客户端
* @return void
*/
public static function onConnect($client_id)
{
Gateway::sendToClient($client_id, json_encode(array(
'type' => 'init',
'client_id' => $client_id
)));
}
/**
* 当用户断开连接时触发的方法
* @param integer $client_id 断开连接的客户端
* @return void
*/
public static function onClose($client_id)
{
Gateway::sendToClient($client_id,json_encode([
'type'=>'logout',
'message'=>"client[$client_id]"
]));
}
/**
* 当进程启动时
* @param integer $businessWorker 进程实例
*/
public static function onWorkerStart($worker)
{
//在进程1上开启定时器 每self::$interval秒执行
if($worker->id === 0){
$last = time();
$task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last];
self::$timer = Timer::add(self::$interval, function() use(&$task) {
try {
$now = time();
Hook::exec(self::$evevtRunClass);
foreach ($task as $sec => &$time) {
if (($now - $time) >= $sec) {
$time = $now;
Hook::exec(self::$evevtRunClass,'task_'.$sec);
}
}
} catch (\Throwable $e) {}
});
}
}
/**
* 当进程关闭时
* @param integer $businessWorker 进程实例
*/
public static function onWorkerStop($worker)
{
if($worker->id === 0) Timer::del(self::$timer);
}
}
消息事件回调 class 方法里的处理根据自身情况编写
<?php
namespace app\push\controller;
use app\wap\model\live\LiveUser;
use GatewayWorker\Lib\Gateway;
use app\wap\model\live\LiveHonouredGuest;
use app\wap\model\user\User;
use app\wap\model\live\LiveBarrage;
class Push
{
/*
* @var array 消息内容
* */
protected $message_data = [
'type' => '',
'message'=>'',
];
/*
* @var string 消息类型
* */
protected $message_type = '';
/*
* @var string $client_id
* */
protected $client_id = '';
/*
* @var int 当前登陆用户
* */
protected $uid = null;
/*
* @var null 本类实例化结果
* */
protected static $instance = null;
/*
*
* */
protected function __construct($message_data = [])
{
}
/*
* 实例化本类
* */
public static function instance()
{
if(is_null(self::$instance)) self::$instance = new static();
return self::$instance;
}
/*
* 检测参数并返回
* @param array || string $keyValue 需要提取的键值
* @param null || bool $value
* @return array;
* */
protected function checkValue($keyValue = null,$value = null)
{
if(is_null($keyValue))
$message_data = $this->message_data;
if(is_string($keyValue))
$message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value);
if(is_array($keyValue))
$message_data = array_merge($keyValue,$this->message_data);
if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){
$newData = [];
foreach ($keyValue as $key => $item){
$newData [] = $message_data[$key];
}
return $newData;
}
return $message_data;
}
/*
* 开始设置回调
* @param string $typeFnName 回调函数名
* @param string $client_id
* @param array $message_data
*
* */
public function start($typeFnName,$client_id,$message_data)
{
$this->message_type = $typeFnName;
$this->message_data = $message_data;
$this->client_id = $client_id;
$this->uid = Gateway::getUidByClientId($client_id);
//记录用户上线
if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room')))
{
LiveUser::setLiveUserOnline($live_id,$this->uid,1);
}
if(method_exists($this,$typeFnName))
call_user_func([$this,$typeFnName]);
else
throw new \Exception('缺少回调方法');
}
/*
* 心跳检测
*
* */
protected function ping()
{
return ;
}
/*
* 绑定用户相应客户端
* @param string $client_id
* @param array $message_data
* @return
* */
protected function handshake()
{
$message_data = $this->checkValue(['uid'=>0,'room'=>0]);
if(!$message_data['uid']) throw new \Exception("缺少用户uid,无法绑定用户");
$new_message = [
'type' => $this->message_type,
'client_id' => $this->client_id,
'time' => date('H:i:s'),
'msg' => '绑定成功!'
];
Gateway::bindUid($this->client_id,$message_data['uid']);
//如果有群组id加入群组
if($message_data['room']){
// 加入某个群组(可调用多次加入多个群组) 将clientid加入roomid分组中
Gateway::joinGroup($this->client_id, $message_data['room']);
}
Gateway::sendToClient($this->client_id, json_encode($new_message));
}
/*
* 接受客户端发送的消息
* @param string $client_id 客户端client_id
* @param array $message_data 发送的数据
* @return
*
* */
protected function send()
{
list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true);
$client_id = $this->client_id;
if(!$this->uid) {
//认证用户信息失败,关闭用户链接
Gateway::closeClient($client_id);
throw new \Exception("缺少用户uid");
}
$userInfo = User::get($this->uid);
if(!$userInfo){
//认证用户信息失败,关闭用户链接
Gateway::closeClient($client_id);
throw new \Exception("用户信息缺少");
}
if($room && Gateway::getClientIdCountByGroup($room)){
$user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type');
if(is_null($user_type)) $user_type = 2;
$res = LiveBarrage::set([
'live_id'=>$room,
'uid'=>$this->uid,
'type'=>$type,
'barrage'=>$message,
'add_time'=>time(),
'is_show'=>1
]);
if(!$res) throw new \Exception("写入历史记录失败");
Gateway::sendToGroup($room,json_encode([
'message'=>$message,
'm_type'=>$type,
'type'=>'message',
'user_type'=>$user_type,
'userInfo'=>$userInfo,
'id'=>$res['id']
]));
}else{
$new_message = [
'type' => 'reception',
'content' => $message,
'time' => date('H:i:s'),
'timestamp' => time(),
];
if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message));
}
}
/*
* 消息撤回
* @param string $client_id
* @param array $message_data
* */
protected function recall()
{
list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true);
if(!$id)
throw new \Exception('缺少撤回消息的id');
if(!$room)
throw new \Exception('缺少房间号');
if(LiveBarrage::del($id)){
Gateway::sendToGroup($room,json_encode([
'type'=>'recall',
'id'=>$id
]),Gateway::getClientIdByUid($this->uid));
}
}
}
定时任务事件处理类 按照自身情况编写方法内逻辑
<?php
namespace app\push\controller;
use GatewayWorker\Lib\Gateway;
/*
* 定时任务
*
* */
class EvevtRun
{
/*
* 默认定时器执行事件
* */
public function run()
{
}
/*
* 每隔6秒执行
* */
public function task_6()
{
}
/*
* 每隔10秒执行
* */
public function task_10()
{
}
/*
* 每隔30秒执行
* */
public function task_30()
{
}
/*
* 每隔60秒执行
* */
public function task_60()
{
}
/*
* 每隔180秒执行
* */
public function task_180()
{
}
/*
* 每隔300秒执行
* */
public function task_300()
{
}
}
4.启动 Workerman 服务端
以debug(调试)方式启动
以debug(调试)方式启动
php think workerman start
//以daemon(守护进程)方式启动
php think workerman start d
//停止
php think workerman stop
//重启
php think workerman restart
//平滑重启
php think workerman reload
//查看状态
php think workerman status
//当你看到如下结果的时候,workerman已经启动成功了。
Workerman[chat] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.11 PHP version:7.0.29
------------------------ WORKERS -------------------------------
user worker listen processes status
tegic Gateway websocket://0.0.0.0:8282 4 [OK]
tegic BusinessWorker none 1 [OK]
tegic Register text://0.0.0.0:1236 4 [OK]
----------------------------------------------------------------
Press Ctrl+C to stop. Start success.
5.客户端连接使用
socket.ws.send()调用可发送消息,socket.onmessage 内是处理消息类型,即可实现长链接
(function (global) {
var socketDebug = window.socketDebug == undefined ? false : window.socketDebug;
var socket = {
ws:null,
connect:function () {
var that= this;
that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//这里如果使用127.0.0.1或者localhost会出现连接失败。当时为了方便以后的维护,这里在php的全局文件里定义了一个常量来定义ip,后来本地开发完提交到linux服务器环境之后发现链接失败!按照此行代码会有效连接~
that.ws.onopen = this.onopen;
that.ws.onmessage = this.onmessage;
that.ws.onclose = function(e) {
socketDebug && console.log("连接关闭,定时重连");
that.connect();
};
that.ws.onerror = function(e) {
socketDebug && console.log("出现错误");
};
},
onopen:function () {
var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}';
socket.ws.send(joint);
socket.heartCheck.start();
},
sendMsg:function(content,type,id){
socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}")
},
onmessage:function (e) {
try {
var data = JSON.parse(e.data);
socketDebug && console.log(data)
switch(data.type){
case 'init':
break;
// 服务端ping客户端
case 'ping':
break;
// 登录 更新用户列表
case 'handshake':
break;
// 提醒
case 'reception':
break;
//直播进行中
case 'live_ing':
break;
//直播结束
case 'live_end':
break;
//消息提醒
case 'message':
break;
//消息撤回
case 'recall':
break;
case 'ban':
break;
}
}catch (e) {
socketDebug && console.info(e);
}
},
heartCheck:{
timeout: 3000,
timeoutObj: null,
start: function(){
this.timeoutObj = setInterval(function(){
socket.ws.send("{'type':'ping'}");
}, this.timeout);
}
}
};
window.onload=function () {
socket.connect();
};
global.socket = socket;
return socket
}(this));
windows 版本无法启动,已经在商城项目中使用