PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

延时队列

Delayproducer.Php

Amqpbuilder.Php

AmqpBuilder.php

  1. <?php
  2. declare(strict_types = 1);
  3. namespace App\Components\Amqp;
  4. use Hyperf\Amqp\Builder\Builder;
  5. use Hyperf\Amqp\Builder\QueueBuilder;
  6. class AmqpBuilder extends QueueBuilder
  7. {
  8. /**
  9. * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
  10. *
  11. * @return \Hyperf\Amqp\Builder\Builder
  12. */
  13. public function setArguments($arguments) : Builder
  14. {
  15. $this->arguments = array_merge($this->arguments, $arguments);
  16. return $this;
  17. }
  18. /**
  19. * 设置延时队列相关参数
  20. *
  21. * @param string $queueName
  22. * @param int $xMessageTtl
  23. * @param string $xDeadLetterExchange
  24. * @param string $xDeadLetterRoutingKey
  25. *
  26. * @return $this
  27. */
  28. public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
  29. {
  30. $this->setArguments([
  31. 'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒
  32. 'x-dead-letter-exchange' => ['S', $xDeadLetterExchange],
  33. 'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
  34. ]);
  35. $this->setQueue($queueName);
  36. return $this;
  37. }
  38. }

DelayProducer.php

  1. <?php
  2. declare(strict_types = 1);
  3. namespace App\Components\Amqp;
  4. use Hyperf\Amqp\Annotation\Producer;
  5. use Hyperf\Amqp\Builder;
  6. use Hyperf\Amqp\Message\ProducerMessageInterface;
  7. use Hyperf\Di\Annotation\AnnotationCollector;
  8. use PhpAmqpLib\Message\AMQPMessage;
  9. use Throwable;
  10. class DelayProducer extends Builder
  11. {
  12. /**
  13. * @param ProducerMessageInterface $producerMessage
  14. * @param AmqpBuilder $queueBuilder
  15. * @param bool $confirm
  16. * @param int $timeout
  17. *
  18. * @return bool
  19. * @throws \Throwable
  20. */
  21. public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
  22. {
  23. return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
  24. {
  25. return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
  26. });
  27. }
  28. /**
  29. * @param ProducerMessageInterface $producerMessage
  30. * @param AmqpBuilder $queueBuilder
  31. * @param bool $confirm
  32. * @param int $timeout
  33. *
  34. * @return bool
  35. * @throws \Throwable
  36. */
  37. private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
  38. {
  39. $result = false;
  40. $this->injectMessageProperty($producerMessage);
  41. $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
  42. $pool = $this->getConnectionPool($producerMessage->getPoolName());
  43. /** @var \Hyperf\Amqp\Connection $connection */
  44. $connection = $pool->get();
  45. if ($confirm) {
  46. $channel = $connection->getConfirmChannel();
  47. } else {
  48. $channel = $connection->getChannel();
  49. }
  50. $channel->set_ack_handler(function () use (&$result)
  51. {
  52. $result = true;
  53. });
  54. try {
  55. // 处理延时队列
  56. $exchangeBuilder = $producerMessage->getExchangeBuilder();
  57. // 队列定义
  58. $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
  59. // 路由定义
  60. $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
  61. // 队列绑定
  62. $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
  63. // 消息发送
  64. $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
  65. $channel->wait_for_pending_acks_returns($timeout);
  66. } catch (Throwable $exception) {
  67. // Reconnect the connection before release.
  68. $connection->reconnect();
  69. throw $exception;
  70. }
  71. finally {
  72. $connection->release();
  73. }
  74. return $confirm ? $result : true;
  75. }
  76. /**
  77. * @param ProducerMessageInterface $producerMessage
  78. */
  79. private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
  80. {
  81. if (class_exists(AnnotationCollector::class)) {
  82. /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
  83. $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
  84. if ($annotation) {
  85. $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
  86. $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
  87. }
  88. }
  89. }
  90. }

处理超时订单

Orderqueueconsumer.Php

Orderqueueproducer.Php

Orderqueueproducer.php

  1. <?php
  2. declare(strict_types = 1);
  3. namespace App\Amqp\Producer;
  4. use Hyperf\Amqp\Annotation\Producer;
  5. use Hyperf\Amqp\Builder\ExchangeBuilder;
  6. use Hyperf\Amqp\Message\ProducerMessage;
  7. /**
  8. * @Producer(exchange="order_exchange", routingKey="order_exchange")
  9. */
  10. class OrderQueueProducer extends ProducerMessage
  11. {
  12. public function __construct($data)
  13. {
  14. $this->payload = $data;
  15. }
  16. public function getExchangeBuilder() : ExchangeBuilder
  17. {
  18. return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
  19. }
  20. }

Orderqueueconsumer.php

  1. <?php
  2. declare(strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\Service\CityTransport\OrderService;
  5. use Hyperf\Amqp\Result;
  6. use Hyperf\Amqp\Annotation\Consumer;
  7. use Hyperf\Amqp\Message\ConsumerMessage;
  8. /**
  9. * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
  10. */
  11. class OrderQueueConsumer extends ConsumerMessage
  12. {
  13. public function consume($data) : string
  14. {
  15. ##业务处理
  16. }
  17. public function isEnable() : bool
  18. {
  19. return true;
  20. }
  21. }

Demo

  1. $builder = new AmqpBuilder();
  2. $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
  3. $que = ApplicationContext::getContainer()->get(DelayProducer::class);
  4. var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))