IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> JavaScript知识库 -> 基于RxJava2.0+Retrofit2.0的多线程文件下载实现 -> 正文阅读

[JavaScript知识库]基于RxJava2.0+Retrofit2.0的多线程文件下载实现

前言

之前写了一篇基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器),是单线程单任务下载一个文件,非断点下载。断点下载可以单线程下载,也可以多线程下载,不过多线程下载需要文件支持断点续传,需要请求头参数Range。不管单线程断点下载,还是多线程断点下载,其原理都一样,都需要对文件划分为几个部分,每一个部分采用一个线程下载,不同的是单线程断点全程一个线程处理文件,多线程断点下载根据文件划分数量规划下载线程数即若干线程处理文件。本文讲述如何多个线程下载文件并回传下载进度,暂时不支持下载进度保存即不支持文件上次中断的地方开始传送数据,大家可以自己使用数据库保存下载进度。

国际规则,先上效果图:

效果图

多线程文件下载流程

  1. 首先获取下载文件的长度ContentLength(fileSize),然后生成一个与下载文件长度相等的临时文件
  2. 设定文件的下载线程数threadNum,计算每条线程下载文件的区间(startPosition~ endPosition)即每一个线程需要下载的大小
 int blockSize = ((fileSize % threadNum) == 0) ? (fileSize / threadNum) : (fileSize / threadNum + 1);
  1. 创建threadNum条线程在文件不同位置下载数据,同时在本地临时文件对应位置写入数据,并记录每条线程当前下载大小
  2. 汇总每条线程下载大小,合并所有线程下载大小,计算下载百分比,下载速率,下载用时,最后回显下载进度信息

下载实现

1.下载接口DownloadApi

 public interface DownloadApi {


    /**
     * 下载文件
     *
     */
    @Streaming
    @GET
    Observable<ResponseBody> downLoad(@Url String url);


    /**
     *下载文件
     * @param range Range表示断点续传的请求头参数
     * @param url 下载url
     * @return
     */
    @Streaming
    @GET
    Observable<ResponseBody> download(@Header("Range") String range, @Url String url);
}

