payload를 보통 String 으로 많이 쓰는데 IoT ESP 임베디드 레벨에서는 String으로 보내는데 스트레스가 있는듯하다. 실제로 길이도 많이 늘어난다.
하여 간단히 페이로드를 바이트 배열로 받는 설정을 기록한다.
서버 역할을 하기 위해 cTs이 붙은것만 받는다.
제품은 sTc가 붙은것만 받는다.
바이트배열로 받는 핵심 부분은 아래부분이다.
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
@Configuration
public class MqttConfig {
// MQTT 브로커 연결 설정
private static final String BROKER_URL = "tcp://localhost:1883"; // RabbitMQ MQTT 브로커 URL
private static final String CLIENT_ID = "spring-boot-server-" + UUID.randomUUID();
private static final String SUBSCRIBE_TOPIC = "cTs/#"; // 요청 구독 토픽
/**
* MQTT 클라이언트 팩토리 설정
* - 브로커 연결 정보를 포함
* - 자동 재연결 활성화
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{BROKER_URL});
options.setUserName("server");
options.setPassword("server".toCharArray());
options.setAutomaticReconnect(true); // 자동 재연결
factory.setConnectionOptions(options);
return factory;
}
// ** 구독부 (Inbound) **================================================================================================
/**
* 요청 메시지를 수신하는 채널 정의
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT 메시지 수신 어댑터 설정
* - 특정 토픽 구독
* - 페이로드를 바이트 배열(byte[])로 처리
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID + "receiver", factory, SUBSCRIBE_TOPIC);
adapter.setOutputChannel(mqttInputChannel());
// 페이로드를 바이트 배열(byte[])로 처리하도록 설정
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(1); // QoS 1로 설정
return adapter;
}
/**
* 요청 메시지 처리 핸들러
* - 요청 메시지를 출력
* - 특정 토픽으로 응답 발행
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(org.springframework.messaging.Message<byte[]> message) {
try {
// 메시지 페이로드와 토픽 정보 추출
System.out.println(message.getHeaders().toString());
byte[] payload = message.getPayload();
String topic = ((String) message.getHeaders().get("mqtt_receivedTopic")).replace("cTs", "").replaceAll("/", ".");
// 수신 메시지 출력
System.out.println("Received message:");
for (byte b : payload) {
System.out.printf("%02X ", b);
}
System.out.println("\nPayload length: " + payload.length);
System.out.println("Received from topic: " + topic);
// 응답 메시지 생성 및 발행
byte[] responsePayload = service.processR_mqtt(payload);
System.out.println("Response message:");
for (byte b : responsePayload) {
System.out.printf("%02X ", b);
}
System.out.println("\nPayload length: " + responsePayload.length);
MessageChannel outboundChannel = mqttOutboundChannel();
outboundChannel.send(org.springframework.integration.support.MessageBuilder.withPayload(responsePayload)
.setHeader("mqtt_topic", "sTc"+topic)
.build());
System.out.println("Response sent to topic: sTc"+topic);
} catch (Exception e) {
e.printStackTrace();
}
}
// ** 발행부 (Outbound) **================================================================================================
/**
* 응답 메시지를 발행하는 채널 정의
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT 메시지 발행 핸들러 설정
* - 비동기 발행 활성화
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutboundHandler(MqttPahoClientFactory factory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(CLIENT_ID + "sender", factory);
handler.setDefaultQos(1);
handler.setAsync(true); // 비동기 발행
return handler;
}
// /**
// * 발행 테스트 메서드
// * - 특정 바이트 배열 데이터를 특정 토픽으로 발행
// */
// @ServiceActivator(inputChannel = "mqttOutboundChannel")
// public void publishMessage(MessageChannel mqttOutboundChannel) {
// byte[] payload = new byte[]{0x52, 0x61, 0x00, (byte) 0xFF}; // 발행할 바이트 배열
// mqttOutboundChannel.send(org.springframework.integration.support.MessageBuilder.withPayload(payload)
// .setHeader("mqtt_topic", "sTc.01.00001.001.045.020")
// .build());
// System.out.println("Message published: " + payload.length + " bytes");
// }
}