__socket = $g_zmqcontext->getSocket(ZMQ::SOCKET_REP); $this->__socket->bind($uri); } /** * Expect no received messages * * @param for how many millis no messages must be received */ function recvNone($for) { $endTime = (microtime(true) * 1000) + $for; while (true) { $r = $this->__socket->recv(ZMQ::MODE_NOBLOCK); $this->_test->isFalse($r); if ($r) { $this->_test->failure("Got message while expecting none"); break; } if ($endTime < (microtime(true) * 1000)) break; usleep(1000); } } function recv($classMethod, $params = [], $autoReply = false) { while (true) { $r = $this->__socket->recv(); if (!$this->__socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) break; } $msg = $this->validateRpc($r, $classMethod); $this->_test->isTrue(isset($msg['req'])); if (count($params)) $this->_test->equal($params, $msg['params']); if ($autoReply) $this->sendReply($msg); return $msg; } /** * Send a reply, data can be a request, or a pre-crafted reply * when the 'req' is present and the 'rep' not it will change the request to a empty reply. * when the 'rep' is present and the 'req' not it shall leave the reply untouched. * when data is a string it shall be forwarded silently * * result? */ function sendReply($data, $result = null) { if (is_string($data)) { $ret = $this->__socket->send($data); } elseif (isset($data['req']) && !isset($data['rep'])) { unset($data['params']); $data['rep'] = $data['req']; unset($data['req']); $data['time'] = round(microtime(true) * 1000); $ret = $this->__socket->send(json_encode($data)); } elseif (!isset($data['req']) && isset($data['rep'])) { $data['time'] = round(microtime(true) * 1000); $ret = $this->__socket->send(json_encode($data)); } else { $this->_test->failure("RPC message is not a valid reply"); var_dump($data); } $this->_test->equal($this->__socket, $ret); } }