Hyperf自定义进程-mqtt使用做个记录
composer require php-mqtt/client:*
<?php
declare(strict_types=1);
/**
* Notes:
* Author: lt
* Date: 2022/11/15 0015
* Time: 9:40
* Version: 1
*/
namespace App\Process;
use App\Constants\BigEnergyType;
use App\Constants\BigType;
use App\Service\BigScreenDataFormatService;
use App\Service\EnergyBigScreenDataFormatService;
use App\utils\EmqxUtils;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Utils\Codec\Json;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
use Hyperf\Utils\Coroutine;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\Exceptions\InvalidMessageException;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;
/**
* @Process(name="diggings_mqtt_process")
*/
class DiggingsMqttProcess extends AbstractProcess
{
/**
* 进程数量
* @var int
*/
public $nums = 1;
/**
* 进程名称
* @var string
*/
public $name = 'diggings-process';
/**
* 重定向自定义进程的标准输入和输出
* @var bool
*/
public $redirectStdinStdout = false;
/**
* 管道类型
* @var int
*/
public $pipeType = 2;
/**
* 是否启用协程
* @var bool
*/
public $enableCoroutine = true;
/**
* @throws ConnectingToBrokerFailedException
* @throws MqttClientException
* @throws RepositoryException
* @throws ConfigurationInvalidException
* @throws ProtocolViolationException
* @throws InvalidMessageException
* @throws ProtocolNotSupportedException
* @throws DataTransferException
*/
public function handle(): void
{
$logger = di()->get(StdoutLoggerInterface::class);
$server = env('MQTT_SERVER');
$port = env('MQTT_PORT');
$clientId = "saas_bigscreen_". uniqid() . '_' . rand(0, 99999) . "_" . Coroutine::id();
$username = env('MQTT_USERNAME');
$password = env('MQTT_PASSWORD');
$clean_session = env('CLEAN_SESSION');
try{
$mqtt = new MqttClient((string) $server, (int) $port, $clientId);
$connectionSettings = (new ConnectionSettings);
// ->setUsername($username)
// ->setPassword($password);
$mqtt->connect($connectionSettings, $clean_session);
$id = 1;
$roll = '发生异常或故障请及时处理!'; //滚动推送
$logger->notice("DiggingsMqttProcess MQTT Server Connect...");
$mqtt->subscribe("saas/{$id}/#", function ($topic, $message) use ($id, $roll, $mqtt) {
$data = Json::decode($message);
$res = di()->get(BigScreenDataFormatService::class)->dataFormat((array)$data, $topic);
$topic = "saas_big_screen/{$id}/" . BigType::getMessage($res['type']);
$payload = Json::encode($res);
$mqtt->publish($topic, $payload, 1, true);
if ($data['status'] && $data['status'] !== 1) {
$mqtt->publish($topic. "msg", Json::encode(
$roll
), 1, false);
}
}, 1);
$mqtt->loop(true);
$mqtt->disconnect();
} catch (\Throwable $ex) {
$logger->error($ex->getMessage());
}
}
public function isEnable($server): bool
{
// 跟随服务启动一同启动
return true;
}
}
客户端工具推荐:
https://github.com/thomasnordquist/MQTT-Explorer/releases
- 版权申明:此文如未标注转载均为本站原创,自由转载请表明出处《龙行博客》。
- 本文网址:https://www.liaotaoo.cn/422.html
- 上篇文章:没有了
- 下篇文章:Hyperf漏斗计数器限流