龙行博客

走路看风景,经历看人生,岁月留痕迹,人生留轨迹,17的历史,18的豪情,时间的匆忙,人生的风景,放开心胸往前走,成功再远行,放开理想往前走,梦想再行动。
现在位置:首页 > 编程语言 > PHP > Hyperf自定义进程-mqtt使用做个记录

Hyperf自定义进程-mqtt使用做个记录

龙行    PHP    2022-11-29    123    0评论    

Hyperf自定义进程-mqtt使用做个记录

composer require php-mqtt/client:*

<?php
declare(strict_types=1);
/**
* Notes:
* Author: lt
* Date: 2022/11/15 0015
* Time:  9:40
* Version: 1
*/

namespace App\Process;

use App\Constants\BigEnergyType;
use App\Constants\BigType;
use App\Service\BigScreenDataFormatService;
use App\Service\EnergyBigScreenDataFormatService;
use App\utils\EmqxUtils;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Utils\Codec\Json;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
use Hyperf\Utils\Coroutine;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\Exceptions\InvalidMessageException;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;

/**
* @Process(name="diggings_mqtt_process")
*/
class DiggingsMqttProcess extends  AbstractProcess
{

   /**
    * 进程数量
    * @var int
    */
   public $nums = 1;

   /**
    * 进程名称
    * @var string
    */
   public $name = 'diggings-process';

   /**
    * 重定向自定义进程的标准输入和输出
    * @var bool
    */
   public $redirectStdinStdout = false;

   /**
    * 管道类型
    * @var int
    */
   public $pipeType = 2;

   /**
    * 是否启用协程
    * @var bool
    */
   public $enableCoroutine = true;

   /**
    * @throws ConnectingToBrokerFailedException
    * @throws MqttClientException
    * @throws RepositoryException
    * @throws ConfigurationInvalidException
    * @throws ProtocolViolationException
    * @throws InvalidMessageException
    * @throws ProtocolNotSupportedException
    * @throws DataTransferException
    */
   public function handle(): void
   {
       $logger = di()->get(StdoutLoggerInterface::class);
       $server   = env('MQTT_SERVER');
       $port     = env('MQTT_PORT');
       $clientId = "saas_bigscreen_". uniqid() . '_' . rand(0, 99999) . "_" . Coroutine::id();
       $username = env('MQTT_USERNAME');
       $password = env('MQTT_PASSWORD');
       $clean_session = env('CLEAN_SESSION');

       try{
           $mqtt = new MqttClient((string) $server, (int) $port, $clientId);
           $connectionSettings = (new ConnectionSettings);
//            ->setUsername($username)
//            ->setPassword($password);
           $mqtt->connect($connectionSettings, $clean_session);
           $id = 1;

           $roll = '发生异常或故障请及时处理!'; //滚动推送
           $logger->notice("DiggingsMqttProcess MQTT Server Connect...");
           $mqtt->subscribe("saas/{$id}/#", function ($topic, $message) use ($id, $roll, $mqtt) {
               $data = Json::decode($message);
               $res = di()->get(BigScreenDataFormatService::class)->dataFormat((array)$data, $topic);
               $topic = "saas_big_screen/{$id}/" . BigType::getMessage($res['type']);

               $payload = Json::encode($res);

               $mqtt->publish($topic, $payload, 1, true);

               if ($data['status'] && $data['status'] !== 1) {
                   $mqtt->publish($topic. "msg", Json::encode(
                       $roll
                   ), 1, false);

               }

           }, 1);

           $mqtt->loop(true);
           $mqtt->disconnect();
       } catch (\Throwable $ex) {
           $logger->error($ex->getMessage());
       }



   }

   public function isEnable($server): bool
   {
       // 跟随服务启动一同启动
       return true;
   }
}

客户端工具推荐:

https://github.com/thomasnordquist/MQTT-Explorer/releases


评论一下 分享本文 赞助站长

赞助站长X

扫码赞助站长
联系站长
龙行博客
  • 版权申明:此文如未标注转载均为本站原创,自由转载请表明出处《龙行博客》。
  • 本文网址:https://www.liaotaoo.cn/422.html
  • 上篇文章:没有了
  • 下篇文章:Hyperf漏斗计数器限流
  • hyperf
快捷导航
联系博主
在线壁纸
给我留言
四四五五
音乐欣赏
返回顶部