__socket = $g_zmqcontext->getSocket(ZMQ::SOCKET_REQ); $this->__socket->connect($uri); } function send($data) { if (is_string($data)) { $ret = $this->__socket->send($data); } elseif (is_array($data)) { $ret = $this->__socket->send(json_encode($data)); } else { $this->__test->failure("RPC message is not a valid reply"); var_dump($data); } $this->_test->equal($this->__socket, $ret); } /** * Expect no received messages * * @param for how many millis no messages must be received */ function recvNone($for) { $endTime = (microtime(true) * 1000) + $for; while (true) { $r = $this->__socket->recv(ZMQ::MODE_NOBLOCK); $this->_test->isFalse($r); if ($r) { $this->_test->failure("Got message while expecting none"); break; } if ($endTime < (microtime(true) * 1000)) break; usleep(1000); } } function recv($classMethod, $result = [], $autoReply = false) { while (true) { $r = $this->__socket->recv(); if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) break; } $msg = $this->validateRpc($r, $classMethod); $this->_test->isTrue(isset($msg['rep'])); if (count($result)) $this->_test->equal($result, $msg['result']); if ($autoReply) $this->sendReply($msg); return $msg; } public function reqRep($request, $error = false) { $this->send($request); if ($error !== false) { return $this->recvError($request['req'], $error); } return $this->recv($request['req']); } // reqRepWithRetry retries as long as the error is DNE_TIMEOUT, DNE_BUSY, DNE_AGAIN public function reqRepWithRetry($request, $retries = -1, $waitTimeMs = 3000) { while (true) { if ($retries == 0) return null; $this->send($request); $rep = $this->recv($request['req']); if (isset($rep['error']['code'])) { $error = $rep['error']['code']; switch ($error) { case DNE_BUSY: case DNE_TIMEOUT: case DNE_AGAIN: break; default: return $rep; } } else { return $rep; } if ($retries > 0) { usleep($waitTimeMs * 1000); $retries--; } } } function recvError($classMethod, $expectedError) { while (true) { $r = $this->__socket->recv(); if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) break; } $msg = $this->validateRpc($r, $classMethod); if (!isset($msg['error'])) { $this->_test->errorCount++; $this->_test->failed("Error occurred, error member NOT found"); } elseif($msg['error']['code'] != $expectedError) { $this->_test->errorCount++; $this->_test->failed("Error occurred, expected code " . $expectedError . ", got " . $msg['error']['code']); } else { } return $msg; } }