• 热门专题

Rxjava+ReTrofit+okHttp深入浅出终极封装四(多文件下载之断点续传)

作者:  发布日期:2016-11-10 20:08:36
Tag标签:断点  文件下载  终极  
  • Rxjava+ReTrofit+okHttp深入浅出-终极封装四(多文件下载之断点续传)

    背景

    断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

    效果

    实现

    下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

    1.创建service接口

    和以前一样,先写接口
    注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)

     /*断点续传下载接口*/
        @Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
        @GET
        Observable<ResponseBody> download(@Header('RANGE') String start, @Url String url);

    2.复写ResponseBody

    和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调

    /**
     * 自定义进度的body
     * @author wzg
     */
    public class DownloadResponseBody extends ResponseBody {
        private ResponseBody responseBody;
        private DownloadProgressListener progressListener;
        private BufferedSource bufferedSource;
    
        public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
            this.responseBody = responseBody;
            this.progressListener = progressListener;
        }
    
        @Override
        public BufferedSource source() {
            if (bufferedSource == null) {
                bufferedSource = Okio.buffer(source(responseBody.source()));
            }
            return bufferedSource;
        }
    
        private Source source(Source source) {
            return new ForwardingSource(source) {
                long totalBytesRead = 0L;
                @Override
                public long read(Buffer sink, long byteCount) throws IOException {
                    long bytesRead = super.read(sink, byteCount);
                    // read() returns the number of bytes read, or -1 if this source is exhausted.
                    totalBytesRead += bytesRead != -1 ? bytesRead : 0;
                    if (null != progressListener) {
                        progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
                    }
                    return bytesRead;
                }
            };
        }
    }

    3.自定义进度回调接口

    /**
     * 成功回调处理
     * Created by WZG on 2016/10/20.
     */
    public interface DownloadProgressListener {
        /**
         * 下载进度
         * @param read
         * @param count
         * @param done
         */
        void update(long read, long count, boolean done);
    }

    4.复写Interceptor

    复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody

    /**
     * 成功回调处理
     * Created by WZG on 2016/10/20.
     */
    public class DownloadInterceptor implements Interceptor {
    
        private DownloadProgressListener listener;
    
        public DownloadInterceptor(DownloadProgressListener listener) {
            this.listener = listener;
        }
    
        @Override
        public Response intercept(Chain chain) throws IOException {
            Response originalResponse = chain.proceed(chain.request());
    
            return originalResponse.newBuilder()
                    .body(new DownloadResponseBody(originalResponse.body(), listener))
                    .build();
        }
    }

    5.封装请求downinfo数据

    这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据

    public class DownInfo {
        /*存储位置*/
        private String savePath;
        /*下载url*/
        private String url;
        /*基础url*/
        private String baseUrl;
        /*文件总长度*/
        private long countLength;
        /*下载长度*/
        private long readLength;
        /*下载唯一的HttpService*/
        private HttpService service;
        /*回调监听*/
        private HttpProgressOnNextListener listener;
        /*超时设置*/
        private  int DEFAULT_TIMEOUT = 6;
        /*下载状态*/
        private DownState state;
        }

    6.DownState状态封装

    很简单,和大多数封装框架一样

    public enum  DownState {
        START,
        DOWN,
        PAUSE,
        STOP,
        ERROR,
        FINISH,
    }

    7.请求HttpProgressOnNextListener回调封装类

    注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听
    通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活

    /**
     * 下载过程中的回调处理
     * Created by WZG on 2016/10/20.
     */
    public abstract class HttpProgressOnNextListener<T> {
        /**
         * 成功后回调方法
         * @param t
         */
        public abstract void onNext(T t);
    
        /**
         * 开始下载
         */
        public abstract void onStart();
    
        /**
         * 完成下载
         */
        public abstract void onComplete();
    
    
        /**
         * 下载进度
         * @param readLength
         * @param countLength
         */
        public abstract void updateProgress(long readLength, long countLength);
    
        /**
         * 失败或者错误方法
         * 主动调用,更加灵活
         * @param e
         */
         public  void onError(Throwable e){
    
         }
    
        /**
         * 暂停下载
         */
        public void onPuase(){
    
        }
    
        /**
         * 停止下载销毁
         */
        public void onStop(){
    
        }
    }

    8.封装回调Subscriber

    准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中

    sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果

    传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态

    通过RxAndroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)

    update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)

    /**
     * 用于在Http请求开始时,自动显示一个ProgressDialog
     * 在Http请求结束是,关闭ProgressDialog
     * 调用者自己对请求数据进行处理
     * Created by WZG on 2016/7/16.
     */
    public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
        //弱引用结果回调
        private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
        /*下载数据*/
        private DownInfo downInfo;
    
    
        public ProgressDownSubscriber(DownInfo downInfo) {
            this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
            this.downInfo=downInfo;
        }
    
        /**
         * 订阅开始时调用
         * 显示ProgressDialog
         */
        @Override
        public void onStart() {
            if(mSubscriberOnNextListener.get()!=null){
                mSubscriberOnNextListener.get().onStart();
            }
            downInfo.setState(DownState.START);
        }
    
        /**
         * 完成,隐藏ProgressDialog
         */
        @Override
        public void onCompleted() {
            if(mSubscriberOnNextListener.get()!=null){
                mSubscriberOnNextListener.get().onComplete();
            }
            downInfo.setState(DownState.FINISH);
        }
    
        /**
         * 对错误进行统一处理
         * 隐藏ProgressDialog
         *
         * @param e
         */
        @Override
        public void onError(Throwable e) {
            /*停止下载*/
            HttpDownManager.getInstance().stopDown(downInfo);
            if(mSubscriberOnNextListener.get()!=null){
                mSubscriberOnNextListener.get().onError(e);
            }
            downInfo.setState(DownState.ERROR);
        }
    
        /**
         * 将onNext方法中的返回结果交给Activity或Fragment自己处理
         *
         * @param t 创建Subscriber时的泛型类型
         */
        @Override
        public void onNext(T t) {
            if (mSubscriberOnNextListener.get() != null) {
                mSubscriberOnNextListener.get().onNext(t);
            }
        }
    
        @Override
        public void update(long read, long count, boolean done) {
            if(downInfo.getCountLength()>count){
                read=downInfo.getCountLength()-count+read;
            }else{
                downInfo.setCountLength(count);
            }
            downInfo.setReadLength(read);
            if (mSubscriberOnNextListener.get() != null) {
                /*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
                rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                          /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
                        if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
                        downInfo.setState(DownState.DOWN);
                        mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
                    }
                });
            }
        }
    
    }

    9.下载管理类封装HttpDownManager

    单利获取
     /**
         * 获取单例
         * @return
         */
        public static HttpDownManager getInstance() {
            if (INSTANCE == null) {
                synchronized (HttpDownManager.class) {
                    if (INSTANCE == null) {
                        INSTANCE = new HttpDownManager();
                    }
                }
            }
            return INSTANCE;
        }
    
    因为单利所以需要记录正在下载的数据和回到sub
     /*回调sub队列*/
        private HashMap<String,ProgressDownSubscriber> subMap;
        /*单利对象*/
        private volatile static HttpDownManager INSTANCE;
    
        private HttpDownManager(){
            downInfos=new HashSet<>();
            subMap=new HashMap<>();
        }
    开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)
     /**
         * 开始下载
         */
        public void startDown(DownInfo info){
            /*正在下载不处理*/
            if(info==null||subMap.get(info.getUrl())!=null){
                return;
            }
            /*添加回调处理类*/
            ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
            /*记录回调sub*/
            subMap.put(info.getUrl(),subscriber);
            /*获取service,多次请求公用一个sercie*/
            HttpService httpService;
            if(downInfos.contains(info)){
                httpService=info.getService();
            }else{
                DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
                OkHttpClient.Builder builder = new OkHttpClient.Builder();
                //手动创建一个OkHttpClient并设置超时时间
                builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
                builder.addInterceptor(interceptor);
    
                Retrofit retrofit = new Retrofit.Builder()
                        .client(builder.build())
                        .addConverterFactory(GsonConverterFactory.create())
                        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                        .baseUrl(info.getBaseUrl())
                        .build();
                httpService= retrofit.create(HttpService.class);
                info.setService(httpService);
            }
            /*得到rx对象-上一次下載的位置開始下載*/
            httpService.download('bytes=' + info.getReadLength() + '-',info.getUrl())
                    /*指定线程*/
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                       /*失败后的retry配置*/
                    .retryWhen(new RetryWhenNetworkException())
                    /*读取下载写入文件*/
                    .map(new Func1<ResponseBody, DownInfo>() {
                        @Override
                        public DownInfo call(ResponseBody responseBody) {
                            try {
                                writeCache(responseBody,new File(info.getSavePath()),info);
                            } catch (IOException e) {
                                /*失败抛出异常*/
                                throw new HttpTimeException(e.getMessage());
                            }
                            return info;
                        }
                    })
                    /*回调线程*/
                    .observeOn(AndroidSchedulers.mainThread())
                    /*数据回调*/
                    .subscribe(subscriber);
    
        }
    写入文件
    注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度
        /**
         * 写入文件
         * @param file
         * @param info
         * @throws IOException
         */
        public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
            if (!file.getParentFile().exists())
                file.getParentFile().mkdirs();
            long allLength;
            if (info.getCountLength()==0){
                allLength=responseBody.contentLength();
            }else{
                allLength=info.getCountLength();
            }
                FileChannel channelOut = null;
                RandomAccessFile randomAccessFile = null;
                randomAccessFile = new RandomAccessFile(file, 'rwd');
                channelOut = randomAccessFile.getChannel();
                MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
                        info.getReadLength(),allLength-info.getReadLength());
                byte[] buffer = new byte[1024*8];
                int len;
                int record = 0;
                while ((len = responseBody.byteStream().read(buffer)) != -1) {
                    mappedBuffer.put(buffer, 0, len);
                    record += len;
                }
                responseBody.byteStream().close();
                    if (channelOut != null) {
                        channelOut.close();
                    }
                    if (randomAccessFile != null) {
                        randomAccessFile.close();
                    }
        }
    停止下载
    调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)
     /**
         * 停止下载
         */
        public void stopDown(DownInfo info){
            if(info==null)return;
            info.setState(DownState.STOP);
            info.getListener().onStop();
            if(subMap.containsKey(info.getUrl())) {
                ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
                subscriber.unsubscribe();
                subMap.remove(info.getUrl());
            }
            /*同步数据库*/
        }
    暂停下载
    原理和停止下载原理一样
     /**
         * 暂停下载
         * @param info
         */
        public void pause(DownInfo info){
            if(info==null)return;
            info.setState(DownState.PAUSE);
            info.getListener().onPuase();
            if(subMap.containsKey(info.getUrl())){
                ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
                subscriber.unsubscribe();
                subMap.remove(info.getUrl());
            }
            /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
        }
    暂停全部和停止全部下载任务
     /**
         * 停止全部下载
         */
        public void stopAllDown(){
            for (DownInfo downInfo : downInfos) {
                stopDown(downInfo);
            }
            subMap.clear();
            downInfos.clear();
        }
    
        /**
         * 暂停全部下载
         */
        public void pauseAll(){
            for (DownInfo downInfo : downInfos) {
                pause(downInfo);
            }
            subMap.clear();
            downInfos.clear();
        }
    
    整合代码HttpDownManager
    同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)
    传送门-HttpDownManager  https://github.com/wzgiceman/RxjavaRetrofitDemo-master/blob/master/app/src/main/java/com/example/retrofit/downlaod/HttpDownManager.java

    补充

    有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

    只需要替换DbUtil的方法即可
    传送门-DbUtil   https://github.com/wzgiceman/RxjavaRetrofitDemo-master/blob/master/app/src/main/java/com/example/retrofit/downlaod/DbUtil.java

    总结

    到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

        1.Retrofit+Rxjava+okhttp基本使用方法
        2.统一处理请求数据格式
        3.统一的ProgressDialog和回调Subscriber处理
        4.取消http请求
        5.预处理http请求
        6.返回数据的统一判断
        7.失败后的retry封装处理
        8.RxLifecycle管理生命周期,防止泄露
        9.文件上传和文件下载(支持多文件断点续传)

    如有帮助欢迎start和follow

    传送门-下载封装源码   http://download.csdn.net/detail/u014610664/9662362
    传送门-全部封装源码   https://github.com/wzgiceman/RxjavaRetrofitDemo-master

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规