phprabbitmq(在PHP中如何使用RabbitMQ来实现消息的订阅和发布?)

wufei123 发布于 2023-10-22 阅读(757)

菜鸟教程html下载

本文将介绍在PHP中如何使用RabbitMQ来实现消息的订阅和发布我使用的系统依然是Centos7,为了方便,应用服务器我使用Docker进行部署,容器环境:centos7+ngi资源nx+php5.6运行环境,安装AMQP扩展:

phprabbitmq(在PHP中如何使用RabbitMQ来实现消息的订阅和发布?)

如何安装Docker我就不说了,网上很多教程非常简单,如果有现成的php环境可以直接使用Docker中我使用的镜像名为webdevops/php-ngin资源x,tag为:centos-7-php56下载镜像:。

(国际带宽出口不稳定,可能会下载失败,重试记次就好了)docker pull webdevops/php-nginx:centos-7-php56资源 //下载镜像 docker run -d -p 80:80 --name rabbitmq webdevops/php-nginx:centos-7-php56 //运行容器 资源 docker exec -ti rabbitmq /bin/bash //进入容器

进入到容器后检测下环境是否有相应扩展cd app vi index.php 刚刚我们在运行容资源器的时候使用80端口,在浏览器中输入http://ip

搜索下没有amqp相关的信息下面开始安装amqp扩展yum install gcc librabbitmq-devel.x86_64 php56w资源-devel -y wget http://pecl.php.net/get/amqp-1.4.0.tgz tar -zxvf amqp-1.4.0.t资源gz cd amqp-1.4.0 phpize ./configure --with-amqp make && make install资源

在php.ini中开启extension=amqp.so 接着重启php-fpm 或 Web服务器vi /etc/php.ini extension=amqp.so我这里就直接重启容资源器了,如果是宿主机直接安装php环境直接重启环境。

exit //退出容器 docker restart rabbitmq //重启容器再查看phpinfo,amqp扩展已经安装好了:

p资源ublish发布消息在/app路径下新建一个publish.php的文件touch publish.php vi publish.php以下是PHP代码,我们先定义好用来发消息的交换机资源、队列、RoutingKey、消息等变量。

$queueName = superrd; $exchangeName = superrd; $routeKey = supe资源rrd; $message = Hello World!;按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array(host 资源=> 10.99.121.137, port => 5672, vhost => /, login => superrd, password => superrd)); $connec资源tion->connect() or die("Cannot connect to the broker!\n");

新建一个信道$channel = new AMQPChannel($connecti资源on);新建一个交换机Exchange,并定义属性,第二章我们讲过有四种类型的交换机,这里使用直连型DIRECTAMQP_DURABLE代表这是一个持久化的交换机,不会以为服务器异常等因素丢失。

$ex资源change = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->s资源etType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareEx资源change();

新建一个队列Queue,前面也讲过生产者将消息发送到Exchange中,Exchange会根据绑定关系投递到队列,也就是如果生产者在生产消息时没有队列与之绑定消息就会丢失为了保证系统资源更加健硕,一般无论是消息的生产者还是消费者都会新建一遍Exchange和Queue,新建后属性不会改变。

同样AMQP_DURABLE代表这是一个持久化的队列,队列会被写入磁盘需要注意的是虽然消息是缓存资源在队列中,但是并不是队列是持久化的队列队列中的消息就是持久化的,消息的持久化需要单独设置$queue = new AMQPQueue($channel); $queue->setNam资源e($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue();

通过routeKey绑定交换资源机和队列$queue->bind($exchangeName, $routeKey);好了,下面可以发送消息了$exchange->publish($message,$routeKey);。

如果你希望资源消息也是持久化的可以使用如下的代码,实际测试结果在持久化消息后消息发布的性能下降一倍,我的磁盘是pcie的固态硬盘,如果你是机械磁盘这个性能下降估计会更明显,24核心CPU,48GB内存,pcie固态资源硬盘,单线程的情况下每秒可以发布2.5万左右的非持久化消息,持久化之后变为变为1.2万左右。

$exchange->publish($message,$routeKey,AMQP_NOPARAM, ar资源ray(delivery_mode=>2));断开连接$connection->disconnect();。

同样在发布消息之后可以通过WEB工具来查看是否发布成功,查看交换机多了一个superid交换资源机。

查看交换机已经有superrd队列。

点击队列查看队列详情。Bindings标签可以看到交换机和队列的绑定关系。

点击Get messages标签Get message(s)按钮可以看到队列中的消息。资源

到此说明我们已经将一个消息发布到了消息队列中完整的PHP代码如下10.99.121.137, port => 5672, vhost => /, login => superrd, password 资源=> superrd)); $connection->connect() or die("Cannot connect to the broker!\n"); try 资源{ $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); 资源 $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $资源exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQue资源ue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $资源queue->declareQueue(); $queue->bind($exchangeName, $routeKey); $exchange->publish($m资源essage,$routeKey); var_dump("[x] Sent Hello World!"); } catch (AMQPConnectionExcepti资源on $e) { var_dump($e); exit(); } $connection->disconnect();。

Subscrib资源e订阅消息在/app路径下新建一个subscribe.php的文件touch subscribe.php vi subscribe.php以下是PHP代码,和发布消息一样我们先定义好用资源交换机、队列、RoutingKey等变量。

$queueName = superrd; $exchangeName = superrd; $routeKey = supe资源rrd;按照我们第二章讲到的首先建立一个连接$connection = new AMQPConnection(array(host => 10.99.121.137, port => 5672, vh资源ost => /, login => superrd, password => superrd)); $connection->connect() or die("Cannot con资源nect to the broker!\n");。

新建一个信道$channel = new AMQPChannel($connection);与发布消息一样新建交换机$exchange = new A资源MQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX资源_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange();。

新建一个资源队列Queue$queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFla资源gs(AMQP_DURABLE); $queue->declareQueue();。

通过routeKey绑定交换机和队列$queue->bind($exchangeName, $rou资源teKey);重点来了,阻塞订阅消息//阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume资源(processMessage); //自动ACK应答 //$queue->consume(processMessage, AMQP_AUTOACK); 资源 } $conn->disconnect(); /* * 消费回调函数 * 处理消息 */ functi资源on processMessage($envelope, $q) { $msg = $envelope->getBody(); echo $msg."\n"; //处理资源消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }。

注意因为是阻塞监听,因为输出缓冲区的原因用浏览器访问该文件资源是看不到输出的。使用脚本访问。php /app/subscribe.php

通过WEB工具查看队列。superrd队列中的消息数已经为0。

完整的PHP代码如下10.99.121.137, port =>资源 5672, vhost => /, login => superrd, password => superrd)); $connection->connect() or die("C资源annot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange资源 = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType资源(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange资源(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->s资源etFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey资源); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume(proces资源sMessage); //自动ACK应答 //$queue->consume(processMessage, AMQP_AUTOACK); } 资源 $conn->disconnect(); /* * 消费回调函数 * 处理消息 */ function proc资源essMessage($envelope, $q) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 资源 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }。

亲爱的读者们,感谢您花时间阅读本文。如果您对本文有任何疑问或建议,请随时资源联系我。我非常乐意与您交流。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

大众 新闻19671