src.dualinventive.com/fw/dncm/libdi/common/ditest-php/rpc/SubscribeSocket.php

55 lines
1.1 KiB
PHP
Executable File

<?php
require_once(__DIR__ . '/RpcCommon.php');
if (!isset($g_zmqcontext)) {
$g_zmqcontext = new ZMQContext();
}
class SubscribeSocket extends RpcCommon {
private $__socket;
function __construct(&$test, $uri, $socket_to = 10000) {
parent::__construct($test);
global $g_zmqcontext;
$this->__socket = $g_zmqcontext->getSocket(ZMQ::SOCKET_SUB);
$this->__socket->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
$this->__socket->connect($uri);
}
function recv($deviceUID = "", $timeoutMs = -1) {
$start = microtime(true) * 1000;
$end = $start + $timeoutMs;
while (true) {
while (true) {
$this->__socket->setSockOpt(ZMQ::SOCKOPT_RCVTIMEO, $timeoutMs);
try {
$r = $this->__socket->recv();
} catch (ZMQSocketException $e) {
return null;
}
if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE))
break;
}
$msg = json_decode($r, true);
if ($deviceUID == "") {
break;
} else {
if ($msg['device:uid'] == $deviceUID) {
break;
}
}
if ((microtime(true) * 1000) > $end && $timeoutMs > 0)
return null;
}
return $msg;
}
}