微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

android – 如何在离线时存储数据mqtt并在线时发送它们

我有一个问题,当我的连接入侵时,mqtt发布没有重新连接发送,如何解决?我跟随this answer但没有工作

我做了什么:

>我已经实现了服务mqtt来发送gps位置并在网上正常工作.
>将Qos设置为1.
>设置ClientId已修复.
>将发布Qos设置为1.
>将clean session设置为false

但是当我重新连接的结果仍然是我在线时发布的数据&不发布存储的持久性数据.

这是我的源代码

package id.trustudio.android.mdm.service;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.content.pm.ApplicationInfo;
import android.content.pm.PackageManager;
import android.net.TrafficStats;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.UnsupportedEncodingException;

import id.trustudio.android.mdm.http.DetectConnection;
import id.trustudio.android.mdm.util.Cons;
import id.trustudio.android.mdm.util.Debug;
import id.trustudio.android.mdm.util.GPSTracker;
import id.trustudio.android.mdm.util.GPSTracker2;

public class MqttService extends Service implements MqttCallback {

    public static boolean isstarted = false;

    private double latitude  = 0;
    private double longitude = 0;
    private GPSTracker mGPSTracker;
    private GPSTracker2 mGPSTracker2;

    boolean isInternetPresent = false;

    private SharedPreferences mPrivatePref;
    private SharedPreferences.Editor editor;

    private DetectConnection mDetectConnection;
    String deviceid,Name;
    int totalbyte;
    String packages;
    MemoryPersistence persistence;
    String clientId;
    MqttAndroidClient client;

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onCreate() {
        super.onCreate();

        mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE);
        editor = mPrivatePref.edit();

        deviceid = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", "");
        Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", "");

        clientId = MqttClient.generateClientId();
        persistence = new MemoryPersistence();

        client =
                new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883",
                        clientId, persistence);

        client.setCallback(this);

        try{
            MqttConnectOptions connopts = new MqttConnectOptions();
            connopts.setCleanSession(false);
            client.connect(connopts,null, new IMqttActionListener() {

                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {

                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

                        }
                    });
        }catch (Exception e){
            e.printstacktrace();
        }

        mHandler.postDelayed(mUpdateTask, 1000);
    }


    public int onStartCommand(Intent intent, int flags, int startId) {

        int res = super.onStartCommand(intent, flags, startId);

        //check if your service is already started
        if (isstarted){      //yes - do nothing
            return Service.START_STICKY;
        } else {             //no
            isstarted = true;
        }

        return Service.START_STICKY;

    }

    private Handler mHandler = new Handler();
    private Runnable mUpdateTask = new Runnable() {
        public void run() {

            getLatLng();
            if (latitude == 0.0 || longitude == 0.0) getLatLngWifi();

                        Debug.e("MQTT","Connect");
                        String topic = "gps/kodeupi/kodeap/kodeup/" + deviceid;
                        Debug.e("MQTT CLIENT", clientId);
                        int qos = 1;
                        try {
                            IMqttToken subToken = client.subscribe(topic, qos);
                            subToken.setActionCallback(new IMqttActionListener() {
                                @Override
                                public void onSuccess(IMqttToken asyncActionToken) {
                                    // The message was published

                                    String topic = "gps/kodeupi/kodeap/kodeup/" + deviceid;
                                    long CurrentTime = System.currentTimeMillis();

                                    String payload = deviceid + "|" + latitude + "|" + longitude + "|" + CurrentTime;

                                    byte[] encodedPayload = new byte[0];
                                    try {
                                        encodedPayload = payload.getBytes("UTF-8");
                                        MqttMessage message = new MqttMessage(encodedPayload);
                                        client.publish(topic, message);
                                        message.setRetained(true);
                                        // set quality of service
                                        message.setQos(1);
                                        Log.d("TAG", "onSuccess");
                                    } catch (UnsupportedEncodingException | MqttException e) {
                                        e.printstacktrace();
                                    }
                                }

                                @Override
                                public void onFailure(IMqttToken asyncActionToken,
                                                      Throwable exception) {
                                    // The subscription Could not be performed, maybe the user was not
                                    // authorized to subscribe on the specified topic e.g. using wildcards

                                }
                            });
                        } catch (MqttException e) {
                            e.printstacktrace();
                        }

            mHandler.postDelayed(this, 20000);
        }
    };

    private void getLatLng() {
        mGPSTracker2        = new GPSTracker2(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker2.canGetLocation()) {
                latitude    = mGPSTracker2.getLatitude();
                longitude   = mGPSTracker2.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }
            } else {
//              getLatLngWifi();
                Debug.i(Cons.TAG, "on gps Failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "no connection");

            if(mGPSTracker2 != null)
                mGPSTracker2.stopUsingGPS();
        }
    }

    private void getLatLngWifi() {
        mGPSTracker         = new GPSTracker(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker.canGetLocation()) {
                latitude    = mGPSTracker.getLatitude();
                longitude   = mGPSTracker.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }

            } else {
                Debug.i(Cons.TAG, "wifi " + "on gps Failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "wifi " + "no connection");

            if(mGPSTracker != null)
                mGPSTracker.stopUsingGPS();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }
}

对不起,我的英语不好

解决方法:

正如评论中所述.

这是您必须自己编写的代码,不支持存储由于客户端在框架中断开连接而未发送的消息. MQTT持久性仅用于确保在QOS握手完成之前与代理的连接断开时,QOS 1/2的消息不会丢失.

如果尝试在断开连接时发布消息,则client.publish(topic,message)调用将引发异常,您需要捕获此异常,然后安排在重新建立连接时存储消息的内容.连接备份并运行后,您需要迭代存储的详细信息并再次尝试发送.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