Minio上传视频

Author Avatar
ciky 08月 23,2024
  • 在其它设备中阅读本文章
  • 点击生成二维码

上传视频


1. 分块测试

//分块测试
@Test
public void testChunk() throws IOException {
    //源文件
    File sourceFile = new File("D:\\c盘移动\\3月11日(1).mp4");
    //分块文件的存储路径
    String chunkFilePath = "D:\\c盘移动\\chunk\\";
    //分块文件大小 1Mb
    int chunkSize = 1024 * 1024 * 1;
    //分块文件的个数   Math.ceil-->向上取整
    int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
    
    //使用流从源文件读数据,向分块文件写数据   (RandomAccessFile具有读写两个功能)
    RandomAccessFile raf_r = new RandomAccessFile(sourceFile, "r");
    //缓存区
    byte[] bytes = new byte[1024];
    
    for (int i = 0; i < chunkNum; i++) {
        File chunkFile = new File(chunkFilePath + i);
        //分块文件写入流
        RandomAccessFile raf_rw = new RandomAccessFile(chunkFile, "rw");
        int len = -1;
        while ((len = raf_r.read(bytes)) != -1) {
            raf_rw.write(bytes, 0, len);
            if (chunkFile.length() >= chunkSize) {
                break;
            }
        }
        raf_rw.close();
    }
    raf_r.close();
}

2. 合并测试

//将分块进行合并
@Test
public void testMerge() throws IOException {
    //源文件---用来校验md5值
    File sourceFile = new File("D:\\c盘移动\\3月11日(1).mp4");
    //块文件目录
    File chunkFolder = new File("D:\\c盘移动\\chunk\\");
    //合并后的文件
    File mergeFile = new File("D:\\c盘移动\\merge.mp4");

    //取出所有的文件(被分块的文件)
    File[] files = chunkFolder.listFiles();
    //将数组转成list
    List<File> filesList = Arrays.asList(files);
    //根据文件名升序排列
    Collections.sort(filesList, new Comparator<File>() {
        @Override
        public int compare(File o1, File o2) {
            return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
        }
    });
    
    
    //向合并文件写的流
    RandomAccessFile raf_rw = new RandomAccessFile(mergeFile, "rw");
    //缓存区
    byte[] bytes = new byte[1024];

    //遍历分块文件,向合并的文件写
    for (File file : filesList) {
        //读分块的流
        RandomAccessFile raf_r = new RandomAccessFile(file, "r");
        int len = -1;
        while((len = raf_r.read(bytes))!= -1){
            raf_rw.write(bytes,0,len);
        }
        raf_r.close();
    }
    raf_rw.close();

    //合并文件完成后对合并文件检验
    FileInputStream fileInputStream_merge = new FileInputStream(mergeFile);
    FileInputStream fileInputStream_source = new FileInputStream(sourceFile);

    String md5_merge = DigestUtils.md5Hex(fileInputStream_merge);
    String md5_source = DigestUtils.md5Hex(fileInputStream_source);

    if(md5_merge.equals(md5_source)){
        System.out.println("文件合并成功");
    }

}

3. 视频上传流程

image20240604231816170.png

1、前端对文件进行分块。

2、前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。

3、如果分块文件不存在则前端开始上传

4、前端请求媒资服务上传分块。

5、媒资服务将分块上传至MinIO。

6、前端将分块上传完毕请求媒资服务合并分块。

7、媒资服务判断分块上传完成则请求MinIO合并文件。

8、合并完成校验合并后的文件是否完整,如果不完整则删除文件。


4. minio合并文件测试

/**
 * 调用minio接口合并分块
 */
@Test
public void testMerge() throws  Exception{

    List<ComposeSource> sources = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
        //指定分块文件的信息
        ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build();
        sources.add(composeSource);
    }


    //Stream流简化代码
    //List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(4).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build()).collect(Collectors.toList());


    //指定合并后的objectName等信息
    ComposeObjectArgs objectArgs = ComposeObjectArgs.builder()
            .sources(sources)  //指定源文件--->需要(List<ComposeSource>)
            .bucket("testbucket")
            .object("merge01.mp4")
            .build();

    //合并文件
    //minio默认的分块文件大小为5M
    minioClient.composeObject(objectArgs);  //--->需要(ComposeObjectArgs)
}

5. minio批量删除文件

/**
 * 批量清理分块文件
 */
