版权声明:本文为博主ExcelMann的原创文章,未经博主允许不得转载。 作者:ExcelMann,转载需注明。
一、背景
最近因为有个需求:
- web前端同时上传多个大文件,并且要求支持断点续传和取消上传,还要显示上传进度条
- 上传完成之后,前端自动将文件传至后台,后台调用算法SDK进行压缩,同时显示压缩进度,压缩成功后返回压缩后的内容,提供url下载
(前提是压缩算法SDK的入参是文件的url路径,并且算法SDK在压缩完成之后负责上传压缩后的文件到OSS,最终返回结果是压缩后文件的url路径)
以下代码保存在github中:FileOssUploadAndWebSocketConnection代码
二、实现方案
第一个需求 对于第一个需求,实现方案是采用OSS的断点续传SDK。直接由Web前端将文件上传到OSS,并得到文件的url路径。 这样做的好处是:绕过SpringBoot后台上传,可以避免多一个步骤(前端传给后台,后台再传给OSS),加快上传的速度。
第二个需求 对于第二个需求,由于是同时压缩多个文件,因此采用线程池异步压缩的方式。并且需要后台主动推送完成进度,实现前端的压缩进度条,因此采用WebSocket,主要的流程如下:
-
客户端获取到上传文件在OSS中的url之后,随机生成一个uuid作为当前文件的id,携带文件url以及uuid,调用后台接口,并且以uuid作为长连接的唯一凭证,与服务端建立WebSocket长连接; -
后台接口处:采用线程池实现并发异步压缩。 线程的工作即:创建新的数据alg_task,其中taskId为前端传的uuid,fileUrl为前端传的文件url,并调用压缩算法SDK进行压缩,算法压缩完成之后将文件上传OSS(这里判断是否为大文件,若是的话,则采用OSS分片上传SDK),将得到的结果返回后台更新到数据库中(其中包含了压缩文件在OSS的url),结束线程。 在线程工作的过程中,后台通过uuid的websocket长连接传送当前完成的进度。
但如果压缩算法SDK支持入参是文件流的话,其实最好的方案是Web前端分片断点续传到后台-----后台分片上传到算法SDK-----算法SDK处理完成之后自行上传压缩后的文件到OSS,得到url路径-----返回url。 这样的话整个流程总共只需要四次IO传输(前端传输文件----后台传输文件----算法处理完成上传OSS----返回url给后台),而我上述提到的方案的整个流程总共需要六次IO传输(前端上传OSS----前端传输参数给后台----后台传输参数给算法----算法根据url从OSS拉取文件----算法处理完成上传文件----返回url给后台)。
三、具体实现流程——Web前端断点续传大文件到OSS
本文采用传统的html+js实现前端,相关的阿里云OSS文档:传统JS的SDK示例
(记得一定要按照这里的要求配置跨域问题)
文档首先就提出为了遵循阿里云的安全最佳实践,前端应该采用RAM用户和STS凭证的方式调用OSS的SDK。因此本文也采用RAM用户和STS的方式,具体创建RAM用户和STS凭证的方式见官方文档:创建RAM用户和STS凭证。
3.1、后台实现
这里后台的实现很简单,只需要提供一个返回STS凭证的接口。
1. 加载maven依赖
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.10.2</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-sts</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.4.6</version>
</dependency>
2. yml配置文件
aliyun:
oss:
stsEndpoint: sts.cn-shenzhen.aliyuncs.com
stsAccessKeyId: xxx
stsAccessKeySecret: xxx
stsRoleArn: xxx
3. OSSUtils工具类
@Component
public class OSSUtils {
private static final Logger logger = LoggerFactory.getLogger(OSSUtils.class);
@Value("${aliyun.oss.stsEndpoint}")
private String stsEndpoint;
@Value("${aliyun.oss.stsAccessKeyId}")
private String stsAccessKeyId;
@Value("${aliyun.oss.stsAccessKeySecret}")
private String stsAccessKeySecret;
@Value("${aliyun.oss.stsRoleArn}")
private String stsRoleArn;
private static final String POLICY = "{\n" +
" \"Version\": \"1\",\n" +
" \"Statement\": [\n" +
" {\n" +
" \"Effect\": \"Allow\",\n" +
" \"Action\": [\n" +
" \"oss:ListObjects\",\n" +
" \"oss:GetObject\",\n" +
" \"oss:PutObject\"\n" +
" ],\n" +
" \"Resource\": [\n" +
" \"acs:oss:*:*:excelman\",\n" +
" \"acs:oss:*:*:excelman/*\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}\n";
private static final String roleSessionName = "excelmanInfo";
public AssumeRoleResponse getStsToken(){
try {
DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
IClientProfile profile = DefaultProfile.getProfile("", stsAccessKeyId, stsAccessKeySecret);
DefaultAcsClient client = new DefaultAcsClient(profile);
final AssumeRoleRequest request = new AssumeRoleRequest();
request.setMethod(MethodType.POST);
request.setRoleArn(stsRoleArn);
request.setRoleSessionName(roleSessionName);
request.setPolicy(POLICY);
request.setDurationSeconds(3600L);
final AssumeRoleResponse response = client.getAcsResponse(request);
System.out.println("Expiration: " + response.getCredentials().getExpiration());
System.out.println("Access Key Id: " + response.getCredentials().getAccessKeyId());
System.out.println("Access Key Secret: " + response.getCredentials().getAccessKeySecret());
System.out.println("Security Token: " + response.getCredentials().getSecurityToken());
System.out.println("RequestId: " + response.getRequestId());
return response;
} catch (ClientException e) {
logger.error("getStsToken发生了异常:{}",e);
}
return null;
}
}
4. Controller接口
@RestController
@RequestMapping("/oss")
public class OSSController {
@Resource
private OSSUtils ossUtils;
@GetMapping("/getStsToken")
public Result getStsToken(){
AssumeRoleResponse stsToken = ossUtils.getStsToken();
StsTokenResponse stsTokenResponse = new StsTokenResponse();
stsTokenResponse.setAccessKeyId(stsToken.getCredentials().getAccessKeyId());
stsTokenResponse.setAccessKeySecret(stsToken.getCredentials().getAccessKeySecret());
stsTokenResponse.setSecurityToken(stsToken.getCredentials().getSecurityToken());
return Result.success(stsTokenResponse);
}
}
3.2、前端实现
前端的实现主要有以下的关键点:
- 调用aliyun-oss SDK之前访问后台接口获取sts token
- 定义上传分片大小,如果文件小于分片大小则采用普通上传,否则使用分片上传(断点续传)
- 上传过程中展示上传进度
- 上传过程中,如果STS Token快过期了,先暂停上传重新获取token
- 支持手动暂停/续传功能
- 上传完成之后返回文件对应的url
这里的实现借鉴了其他博客:阿里云OSS上传 (若有侵权,请私聊删除哈~)
1. 引入SDK 参考链接,内含多种方式:引入SDK
2. HTML
<div>
<input type="file" id='fileInput' multiple='true'>
<button id="uploadBtn" onclick="upload()">Upload</button>
<button id="stopBtn" onclick="stop()">Stop</button>
<button id="resumeBtn" onclick="resume()">resume</button>
<h2 id='status'></h2>
</div>
3. 定义变量
let credentials = null;
let ossClient = null;
const fileInput = document.getElementById('fileInput');
const status = document.getElementById('status');
const bucket = 'mudontire-test';
const region = 'oss-cn-shanghai';
const partSize = 1024 * 1024;
const parallel = 3;
const checkpoints = {};
4. 获取STS凭证,创建OSS Client
function getCredential() {
return fetch('http://localhost:9505/oss/getStsToken')
.then(res => {
return res.json()
})
.then(res => {
console.log("获取Credential成功!")
credentials = res.data;
})
.catch(err => {
console.error(err);
});
}
getCredential();
async function initOSSClient() {
ossClient = new OSS({
accessKeyId: credentials.accessKeyId,
accessKeySecret: credentials.accessKeySecret,
stsToken: credentials.securityToken,
bucket,
region
});
console.log("初始化OSSClient成功!")
}
5. 定义三个按钮事件
async function upload() {
status.innerText = 'Uploading...';
const { files } = fileInput;
const fileList = Array.from(files);
const uploadTasks = fileList.forEach(file => {
if (file.size < partSize) {
commonUpload(file);
} else {
multipartUpload(file);
}
});
}
function stop(){
status.innerText = 'Stopping...';
if (ossClient) ossClient.cancel();
}
function resume(){
status.innerText = 'Resuming...';
if (ossClient) resumeMultipartUpload();
}
6. 普通上传的方式
async function commonUpload(file){
if (!ossClient) {
await initOSSClient();
}
const fileName = file.name;
return ossClient.put(fileName, file).then(result => {
console.log(`Common upload ${file.name} succeeded, result === `, result)
status.innerText = 'Success!';
}).catch(err => {
console.log(`Common upload ${file.name} failed === `, err);
});
}
7. 分片上传的方式
async function multipartUpload(file){
if(!ossClient){
await initOSSClient();
}
const fileName = file.name;
return ossClient.multipartUpload(fileName, file, {
parallel,
partSize,
progress: onMultipartUploadProgress
}).then(result => {
const url = `http://${bucket}.${region}.aliyuncs.com/${fileName}`;
console.log(`Multipart upload ${file.name} succeeded, url === `, url)
status.innerText = 'Success!';
}).catch(err => {
console.log(`Multipart upload ${file.name} failed === `, err);
});
}
8. 分片上传进度回调函数
async function onMultipartUploadProgress(progress, checkpoint) {
console.log(`${checkpoint.file.name} 上传进度 ${progress}`);
checkpoints[checkpoint.uploadId] = checkpoint;
const { Expiration } = credentials;
const timegap = 1;
if (Expiration && moment(Expiration).subtract(timegap, 'minute').isBefore(moment())) {
console.log(`STS token will expire in ${timegap} minutes,uploading will pause and resume after getting new STS token`);
if (ossClient) {
ossClient.cancel();
}
await getCredential();
await resumeMultipartUpload();
}
}
9. 断点续传
async function resumeMultipartUpload() {
console.log("断点续传...")
Object.values(checkpoints).forEach((checkpoint) => {
const { uploadId, file, name } = checkpoint;
ossClient.multipartUpload(uploadId, file, {
parallel,
partSize,
progress: onMultipartUploadProgress,
checkpoint
}).then(result => {
console.log('before delete checkpoints === ', checkpoints);
delete checkpoints[checkpoint.uploadId];
console.log('after delete checkpoints === ', checkpoints);
const url = `http://${bucket}.${region}.aliyuncs.com/${name}`;
console.log(`Resume multipart upload ${file.name} succeeded, url === `, url)
status.innerText = 'Success!';
}).catch(err => {
console.log('Resume multipart upload failed === ', err);
});
});
}
3.3、效果图
上传页面 暂停上传页面,log中显示进度(进度条还没实现) 断点续传页面
四、具体实现流程——WebSocket+线程池实现多文件异步压缩并推送压缩进度
采用的技术是WebSocket以及SockJs和Stomp协议,详细内容见以往的一篇博客: 【原创】基于Springboot、WebSocket的一对一聊天室
4.1、后台实现
1. 引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointExcelman").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/uuid");
registry.setUserDestinationPrefix("/uuid");
}
}
3. 跨域配置
@Configuration
public class CorsConfig implements WebMvcConfigurer {
static final String ORIGINS[] = new String[] { "GET", "POST", "PUT", "DELETE" };
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowCredentials(true)
.allowedMethods(ORIGINS)
.maxAge(3601);
}
}
4. Controller接口
SimpMessagingTemplate 是org.springframework.messaging.simp包下的一个类,用于将信息传输到指定的长连接路径上(这里指WebSocket建立的长连接)。
@RestController
public class CompressController {
private static final Logger logger = LoggerFactory.getLogger(CompressController.class);
private static final Map<Integer,String> PROGRESS = new HashMap<>();
static{
PROGRESS.put(1,"10%"); PROGRESS.put(2, "40%"); PROGRESS.put(3, "70%"); PROGRESS.put(4, "90%"); PROGRESS.put(5, "100%");
}
@Resource(name = "threadPool")
private ThreadPoolExecutor executor;
@Resource
private AlgTaskService taskService;
@Resource
private SimpMessagingTemplate template;
@PostMapping("/compressTask")
public Result compressTask(String uuid, String fileUrl) throws InterruptedException {
Thread.sleep(3000L);
template.convertAndSendToUser(uuid, "/queue/getResponse", PROGRESS.get(1));
AlgTask algTask = new AlgTask();
algTask.setUserName("ADMIN");
algTask.setApiName("/compressTask");
algTask.setModuleName("compress");
algTask.setUrl("/compressTask");
algTask.setMethod("POST");
algTask.setTaskId(uuid);
algTask.setInput(fileUrl);
taskService.save(algTask);
Thread.sleep(3000L);
template.convertAndSendToUser(uuid, "/queue/getResponse", PROGRESS.get(2));
executor.execute(()->{
logger.info("开始压缩...当前线程名:{}",Thread.currentThread().getName());
template.convertAndSendToUser(uuid, "/queue/getResponse", PROGRESS.get(3));
template.convertAndSendToUser(uuid, "/queue/getResponse", PROGRESS.get(4));
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String compressResultUrl = "http://compressResult.jpg";
taskService.updateAlgTaskByUUID(uuid, compressResultUrl);
logger.info("压缩完成...");
template.convertAndSendToUser(uuid, "/queue/getResponse", PROGRESS.get(5));
});
return Result.success(null);
}
}
4.2、前端实现
1. 引入JS
<!-- JQuery -->
<script type="text/javascript" src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js" charset="utf-8"></script>
<!-- SockJs -->
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js" charset="utf-8"></script>
<!-- Stomp -->
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js" charset="utf-8"></script>
2. Html
<div>
<h1>Compress Test</h1>
<button onclick="addTask()">点击生成新的文件任务</button>
<div id="compressQueue">
</div>
</div>
3. 生成新的文件任务
function addTask(){
var uuid = getUuid();
var fileUrl = "http://testUploadUrl.jpg";
var newDiv = "<div>" +
"<div>当前任务的uuid:" + uuid + "</div>" +
"<div style='background-color: wheat; height: 50px; width: 500px;' id='progress" + uuid + "'></div>" +
"</div>";
var compressQueue = $("#compressQueue");
compressQueue.append(newDiv);
connect(uuid);
$.ajax({
url:'http://localhost:9505/compressTask',
method:'POST',
data: {
"uuid" : uuid,
"fileUrl" : fileUrl
},
success: function (result) {
console.log("ajax 成功,结果:"+result)
},
error: function (error) {
console.log("ajax 发生错误"+error)
}
});
}
4. 获取uuid
function getUuid() {
var s = [];
var hexDigits = "0123456789abcdef";
for (var i = 0; i < 36; i++) {
s[i] = hexDigits.substr(Math.floor(Math.random() * 0x10), 1);
}
s[14] = "4";
s[19] = hexDigits.substr((s[19] & 0x3) | 0x8, 1);
s[8] = s[13] = s[18] = s[23] = "-";
var uuid = s.join("");
return uuid;
}
5. WebSocket连接
function connect(uuid){
console.log(uuid+"开始连接WebSocket");
var socket = new SockJS('http://127.0.0.1:9505/endpointExcelman');
var stompClient = Stomp.over(socket);
stompClient.connect({},function(frame){
console.log('Connected:' + frame);
stompClient.subscribe('/uuid/' + uuid + '/queue/getResponse',function(response){
console.log("当前的response:" + response.body);
showProgress(response.body, uuid);
if(response.body === "100%"){
disconnect(stompClient);
}
});
});
}
6. 推进进度条
function showProgress(currentProgress, uuid){
var progressUuid = $("#progress"+uuid);
progressUuid.append(currentProgress);
}
7. 关闭wbesocket双通道
function disconnect(stompClient){
if(stompClient != null) {
stompClient.disconnect();
}
console.log("Disconnected");
}
4.3、效果图
首页 压缩任务显示进度条
|