Skip to content

easyswoole 使用php-amqplib/php-amqplib 进行自定义消费会出现mysql连接断开 #534

@645263

Description

@645263

软件版本:
easyswoole: 3.5.1
php-amqplib/php-amqplib:3.1.2
swoole: 4.4.23

问题出现时机:
1、项目运行1天样子基本就会出现 SQLSTATE[HY000] [2006] MySQL server has gone away ,一旦自定义进程出现这个报错,必须重启进程才可以解决,
2、在项目消费者里面已经尝试加了 \Co::sleep(0.01);,并未解决mysql 链接断开的问题
一旦自定义进程报 MySQL server has gone away,后面的消费 依然是继续报:
Connection reset by peer or Transport endpoint is not connected

群里网名“ 那就这样吧” 的开发者也遇到同样的问题

目前的解决办法:
1、捕获异常,重新入列,然后重启进程

希望官方解决一下这个问题,感谢

自定义进程代码:

<?php
namespace App\Process\RabbitMqConsumer;
use App\HttpController\ShopApi\Service\ClientService;
use App\Models\VipPayOrderModel;
use App\Service\LogService;
use App\Service\MqFanoutService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\RabbitMq\MqJob;
use EasySwoole\RabbitMq\MqQueue;
use EasySwoole\RabbitMq\RabbitMqQueueDriver;

/**
 * mq广播消费进程
 * Class ClientRegisterFinishProcess
 * @package App\Process
 */
class RabbitMqConsumerProcess extends AbstractProcess
{
    protected function run($arg)
    {
        $queueName  = 'WORK_QUEUE';
        $config     = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
        $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
        $channel    = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $callback = function ($msg) use ($channel){
            $headersObject = $msg->get_properties()['application_headers'];
            $headersArray  = $headersObject->getNativeData();
            $body = json_decode($msg->body, true);
            $tag = $body['desc'];
            $msgId = $body['msgId'];
            $listenerKey = $body['listenerKey'];
            try {
                if ($listenerKey){
                    \Co::sleep(0.01);
                    \EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function () use ($body){
                        WorkQueueService::handler($body);
                    });
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                }
            }catch (\Throwable $e){
                LogService::updateRabbitMqLog($msgId,'RabbitMqQueueConsumerProcess 消费异常:'.$e->getMessage(),2);
                if($headersArray['retry'] > $headersArray['maxRetry']){
                    Logger::getInstance()->error("{$tag}-达到最大重试次数!,停止重试,参数={$msg->body},errMsg=".$e->getMessage());
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    //todo 这里需要写db
                    return;
                }
                Logger::getInstance()->error("{$tag}-工作队列处理异常,开始重试!,参数={$msg->body},errMsg=".$e->getMessage());
                $headersArray['retry']++;
                Logger::getInstance()->waring("{$tag}-消息ID:".$body['msgId'].' 第'.$headersArray['retry'].'次失败,消息重新入队');
                $exchange      = $msg->getExchange();
                $routingKey    = $msg->getRoutingKey();
                $body          = $msg->getBody();
                \Co::sleep(5);
                $msg->delivery_info['channel']->basic_publish(
                    new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
                    $exchange,
                    $routingKey
                );
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        };
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

    protected function onException(\Throwable $throwable, ...$args)
    {
        Logger::getInstance()->error("广播队列异常:".$throwable->getMessage());
        Logger::getInstance()->error("广播队列异常trace:".$throwable->getTraceAsString());
        parent::onException($throwable, $args);
    }

    protected function onShutDown()
    {
        Logger::getInstance()->waring("RabbitMqConsumerProcess 进程退出了");
        parent::onShutDown(); // TODO: Change the autogenerated stub
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions