/**
* 多线程压缩文件
* @param zipOutName 压缩文件名称
* @param fileInfos 文件信息集合
*/
public void threadZip(String zipOutName, List<FileInfo> fileInfos) throws IOException, ExecutionException, InterruptedException {
//开始时间
log.info("+++++++++++++++++++++++++++ 开始压缩: " + zipOutName);
//ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("compressFileList-pool-").build();
ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), new MyRejectedExecutionHandler());
ParallelScatterZipCreator parallelScatterZipCreator = new ParallelScatterZipCreator(executor);
OutputStream outputStream = new FileOutputStream(zipOutName);
ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(outputStream);
zipArchiveOutputStream.setEncoding("UTF-8");
for (FileInfo fileInfo : fileInfos) {
final InputStreamSupplier inputStreamSupplier = () -> {
try {
String fileName = fileInfo.getFileName();
int start = fileName.lastIndexOf("/") > 0 ? fileName.lastIndexOf("/") + 1 : 0;
fileName = fileName.replace(fileName.substring(start, fileName.lastIndexOf(".")), fileInfo.getId().toString());
fileInfo.setFileName(fileName);
InputStream fileInputStream = getFileInputStream(fileInfo.getBucketName(), fileName);
return fileInputStream;
} catch (Exception e) {
e.printStackTrace();
return new NullInputStream(0);
}
};
ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(fileInfo.getFileName());
zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED);
zipArchiveEntry.setSize(fileInfo.getFileSize());
zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436);
parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier);
}
parallelScatterZipCreator.writeTo(zipArchiveOutputStream);
zipArchiveOutputStream.close();
outputStream.close();
log.info("+++++++++++++++++++++++++++ 结束压缩: " + zipOutName);
}
需要依赖的jar包
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.20</version>
</dependency>
阻塞队列工类
package com.open.capacity.oss.utils;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//阻塞
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|