132 lines
2.9 KiB
PHP
Executable File
132 lines
2.9 KiB
PHP
Executable File
<?php
|
|
|
|
require_once(__DIR__ . '/RpcCommon.php');
|
|
|
|
if (!isset($g_zmqcontext)) {
|
|
$g_zmqcontext = new ZMQContext();
|
|
}
|
|
|
|
class RequestSocket extends RpcCommon {
|
|
private $__socket;
|
|
|
|
function __construct(&$test, $uri) {
|
|
parent::__construct($test);
|
|
global $g_zmqcontext;
|
|
|
|
$this->__socket = $g_zmqcontext->getSocket(ZMQ::SOCKET_REQ);
|
|
$this->__socket->connect($uri);
|
|
}
|
|
|
|
function send($data) {
|
|
if (is_string($data)) {
|
|
$ret = $this->__socket->send($data);
|
|
} elseif (is_array($data)) {
|
|
$ret = $this->__socket->send(json_encode($data));
|
|
} else {
|
|
$this->__test->failure("RPC message is not a valid reply");
|
|
var_dump($data);
|
|
}
|
|
$this->_test->equal($this->__socket, $ret);
|
|
}
|
|
|
|
/**
|
|
* Expect no received messages
|
|
*
|
|
* @param for how many millis no messages must be received
|
|
*/
|
|
function recvNone($for) {
|
|
$endTime = (microtime(true) * 1000) + $for;
|
|
while (true) {
|
|
$r = $this->__socket->recv(ZMQ::MODE_NOBLOCK);
|
|
$this->_test->isFalse($r);
|
|
if ($r) {
|
|
$this->_test->failure("Got message while expecting none");
|
|
break;
|
|
}
|
|
if ($endTime < (microtime(true) * 1000))
|
|
break;
|
|
usleep(1000);
|
|
}
|
|
}
|
|
|
|
function recv($classMethod, $result = [], $autoReply = false) {
|
|
while (true) {
|
|
$r = $this->__socket->recv();
|
|
if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE))
|
|
break;
|
|
}
|
|
|
|
$msg = $this->validateRpc($r, $classMethod);
|
|
|
|
$this->_test->isTrue(isset($msg['rep']));
|
|
|
|
if (count($result))
|
|
$this->_test->equal($result, $msg['result']);
|
|
|
|
if ($autoReply)
|
|
$this->sendReply($msg);
|
|
|
|
return $msg;
|
|
}
|
|
|
|
public function reqRep($request, $error = false)
|
|
{
|
|
$this->send($request);
|
|
if ($error !== false) {
|
|
return $this->recvError($request['req'], $error);
|
|
}
|
|
return $this->recv($request['req']);
|
|
}
|
|
|
|
// reqRepWithRetry retries as long as the error is DNE_TIMEOUT, DNE_BUSY, DNE_AGAIN
|
|
public function reqRepWithRetry($request, $retries = -1, $waitTimeMs = 3000)
|
|
{
|
|
while (true) {
|
|
if ($retries == 0)
|
|
return null;
|
|
|
|
$this->send($request);
|
|
$rep = $this->recv($request['req']);
|
|
|
|
if (isset($rep['error']['code'])) {
|
|
$error = $rep['error']['code'];
|
|
switch ($error) {
|
|
case DNE_BUSY:
|
|
case DNE_TIMEOUT:
|
|
case DNE_AGAIN:
|
|
break;
|
|
default:
|
|
return $rep;
|
|
}
|
|
} else {
|
|
return $rep;
|
|
}
|
|
|
|
if ($retries > 0) {
|
|
usleep($waitTimeMs * 1000);
|
|
$retries--;
|
|
}
|
|
}
|
|
}
|
|
|
|
function recvError($classMethod, $expectedError) {
|
|
while (true) {
|
|
$r = $this->__socket->recv();
|
|
if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE))
|
|
break;
|
|
}
|
|
|
|
$msg = $this->validateRpc($r, $classMethod);
|
|
|
|
if (!isset($msg['error'])) {
|
|
$this->_test->errorCount++;
|
|
$this->_test->failed("Error occurred, error member NOT found");
|
|
} elseif($msg['error']['code'] != $expectedError) {
|
|
$this->_test->errorCount++;
|
|
$this->_test->failed("Error occurred, expected code " . $expectedError . ", got " . $msg['error']['code']);
|
|
} else {
|
|
}
|
|
return $msg;
|
|
}
|
|
}
|