前言
之前写了一篇基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器),是单线程单任务下载一个文件,非断点下载。断点下载可以单线程下载,也可以多线程下载,不过多线程下载需要文件支持断点续传,需要请求头参数Range 。不管单线程断点下载,还是多线程断点下载,其原理都一样,都需要对文件划分为几个部分,每一个部分采用一个线程下载,不同的是单线程断点全程一个线程处理文件,多线程断点下载根据文件划分数量规划下载线程数即若干线程处理文件。本文讲述如何多个线程下载文件并回传下载进度,暂时不支持下载进度保存即不支持文件上次中断的地方开始传送数据,大家可以自己使用数据库保存下载进度。
国际规则,先上效果图:
效果图
多线程文件下载流程
- 首先获取下载文件的长度
ContentLength (fileSize),然后生成一个与下载文件长度相等的临时文件 - 设定文件的下载线程数
threadNum ,计算每条线程下载文件的区间(startPosition~ endPosition)即每一个线程需要下载的大小
int blockSize = ((fileSize % threadNum) == 0) ? (fileSize / threadNum) : (fileSize / threadNum + 1);
- 创建
threadNum 条线程在文件不同位置下载数据,同时在本地临时文件对应位置写入数据,并记录每条线程当前下载大小 - 汇总每条线程下载大小,合并所有线程下载大小,计算下载百分比,下载速率,下载用时,最后回显下载进度信息
下载实现
1.下载接口DownloadApi :
public interface DownloadApi {
/**
* 下载文件
*
*/
@Streaming
@GET
Observable<ResponseBody> downLoad(@Url String url);
/**
*下载文件
* @param range Range表示断点续传的请求头参数
* @param url 下载url
* @return
*/
@Streaming
@GET
Observable<ResponseBody> download(@Header("Range") String range, @Url String url);
}
2.下载方法:
/**
* 下载文件法3(多线程文件下载,使用RXJava更新UI)
*
* @param threadNum 下载线程数
* @param url
* @param destDir
* @param fileName
* @param progressHandler
*/
public static void downloadFile3(final int threadNum, final String url, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {
DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);
final DownloadInfo downloadInfo = new DownloadInfo();
Observable<ResponseBody> getFileSizeObservable = apiService.downLoad(url);
final FileDownloadObservable[] fileDownloadObservables = new FileDownloadObservable[threadNum];//设置线程数量
//获取文件大小,分割文件下载
getFileSizeObservable
.flatMap(new Function<ResponseBody, ObservableSource<DownloadInfo>>() {
@Override
public ObservableSource<DownloadInfo> apply(final ResponseBody responseBody) throws Exception {
return Observable.create(new ObservableOnSubscribe<DownloadInfo>() {
@Override
public void subscribe(ObservableEmitter<DownloadInfo> emitter) throws Exception {
long fileSize = responseBody.contentLength();
File dir = new File(destDir);
if (!dir.exists()) {
dir.mkdirs();
}
final File file = new File(destDir, fileName);
if (file.exists()){
file.delete();
}
downloadInfo.setFile(file);
downloadInfo.setFileSize(fileSize);
RandomAccessFile accessFile = new RandomAccessFile(file, "rwd");
//设置本地文件的长度和下载文件相同
accessFile.setLength(fileSize);
accessFile.close();
//每条线程下载数据
long blockSize = ((fileSize % threadNum) == 0) ? (fileSize / threadNum) : (fileSize / threadNum + 1);
//指定每条线程下载的区间
for (int i = 0; i < threadNum; i++) {
int curThreadEndPosition = (int) ((i + 1) != threadNum ? ((i + 1) * blockSize - 1) : fileSize);
FileDownloadObservable fileDownloadObservable = new FileDownloadObservable(url, file, (int) (i * blockSize), curThreadEndPosition);
fileDownloadObservable.download();
fileDownloadObservables[i] = fileDownloadObservable;
mDisposable.add(fileDownloadObservable.getDisposable());
}
boolean finished = false;
long startTime = System.currentTimeMillis();
int downloadSize;
int progress;
long usedTime;
long curTime;
boolean completed;
int speed;
//根据所有线程下载大小,计算下载进度、下载速率,下载用时
while (!finished && !emitter.isDisposed()) {
downloadSize = 0;
finished = true;
for (int i = 0; i < fileDownloadObservables.length; i++) {
downloadSize += fileDownloadObservables[i].getDownloadSize();
if (!fileDownloadObservables[i].isFinished()) {
finished = false;
}
}
progress = (int) ((downloadSize * 1.0 / fileSize) * 100);
curTime = System.currentTimeMillis();
usedTime = (curTime - startTime) / 1000;
if (usedTime == 0) usedTime = 1;
speed = (int) (downloadSize / usedTime);
downloadInfo.setSpeed(speed);
downloadInfo.setProgress(progress);
downloadInfo.setCurrentSize(downloadSize);
downloadInfo.setUsedTime(usedTime);
//回显下载信息
if (!emitter.isDisposed()) {
emitter.onNext(downloadInfo);
}
SystemClock.sleep(1000);
}
completed = true;
if (!emitter.isDisposed()) {
if (completed) {
emitter.onComplete();
} else {
emitter.onError(new RuntimeException("下载失败"));
}
}
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<DownloadInfo>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable.add(d);
}
@Override
public void onNext(DownloadInfo downloadInfo) {
progressHandler.onProgress(downloadInfo);
}
@Override
public void onError(Throwable e) {
progressHandler.onError(e);
}
@Override
public void onComplete() {
progressHandler.onCompleted(downloadInfo.getFile());
}
});
}
3.下载Observable即每条线程所在的下载任务FileDownloadObservable :
/**
* TODO
*
* @author Kelly
* @version 1.0.0
* @filename FileDownloadObservable
* @time 2021/9/6 17:37
* @copyright(C) 2021 song
*/
public class FileDownloadObservable {
/**
* 下载url
*/
private String url;
/**
* 缓存的FIle
*/
private File file;
/**
* 开始位置
*/
private int startPosition;
/**
* 结束位置
*/
private int endPosition;
/**
* 当前位置
*/
private int curPosition;
/**
* 完成
*/
private boolean finished = false;
/**
* 已经下载多少
*/
private int downloadSize = 0;
private Disposable disposable;
private String name;
public FileDownloadObservable(String url, File file, int startPosition,
int endPosition) {
this.url = url;
this.file = file;
this.startPosition = startPosition;
this.curPosition = startPosition;
this.endPosition = endPosition;
this.name = "";
}
public void download() {
DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);
String range = "bytes=" + (startPosition) + "-" + endPosition;
apiService.download(range, url)
.flatMap(new Function<ResponseBody, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(final ResponseBody responseBody) throws Exception {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
InputStream inputStream = null;
long contentLength;
RandomAccessFile randomAccessFile = null;
byte[] buf = new byte[1024 * 8];
name = Thread.currentThread().getName();
try {
inputStream = responseBody.byteStream();
contentLength = responseBody.contentLength();
System.out.println(name + ",startPosition " + startPosition + ",endPosition " + endPosition);
randomAccessFile = new RandomAccessFile(file, "rwd");
//设置开始写入位置
randomAccessFile.seek(startPosition);
System.out.println(name + "连接成功,读取长度:" + FileUtils.formatFileSize(contentLength));
while (curPosition < endPosition) {
//当前位置小于结束位置 继续下载
int len = inputStream.read(buf);
if (len == -1) {
//下载完成
System.out.println(len);
break;
}
randomAccessFile.write(buf, 0, len);
curPosition = curPosition + len;
if (curPosition > endPosition) { //如果下载多了,则减去多余部分
System.out.println(name + "curPosition > endPosition !!!!");
int extraLen = curPosition - endPosition;
downloadSize += (len - extraLen + 1);
} else {
downloadSize += len;
}
// emitter.onNext(downloadSize);
}
finished = true; //当前阶段下载完成
System.out.println("当前" + name + "下载完成");
if (!emitter.isDisposed()) {
emitter.onComplete();
}
} catch (Exception e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
} finally {
//关闭流
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
randomAccessFile.close();
} catch (IOException e) {
System.out.println("AccessFile IOException " + e.getMessage());
}
}
}
});
}
})
.subscribeOn(Schedulers.newThread())//新的线程下载
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer downloadSize) {
}
@Override
public void onError(Throwable e) {
System.out.println(name + "download error Exception " + e.getMessage());
}
@Override
public void onComplete() {
}
});
}
/**
* 是否完成当前段下载完成
*
* @return
*/
public boolean isFinished() {
return finished;
}
/**
* 已经下载多少
*
* @return
*/
public int getDownloadSize() {
return downloadSize;
}
public Disposable getDisposable() {
return disposable;
}
}
下载调用
public void multiThreadDownloadTest(View view) {
String url = "https://imtt.dd.qq.com/16891/apk/B168BCBBFBE744DA4404C62FD18FFF6F.apk?fsname=com.tencent.tmgp.sgame_1.61.1.6_61010601.apk&csr=1bbd";
final NumberFormat numberFormat = NumberFormat.getInstance();
// 设置精确到小数点后2位
numberFormat.setMaximumFractionDigits(2);
FileDownloader.downloadFile3(threadNum, url, FileDownloadActivity.DOWNLOAD_APK_PATH, "multi_test.apk", new DownloadProgressHandler() {
@Override
public void onProgress(DownloadInfo downloadInfo) {
long fileSize = downloadInfo.getFileSize();
long speed = downloadInfo.getSpeed();
long usedTime = downloadInfo.getUsedTime();
long currentSize = downloadInfo.getCurrentSize();
String percent = numberFormat.format((float) currentSize / (float) fileSize * 100);
mProgress.setText(percent + "%");
mFileSize.setText(FileUtils.formatFileSize(fileSize));
mRate.setText(FileUtils.formatFileSize(speed) + "/s");
mTime.setText(FileUtils.formatTime(usedTime));
}
@Override
public void onCompleted(File file) {
showMsg("下载完成:" + file.getAbsolutePath());
}
@Override
public void onError(Throwable e) {
showMsg("下载文件异常:" + e.getMessage());
}
});
}
小结
理论上,线程数越多下载越快,但过多线程数会造成CPU大部分开销花在线程间切换上,反而更慢,此外下载快慢还与带宽有关。因此,线程数必须适量。
demo GitHub地址: https://github.com/kellysong/android-blog-demo/tree/master/net-demo
?
|