讲讲断点续传的实现
# 讲讲断点续传的实现
# 什么是断点续传
通常视频文件都比较大,大文件分块上传主要是为了解决直接上传大文件可能出现的网络中断、超时、内存占用过大等问题,其核心思想是将大文件分割成多个较小的块(chunk),每个块单独上传,最后在服务器端进行合并。
http 协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
引用百度百科:
断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
采用 MinIO 存储方式,实现流程如下:
1、前端对文件进行分块。
2、前端使用多线程一块一块上传分块文件前,请求后端服务检查文件是否存在,如果已经存在则不再上传。
3、如果分块文件不存在则前端开始上传。
4、前端请求后端服务上传分块。
5、后端服务将分块上传至 MinIO。
6、前端将分块上传完毕请求后端服务按顺序合并分块。
7、后端服务判断分块上传完成则请求 MinIO 合并文件。
8、合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。
# 本地大文件上传示例
/**
* 测试本地大文件上传方法
*
* @author chenmeng
*/
public class BigFileTest {
// 分块测试
@Test
public void testChunk() throws IOException {
// 源文件
File sourceFile = new File("D:\\develop\\upload\\1.mp4");
// 分块文件存储路径
String chunkFilePath = "D:\\develop\\upload\\chunk\\";
// 分块文件大小
int chunkSize = 1024 * 1024 * 5;
// 分块文件个数
int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
// 使用流从源文件读数据,向分块文件中写数据
RandomAccessFile raf_r = new RandomAccessFile(sourceFile, "r");
// 缓存区
byte[] bytes = new byte[1024];
for (int i = 0; i < chunkNum; i++) {
File chunkFile = new File(chunkFilePath + i);
// 分块文件写入流
RandomAccessFile raf_rw = new RandomAccessFile(chunkFile, "rw");
int len = -1;
while ((len = raf_r.read(bytes)) != -1) {
raf_rw.write(bytes, 0, len);
if (chunkFile.length() >= chunkSize) {
break;
}
}
raf_rw.close();
}
raf_r.close();
}
// 将分块进行合并
@Test
public void testMerge() throws IOException {
// 块文件目录
File chunkFolder = new File("D:\\develop\\upload\\chunk");
// 源文件
File sourceFile = new File("D:\\develop\\upload\\1.mp4");
// 合并后的文件
File mergeFile = new File("D:\\develop\\upload\\1_2.mp4");
// 取出所有分块文件
File[] files = chunkFolder.listFiles();
// 将数组转成list
assert files != null;
List<File> filesList = Arrays.asList(files);
// 对分块文件排序
Collections.sort(filesList, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
}
});
// 向合并文件写的流
RandomAccessFile raf_rw = new RandomAccessFile(mergeFile, "rw");
// 缓存区
byte[] bytes = new byte[1024];
// 遍历分块文件,向合并 的文件写
for (File file : filesList) {
// 读分块的流
RandomAccessFile raf_r = new RandomAccessFile(file, "r");
int len = -1;
while ((len = raf_r.read(bytes)) != -1) {
raf_rw.write(bytes, 0, len);
}
raf_r.close();
}
raf_rw.close();
// 合并文件完成后对合并的文件md5校验
FileInputStream fileInputStream_merge = new FileInputStream(mergeFile);
FileInputStream fileInputStream_source = new FileInputStream(sourceFile);
String md5_merge = DigestUtils.md5Hex(fileInputStream_merge);
String md5_source = DigestUtils.md5Hex(fileInputStream_source);
if (md5_merge.equals(md5_source)) {
System.out.println("文件合并成功");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# 大文件分块上传示例
@Slf4j
@SpringBootTest
public class MediaChunkTest {
@Resource
private MinioClient minioClient;
String bucketName = "test_bucket";
String localPath = System.getProperty("user.dir") + "\\demo6-minio\\src\\main\\resources\\media\\1.mp4";
String outPath = System.getProperty("user.dir") + "\\demo6-minio\\src\\main\\resources\\media\\1a.mp4";
String chunkPath = System.getProperty("user.dir") + "\\demo6-minio\\src\\main\\resources\\media\\chunk\\";
public static void main(String[] args) {
String path = System.getProperty("user.dir") + "\\upload\\1.mp4";
System.out.println(path);
}
@Test
public void test_upload() throws Exception {
// 通过扩展名得到媒体资源类型 mimeType
// 根据扩展名取出mimeType
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4");
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;// 通用mimeType,字节流
if (extensionMatch != null) {
mimeType = extensionMatch.getMimeType();
}
// 上传文件的参数信息
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket(bucketName)// 桶
.filename(localPath) // 指定本地文件路径
// .object("1.mp4")//对象名 在桶下存储该文件
.object("test/01/1.mp4")// 对象名 放在子目录下
.contentType(mimeType)// 设置媒体文件类型
.build();
// 上传文件
minioClient.uploadObject(uploadObjectArgs);
}
// 删除文件
@Test
public void test_delete() throws Exception {
// RemoveObjectArgs
RemoveObjectArgs removeObjectArgs = RemoveObjectArgs.builder().bucket(bucketName).object("test/01/1.mp4").build();
// 删除文件
minioClient.removeObject(removeObjectArgs);
}
// 查询文件 从minio中下载
@Test
public void test_getFile() throws Exception {
GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket(bucketName).object("test/01/1.mp4").build();
// 查询远程服务获取到一个流对象
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
// 指定输出流
FileOutputStream outputStream = new FileOutputStream(outPath);
IOUtils.copy(inputStream, outputStream);
// 校验文件的完整性对文件的内容进行md5
FileInputStream fileInputStream1 = new FileInputStream(localPath);
String source_md5 = DigestUtils.md5Hex(fileInputStream1);
FileInputStream fileInputStream = new FileInputStream(outPath);
String local_md5 = DigestUtils.md5Hex(fileInputStream);
if (source_md5.equals(local_md5)) {
System.out.println("下载成功");
}
}
// 将分块文件上传到minio
@SneakyThrows
@Test
public void uploadChunk() {
for (int i = 0; i < 6; i++) {
// 上传文件的参数信息
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket(bucketName)// 桶
.filename(chunkPath + i) // 指定本地文件路径
.object("chunk/" + i)// 对象名 放在子目录下
.build();
// 上传文件
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传分块" + i + "成功");
}
}
// 调用minio接口合并分块
@Test
public void testMerge() throws Exception {
// List<ComposeSource> sources = new ArrayList<>();
// for (int i = 0; i < 6; i++) {
// //指定分块文件的信息
// ComposeSource composeSource = ComposeSource.builder().bucket(bucketName).object("chunk/" + i).build();
// sources.add(composeSource);
// }
List<ComposeSource> sources = Stream.iterate(0, i -> ++i)
.limit(6)
.map(i -> ComposeSource.builder()
.bucket(bucketName)
.object("chunk/" + i)
.build())
.collect(Collectors.toList());
// 指定合并后的objectName等信息
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket(bucketName)
.object("merge01.mp4")
.sources(sources)// 指定源文件
.build();
// 合并文件,
// 报错size 1048576 must be greater than 5242880,minio默认的分块文件大小为5M
minioClient.composeObject(composeObjectArgs);
// 下载合并后的文件到本地临时文件用于校验
File mergedFile = File.createTempFile("merged", ".mp4");
try (InputStream in = minioClient.getObject(
GetObjectArgs.builder()
.bucket(bucketName)
.object("merge01.mp4")
.build());
FileOutputStream fos = new FileOutputStream(mergedFile)) {
IOUtils.copy(in, fos);
}
// 计算合并后文件的MD5值
String mergedFileMd5;
try (FileInputStream fis = new FileInputStream(mergedFile)) {
mergedFileMd5 = DigestUtils.md5Hex(fis);
}
System.out.println("合并后文件的MD5:" + mergedFileMd5);
// 此处假设originalMd5为原始文件的MD5,实际中应从上传环节获得并保存
String originalMd5 = "expectedMd5Value"; // 请替换成真实的MD5值
// 比较合并后的MD5与原始MD5
if (mergedFileMd5.equalsIgnoreCase(originalMd5)) {
System.out.println("文件合并成功,MD5校验通过。");
} else {
System.err.println("文件合并后MD5校验失败!原始MD5:" + originalMd5 + ", 合并后MD5:" + mergedFileMd5);
// 根据业务需求可以抛出异常或进行重试处理
throw new RuntimeException("文件合并后MD5校验失败");
}
// 清理临时下载的文件
if (mergedFile.exists()) {
mergedFile.delete();
}
}
/**
* 批量清理分块文件
*
* @param chunkFileFolderPath 分块文件所在目录,例如:"a/b/文件MD5/chunk/"
* @param chunkTotal 分块文件总数
*/
private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {
try {
// 构造待删除对象列表(假设分块索引从1开始,如果分块索引从0开始,请相应修改)
List<DeleteObject> deleteObjects = Stream.iterate(1, i -> ++i)
.limit(chunkTotal)
.map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i))))
.collect(Collectors.toList());
// 调用 removeObjects 批量删除
Iterable<Result<DeleteError>> results = minioClient.removeObjects(
RemoveObjectsArgs.builder()
.bucket(bucketName) // 分块文件所在桶
.objects(deleteObjects)
.build());
// 遍历结果,处理可能的删除错误
results.forEach(result -> {
try {
DeleteError error = result.get();
// 如果有删除错误,可以记录日志
log.error("删除分块文件出错:objectName={}, message={}", error.objectName(), error.message());
} catch (Exception e) {
log.error("清理分块文件异常", e);
}
});
} catch (Exception e) {
log.error("清理分块文件失败, chunkFileFolderPath:{}", chunkFileFolderPath, e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# 注意事项
# minio 合并文件异常
使用 minio 合并文件报错:
java.lang.IllegalArgumentException: source testbucket/chunk/0: size 1048576 must be greater than 5242880
1
原因分析:
- minio 合并文件默认分块最小 5M
# 分块文件清理问题
上传一个文件进行分块上传,上传一半不传了,之前上传到 minio 的分块文件要清理吗?怎么做的?
- 在数据库中有一张文件表记录 minio 中存储的文件信息。
- 文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。
- 当一个文件传了一半不再上传了说明该 文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除 minio 中没有上传成功的文件目录。
# 学习参考
上次更新: 2025/2/6 17:50:04