@Test
public void testDeleteChunk() throws Exception{

   List<DeleteObject> deleteObjects = new ArrayList<>();

    for (int i = 0; i < 4; i++) {
        DeleteObject deleteObject = new DeleteObject("chunk/" + i);
        deleteObjects.add(deleteObject);
    }

//stream流简化
//        List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
//                .limit(4)
//                .map(i -> new DeleteObject("chunk/" .concat(Integer.toString(i))))
//                .collect(Collectors.toList());


    RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
            .bucket("testbucket")
            .objects(deleteObjects)  //需要--->(Iterable<DeleteObject>)--->List继承Iterable
            .build();

    //------------------------------------removeObjects需要返回值,否则删除无效------------------------------------
    Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);//--->需要(RemoveObjectsArgs)
    results.forEach(r->{
        DeleteError deleteError = null;
        try {
            deleteError = r.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

6. RestResponse响应类

@Data
@ToString
public class RestResponse<T> {

  /**
   * 响应编码,0为正常,-1错误
   */
  private int code;

  /**
   * 响应提示信息
   */
  private String msg;

  /**
   * 响应内容
   */
  private T result;


  public RestResponse() {
   this(0, "success");
  }

  public RestResponse(int code, String msg) {
   this.code = code;
   this.msg = msg;
  }

  /**
   * 错误信息的封装
   *
   * @param msg
   * @param <T>
   * @return
   */
  public static <T> RestResponse<T> validfail(String msg) {
   RestResponse<T> response = new RestResponse<T>();
   response.setCode(-1);
   response.setMsg(msg);
   return response;
  }
  public static <T> RestResponse<T> validfail(T result,String msg) {
   RestResponse<T> response = new RestResponse<T>();
   response.setCode(-1);
   response.setResult(result);
   response.setMsg(msg);
   return response;
  }



  /**
   * 添加正常响应数据(包含响应内容)
   *
   * @return RestResponse Rest服务封装相应数据
   */
  public static <T> RestResponse<T> success(T result) {
   RestResponse<T> response = new RestResponse<T>();
   response.setResult(result);
   return response;
  }
  public static <T> RestResponse<T> success(T result,String msg) {
   RestResponse<T> response = new RestResponse<T>();
   response.setResult(result);
   response.setMsg(msg);
   return response;
  }

  /**
   * 添加正常响应数据(不包含响应内容)
   *
   * @return RestResponse Rest服务封装相应数据
   */
  public static <T> RestResponse<T> success() {
   return new RestResponse<T>();
  }


  public Boolean isSuccessful() {
   return this.code == 0;
  }

}

7. 定义接口层-Controller

/**
 * @Author: ciky
 * @Description: 上传视频
 * @DateTime: 2024/6/5 12:39
 **/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFileController {

    @ApiOperation("文件上传前检查文件")
    @PostMapping("/upload/checkfile")
    public RestResponse<Boolean> checkfile(
            @RequestParam("fileMd5") String fileMd5
    ) throws Exception {

        return null;
    }

    @ApiOperation("分块文件上传前的检测")
    @PostMapping("/upload/checkchunk")
    public RestResponse<Boolean> checkchunk(
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("chunk") int chunk
    ) throws Exception {

    }


    @ApiOperation("上传分块文件")
    @PostMapping("/upload/uploadchunk")
    public RestResponse<Boolean> uploadchunk(
            @RequestParam("file") MultipartFile file,
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("chunk") int chunk
    ) throws Exception {

        return null;
    }

    @ApiOperation("合并文件")
    @PostMapping("/upload/mergechunks")
    public RestResponse<Boolean> mergechunks(
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("fileName") String fileName,
            @RequestParam("chunkTotal") int chunkTotal
    ) throws Exception{

    }


}


8. 持久层-Mapper


9. 开发层-Service


(1) 检查文件是否存在

@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
    //先查询数据库
    MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
    if (mediaFiles != null) {
        //如果数据库存在,再查询minio
        GetObjectArgs getObjectArgs = GetObjectArgs.builder()
                .bucket(mediaFiles.getBucket())   //桶
                .object(mediaFiles.getFilePath())   //objectName
                .build();

        //查询远程服务器获取到的一个流对象
        try {
            FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
            //文件已存在
            if (inputStream != null) {
                return RestResponse.success(true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //文件不存在
    return RestResponse.success(false);
}

(2) 检查分片是否存在

@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunk) {
    //分块存储路径: md5前两位为两个目录,chunk存储分块文件
    String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);

    GetObjectArgs getObjectArgs = GetObjectArgs.builder()
            .bucket(bucket_video)   //桶
            .object(chunkFileFolderPath + chunk)   //objectName:目录路径+文件名
            .build();

    //查询远程服务器获取到的一个流对象
    try {
        FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
        //文件已存在
        if (inputStream != null) {
            return RestResponse.success(true);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    //文件不存在
    return RestResponse.success(false);
}

private String getChunkFileFolderPath(String fileMd5) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/chunk/";
}

(3) 上传分块

  • Minio合并分块要求大于5Mb,

  • springboot web默认上传文件大小限制为1Mb

  • nacos中media-api工程修改配置文件

    spring:
      servlet:
        multipart:
          max-file-size: 50MB	#单个文件的大小限制
          max-request-size: 50MB	#单个请求的大小限制
    
@Override
public RestResponse<Boolean> uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
    //获取mimeType
    String mimeType = getMimeType(null);	//------------------------抽取方法1
    //分块文件的路径
    String chunkFilePath = getChunkFileFolderPath(fileMd5)+chunk;	//------------------------抽取方法2
    //将文件上传到minio
    boolean b = addMediaFilesToMinIO(bucket_video, localChunkFilePath, mimeType, chunkFilePath);//------------------------抽取方法3
    if(!b){
        return RestResponse.validfail(false,"上传分块文件失败");
    }
    return RestResponse.success(true);
}
 /**
 * 根据扩展名获取mimeType	------------------------抽取方法1
 */
private String getMimeType(String extension) {
    if (extension == null) {
        extension = "";
    }
    //根据扩展名获取mimeType
    ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
    String mimeType = MediaType.APPLICATION_STREAM_JSON_VALUE;  //通用mimeType(字节流)
    if (extensionMatch != null) {
        mimeType = extensionMatch.getMimeType();
    }
    return mimeType;
}


 /**
 * 得到分块文件的目录	------------------------抽取方法2
 */
private String getChunkFileFolderPath(String fileMd5) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/chunk/";
}

/**
 * 上传文件到minio	------------------------抽取方法3
 *
 * @param bucket        桶
 * @param localFilePath 本地文件路径
 * @param mimeType      媒资类型
 * @param objectName    对象名
 * @return
 */
public boolean addMediaFilesToMinIO(String bucket, String localFilePath, String mimeType, String objectName) {
    try {
        minioClient.uploadObject(UploadObjectArgs.builder()
                .bucket(bucket)             //minio中的桶
                .filename(localFilePath)    //本地文件路径
                .object(objectName)         //对象名,minio桶中的路径
                .contentType(mimeType)      //媒体文件类型
                .build());
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        log.error("上传文件出错,bucket{},objectName{},错误信息{}", bucket, objectName, e.getMessage());
    }
    return false;
}


(4) 合并分块

@Override
public RestResponse<Boolean> mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
    //分块文件的路径	
    String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);					//----------------------调用方法1
    //找到所有的分块文件
    List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath + i).build()).collect(Collectors.toList());

    //源文件名称
    String filename = uploadFileParamsDto.getFilename();

    //合并后文件的objectName	
    String objectName = getFilePathByMd5(fileMd5, filename.substring(filename.lastIndexOf(".")));	//----------------------调用方法2

    //指定合并后的objectName等信息
    ComposeObjectArgs objectArgs = ComposeObjectArgs.builder()
            .sources(sources)  //指定源文件--->需要(List<ComposeSource>)
            .bucket(bucket_video)
            .object(objectName)
            .build();

    //1.==================调用minio的sdk进行文件合并==================
    //minio默认的分块文件大小为5M
    try {
        minioClient.composeObject(objectArgs);  //--->需要(ComposeObjectArgs)
    } catch (Exception e) {
        e.printStackTrace();
        log.error("合并文件出错,bucket:{},objectName:{},错误信息:{}", bucket_video, objectName, e.getMessage());
        return RestResponse.validfail(false, "合并文件出错");
    }

    //minio直接获取文件md5 ---> getObject方法获取输入流,对流进行md5值的获取
    try {
        //获取minio远程服务器对象流
        InputStream minioInputStream = minioClient.getObject(GetObjectArgs.builder()
                .bucket(bucket_video)
                .object(objectName)
                .build());
        //通过对象流获取md5值
        String mergeMd5 = DigestUtils.md5Hex(minioInputStream);
        //比较原始的md5值和合并后的md5值
        if (!fileMd5.equals(mergeMd5)) {
            log.error("校验合并文件的md5值不一致,原始文件{},合并文件{}", fileMd5, mergeMd5);
            return RestResponse.validfail(false, "文件校验失败");
        }
        //文件大小(通过minio获取)
        long fileSize = getFileSizeFromMinIO(bucket_video, objectName);	//----------------------调用方法3
        uploadFileParamsDto.setFileSize(fileSize);

    } catch (Exception e) {
        e.printStackTrace();
        return RestResponse.validfail(false, "文件校验失败");
    }


    //2.==================将文件信息入库==================
    //addMediaFilesToDb添加了@Transaction注解
    //通过代理对象调用,启用事务功能									//----------------------调用方法4
    MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName);
    if (mediaFiles == null) {
        return RestResponse.validfail(false, "文件入库失败");
    }

    //3.==================清理分块文件==================
    clearChunkFiles(chunkFileFolderPath, chunkTotal);	//----------------------调用方法5

    return RestResponse.success(true);
}
/**
 * 得到分块文件的目录								//----------------------方法1
 * 0/1/0158406d1d5f63c71e7b99f981f63ef7/
 */
