如何解决未收到来自Android MQTT的消息发布到RabbitMQ
我已经设置RabbitMQ,启用了用于管理的Web UI,启用了mqtt_plugin以及端口1883、8883、5672和15672(Docker)。我将Paho MqttClient用于我正在开发的Android应用程序,用于将消息发布到MQ代理。连接很好,但是在Web UI和CLI上没有收到作为检查的消息。
连接页面:
队列页面:
下面是我正在处理的代码。
<com.google.android.gms.ads.AdView
xmlns:ads="http://schemas.android.com/apk/res-auto"
android:id="@+id/adView"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_centerHorizontal="true"
android:layout_alignParentBottom="true"
ads:adSize="BANNER"
ads:adUnitId="ca-app-pub-3940256099942544/6300978111">
</com.google.android.gms.ads.AdView>
我一直在寻找答案,并尝试重新安装MQ,但是仍然得到相同的结果。
解决方法
这是coonectMq的常见扩展,并从中获取消息。 (MqConnectionExtention.kt)
fun Context.connectMq(publishTopicChannelName: String,onConnectionSuccess: (topic: String?,message: MqttMessage?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this,"tcp://34.212.00.188:1883",mClientId)
Timber.e("ChannelName:$publishTopicChannelName")
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean,serverURI: String) {
if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
Log.e("TAG","Reconnected to : $serverURI")
// Because Clean Session is true,we need to re-subscribe
try {
mqttAndroidClient.subscribe(publishTopicChannelName,object : IMqttMessageListener {
override fun messageArrived(topic: String?,message: MqttMessage?) {
onConnectionSuccess(topic,message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG","Connected to: $serverURI")
}
}
override fun connectionLost(cause: Throwable) {
Log.e("TAG","The Connection was lost.")
}
override fun messageArrived(topic: String,message: MqttMessage) {
Log.e("TAG","Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME","MQ_CONNECTION_PASSWORD")
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions,null,object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
try {
mqttAndroidClient.subscribe(publishTopicChannelName,message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
}
override fun onFailure(asyncActionToken: IMqttToken,exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
}
})
} catch (ex: MqttException) {
ex.printStackTrace()
}
}
private fun setUpConnectionOptions(username: String,password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
从Java类中,我像下面这样调用它并成功获取消息:
private void subscribeMQForVideo() {
MqConnectionExtentionKt.connectMq(mContext,"mq_video_channel_name",(topic,mqttMessage) -> {
// message Arrived!
Log.e("TAG","Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));
return null;
});
}
要发布类似扩展的消息,我创建的扩展几乎没有什么不同。 (MqConnectionPublishExtention.kt)
fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this,BuildConfig.MQ_SERVER_URI,mClientId)
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean,we need to re-subscribe
onConnectionSuccess(mqttAndroidClient)
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG","Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME,BuildConfig.MQ_CONNECTION_PASSWORD)
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions,object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
onConnectionSuccess(mqttAndroidClient)
}
override fun onFailure(asyncActionToken: IMqttToken,password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
发布来自Java类的消息
private void publishExerciseDataToMQChannel() {
MqConnectionPublishExtentionKt.connectMq(mContext,(mqttAndroidClient) -> {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("params",mlParams);
jsonObject.put("workoutid",workoutId);
jsonObject.put("userid",model.getUserIdFromPrefs());
jsonObject.put("stream_id",streamDataModel.getStreamId());
MqttMessage message = new MqttMessage();
message.setPayload(jsonObject.toString().getBytes());
mqttAndroidClient.publish("Channel_name",message);
Log.e("TAG",message.getQos() + "");
if (!mqttAndroidClient.isConnected()) {
Log.e("TAG",mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
} catch (JSONException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
return null;
});
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。