智宇物聯 專注于提供高穩定、高速率的三網物聯網卡
低帶寬環境下的物聯網傳輸協議–MQTT
- 作者:智宇物聯
- 發表時間:2022年10月13日
- 來源:智宇物聯
MQTT是什么
MQTT由IBM公司開發,是一個即時通訊協議,也是一個物聯網傳輸協議,主要用于輕量級的訂閱/發布式的消息傳輸。其設計目的主要是為低帶寬和不穩定網絡環境下的物聯網設備提供服務。

MQTT中的概念
- 訂閱(Subscribtion): 訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器
- 會話(Session): 每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在于一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。
- 主題名(Topic Name): 連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。 需要注意的是,MQTT中消息主題按照層級命名,使用 ‘/’ 進行分割 此外,主題中可以使用通配符進行多個主題或多層級的訂閱,有兩種常見的通配符: 單層通配符 +:單層通配符只能匹配一層的主題,例如:China/Beijing/+,可以匹配的只有Beijing這個主題下面一層的主題,例如Xicheng, DongCheng, Xuanwu等等。 多層通配符 #:顧名思義,多層通配符就是可以匹配多個層級的主題,例如:China/#,可以匹配到的主題可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong,等等。
- 主題篩選器(Topic Filter): 一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
- 負載(Payload): 消息訂閱者所具體接收的內容
MQTT協議的使用
MQTT是一個輕量的發布訂閱模式消息傳輸協議,專門針對低帶寬和不穩定網絡環境的物聯網應用設計。
特點
- 開放消息協議,簡單易實現
- 發布訂閱模式,一對多消息發布
- 基于TCP/IP網絡連接
- 1字節固定報頭,2字節心跳報文,報文結構緊湊
- 消息QoS支持,可靠傳輸保證
MQTT協議基于主題(Topic)進行消息路由,主題(Topic)類似URL路徑

使用MQTT連接EMQ
生產端
@Configuration
public class MqttSenderConfig {
@Value("${emq.connection.mqtt_url}")
private String MQTT_URL;
@Value("${emq.connection.client_token}")
private String TOKEN;
@Value("${emq.connection.client_id}")
private String CLIENT_ID;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{MQTT_URL});
options.setUserName(CLIENT_ID);
options.setPassword(TOKEN.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("topic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, @Header(MqttHeaders.RETAINED) Boolean retained, String payload);
}
retained true 保留數據,Broker會存儲每個Topic的最后一條保留消息及其Qos,當訂閱該Topic的客戶端上線后,Broker需要將該消息投遞給它。
消費端
@Configuration
@Slf4j
public class MqttInboundConfiguration {
@Value("${emq.connection.mqtt_url}")
private String MQTT_URL;
@Value("${emq.connection.client_token}")
private String TOKEN;
@Value("${emq.connection.client_id}")
private String CLIENT_ID;
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MQTT_URL, CLIENT_ID,
"topic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
}
@Slf4j
@Component
public class MqttSubscribeImpl {
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<?> message) throws MessagingException {
log.info(message.getPayload().toString());
}
文章標簽:
最新資訊