2.下载方法

   /**
     * 下载文件法3(多线程文件下载,使用RXJava更新UI)
     *
     * @param threadNum       下载线程数
     * @param url
     * @param destDir
     * @param fileName
     * @param progressHandler
     */
    public static void downloadFile3(final int threadNum, final String url, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {
        DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);
        final DownloadInfo downloadInfo = new DownloadInfo();
        Observable<ResponseBody> getFileSizeObservable = apiService.downLoad(url);
        final FileDownloadObservable[] fileDownloadObservables = new FileDownloadObservable[threadNum];//设置线程数量
        //获取文件大小,分割文件下载
        getFileSizeObservable
                .flatMap(new Function<ResponseBody, ObservableSource<DownloadInfo>>() {

                    @Override
                    public ObservableSource<DownloadInfo> apply(final ResponseBody responseBody) throws Exception {

                        return Observable.create(new ObservableOnSubscribe<DownloadInfo>() {
                            @Override
                            public void subscribe(ObservableEmitter<DownloadInfo> emitter) throws Exception {
                                long fileSize = responseBody.contentLength();


                                File dir = new File(destDir);
                                if (!dir.exists()) {
                                    dir.mkdirs();
                                }
                                final File file = new File(destDir, fileName);
                                if (file.exists()){
                                    file.delete();
                                }
                                downloadInfo.setFile(file);
                                downloadInfo.setFileSize(fileSize);
                                RandomAccessFile accessFile = new RandomAccessFile(file, "rwd");
                                //设置本地文件的长度和下载文件相同
                                accessFile.setLength(fileSize);
                                accessFile.close();
                                //每条线程下载数据
                                long blockSize = ((fileSize % threadNum) == 0) ? (fileSize / threadNum) : (fileSize / threadNum + 1);
								//指定每条线程下载的区间
                                for (int i = 0; i < threadNum; i++) {
                                    int curThreadEndPosition = (int) ((i + 1) != threadNum ? ((i + 1) * blockSize - 1) : fileSize);
                                    FileDownloadObservable fileDownloadObservable = new FileDownloadObservable(url, file, (int) (i * blockSize), curThreadEndPosition);
                                    fileDownloadObservable.download();
                                    fileDownloadObservables[i] = fileDownloadObservable;
                                    mDisposable.add(fileDownloadObservable.getDisposable());

                                }
                                boolean finished = false;
                                long startTime = System.currentTimeMillis();
                                int downloadSize;
                                int progress;
                                long usedTime;
                                long curTime;
                                boolean completed;
                                int speed;
								//根据所有线程下载大小,计算下载进度、下载速率,下载用时
                                while (!finished && !emitter.isDisposed()) {
                                    downloadSize = 0;
                                    finished = true;
                                    for (int i = 0; i < fileDownloadObservables.length; i++) {
                                        downloadSize += fileDownloadObservables[i].getDownloadSize();
                                        if (!fileDownloadObservables[i].isFinished()) {
                                            finished = false;
                                        }
                                    }
                                    progress = (int) ((downloadSize * 1.0 / fileSize) * 100);
                                    curTime = System.currentTimeMillis();
                                    usedTime = (curTime - startTime) / 1000;
                                    if (usedTime == 0) usedTime = 1;
                                    speed = (int) (downloadSize / usedTime);
                                    downloadInfo.setSpeed(speed);
                                    downloadInfo.setProgress(progress);
                                    downloadInfo.setCurrentSize(downloadSize);
                                    downloadInfo.setUsedTime(usedTime);
									//回显下载信息
                                    if (!emitter.isDisposed()) {
                                        emitter.onNext(downloadInfo);
                                    }
                                    SystemClock.sleep(1000);
                                }
                                completed = true;
                                if (!emitter.isDisposed()) {
                                    if (completed) {
                                        emitter.onComplete();
                                    } else {
                                        emitter.onError(new RuntimeException("下载失败"));
                                    }
                                }

                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<DownloadInfo>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable.add(d);
                    }

                    @Override
                    public void onNext(DownloadInfo downloadInfo) {
                        progressHandler.onProgress(downloadInfo);
                    }

                    @Override
                    public void onError(Throwable e) {
                        progressHandler.onError(e);
                    }

                    @Override
                    public void onComplete() {
                        progressHandler.onCompleted(downloadInfo.getFile());
                    }
                });

    }

3.下载Observable即每条线程所在的下载任务FileDownloadObservable

/**
 * TODO
 *
 * @author Kelly
 * @version 1.0.0
 * @filename FileDownloadObservable
 * @time 2021/9/6 17:37
 * @copyright(C) 2021 song
 */
public class FileDownloadObservable {

    /**
     * 下载url
     */
    private String url;
    /**
     * 缓存的FIle
     */
    private File file;
    /**
     * 开始位置
     */
    private int startPosition;
    /**
     * 结束位置
     */
    private int endPosition;
    /**
     * 当前位置
     */
    private int curPosition;
    /**
     * 完成
     */
    private boolean finished = false;
    /**
     * 已经下载多少
     */
    private int downloadSize = 0;
    private Disposable disposable;
    private String name;

    public FileDownloadObservable(String url, File file, int startPosition,
                                  int endPosition) {
        this.url = url;
        this.file = file;
        this.startPosition = startPosition;
        this.curPosition = startPosition;
        this.endPosition = endPosition;
        this.name = "";
    }

    public void download() {
        DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);
        String range = "bytes=" + (startPosition) + "-" + endPosition;

        apiService.download(range, url)
                .flatMap(new Function<ResponseBody, ObservableSource<Integer>>() {

                    @Override
                    public ObservableSource<Integer> apply(final ResponseBody responseBody) throws Exception {

                        return Observable.create(new ObservableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                                InputStream inputStream = null;
                                long contentLength;

                                RandomAccessFile randomAccessFile = null;
                                byte[] buf = new byte[1024 * 8];
                                name = Thread.currentThread().getName();
                                try {
                                    inputStream = responseBody.byteStream();
                                    contentLength = responseBody.contentLength();
                                    System.out.println(name + ",startPosition " + startPosition + ",endPosition " + endPosition);
                                    randomAccessFile = new RandomAccessFile(file, "rwd");
                                    //设置开始写入位置
                                    randomAccessFile.seek(startPosition);
                                    System.out.println(name + "连接成功,读取长度:" + FileUtils.formatFileSize(contentLength));
                                    while (curPosition < endPosition) {
                                        //当前位置小于结束位置  继续下载
                                        int len = inputStream.read(buf);
                                        if (len == -1) {
                                            //下载完成
                                            System.out.println(len);
                                            break;
                                        }
                                        randomAccessFile.write(buf, 0, len);
                                        curPosition = curPosition + len;
                                        if (curPosition > endPosition) {    //如果下载多了,则减去多余部分
                                            System.out.println(name + "curPosition > endPosition  !!!!");
                                            int extraLen = curPosition - endPosition;
                                            downloadSize += (len - extraLen + 1);
                                        } else {
                                            downloadSize += len;
                                        }
//                                        emitter.onNext(downloadSize);
                                    }
                                    finished = true;  //当前阶段下载完成
                                    System.out.println("当前" + name + "下载完成");

                                    if (!emitter.isDisposed()) {
                                        emitter.onComplete();
                                    }
                                } catch (Exception e) {
                                    if (!emitter.isDisposed()) {
                                        emitter.onError(e);
                                    }
                                } finally {
                                    //关闭流
                                    if (inputStream != null) {
                                        try {
                                            inputStream.close();
                                        } catch (IOException e) {
                                            e.printStackTrace();
                                        }
                                    }
                                    try {
                                        randomAccessFile.close();
                                    } catch (IOException e) {
                                        System.out.println("AccessFile IOException " + e.getMessage());
                                    }
                                }

                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.newThread())//新的线程下载
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(Integer downloadSize) {

                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(name + "download error Exception " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }


    /**
     * 是否完成当前段下载完成
     *
     * @return
     */
    public boolean isFinished() {
        return finished;
    }

    /**
     * 已经下载多少
     *
     * @return
     */
    public int getDownloadSize() {
        return downloadSize;
    }

    public Disposable getDisposable() {
        return disposable;
    }
}

下载调用

public void multiThreadDownloadTest(View view) {

    String url = "https://imtt.dd.qq.com/16891/apk/B168BCBBFBE744DA4404C62FD18FFF6F.apk?fsname=com.tencent.tmgp.sgame_1.61.1.6_61010601.apk&csr=1bbd";
    final NumberFormat numberFormat = NumberFormat.getInstance();
    // 设置精确到小数点后2位
    numberFormat.setMaximumFractionDigits(2);
    FileDownloader.downloadFile3(threadNum, url, FileDownloadActivity.DOWNLOAD_APK_PATH, "multi_test.apk", new DownloadProgressHandler() {

        @Override
        public void onProgress(DownloadInfo downloadInfo) {

            long fileSize = downloadInfo.getFileSize();
            long speed = downloadInfo.getSpeed();
            long usedTime = downloadInfo.getUsedTime();
            long currentSize = downloadInfo.getCurrentSize();
            String percent = numberFormat.format((float) currentSize / (float) fileSize * 100);
            mProgress.setText(percent + "%");
            mFileSize.setText(FileUtils.formatFileSize(fileSize));
            mRate.setText(FileUtils.formatFileSize(speed) + "/s");
            mTime.setText(FileUtils.formatTime(usedTime));
        }

        @Override
        public void onCompleted(File file) {
            showMsg("下载完成:" + file.getAbsolutePath());
        }

        @Override
        public void onError(Throwable e) {
            showMsg("下载文件异常:" + e.getMessage());
        }
    });
}

小结

理论上,线程数越多下载越快,但过多线程数会造成CPU大部分开销花在线程间切换上,反而更慢,此外下载快慢还与带宽有关。因此,线程数必须适量。

demo GitHub地址
https://github.com/kellysong/android-blog-demo/tree/master/net-demo

?

  JavaScript知识库 最新文章
ES6的相关知识点
react 函数式组件 & react其他一些总结
Vue基础超详细
前端JS也可以连点成线(Vue中运用 AntVG6)
Vue事件处理的基本使用
Vue后台项目的记录 (一)
前后端分离vue跨域,devServer配置proxy代理
TypeScript
初识vuex
vue项目安装包指令收集
上一篇文章      下一篇文章      查看所有文章
加:2021-09-10 23:56:23  更:2021-09-10 23:57:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:58:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码