package com.sandu.ximon.admin.localMQTT.callback;
|
|
|
import com.alibaba.fastjson.JSON;
|
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
|
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
|
import com.sandu.ximon.admin.utils.RedisUtils;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Map;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
|
|
/**
|
* @author van
|
* @version 1.0
|
* msg:默认回调
|
* @date 2022/11/9 16:24
|
*/
|
@Slf4j
|
@Component("java_server_status")
|
public class StatusMqttCallBack extends AbsMqttCallBack {
|
|
private static final String localMqttConnectStatusConnected = "connected";
|
|
private static final String localMqttConnectStatusDisconnected = "disconnected";
|
|
public static final String localMqttConnectStatus = "localMqttConnectStatus.";
|
|
|
|
protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
Runtime.getRuntime().availableProcessors(),
|
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory());
|
|
static class NameTreadFactory implements ThreadFactory {
|
|
private final AtomicInteger mThreadNum = new AtomicInteger(1);
|
|
@Override
|
public Thread newThread(Runnable r) {
|
return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
|
}
|
}
|
|
@Override
|
protected void handleReceiveMessage(String topic, String message) {
|
EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
|
|
// log.info("接收到消息---StatusMqttCallBack:topic={},message={}", topic, message);
|
}
|
|
|
/**
|
* 在这里处理您收到消息后的具体业务逻辑。
|
*/
|
private void processMessage(String topic,String messageString) {
|
try {
|
|
String mac = topic.split("/")[4];
|
String status = topic.split("/")[5];
|
|
if (status.equals(localMqttConnectStatusConnected)){
|
// 设备数据上报
|
boolean set = RedisUtils.getBean().set(localMqttConnectStatus + mac, 1);
|
log.error("设备上线------,mac:{},上线,{}",mac,set);
|
}
|
else if (status.equals(localMqttConnectStatusDisconnected)){
|
boolean set = RedisUtils.getBean().set(localMqttConnectStatus+mac,0);
|
log.error("设备下线------,mac:{},下线.{}",mac,set);
|
}
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
}
|