php 如何判断mqtt phpfwritee失败

在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。
标签:至少1个,最多5个
php实现mqtt发布/发送 消息到主题
mqtt是啥?我的博客有写这个东西:
php想要实现mqtt需要使用到php中的socket函数;
此次使用的是网上开源mqtt案例:其中使用的是 stream_socket_xxxx 系列函数
大概意思是:
正如你所指出的,'stream'是PHP核心(内置的,始终可用),而'套接字'是很少包含的扩展的一部分。除此之外,它们几乎完全相同。您可以同时使用TCP和UDP两种流,也可以使用阻塞和非阻塞模式,这些模式涵盖了所有用例的99%。我能想到的唯一常见的例外是ICMP。例如,'ping'。但是,看起来目前还没有一种安全的方式来从PHP执行ICMP。这种调用需要通过套接字扩展来实现SOCK_RAW,这需要执行“root”权限。此外,并非所有路由器都会在TCP,UDP和ICMP之外路由其他数据包类型。这限制了套接字扩展的实用性。
MQTT类代码:
/* phpMQTT */
class Mqtt {
/* holds the socket
private $msgid = 1;
/* counter for message id */
public $keepalive = 10;
/* default keepalive timmer */
/* host unix time, used to detect disconects */
public $topics = array();
/* used to store currently subscribed topics */
public $debug =
/* should output debug messages */
/* broker address */
/* broker port */
/* client id sent to brocker */
/* stores the will of the client */
/* stores username */
/* stores password */
function __construct($address, $port, $clientid, $cafile = NULL){
$this-&broker($address, $port, $clientid, $cafile);
/* sets the broker details */
function broker($address, $port, $clientid, $cafile = NULL){
$this-&address = $
$this-&port = $
$this-&clientid = $
$this-&cafile = $
function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){
while($this-&connect($clean, $will, $username, $password)==false){
sleep(10);
/* connects to the broker
inputs: $clean: should the client send a clean session flag */
function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){
if($will) $this-&will = $
if($username) $this-&username = $
if($password) $this-&password = $
if ($this-&cafile) {
$socketContext = stream_context_create(["ssl" =& [
"verify_peer_name" =& true,
"cafile" =& $this-&cafile
$this-&socket = stream_socket_client("tls://" . $this-&address . ":" . $this-&port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
$this-&socket = stream_socket_client("tcp://" . $this-&address . ":" . $this-&port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
if (!$this-&socket ) {
if($this-&debug) error_log("stream_socket_create() $errno, $errstr \n");
stream_set_timeout($this-&socket, 5);
stream_set_blocking($this-&socket, 0);
$buffer = "";
$buffer .= chr(0x00); $i++;
$buffer .= chr(0x06); $i++;
$buffer .= chr(0x4d); $i++;
$buffer .= chr(0x51); $i++;
$buffer .= chr(0x49); $i++;
$buffer .= chr(0x73); $i++;
$buffer .= chr(0x64); $i++;
$buffer .= chr(0x70); $i++;
$buffer .= chr(0x03); $i++;
if($clean) $var+=2;
//Add will info to header
if($this-&will != NULL){
$var += 4; // Set will flag
$var += ($this-&will['qos'] && 3); //Set will qos
if($this-&will['retain'])
$var += 32; //Set will retain
if($this-&username != NULL) $var += 128;
//Add username to header
if($this-&password != NULL) $var += 64;
//Add password to header
$buffer .= chr($var); $i++;
//Keep alive
$buffer .= chr($this-&keepalive && 8); $i++;
$buffer .= chr($this-&keepalive & 0xff); $i++;
$buffer .= $this-&strwritestring($this-&clientid,$i);
//Adding will to payload
if($this-&will != NULL){
$buffer .= $this-&strwritestring($this-&will['topic'],$i);
$buffer .= $this-&strwritestring($this-&will['content'],$i);
if($this-&username) $buffer .= $this-&strwritestring($this-&username,$i);
if($this-&password) $buffer .= $this-&strwritestring($this-&password,$i);
$head{0} = chr(0x10);
$head{1} = chr($i);
fwrite($this-&socket, $head, 2);
fwrite($this-&socket,
$string = $this-&read(4);
if(ord($string{0})&&4 == 2 && $string{3} == chr(0)){
if($this-&debug) echo "Connected to Broker\n";
error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
ord($string{0}),ord($string{3})));
$this-&timesinceping = time();
/* read: reads in so many bytes */
function read($int = 8192, $nb = false){
print_r(socket_get_status($this-&socket));
$string="";
return fread($this-&socket, $togo);
while (!feof($this-&socket) && $togo&0) {
$fread = fread($this-&socket, $togo);
$string .= $
$togo = $int - strlen($string);
/* subscribe: subscribes to topics */
function subscribe($topics, $qos = 0){
$buffer = "";
$id = $this-&
$buffer .= chr($id && 8);
$buffer .= chr($id % 256);
foreach($topics as $key =& $topic){
$buffer .= $this-&strwritestring($key,$i);
$buffer .= chr($topic["qos"]);
$this-&topics[$key] = $
$cmd = 0x80;
($qos && 1);
$head = chr($cmd);
$head .= chr($i);
fwrite($this-&socket, $head, 2);
fwrite($this-&socket, $buffer, $i);
$string = $this-&read(2);
$bytes = ord(substr($string,1,1));
$string = $this-&read($bytes);
/* ping: sends a keep alive ping */
function ping(){
$head = " ";
$head = chr(0xc0);
$head .= chr(0x00);
fwrite($this-&socket, $head, 2);
if($this-&debug) echo "ping sent\n";
/* disconnect: sends a proper disconect cmd */
function disconnect(){
$head = " ";
$head{0} = chr(0xe0);
$head{1} = chr(0x00);
fwrite($this-&socket, $head, 2);
/* close: sends a proper disconect, then closes the socket */
function close(){
$this-&disconnect();
stream_socket_shutdown($this-&socket, STREAM_SHUT_WR);
/* publish: publishes $content on a $topic */
function publish($topic, $content, $qos = 0, $retain = 0){
$buffer = "";
$buffer .= $this-&strwritestring($topic,$i);
//$buffer .= $this-&strwritestring($content,$i);
$id = $this-&msgid++;
$buffer .= chr($id && 8);
$buffer .= chr($id % 256);
$buffer .= $
$i+=strlen($content);
$head = " ";
$cmd = 0x30;
if($qos) $cmd += $qos && 1;
if($retain) $cmd += 1;
$head{0} = chr($cmd);
$head .= $this-&setmsglength($i);
fwrite($this-&socket, $head, strlen($head));
fwrite($this-&socket, $buffer, $i);
/* message: processes a recieved topic */
function message($msg){
$tlen = (ord($msg{0})&&8) + ord($msg{1});
$topic = substr($msg,2,$tlen);
$msg = substr($msg,($tlen+2));
$found = 0;
foreach($this-&topics as $key=&$top){
if( preg_match("/^".str_replace("#",".*",
str_replace("+","[^\/]*",
str_replace("/","\/",
str_replace("$",'\$',
$key))))."$/",$topic) ){
if(is_callable($top['function'])){
call_user_func($top['function'],$topic,$msg);
$found = 1;
if($this-&debug && !$found) echo "msg recieved but no match in subscriptions\n";
/* proc: the processing loop for an "allways on" client
set true when you are doing other stuff in the loop good for watching something else at the same time */
function proc( $loop = true){
$sockets = array($this-&socket);
$w = $e = NULL;
//$byte = fgetc($this-&socket);
if(feof($this-&socket)){
if($this-&debug) echo "eof receive going to reconnect for good measure\n";
fclose($this-&socket);
$this-&connect_auto(false);
if(count($this-&topics))
$this-&subscribe($this-&topics);
$byte = $this-&read(1, true);
if(!strlen($byte)){
if($loop){
usleep(100000);
$cmd = (int)(ord($byte)/16);
if($this-&debug) echo "Recevid: $cmd\n";
$multiplier = 1;
$value = 0;
$digit = ord($this-&read(1));
$value += ($digit & 127) * $
$multiplier *= 128;
}while (($digit & 128) != 0);
if($this-&debug) echo "Fetching: $value\n";
if($value)
$string = $this-&read($value);
switch($cmd){
$this-&message($string);
$this-&timesinceping = time();
if($this-&timesinceping & (time() - $this-&keepalive )){
if($this-&debug) echo "not found something so ping\n";
$this-&ping();
if($this-&timesinceping&(time()-($this-&keepalive*2))){
if($this-&debug) echo "not seen a package in a while, disconnecting\n";
fclose($this-&socket);
$this-&connect_auto(false);
if(count($this-&topics))
$this-&subscribe($this-&topics);
/* getmsglength: */
function getmsglength(&$msg, &$i){
$multiplier = 1;
$value = 0 ;
$digit = ord($msg{$i});
$value += ($digit & 127) * $
$multiplier *= 128;
}while (($digit & 128) != 0);
/* setmsglength: */
function setmsglength($len){
$string = "";
$digit = $len % 128;
$len = $len && 7;
// if there are more digits to encode, set the top bit of this digit
if ( $len & 0 )
$digit = ($digit | 0x80);
$string .= chr($digit);
}while ( $len & 0 );
/* strwritestring: writes a string to a buffer */
function strwritestring($str, &$i){
$ret = " ";
$len = strlen($str);
$msb = $len && 8;
$lsb = $len % 256;
$ret = chr($msb);
$ret .= chr($lsb);
$i += ($len+2);
function printstr($string){
$strlen = strlen($string);
for($j=0;$j&$$j++){
$num = ord($string{$j});
if($num & 31)
$chr = $string{$j}; else $chr = " ";
printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);
发送到主题
// 发送给订阅号信息,创建socket,无sam队列
$server = "127.0.0.1";
// 服务代理地址(mqtt服务端地址)
$port = 1883;
// 通信端口
$username = "";
// 用户名(如果需要)
$password = "";
// 密码(如果需要
$client_id = "clientx9293670xxctr"; // 设置你的连接客户端id
$mqtt = new Mqtt($server, $port, $client_id); //实例化MQTT类
if ($mqtt-&connect(true, NULL, $username, $password)) {
//如果创建链接成功
$mqtt-&publish("xxxctr", "setr=3xxxxxxxxx", 0);
// 发送到 xxxctr 的主题 一个信息 内容为 setr=3xxxxxxxxx Qos 为 0
$mqtt-&close();
//发送后关闭链接
echo "Time out!\n";
/*// 订阅信息,接收一个信息后退出
$server = "127.0.0.1";
// 服务代理地址(mqtt服务端地址)
$port = 1883;
// 通信端口
$username = "";
// 用户名(如果需要)
$password = "";
// 密码(如果需要
$client_id = "clientx9293670xxctr"; // 设置你的连接客户端id
$mqtt = new Mqtt($server, $port, $client_id);
if(!$mqtt-&connect(true, NULL, $username, $password)) { //链接不成功再重复执行监听连接
$topics['SN70state'] = array("qos" =& 0, "function" =& "procmsg");
// 订阅主题为 SN70state qos为0
$mqtt-&subscribe($topics, 0);
while($mqtt-&proc()){
//死循环监听
$mqtt-&close();
function procmsg($topic, $msg){ //信息回调函数 打印信息
echo "Msg Recieved: " . date("r") . "\n";
echo "Topic: {$topic}\n\n";
echo "\t$msg\n\n";
$xxx = json_decode($msg);
var_dump($xxxxxx-&aa);
这是php实现方法,如果用php做发送端还是不错的.但是
我被这个图片打击了,区块链应用还真提莫的是js写起来跟简单;
我最终写出的mqtt api 使用的是为什么?
0 收藏&&|&&4
你可能感兴趣的文章
13 收藏,1.8k
2 收藏,347
本作品采用署名-非商业性使用-禁止演绎 4.0 国际许可协议 进行许可
分享到微博?
我要该,理由是:
在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。下次自动登录
现在的位置:
& 综合 & 正文
MQTT协议详解二
下面我们开始一步步执行连接,订阅,发布和接收。
首先就是CONNECT,发送连接申请。
在连接中我们要做的是将客户端ID、消息标识、用户名、密码等能过Socket传送给服务器。
private $msgid=1;//消息id
$keepalive=10;//默认心跳时间
public $//主机时间,有来断开连接
public $topics=array();//订阅主题
public $debug=
public $ //主机名
public $//端口名
public $ //客户机id
public $//是否保存客户机will标识的数组,包括Qos,Retain,Dup。
public $ //保存用户名
public $ //保存密码
public $operations=array(
"MQTT_CONNECT"=&1,//请求连接
"MQTT_CONNACK"=&2,//请求应答
"MQTT_PUBLISH"=&3,//发布消息
"MQTT_PUBACK"=&4,//发布应答
"MQTT_PUBREC"=&5,//发布已接收,保证传递1
"MQTT_PUBREL"=&6,//发布释放,保证传递2
"MQTT_PUBCOMP"=&7,//发布完成,保证传递3
"MQTT_SUBSCRIBE"=&8,//订阅请求
"MQTT_SUBACK"=&9,//订阅应答
"MQTT_UNSUBSCRIBE"=&10,//取消订阅
"MQTT_UNSUBACK"=&11,//取消订阅应答
"MQTT_PINGREQ"=&12,//ping请求
"MQTT_PINGRESP"=&13,//ping响应
"MQTT_DISCONNECT"=&14//断开连接
以上是变量的申明,但不一定全部都用上,比如用户名和密码就可选。
function connect($clean=1,$will=NULL,$userarray=NULL){
$this-&username=$userarray['username'];
$this-&password=$userarray['password'];
$this-&socket=fsockopen($this-&host,$this-&port,$errno,$errstr,60);
if(!$this-&socket){
$this-&debug("socket_error $errno,$errstr &br/&");
stream_set_timeout($this-&socket, 10);//用于读取流时的时间控制
stream_set_blocking($this-&socket, 0);//0表示非阻塞,1表示阻塞
$payload=$this-&create_connect_payload($this-&clientid,$will,$userarray);
$index+=$payload['index'];
$vhead=$this-&create_variable_connect_header($userarray,$will,$clean);
$index+=$vhead['index'];
$conhead=$this-&create_fixed_header('MQTT_CONNECT',0,0,0); //发起连接
$index=$this-&numencoding($index); //因为对于超过每个字节是8位,超过就要多加一位。
$conhead.=$
fwrite($this-&socket,$conhead,strlen($conhead));//传递固定头部长度2字节
fwrite($this-&socket,$vhead['content'].$payload['content']);//传递变长头部和消息体
$resCode =$this-&read_fixed_header($this-&socket); //读取返回的头部
$resRemaing=$this-&read_remaing_length($this-&socket);
if($resRemaing&0){
$body=unpack('Ccomp/Cretcode', fread($this-&socket,$resRemaing));
if($resCode['msgtype'] == $this-&operations['MQTT_CONNACK'] && $body['retcode'] ==0){ //0表示连接成功,大于1-5表示错误。
$this-&debug("Connected to {$this-&host}:{$this-&port}&br/&");
$this-&debug(sprintf("Connection failed! (Error: 0x%02x)",
$body['retcode']));//读取错误,其错误代码见协议
$this-&timesinceping = time();
这个是发起连接的函数,安全是按照协议来组装代码,但是这是用PHP实现的时候要注意一下,因为PHP是弱类型的,不能像C里直接计算,否则会将非INT型的当做0,INT型的直接计算,所以要注意使用chr()和ord(),pack()和unpacka()这两对函数,在字符拼接时,一定要用chr()或pack()把ASCII转成Char型的,而在计算移位时,又要把其转换为ASCII。
二、对于整个推送的架构应该如下图所示:
上面是当Qos为1的推送,即对于推送的内容会保存至服务器中,然后返回PUBACK告知操作成功,然后便会向在线终端推送。当设置Retain值为1时,但终端上线,则会立即推送。
我们现在就按Qos来理解推送的三种机制(Qos仅在PUBLISH消息中有效)
当Qos为0时,是最基本的推送形式,它最大的特点就是无保障性的,因为当你将数据包发送给服务器时,服务器不会有任何的回应,故不知成功与否
但如果成功,刚会发送给相应的订阅者,而且不会保存任何的发送信息,发完就完全删除了,所以成功率为小于等于1。
当Qos为1时,则是有保障性的传输,当发送推送数据给服务器时,其会返回一个PUBACK的确认包给发送者,用于确定成功与否,当把Retain值设为1时,会保存这个状态,对于所有后来上线的都会直接推送最后一次推的内容。所以说至少会推送成功一次。
当Qos为2时,因为是我们最常用的了,就要通过协议的机制既要保证推送的成功性,也要保证推送的准确性。
下面给出其示例图:
可以说要保证推送的成功传送,其认证的过程是比较比杂的,首先在发送的时候将Qos标识为2,然后发送PUBLISH信息,服务器收到后会将信息保存然后发送PUBREC告知
已接收,但此时服务器并不会推送,而是等待下一个客户端的PUBREL
下面贴出代码:
public function publish($topic,$message,$qos=2,$retain=0){
$pubhead=$this-&create_fixed_header('MQTT_PUBLISH',1,$qos,$retain);//生成固定头部标识为PUBLISH,其消息结构也比较简单。
$body.=$this-&strencoding($topic,$index);
if($qos&0){
//如果Qos大于0,即要保证传输则要加上MessageID,自己定义
$body.=$this-&create_message_id(++$this-&msgid,$index);
$index+=strlen($message);
$pubhead.=$this-&numencoding($index);
//同样的要对消息体的长度按照消息规定进行设置
fwrite($this-&socket,$pubhead,strlen($pubhead));
fwrite($this-&socket,$body,$index);
$resCode =$this-&read_fixed_header($this-&socket);
$resRemaing=$this-&read_remaing_length($this-&socket);
//接收返回信息
if($resRemaing&0){
if($resCode['msgtype']==$this-&operations['MQTT_PUBACK']){
//如果Qos为1的话,那么返回的是PUBACK
$logedId=$this-&read_message_id($this-&socket);
if($logedId==$this-&msgid){
$this-&debug(" Qos 1 Message ".$logedId."send successfully&br/&");
}else if($resCode['msgtype']==$this-&operations['MQTT_PUBREC']){ //如果Qos为2的话,那么返回信息就是PUBREC,此时要发送同意推送信息PUBREL
$this-&debug("received message PUBREC");
$logedId=$this-&read_message_id($this-&socket);
if($logedId==$this-&msgid){
$this-&debug("Qos 2 Message ".$this-&msgid."send successfully&br/&");
$relHead=$this-&create_fixed_header('MQTT_PUBREL',0,0,0);
$payload=$this-&create_message_id($logedId,$index);
$relHead.=$this-&numencoding($index);
fwrite($this-&socket, $relHead,strlen($relHead));
fwrite($this-&socket,$payload,strlen($payload));
$resCode=$this-&read_fixed_header($this-&socket);
$resRemaing=$this-&read_remaing_length($this-&socket);
if($resRemaing&0){
if($resCode['msgtype']==$this-&operations['MQTT_PUBCOMP']){ //最后收到的是PUBCOMP信息,表示推送结束。
$clogedId=$this-&read_message_id($this-&socket);
if($clogedId==$logedId){
$this-&debug("the whole flow send successfully ,then the server can delete the message!&br/&");
接下去说说订阅的,订阅者总体来说比发送者处理上要简单,仅仅是四个很容易理解的消息,SUBSCRIBE,SUBACK,UNSUBSCRIBE和UNSUBACK。
分别是对订阅和取消的判断。
下面直接贴代码:
public function subscribe($topics,$qos=0,$dup=0){
$subHead=$this-&create_fixed_header("MQTT_SUBSCRIBE",$dup,$qos,0);//在订阅中Retain标识是元效的
$body=$this-&create_message_id($this-&msgid,$index);
foreach($topics as $key=&$topic){
$body.=$this-&strencoding($topic['topic'],$index);
//这里补充一下,因为此支持一次订阅多个主题,所以topics是一个二维数组。第二维表示主题和qos。
$body.=chr($topic['qos']);
$subHead.=$this-&numencoding($index);
//老样子,第二个字节是下面字符的总长。
fwrite($this-&socket,$subHead,strlen($subHead));
fwrite($this-&socket,$body);
$res=$this-&read_fixed_header($this-&socket);
//获取返回值
$remaing=$this-&read_remaing_length($this-&socket);
if($remaing&0){
if($res['msgtype']==$this-&operations['MQTT_SUBACK']){ //解析SUBACK包的消息
$this-&debug("get suback&br/&");
$logedId=$this-&read_message_id($this-&socket);
if($logedId==$this-&msgid){
$this-&debug("subscribe successful&br/&");
$qosGrantCode=fread($this-&socket,count($topics[0]));
//在SUBACK的消息中我们可以确认我们订阅各个主题的Qos。
$this-&debug(var_dump(unpack("C*",$qosGrantCode)));
取消订阅和订阅的结构和方法其实是一样的
下面贴出取消订阅的代码:
public function unsubscribe($untopics,$dup=0,$qos=1){
$unsHead=$this-&create_fixed_header("MQTT_UNSUBSCRIBE",$dup,$qos,0);
$body=$this-&create_message_id($this-&msgid,$index);
//变长头为MessageID
foreach($untopics as $untopic){
$body.=$this-&strencoding($untopic,$index); //这里的untopics为一维数组,保存退订的主题。
$unsHead.=$this-&numencoding($index);
fwrite($this-&socket,$unsHead,strlen($unsHead));
fwrite($this-&socket,$body);
$res=$this-&read_fixed_header($this-&socket);
//读取头部
$remaing=$this-&read_remaing_length($this-&socket);
if($remaing&0){
if($res['msgtype']==$this-&operations['MQTT_UNSUBACK']){ //处理UNSUBACK信息。
$unlogedId=$this-&read_message_id($this-&socket);
if($unlogedId==$this-&msgid){
//匹配发送前后ID是否相同
$this-&debug("unsubscribe successful");
最后就是PINGREQ消息,即在每个Keep Alive Time中发送Ping请求,来判断主机是否仍然在线。
做为回应则是PINGRESP消息,表示目前仍然在线。
下面贴出Ping请求和回应代码:
public function pingreq(){
$reqHead=$this-&create_fixed_header("MQTT_PINGREQ",0,0,0);//发送PINT REQUEST,其它标识位无意义,全为0
$reqHead.=chr(0);
//只有固定头部,固其只有2个字节的消息长
fwrite($this-&socket,$reqHead,strlen($reqHead));
$res=$this-&read_fixed_header($this-&socket);
if($res!=NULL&&$res['msgtype']==$this-&operations['MQTT_PINGRESP']){ //如果返回了PING RESPONSE则服务器仍在线
$this-&debug("Yes I am alive!");
$this-&debug("Server is outline!");
以上代码是各部分的实现,而要实际使用则要配合异步和事件机制,虽然PHP一直以来在线程方面比较弱,但是以现在PHP所支持的来看,已完全有能力实现异步和事件机制。
这也是我下一篇博文要介绍的,同时还会对MQTT协议有针对性的修改和补充,使之更好的结合数据库以及满足较高并发量的实现。
最后,能力有限,人艰不拆,欢迎指教。
【上篇】【下篇】}

我要回帖

更多关于 php fwrite 失败 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信