MQTT(EMQX) - Java 调用 MQTT Demo 代码

mqtt,emqx,java,调用,demo,代码 · 浏览次数 : 193

小编点评

```java // POM<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version></dependency> // Client.java import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; public class Client { public static void main(String[] args) throws Exception { String host = "tcp://172.16.3.88:1883"; String topic = "VipSoft_MQTT"; String clientId = "client_id"; // 1.设置mqtt连接属性 MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); // 2.实例化mqtt客户端 MqttClient client = new MqttClient(host, clientId); // 3.连接 client.connect(options); // 4.设置回调 client.setCallback(new PushCallback()); // 5.监听连接状态变化 while (true) { client.subscribe(topic, 2); } } //回调类 private static class PushCallback implements MqttCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void connectionLost(Throwable cause) { // 连接丢失后进行重连 System.out.println("连接断开,可以做重连\"); logger.info("掉线时间:{}\", new Date()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 接收已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 System.out.println("deliveryComplete---------\" + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 // System.out.println(message); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } } } ```

正文

POM

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

Service.java

package com.vipsoft.mqtt;


import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Scanner;

public class Service {

    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "server_id"; // clientId不能重复这个是server的id
        //新建mqtt连接
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        //新建mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        client.connect(options);
        //新建mqtt消息
        MqttMessage message = new MqttMessage();

        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入要发送的内容:");
        while (true) {
            String MsgMessage= scanner.nextLine();
            message.setPayload(MsgMessage.getBytes());
            client.publish(topic, message);
        }

    }
}

Client.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

public class Client {

    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "client_id";
        // 1.设置mqtt连接属性
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        // 2.实例化mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        // 3.连接
        client.connect(options);
        //这里的setCallback需要新建一个Callback类并实现 MqttCallback 这个类
        client.setCallback(new PushCallback());
        while (true) {
            client.subscribe(topic, 2);
        } 
    }
}

PushCallback.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */
public class PushCallback implements MqttCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后进行重连
        System.out.println("连接断开,可以做重连");
        logger.info("掉线时间:{}", new Date());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        // System.out.println(message);
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}

image
image

与MQTT(EMQX) - Java 调用 MQTT Demo 代码相似的内容:

MQTT(EMQX) - Java 调用 MQTT Demo 代码

POM org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.2 Se

MQTT(EMQX) - Linux CentOS Docker 安装

Linux CentOS Docker 安装 MQTT(EMQX), el7-amd64 => EL 是 Red Hat Enterprise Linux 的简写, “el7” 表示的是 centos7/redhat7, amd64 一般指:x86-64

MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图

MQTT连接池 主要用到 `InitializingBean、BasePooledObjectFactory、GenericObjectPool、GenericObjectPoolConfig` MQTT是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的信

使用 Helm 安装 MQTT 服务器-EMQX

EMQX ℹ️ Info: 使用 EMQX 通过 Helm3 在 Kubernetes 上部署 EMQX 4.0 集群 | EMQ emqx/deploy/charts/emqx at main-v4.4 · emqx/emqx (github.com) emqx/values.yaml at ma

EMQX配置ssl/tls双向认证+SpringBoot项目整合MQTT_真实业务实践

一.使用docker搭建Emqx 1.拉取emqx镜像 docker pull emqx/emqx:5.7 2.运行 docker run -d --name emqx emqx/emqx:5.7 3.拷贝 docker中 etc data log 到宿主机的 /opt/emqx 下 mkdir -

一款基于C#开发的通讯调试工具(支持Modbus RTU、MQTT调试)

前言 今天大姚给大家分享一款基于C#、WPF、Prism、MaterialDesign、HandyControl开发的通讯调试工具(支持Modbus RTU、MQTT调试,界面色彩丰富):Wu.CommTool。 工具特点 工具界面色彩丰富。 支持Modbus RTU、MQTT服务器、MQTT客户端

手把手教大家写书写一个Mqtt网关

摘要:物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。 本文分享自华为云社区《一文带你掌握物联网mqtt网关搭建背后的技术原理》,作者:张俭。 前言 物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其

轻量通讯协议 --- MQTT

介绍 一、MQTT简介 MQTT(Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,通常用于在物联网(IoT)和传感器网络中进行通信。它设计用于在低带宽、不稳定或高延迟的网络环境下传输数据,因此非常适用于连接设备之间的通信,尤其是在资源有限的环境中

教你2种方法,将iOS设备通过MQTT协议连接到华为云物联网平台

本文讲述如何使用Flutter和Swift两种开发语言连接到华为云物联网平台。

[转帖]Web技术(七):如何使用并实现MQTT 消息订阅-发布模型?

文章目录 一、什么是发布-订阅消息模型?二、订阅-发布消息模型有哪些应用?2.1 应用于IP 物联网络中的消息传递2.2 应用于操作系统进程间的消息传递2.3 应用于MESH 自组网中的消息传递 三、MQTT 如何实现订阅-发布消息模型?3.1 如何在本机实践MQTT 通信并抓包分析?3.2 MQT