private String getChunkFileFolderPath(String fileMd5) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/chunk/";
}

/**
 * 根据md5值得到文件的地址							//----------------------方法2
 * 0/1/0158406d1d5f63c71e7b99f981f63ef7/0158406d1d5f63c71e7b99f981f63ef7.mp4
 */
private String getFilePathByMd5(String fileMd5, String fileExt) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}

/**
 * 通过minio获取文件大小							//----------------------方法3
 *
 * @return
 */
private long getFileSizeFromMinIO(String bucket, String objectName) {
    StatObjectResponse videoInfo = null;
    try {
        videoInfo = minioClient.statObject(StatObjectArgs.builder()
                .bucket(bucket)
                .object(objectName)
                .build());
    } catch (Exception e) {
        e.printStackTrace();
    }
    return videoInfo.size();
}

/**
 * 清理分块文件									//----------------------方法5
 */
private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal) {

    Iterable<DeleteObject> objects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath + i)).collect(Collectors.toList());

    RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
            .bucket(bucket_video)
            .objects(objects)
            .build();

    Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
    //要想真正删除
    results.forEach(item -> {
        try {
            item.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

/**
 *  将文件信息添加到文件表									//----------------------方法4
 *												抽取为为接口,通过代理对象调用,方法上有@Transaction注解,启用事务
 *  
 * @param companyId           机构id
 * @param fileMd5             文件md5值
 * @param uploadFileParamsDto 上传文件的信息
 * @param bucket              桶
 * @param objectName          对象名称
 * @return com.xuecheng.media.model.po.MediaFiles
 */
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {
    MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
    if (mediaFiles == null) {
        mediaFiles = new MediaFiles();
        BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
        //文件id
        mediaFiles.setId(fileMd5);
        //机构id
        mediaFiles.setCompanyId(companyId);
        //桶
        mediaFiles.setBucket(bucket);
        //file_path
        mediaFiles.setFilePath(objectName);
        //file_id
        mediaFiles.setFileId(fileMd5);
        //url
        mediaFiles.setUrl("/" + bucket + "/" + objectName);
        //上传时间
        mediaFiles.setCreateDate(LocalDateTime.now());
        //状态
        mediaFiles.setStatus("1");
        //审核状态
        mediaFiles.setAuditStatus("002003");

        //插入数据库
        int insert = mediaFilesMapper.insert(mediaFiles);

        if (insert <= 0) {
            log.debug("向数据库保存文件失败,bucket{},objectName{}", bucket, objectName);
            return null;
        }
    }
    return mediaFiles;
}

10. 完善接口层-Controller

/**
 * @Author: ciky
 * @Description: 上传视频
 * @DateTime: 2024/6/5 12:39
 **/
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFileController {

    @Autowired
    private MediaFileService mediaFileService;

    @ApiOperation("文件上传前检查文件")
    @PostMapping("/upload/checkfile")
    public RestResponse<Boolean> checkfile(
            @RequestParam("fileMd5") String fileMd5
    ) throws Exception {
        RestResponse<Boolean> booleanRestResponse = mediaFileService.checkFile(fileMd5);
        return booleanRestResponse;
    }

    @ApiOperation("分块文件上传前的检测")
    @PostMapping("/upload/checkchunk")
    public RestResponse<Boolean> checkchunk(
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("chunk") int chunk
    ) throws Exception {
        RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5, chunk);
        return booleanRestResponse;
    }


    @ApiOperation("上传分块文件")
    @PostMapping("/upload/uploadchunk")
    public RestResponse<Boolean> uploadchunk(
            @RequestParam("file") MultipartFile file,
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("chunk") int chunk
    ) throws Exception {
        //创建一个临时文件
        File tempFile = File.createTempFile("minio", ".temp");
        file.transferTo(tempFile);
        //文件路径
        String localFilePath = tempFile.getAbsolutePath();

        RestResponse<Boolean> uploadChunk = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath);
        return uploadChunk;
    }

    @ApiOperation("合并文件")
    @PostMapping("/upload/mergechunks")
    public RestResponse<Boolean> mergechunks(
            @RequestParam("fileMd5") String fileMd5,
            @RequestParam("fileName") String fileName,
            @RequestParam("chunkTotal") int chunkTotal
    ) throws Exception{
        Long companyId = 12332141425L;

        //文件信息对象
        UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
        uploadFileParamsDto.setFilename(fileName);
        uploadFileParamsDto.setTags("视频文件");
        uploadFileParamsDto.setFileType("001002");

        RestResponse<Boolean> booleanRestResponse = mediaFileService.mergeChunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);
        return booleanRestResponse;
    }
}