Source of file ServerEndpoint.php
Size: 5,114 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/ServerEndpoint.php
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 
                                Covered by 3 test(s):
                            54 
 
                                Covered by 3 test(s):
                            5556 
 
                                Covered by 3 test(s):
                            57 
 
                                Covered by 2 test(s):
                            5859 
 
                                Covered by 2 test(s):
                            60 
 
                                Covered by 2 test(s):
                            61 
 
                                Covered by 2 test(s):
                            62 
 
                                Covered by 2 test(s):
                            63 
 
                                Covered by 2 test(s):
                            64 
 
                                Covered by 2 test(s):
                            656667 
 
                                Covered by 2 test(s):
                            68 
 
                                Covered by 2 test(s):
                            69 
 
                                Covered by 2 test(s):
                            70 
 
                                Covered by 2 test(s):
                            717273 
 
                                Covered by 2 test(s):
                            74 
 
                                Covered by 2 test(s):
                            75 
 
                                Covered by 2 test(s):
                            76 
 
                                Covered by 2 test(s):
                            77 
 
                                Covered by 2 test(s):
                            78 
 
                                Covered by 2 test(s):
                            79 
 
                                Covered by 2 test(s):
                            8081 
 
                                Covered by 2 test(s):
                            82 
 
                                Covered by 2 test(s):
                            838485 
 
                                Covered by 2 test(s):
                            86 
 
                                Covered by 2 test(s):
                            878889 
 
                                Covered by 1 test(s):
                            909192 
 
                                Covered by 1 test(s):
                            93949596979899100101102103104105 
 
                                Covered by 1 test(s):
                            106107108109110111112113114115116 
 
                                Covered by 2 test(s):
                            117118 
 
                                Covered by 2 test(s):
                            119 
 
                                Covered by 2 test(s):
                            120 
 
                                Covered by 2 test(s):
                            121 
 
                                Covered by 2 test(s):
                            122123 
 
                                Covered by 2 test(s):
                            124 
 
                                Covered by 1 test(s):
                            125 
 
                                Covered by 1 test(s):
                            126 
 
                                Covered by 1 test(s):
                            127 
 
                                Covered by 1 test(s):
                            128129130131132 
 
                                Covered by 1 test(s):
                            133 
 
                                Covered by 1 test(s):
                            134 
 
                                Covered by 1 test(s):
                            135136 
 
                                Covered by 1 test(s):
                            137138 
 
                                Covered by 1 test(s):
                            139 
 
                                Covered by 1 test(s):
                            140 
 
                                Covered by 1 test(s):
                            141 
 
                                Covered by 1 test(s):
                            142143144 
 
                                Covered by 1 test(s):
                            145146 
 
                                Covered by 1 test(s):
                            147 
 
                                Covered by 1 test(s):
                            148149150151152153154155156 
 
                                Covered by 1 test(s):
                            157158159 
 | <?php/** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\Helper\ClassProxy; use MAKS\AmqpAgent\RPC\AbstractEndpoint; use MAKS\AmqpAgent\RPC\ServerEndpointInterface; use MAKS\AmqpAgent\Exception\RPCEndpointException; /** * A class specialized in responding. Implementing only the methods needed for a server. * * Example: * ``` * $serverEndpoint = new ServerEndpoint(); * $serverEndpoint->on('some.event', function () { ... }); * $serverEndpoint->connect(); * $serverEndpoint->respond('Namespace\SomeClass::someMethod', 'queue.name'); * $serverEndpoint->disconnect(); * ``` * * @since 2.0.0 * @api */class ServerEndpoint extends AbstractEndpoint implements ServerEndpointInterface {/**      * The callback to use when processing the requests.     * @var callable     */protected $callback; /**      * Listens on requests coming via the passed queue and processes them with the passed callback.     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.     * @param string|null $queueName [optional] The name of the queue to listen on.     * @return string The last processed request.     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.     */public function respond(?callable $callback = null, ?string $queueName = null): string { $this->callback = $callback ?? [$this, 'callback']; $this->queueName = $queueName ?? $this->queueName; if ($this->isConnected()) { $this->requestQueue = $this->queueName; $this->channel->queue_declare( $this->requestQueue, false, false, false, false ); $this->channel->basic_qos( null, 1, null ); $this->channel->basic_consume( $this->requestQueue, null, false, false, false, false, function ($message) { ClassProxy::call($this, 'onRequest', $message); } ); while ($this->channel->is_consuming()) { $this->channel->wait(); } return $this->requestBody; } throw new RPCEndpointException('Server is not connected yet!'); } /**      * Listens on requests coming via the passed queue and processes them with the passed callback.     * Alias for `self::respond()`.     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.     * @param string|null $queueName [optional] The queue to listen on.     * @return string The last processed request.     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.     */public function serve(?callable $callback = null, ?string $queueName = null): string { return $this->respond($callback, $queueName); } /**      * Replies to the client.     * @param AMQPMessage $request     * @return void     * @throws RPCEndpointException     */protected function onRequest(AMQPMessage $request): void { $this->trigger('request.on.get', [$request]); $this->requestBody = $request->body; $this->responseBody = call_user_func($this->callback, $request); $this->responseQueue = (string)$request->get('reply_to'); $this->correlationId = (string)$request->get('correlation_id'); if (!is_string($this->responseBody)) { throw new RPCEndpointException( sprintf( 'The passed processing callback must return a string, instead it returned (data-type: %s)!', gettype($this->responseBody) ) ); } $message = new AMQPMessage($this->responseBody); $message->set('correlation_id', $this->correlationId); $message->set('timestamp', time()); $this->trigger('response.before.send', [$message]); $request->getChannel()->basic_publish( $message, null, $this->responseQueue ); $request->ack(); $this->trigger('response.after.send', [$message]); } /**      * Returns the final request body. This method will be ignored if a callback in `self::respond()` is specified.     * @param AMQPMessage $message     * @return string     */protected function callback(AMQPMessage $message): string { return $message->body; } } |