在此特别感谢黑马程序员提供的课程
写在最前
媒资管理模块
模块需求分析
模块介绍
媒资管理系统是每个在线教育平台所必须具备的,百度百科对其定义如下
媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。
每个教学机构都可以在媒资管理系统管理自己的教学资源,包括:视频、教案等文件
目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等
主要的几个功能如下:
媒资查询:教学机构查询自己所拥有的的媒资信息
文件上传:包括上传图片、上传文档、上传视频
视频处理:视频上传成功,系统自动对视频进行编码处理
文件删除:教学机构删除自己上传的媒资文件
下图是课程编辑与发布的全流程,可以通过下图看到媒资在整体流程的位置
业务流程
上传图片
教学机构人员在课程信息编辑页面上传课程图片,课程图片统一记录在媒资管理系统
上传视频
教学机构人员进入媒资管理列表查询自己上传的媒资文件
教育机构用户在媒资管理
页面中点击上传视频
按钮
选择要上传的文件,自动执行文件上传
视频上传成功会自动处理,处理完成后可以预览视频
处理视频
对需要转码处理的视频,系统会自动对齐处理,处理后生成视频的URL
审核媒资
运营用户登入运营平台,并进入媒资管理界面,查找待审核媒资
点击列表中媒资名称链接,可以预览该媒资,若是视频,则播放视频
点击列表中某媒资后的审核按钮
,即完成媒资的审批过程
绑定媒资
课程计划创建好后需要绑定媒资文件,比如:如果课程计划绑定了视频文件,进入课程在线学习界面后,点课程计划名称则在线播放视频
如何将课程计划绑定媒资呢?
教育机构用户进入课程管理页面编辑某一课程,在课程大纲
编辑页的某一小节后,可以添加媒资信息
点击添加视频
,会弹出对话框,可通过输入视频关键字搜索已审核通过的视频媒资
选择视频媒资,点击提交安努,完成课程计划绑定媒资流程
数据模型
本模块妹子恩建相关数据表如下
media_files:存储文件信息,包括图片、视频、文档等
media_process:待处理视频表
media_process_history:视频处理历史表,记录已经处理成功的视频信息
搭建模块环境
架构分析
当前要开发的是媒资管理服务,目前为止共三个微服务:内容管理、系统管理、媒资管理
后期还会添加更多的微服务,目前这种由前端直接请求微服务的方式存在弊端
如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,例如下面这种
1 2 3 4 5 6 export async function dictionaryAll (params: any = undefined , body: any = undefined ): Promise <ISystemDictionary []> { const { data } = await createAPI ('http://localhost:53110/system/dictionary/all' , 'get' , params, body) return data }
当系统上限后,需要将localhost改为公网域名,如果这种地址非常多,那么修改会很麻烦
基于这个问题,可以采用网关来解决
这样在前端代码中只需要指定每个接口的相对路径
1 2 3 4 5 6 export async function dictionaryAll (params: any = undefined , body: any = undefined ): Promise <ISystemDictionary []> { const { data } = await createAPI ('/system/dictionary/all' , 'get' , params, body) return data }
在前端代码的一个固定的地方在接口地址前统一添加网关地址,每个请求统一到网关,由网关将请求转发到具体的微服务
有了网关就可以对请求进行路由,例如:可以根据请求路径路由、根据host地址路由等。当微服务有多个实例时,还可以通过负载均衡算法进行路由
此外,网关还可以实现权限控制、限流等功能
本项目采用SpringCloudGateway作为网关,网关在请求路由时,需要知道每个微服务实例的地址。
项目使用Nacos作为服务发现中心和配置中心,整体架构如下
流程如下
微服务启动,将自己注册到Nacos,Nacos记录了个微服务实例的地址
网关从Nacos读取服务列表,包括服务名称、服务地址等
请求到达网关,网关将请求路由到具体的微服务
要使用网关首先搭建Nacos、Nacos有两个作用
服务发现中心
微服务将自身注册到Nacos,网关从Nacos获取微服务列表
配置中心
微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置
搭建Nacos
在此之前我们先下载安装并启动
1 docker pull nacos/nacos-server:1.4.1
1 docker run --name nacos -e MODE=standalone -p 8849:8848 -d nacos/nacos-server:1.4.1
访问虚拟机ip:8848/nacos
登录,默认的账号密码均为nacos
服务发现中心
搭建Nacos服务发现中心之前,需要搞清楚两个概念
namespace:用于区分环境,例如:开发环境dev、测试环境test、生产环境prod
group:用于区分项目,例如xuecheng-plus、reggie项目
首先,在Nacos配置namespace
新增开发环境命名空间
用同样的方法新增生产环境和测试环境的命名空间
随后将各微服务注册到Nacos
在xuecheng-plus-parent中添加依赖管理
1 2 3 4 5 6 7 8 9 10 <properties > <spring-cloud-alibaba.version > 2.2.6.RELEASE</spring-cloud-alibaba.version > </properties > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-dependencies</artifactId > <version > ${spring-cloud-alibaba.version}</version > <type > pom</type > <scope > import</scope > </dependency >
在xuecheng-content-api、xuecheng-plus-system中添加依赖
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-nacos-discovery</artifactId > </dependency >
配置nacos地址
在content-service的application.yml中配置如下信息
1 2 3 4 5 6 7 8 9 spring: application: name: content-service cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: dev group: xuecheng-plus-project
在system-service的application.yml中配置如下信息
1 2 3 4 5 6 7 8 9 spring: application: name: system-service cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: dev group: xuecheng-plus-project
重启这两个服务,进入Nacos中查看服务列表
配置中心
配置三要素
搭建完Nacos服务发现中心,现在我们来搭建Nacos配置中心,其目的就是通过Nacos去管理项目中的所有配置
那么先将项目中的配置文件进行分类
每个项目特有的配置
项目所公用的配置
是指一些在若干项目中配置内容相同的配置,例如redis的配置,很多项目用的同一套redis服务,所以配置也一样
另外,还需要知道Nacos如何去定位一个具体的配置文件,即配置三要素
通过namespace、group找到具体的环境和具体的项目
通过dataid找到具体的配置文件,dataid由三部分组成
例如content-service-dev.yml
,由content-service
、dev
、yml
三部分组成
content-service
:它是在application.yml中配置的应用名,即spring.application.name
的值
dev
:它是环境名,由spring.profile.active指定
yml
:它是配置文件的后缀
所以,如果我们要配置content-service工程的配置文件
在开发环境中配置content-service-dev.yml
在生产环境中配置content-service-prod.yml
在测试环境中配置content-service-test.yml
配置content-service
下面以配置content-service为例,在开发环境下添加content-service-dev.yaml
配置
为什么不在nacos中配置如下内容?
1 2 3 spring: application: name: content-service
因为Nacos是先根据spring.cloud.nacos.server-addr获取Nacos地址,再根据${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
作为文件id,来读取配置。
而读取配置文件的顺序如下
bootstrap.yml
nacos中的配置文件
本地application.yml
所以我们要先在bootstrap.yml中配置文件id,然后nacos才知道怎么读取配置文件。
那么我们现在在content-service下创建bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 spring: application: name: content-service cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true profiles: active: dev
删除原本的application.yml,在content-service工程中添加依赖
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency >
随后运行测试方法,看看能否正产读取数据库的配置信息,并读取数据
1 2 3 4 5 @Test void contextQueryCourseTest () { PageResult<CourseBase> result = courseBaseInfoService.queryCourseBaseList(new PageParams (1L , 10L ), new QueryCourseParamDto ()); log.info("查询到数据:{}" , result); }
配置content-api
以相同的方法配置content-api,在nacos的开发环境下新建配置content-api-dev.yaml
application.yml content-api-dev.yaml
我们先来看看哪些配置可以交给Nacos管理
原本content-api的配置如下,除了服务名,都可以交给content-api-dev.yaml
管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 server: servlet: context-path: /content port: 53040 spring: application: name: content-api logging: config: classpath:log4j2-dev.xml swagger: title: "学成在线内容管理系统" description: "内容系统管理系统对课程相关信息进行业务管理数据" base-package: com.xuecheng.content enabled: true version: 1.0 .0
经过我们的分析,content-api-dev.yaml
的内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: servlet: context-path: /content port: 53040 logging: config: classpath:log4j2-dev.xml swagger: title: "学成在线内容管理系统" description: "内容系统管理系统对课程相关信息进行业务管理数据" base-package: com.xuecheng.content enabled: true version: 1.0 .0
那么我们现在就需要配置content-api
的bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 spring: application: name: content-api cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-plus-project config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: content-service-${spring.profiles.active}.yaml group: xuecheng-plus-project refresh: true profiles: active: dev
注意:因为content-api接口工程依赖了content-service工程的jar,而content-service的配置也交由nacos管理了(主要是数据库配置),所以我们现在需要extension-configs扩展配置文件的方式引用service工程的配置文件
如果需要添加多个扩展文件,继续在下面添加即可
1 2 3 4 5 6 7 extension-configs: - data-id: content-service-${spring.profiles.active}.yaml group: xuecheng-plus-project refresh: true - data-id: 填写文件 dataid group: xuecheng-plus-project refresh: true
公用配置
nacos还提供了shared-configs,可以引用公用配置
我们之前在content-api中配置了swagger,并且所有接口工程都需要配置swagger
那么我们这里就可以将swagger的配置定义为一个公用配置,哪个项目需要,哪个项目就引用
接下来,我们就着手创建xuecheng-plus的公用配置,进入nacos的开发环境,添加swagger-dev.yaml
公用配置,这里的group可以设置为xuecheng-plus-common
,该组下的内容都作为xuecheng-plus
的公用配置
删除content-api-dev.yaml
中的swagger
配置,在content-api
的bootstrap.yml
中使用shared-config
添加公用配置
content-api-dev.yaml bootstrap.yml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server: servlet: context-path: /content port: 53040 ## 日志文件配置路径 logging: config: classpath:log4j2-dev.xml ## swagger 文档配置 - swagger: - title: "学成在线内容管理系统" - description: "内容系统管理系统对课程相关信息进行业务管理数据" - base-package: com.xuecheng.content - enabled: true - version: 1.0.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #微服务配置 spring: application: name: content-api cloud: nacos: server-addr: 127.0.0.1:8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-plus-project config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: content-service-${spring.profiles.active}.yaml group: xuecheng-plus-project refresh: true + shared-configs: + - data-id: swagger-${spring.profiles.active}.yaml + group: xuecheng-plus-common + refresh: true profiles: active: dev
再以相同的方法配置日志的公用配置
删除content-api-dev.yaml
中的logging
配置,在content-api
的bootstrap.yml
中使用shared-config
添加公用配置
content-api-dev.yaml bootstrap.yml 1 2 3 4 5 6 7 8 server: servlet: context-path: /content port: 53040 - ## 日志文件配置路径 - logging: - config: classpath:log4j2-dev.xml
1 2 3 4 5 6 7 shared-configs: - data-id: swagger-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true + - data-id: logging-${spring.profiles.active}.yaml + group: xuecheng-plus-common + refresh: true
那么到此为止,配置完成,重启content-api,访问swagger页面,查看swagger接口文档是否可以正常显示,查看控制台log4j2日志是否正常输出
系统管理配置
按照上面的方法,将系统管理服务的配置信息在nacos上进行配置
在开发环境下添加system-service-dev.yaml
配置
在system-service下创建bootstrap.yml配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spring: application: name: system-service cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true profiles: active: dev
在开发环境下添加system-api-dev.yaml
配置
在system-api下创建bootstrap.yml配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 spring: application: name: system-api cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-plus-project config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: system-service-${spring.profiles.active}.yaml group: xuecheng-plus-project refresh: true shared-configs: - data-id: swagger-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true profiles: active: dev
配置优先级
到目前位置,已经将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的bootstrap.yaml和nacos上的配置文件
引入配置文件的形式有
通过dataid方式引入
以扩展配置文件方式引入
以公用配置文件方式引入
各配置文件的优先级:项目应用名配置文件(content-api-dev.yaml) > 扩展配置文件(content-service-dev.html) > 共享配置文件(swagger-dev.yaml) > 本地配置文件(application.yml)
有时候我们在测试程序时,直接在本地加一个配置进行测试,这时候我们想让本地配置优先级最高,那么可以在nacos配置文件中配置如下内容
1 2 3 4 spring: cloud: config: override-none: true
搭建Gateway
本项目使用SpringCloudGateway作为网关,下面创建网关工程
指定父工程,添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > com.xuecheng</groupId > <artifactId > xuecheng-plus-parent</artifactId > <version > 0.0.1-SNAPSHOT</version > <relativePath > ../xuecheng-plus-parent</relativePath > </parent > <artifactId > xuecheng-plus-gateway</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > xuecheng-plus-gateway</name > <description > xuecheng-plus-gateway</description > <properties > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > <exclusions > <exclusion > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-logging</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-log4j2</artifactId > </dependency > </dependencies > </project >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 spring: application: name: gateway cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-plus-project config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true profiles: active: dev
在nacos的开发环境下创建gateway-dev.yaml
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 53010 spring: cloud: gateway: routes: - id: content-api uri: lb://content-api predicates: - Path=/content/** - id: system-api uri: lb://system-api predicates: - Path=/system/**
在http-client.env.json中配置网关的地址
1 2 3 4 5 6 7 8 9 10 { "dev": { "host": "localhost:53010", "content_host": "localhost:53040", "system_host": "localhost:53110", "media_host": "localhost:53050", "cache_host": "localhost:53035", + "gateway_host": "localhost:53010" } }
1 2 3 4 5 6 7 8 9 #### 课程查询列表 POST { { gateway_host} } /content/course/list?pageNo=1 &pageSize=2 Content-Type: application/json { "auditStatus" : "" , "courseName" : "" , "publishStatus" : "" }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 POST http: HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/json Date: Tue, 14 Feb 2023 10 : 25 : 35 GMT { "items" : [ { "id" : 1 , "companyId" : 22 , "companyName" : null , "name" : "JAVA8/9/10新特性讲解啊" , ··· }
搭建媒资工程
从课程资料中获取媒资工程xuecheng-plus-media,拷贝到项目工程根目录,修改其bootstrap中的nacos连接信息
创建媒资数据库xc_media,并导入数据库脚本
在nacos的开发环境下创建media-api-dev.yaml
和media-service-dev.yaml
media-api-dev.yaml media-service-dev.yaml 1 2 3 4 server: servlet: context-path: /media port: 53050
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 spring: datasource: druid: stat-view-servlet: enabled: true loginUsername: admin loginPassword: 123456 dynamic: primary: content strict: true druid: initial-size: 3 max-active: 5 min-idle: 5 max-wait: 60000 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 validation-query: SELECT 1 FROM DUAL test-while-idle: true test-on-borrow: false test-on-return: false stat-view-servlet: enabled: true url-pattern: /druid/* filter: stat: log-slow-sql: true slow-sql-millis: 1000 merge-sql: true wall: config: multi-statement-allow: true datasource: content: url: jdbc:mysql://localhost:3306/xc_content?serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 username: root password: mysql driver-class-name: com.mysql.cj.jdbc.Driver media: url: jdbc:mysql://localhost:3306/xc_media?serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8 username: root password: mysql driver-class-name: com.mysql.cj.jdbc.Driver
分布式文件系统
什么是分布式文件系统
文件系统是操作系统用于明确存储设备(常见的是磁盘,也有基于NAND Flash的固态硬盘)或分区上的文件的方法和数据结构;即在存储设备上组织文件的方法。操作系统中负责管理和存储文件信息的软件机构称为文件管理系统,简称文件系统。文件系统由三部分组成:文件系统的接口,对对象操纵和管理的软件集合,对象及属性。从系统角度来看,文件系统是对文件存储设备的空间进行组织和分配,负责文件存储并对存入的文件进行保护和检索的系统。具体地说,它负责为用户建立文件,存入、读出、修改、转储文件,控制文件的存取,当用户不再使用时撤销文件等。
文件系统是负责管理和存储文件和系统软件,操作系统通过文件系统提供的借口去存取文件,用户通过操作系统访问磁盘上的文件
常见的文件系统:FAT16/FAT32、NTFS、HFS、UFS、APFS、XFS、Ext4等
现在有个问题,一些短视频平台拥有大量的视频、图片,这些视频文件、图片文件该如何存储呢?如何存储可以满足互联网上海量用户的浏览
这里说的分布式文件系统就是海量用户查阅海量文件的方案
我们再来看看分布式文件系统的定义
分布式文件系统(Distributed File System,DFS)是指文件系统管理的物理存储资源不一定直接连接在本地节点上,而是通过计算机网络与节点(可简单的理解为一台计算机)相连;或是若干不同的逻辑磁盘分区或卷标组合在一起而形成的完整的有层次的文件系统。DFS为分布在网络上任意位置的资源提供一个逻辑上的树形文件系统结构,从而使用户访问分布在网络上的共享文件更加简便。单独的 DFS共享文件夹的作用是相对于通过网络上的其他共享文件夹的访问点
MinIO
介绍
本项目采用MinIO构建分布式文件系统,MinIO是一个非常轻量的服务,可以很简单的和其他应用结合使用。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等
它的一大特点就是轻量,使用简单、功能强大,支持各种平台,单个文件最大5TB,兼容提供了Java、Python、GO等多版本SDK支持
官网:https://min.io/,
中文:https://www.minio.org.cn/ , http://docs.minio.org.cn/docs/
MinIO采用去中心化共享架构,每个节点是对等关系,通过Nginx可对MinIO进行负载均衡访问
去中心化有什么好处?
在大数据领域,通常的设计理念都是无中心和分布式。MinIO分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置
它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式MinIO避免了单点故障
MinIO使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块,冗余地分散存储在各个节点的磁盘上,所有可用的磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时,会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验开会分散的存储在这8块硬盘上
使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍可以恢复数据。例如上面集合中有4个以内的硬盘损害,仍可保证数据恢复,不影响上传和下载;但如果多余一半的硬盘损坏,则无法恢复。
MinIO下载地址:https://dl.min.io/server/minio/release/
安装完毕后,CMD进入minio.exe所在目录,执行下面的命令,会在D盘创建4个目录,模拟4个硬盘
1 minio.exe server D:\develop\minio_data\data1 D:\develop\minio_data\data2 D:\develop\minio_data\data3 D:\develop\minio_data\data4
启动结果如下
默认账号密码均为minioadmin
,访问localhost:9000
进行登录
不过我们这里由于条件有限,所以先不做分布式,修改启动命令
1 minio.exe server d:\minio_data
之后创建两个buckets
mediafiles
:普通文件
video
:视频文件
SDK
上传文件
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > io.minio</groupId > <artifactId > minio</artifactId > <version > 8.4.3</version > </dependency > <dependency > <groupId > com.squareup.okhttp3</groupId > <artifactId > okhttp</artifactId > <version > 4.8.1</version > </dependency >
从官方文档中看到,需要三个参数才能连接到minio服务
Parameters
Description
Endpoint
URL to S3 service.
Access Key
Access key (aka user ID) of an account in the S3 service.
Secret Key
Secret key (aka password) of an account in the S3 service.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import io.minio.BucketExistsArgs;import io.minio.MakeBucketArgs;import io.minio.MinioClient;import io.minio.UploadObjectArgs;import io.minio.errors.MinioException;import java.io.IOException;import java.security.InvalidKeyException;import java.security.NoSuchAlgorithmException;public class FileUploader { public static void main (String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeyException { try { MinioClient minioClient = MinioClient.builder() .endpoint("https://play.min.io" ) .credentials("Q3AM3UQ867SPQQA43P2F" , "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" ) .build(); boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket("asiatrip" ).build()); if (!found) { minioClient.makeBucket(MakeBucketArgs.builder().bucket("asiatrip" ).build()); } else { System.out.println("Bucket 'asiatrip' already exists." ); } minioClient.uploadObject( UploadObjectArgs.builder() .bucket("asiatrip" ) .object("asiaphotos-2015.zip" ) .filename("/home/user/Photos/asiaphotos.zip" ) .build()); System.out.println( "'/home/user/Photos/asiaphotos.zip' is successfully uploaded as " + "object 'asiaphotos-2015.zip' to bucket 'asiatrip'." ); } catch (MinioException e) { System.out.println("Error occurred: " + e); System.out.println("HTTP trace: " + e.httpTrace()); } } }
那么我们在其基础上进行修改,完成基本的上传、下载和删除功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class FileUploader { public static void main (String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeyException { try { MinioClient minioClient = MinioClient.builder() .endpoint("https://play.min.io" ) .credentials("Q3AM3UQ867SPQQA43P2F" , "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG" ) .build(); boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket("asiatrip" ).build()); if (!found) { minioClient.makeBucket(MakeBucketArgs.builder().bucket("asiatrip" ).build()); } else { System.out.println("Bucket 'asiatrip' already exists." ); } minioClient.uploadObject( UploadObjectArgs.builder() .bucket("asiatrip" ) .object("asiaphotos-2015.zip" ) .filename("/home/user/Photos/asiaphotos.zip" ) .build()); System.out.println( "'/home/user/Photos/asiaphotos.zip' is successfully uploaded as " + "object 'asiaphotos-2015.zip' to bucket 'asiatrip'." ); } catch (MinioException e) { System.out.println("Error occurred: " + e); System.out.println("HTTP trace: " + e.httpTrace()); } } }
在此之前,我们先创建一个名为testbucket的桶,然后将其权限修改为public
那我们现在把不要的代码删掉,再修改一下连接参数、bucket名、和要上传的文件路径,就可以进行测试了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @SpringBootTest public class MinIOTest { static MinioClient minioClient = MinioClient.builder() .endpoint("http://127.0.0.1:9000" ) .credentials("minioadmin" , "minioadmin" ) .build(); @Test public void uploadTest () { try { minioClient.uploadObject( UploadObjectArgs.builder() .bucket("testbucket" ) .object("pic01.png" ) .filename("D:\\Picture\\background\\01.png" ) .build() ); System.out.println("上传成功" ); } catch (Exception e) { System.out.println("上传失败" ); } } }
由于我们已经将桶的权限修改为了public,所以我们直接访问http://127.0.0.1:9000/testbucket/pic01.png, 也是可以直接看到上传的图片的
删除文件
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void deleteTest () { try { minioClient.removeObject(RemoveObjectArgs .builder() .bucket("testbucket" ) .object("pic01.png" ) .build()); System.out.println("删除成功" ); } catch (Exception e) { System.out.println("删除失败" ); } }
查询文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Test public void getFileTest () { try { InputStream inputStream = minioClient.getObject(GetObjectArgs.builder() .bucket("testbucket" ) .object("pic01.png" ) .build()); FileOutputStream fileOutputStream = new FileOutputStream ("C:\\Users\\15863\\Desktop\\tmp.png" ); byte [] buffer = new byte [1024 ]; int len; while ((len = inputStream.read(buffer)) != -1 ) { fileOutputStream.write(buffer,0 ,len); } inputStream.close(); fileOutputStream.close(); System.out.println("下载成功" ); } catch (Exception e) { System.out.println("下载失败" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Test public void getFileTest () { try { InputStream inputStream = minioClient.getObject(GetObjectArgs.builder() .bucket("testbucket" ) .object("pic01.png" ) .build()); FileOutputStream fileOutputStream = new FileOutputStream ("C:\\Users\\15863\\Desktop\\tmp.png" ); IOUtils.copy(inputStream,fileOutputStream); System.out.println("下载成功" ); } catch (Exception e) { System.out.println("下载失败" ); } }
上传图片
需求分析
业务流程
我们在新增课程的时候,需要上传课程图片
课程图片上传至分布式文件系统,在课程信息中保存课程图片路径,流程如下
前端进入上传图片界面
上传图片,请求媒资管理服务
媒资管理服务将图片文件存储在MinIO
媒资管理记录文件信息到数据库
保存课程信息,在内容管理数据库保存图片地址
媒资管理服务由接口层和业务层共同完成,具体分工如下
用户上传图片请求至媒资管理的接口层,接口层解析文件信息,通过业务层将文件保存至minio和数据库
数据模型
涉及到的数据表主要是媒资信息
环境准备
在minio中配置bucket,创建一个名为mediafiles的bucket,并将其权限设置为public
在nacos中配置minio的相关信息,在nacos的开发环境下新增配置media-service-dev.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/xc_media?serverTimezone=UTC&userUnicode=true&useSSL=false username: root password: A10ne,tillde@th. cloud: config: override-none: true minio: endpoint: http://127.0.0.1:9000 accessKey: minioadmin secretKey: minioadmin bucket: files: mediafiles videofiles: video
在media-service工程下配置bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 spring: application: name: media-service cloud: nacos: server-addr: 127.0 .0 .1 :8848 discovery: namespace: ${spring.profiles.active} group: xuecheng-plus-project config: namespace: ${spring.profiles.active} group: xuecheng-plus-project file-extension: yaml refresh-enabled: true shared-configs: - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true profiles: active: dev
在media-service工程下编写minio的配置类
该配置类中药根据yaml中的minio配置信息,创建一个MinioClient对象,并声明为bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class MinioConfig { @Value("${minio.endpoint}") private String endpoint; @Value("${minio.accessKey}") private String accessKey; @Value("${minio.secretKey}") private String secretKey; @Bean public MinioClient minioClient () { return MinioClient.builder(). endpoint(endpoint). credentials(accessKey, secretKey). build(); } }
接口定义
根据需求分析,下面进行接口定义。
此接口定义为一个通用的上传文件的接口,可以上传图片或其他文件
首先分析接口
请求地址:/media/upload/coursefile
请求参数:Content-Type: multipart/form-data;boundary=… FormData: filedata=??, folder=?, objectName=?
响应参数:文件信息,如下,其内容与media_files表中的字段完全一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "id" : "a16da7a132559daf9e1193166b3e7f52" , "companyId" : 1232141425 , "companyName" : null , "filename" : "1.jpg" , "fileType" : "001001" , "tags" : "" , "bucket" : "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg" , "fileId" : "a16da7a132559daf9e1193166b3e7f52" , "url" : "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg" , "timelength" : null , "username" : null , "createDate" : "2022-09-12T21:57:18" , "changeDate" : null , "status" : "1" , "remark" : "" , "auditStatus" : null , "auditMind" : null , "fileSize" : 248329 }
定义模型类,虽然响应结果与MediaFiles表中的字段完全一致,但最好不要直接用MediaFiles类。因为该类属于PO类,如果后期我们要对响应结果进行修改,那么模型类也需要进行修改,但是MediaFiles是PO类,我们不能动。所以可以直接用一个类继承MediaFiles,里面什么属性都不用加
1 2 3 @Data public class UploadFileResultDto extends MediaFiles {}
定义接口,其中folder和objectName这两个参数不一定传,所以将其required设为false
1 2 3 4 5 6 7 @ApiOperation("上传文件") @RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public UploadFileResultDto upload (@RequestPart("filedata") MultipartFile upload, @RequestParam(value = "folder", required = false) String folder, @RequestParam(value = "objectName", required = false) String objectName) { return null ; }
接口开发
DAO开发
根据需求分析,DAO层实现向media_file表中插入一条记录,使用media_files表生成的mapper即可
Service开发
Service方法需要听歌一个更加通用的保存文件的方法
定义请求参数类,上传文件,我们需要文件名称、文件的content-type、文件类型(文档、视频、图片等,对应数据字典表中的类型)、文件大小、标签、上传人、备注
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Data @ToString public class UploadFileParamsDto { private String filename; private String contentType; private String fileType; private Long fileSize; private String tags; private String username; private String remark; }
定义service方法,MultipartFile是SpringMVC提供简化上传操作的工具类,不使用框架之前,都是使用原生的HttpServletRequest来接收上传的数据,文件是以二进制流传递到后端的。为了使接口更通用,我们可以用字节数组代替MultpartFile类型
1 2 3 4 5 6 7 8 9 10 UploadFileResultDto uploadFile (Long companyId, UploadFileParamsDto uploadFileParamsDto, byte [] bytes, String folder, String objectName) ;
实现方法如下,主要分为两部分
将文件上传到minio
将文件信息写入media_file表中
将文件上传到minio
前面我们上传文件是用的uploadObject方法,是从本地磁盘上传文件,为了使方法更通用,这里使用putObject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Autowired MinioClient minioClient; @Value("${minio.bucket.files}") private String bucket_files;@Override public UploadFileResultDto uploadFile (Long companyId, UploadFileParamsDto uploadFileParamsDto, byte [] bytes, String folder, String objectName) { if (StringUtils.isEmpty(folder)) { folder = getFileFolder(true , true , true ); } else if (!folder.endsWith("/" )) { folder = folder + "/" ; } if (StringUtils.isEmpty(objectName)) { String filename = uploadFileParamsDto.getFilename(); objectName = DigestUtils.md5DigestAsHex(bytes) + filename.substring(filename.lastIndexOf("." )); } objectName = folder + objectName; try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (bytes); minioClient.putObject(PutObjectArgs.builder() .bucket(bucket_files) .object(objectName) .stream(byteArrayInputStream, byteArrayInputStream.available(), -1 ) .contentType(uploadFileParamsDto.getContentType()) .build()); } catch (Exception e) { } return null ; } private String getFileFolder (boolean year, boolean month, boolean day) { StringBuffer stringBuffer = new StringBuffer (); SimpleDateFormat dateFormat = new SimpleDateFormat ("yyyy-MM-dd" ); String dateString = dateFormat.format(new Date ()); String[] split = dateString.split("-" ); if (year) { stringBuffer.append(split[0 ]).append("/" ); } if (month) { stringBuffer.append(split[1 ]).append("/" ); } if (day) { stringBuffer.append(split[2 ]).append("/" ); } return stringBuffer.toString(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);if (mediaFiles == null ) { mediaFiles = new MediaFiles (); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMD5); mediaFiles.setFileId(fileMD5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket_files); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1" ); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket_files + "/" + objectName); mediaFiles.setAuditStatus("002003" ); } int insert = mediaFilesMapper.insert(mediaFiles);if (insert <= 0 ) { XueChengPlusException.cast("保存文件信息失败" ); } UploadFileResultDto uploadFileResultDto = new UploadFileResultDto ();BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 @Autowired MediaFilesMapper mediaFilesMapper; @Autowired MinioClient minioClient; @Value("${minio.bucket.files}") private String bucket_files;@Override public UploadFileResultDto uploadFile (Long companyId, UploadFileParamsDto uploadFileParamsDto, byte [] bytes, String folder, String objectName) { String fileMD5 = DigestUtils.md5DigestAsHex(bytes); if (StringUtils.isEmpty(folder)) { folder = getFileFolder(true , true , true ); } else if (!folder.endsWith("/" )) { folder = folder + "/" ; } if (StringUtils.isEmpty(objectName)) { String filename = uploadFileParamsDto.getFilename(); objectName = fileMD5 + filename.substring(filename.lastIndexOf("." )); } objectName = folder + objectName; try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (bytes); minioClient.putObject(PutObjectArgs.builder() .bucket(bucket_files) .object(objectName) .stream(byteArrayInputStream, byteArrayInputStream.available(), -1 ) .contentType(uploadFileParamsDto.getContentType()) .build()); MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5); if (mediaFiles == null ) { mediaFiles = new MediaFiles (); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMD5); mediaFiles.setFileId(fileMD5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket_files); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1" ); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket_files + "/" + objectName); mediaFiles.setAuditStatus("002003" ); } int insert = mediaFilesMapper.insert(mediaFiles); if (insert <= 0 ) { XueChengPlusException.cast("保存文件信息失败" ); } UploadFileResultDto uploadFileResultDto = new UploadFileResultDto (); BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto; } catch (Exception e) { XueChengPlusException.cast("上传过程中出错" ); } return null ; } private String getFileFolder (boolean year, boolean month, boolean day) { StringBuffer stringBuffer = new StringBuffer (); SimpleDateFormat dateFormat = new SimpleDateFormat ("yyyy-MM-dd" ); String dateString = dateFormat.format(new Date ()); String[] split = dateString.split("-" ); if (year) { stringBuffer.append(split[0 ]).append("/" ); } if (month) { stringBuffer.append(split[1 ]).append("/" ); } if (day) { stringBuffer.append(split[2 ]).append("/" ); } return stringBuffer.toString(); } private String getFileFolder (boolean year, boolean month, boolean day) { StringBuffer stringBuffer = new StringBuffer (); SimpleDateFormat dateFormat = new SimpleDateFormat ("yyyy-MM-dd" ); String dateString = dateFormat.format(new Date ()); String[] split = dateString.split("-" ); if (year) { stringBuffer.append(split[0 ]).append("/" ); } if (month) { stringBuffer.append(split[1 ]).append("/" ); } if (day) { stringBuffer.append(split[2 ]).append("/" ); } return stringBuffer.toString(); }
完善Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @ApiOperation("上传文件") @RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public UploadFileResultDto upload (@RequestPart("filedata") MultipartFile upload, @RequestParam(value = "folder", required = false) String folder, @RequestParam(value = "objectName", required = false) String objectName) { UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto (); uploadFileParamsDto.setFileSize(upload.getSize()); String contentType = upload.getContentType(); if (contentType.contains("image" )) { uploadFileParamsDto.setFileType("001001" ); } else { uploadFileParamsDto.setFileType("001003" ); } uploadFileParamsDto.setFilename(upload.getOriginalFilename()); uploadFileParamsDto.setContentType(contentType); Long companyId = 1232141425L ; try { UploadFileResultDto uploadFileResultDto = mediaFileService.uploadFile(companyId, uploadFileParamsDto, upload.getBytes(), folder, objectName); return uploadFileResultDto; } catch (IOException e) { XueChengPlusException.cast("上传文件过程出错" ); } return null ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #### 上传文件 POST { { media_host} } /media/upload/coursefile Content-Type: multipart/form-data; boundary=WebAppBoundary --WebAppBoundary Content-Disposition: form-data; name="filedata" ; filename="test01.jpg" Content-Type: application/octet-stream < C: \Users\kyle\Desktop\Picture\photo\bg01.jpg ## 响应结果如下 POST http: HTTP/1.1 200 Content-Type: application/json Transfer-Encoding: chunked Date: Thu, 16 Feb 2023 09 : 57 : 48 GMT Keep-Alive: timeout=60 Connection: keep-alive { "id" : "632fb34166d91865da576032b9330ced" , "companyId" : 1232141425 , "companyName" : null , "filename" : "test01.jpg" , "fileType" : "001003" , "tags" : null , "bucket" : "mediafiles" , "filePath" : "2023/57/16/632fb34166d91865da576032b9330ced.jpg" , "fileId" : "632fb34166d91865da576032b9330ced" , "url" : "/mediafiles/2023/57/16/632fb34166d91865da576032b9330ced.jpg" , "username" : null , "createDate" : "2023-02-16 17:57:48" , "changeDate" : null , "status" : "1" , "remark" : "" , "auditStatus" : "002003" , "auditMind" : null , "fileSize" : 22543 } 响应文件已保存。 > 2023 -02 -16 T175748.200 .json
Service代码优化
在上传文件的方法中包括两部分
向MinIO存储文件
向数据库存储文件信息
下面将这两部分抽取出来,后期可供其他Service方法调用
为了跟方便的获取content-type,我们可以添加simplemagic依赖,它提供的方法可以根据文件扩展名,得到资源的content-type
在base工程中添加依赖
1 2 3 4 5 <dependency > <groupId > com.j256.simplemagic</groupId > <artifactId > simplemagic</artifactId > <version > 1.17</version > </dependency >
1 2 ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(扩展名);String contentType = extensionMatch.getMimeType();
IDEA中使用Ctrl + Alt + M 可以快速重构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 @Override public UploadFileResultDto uploadFile (Long companyId, UploadFileParamsDto uploadFileParamsDto, byte [] bytes, String folder, String objectName) { String fileMD5 = DigestUtils.md5DigestAsHex(bytes); if (StringUtils.isEmpty(folder)) { folder = getFileFolder(true , true , true ); } else if (!folder.endsWith("/" )) { folder = folder + "/" ; } if (StringUtils.isEmpty(objectName)) { String filename = uploadFileParamsDto.getFilename(); objectName = fileMD5 + filename.substring(filename.lastIndexOf("." )); } objectName = folder + objectName; try { addMediaFilesToMinIO(bytes, bucket_files, objectName); MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, objectName, fileMD5, bucket_files); UploadFileResultDto uploadFileResultDto = new UploadFileResultDto (); BeanUtils.copyProperties(mediaFiles, uploadFileResultDto); return uploadFileResultDto; } catch (Exception e) { XueChengPlusException.cast("上传过程中出错" ); } return null ; } private MediaFiles addMediaFilesToDB (Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) { MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5); if (mediaFiles == null ) { mediaFiles = new MediaFiles (); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMD5); mediaFiles.setFileId(fileMD5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1" ); mediaFiles.setFilePath(objectName); mediaFiles.setUrl("/" + bucket + "/" + objectName); mediaFiles.setAuditStatus("002003" ); } int insert = mediaFilesMapper.insert(mediaFiles); if (insert <= 0 ) { XueChengPlusException.cast("保存文件信息失败" ); } return mediaFiles; } private void addMediaFilesToMinIO (byte [] bytes, String bucket, String objectName) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (bytes); String contentType = MediaType.APPLICATION_OCTET_STREAM_VALUE; if (objectName.indexOf("." ) >= 0 ) { String extension = objectName.substring(objectName.lastIndexOf("." )); ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension); if (extensionMatch != null ) { contentType = extensionMatch.getMimeType(); } } try { minioClient.putObject(PutObjectArgs.builder() .bucket(bucket) .object(objectName) .stream(byteArrayInputStream, byteArrayInputStream.available(), -1 ) .contentType(contentType) .build()); } catch (Exception e) { log.debug("上传到文件系统出错:{}" , e.getMessage()); throw new XueChengPlusException ("上传到文件系统出错" ); } }
优化后使用HTTP Client进行测试
同时,根据文件扩展名获取content-type的方法可以进一步抽取,可以在base工程中创建一个工具类,供其他微服务使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static String getContentType (String objectName) { String contentType = MediaType.APPLICATION_OCTET_STREAM_VALUE; if (objectName.indexOf("." ) >= 0 ) { String extension = objectName.substring(objectName.lastIndexOf("." )); ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension); if (extensionMatch != null ) { contentType = extensionMatch.getMimeType(); } } return contentType; }
Service事务优化
我们现在思考一下,updateFile方法是否应该开启事务
目前如果在updateFile方法上添加@Transactional
,当调用updateFile方法前会开启数据库事务,如果上传文件过程时间较长(例如用户在上传超大视频文件),那么数据库的食物持续时间也会变长(因为在updateFile方法中,我们即要将文件上传到minio,又要将文件信息写入数据库),这样数据库连接释放就慢,最终导致数据库链接不够用
那么解决办法也显而易见,那就是只在addMediaFilesToDB
方法上添加事务控制即可,同时将uploadFile方法上的@Transactional
注解去掉
但事情并不是那么简单,首先我们来看一下Spring的事务控制
判断方法能否被事务控制
是不是通过代理对象调用的方法
该方法上是否添加了@Transactional
注解
现在只满足了添加事务注解,那么如何判断是不是通过代理对象调用的方法呢?
当我们在一个不能被事务控制的方法里(uploadFile),调用一个被事务控制的方法(addMediaFilesToDB),那么该方法(addMediaFilesToDB)也不会被事务控制
那么如何解决呢?
我们需要通过代理对象去调用addMediaFilesToDB方法
在MediaFileService的实现类中注入MediaFileService的代理对象
1 2 @Autowired MediaFileService currentProxy;
将addMediaFilesToDB方法提取成接口
1 2 3 4 5 6 7 8 9 10 11 MediaFiles addMediaFilesToDB (Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) ;
通过代理对象调用addMediaFilesToDB
1 MediaFiles mediaFiles = currentProxy.addMediaFilesToDB(companyId, uploadFileParamsDto, objectName, fileMD5, bucket_files);
再次测试事务是否可以正常控制
打断点看到这次是代理对象调用的方法
前后端联调
1 2 VUE_APP_SERVER_PICSERVER_URL =http://127.0.0.1:9000
在新增课程、编辑课程界面上传图片,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存图片信息
上传图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息
bug修复
在媒资列表可以查看到刚刚上传的图片信息,但是通过条件查询不起作用
原因:没有使用查询条件
解决:修改MediaFileServiceImpl中的queryMediaFiles方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) { //构建查询条件对象 LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.like(!StringUtils.isEmpty(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename()); + queryWrapper.eq(!StringUtils.isEmpty(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType()); //分页对象 Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize()); // 查询数据内容获得结果 Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper); // 获取数据列表 List<MediaFiles> list = pageResult.getRecords(); // 获取数据总数 long total = pageResult.getTotal(); // 构建结果集 PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize()); return mediaListResult; }
重启服务,测试是否能正常查询
上传视频
需求分析
教学机构人员进入媒资管理列表查询自己上传的媒资文件
教育机构人员在媒资管理
页面中点击上传视频
按钮,打开上传界面
选择要上传的文件,自动执行文件上传
视频上传成功会自动处理,处理完成后可以预览视频
断点续传
什么是断点续传
通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传需求。HTTP协议本身对上传文件大小没有限制,但是客户的网络环境之类、电脑硬件环境等参差不齐,如果一个大文件快上传完了,但是突然断网了,没有上传完成,需要客户重新上传,那么用户体验就非常差。所以对于大文件上传的最基本要求就是断点续传
流程如下
前端上传前先把文件分成块
一块一块的上传,上传中断后重新上传。已上传的分块则不用再上传
各分块上传完成后,在服务端合并文件
分块与合并测试
为了更好的理解文件分块上传的原理,下面用Java代码测试文件的分块与合并
文件分块的流程如下
获取源文件长度
根据设定的分块文件大小,计算出块数(向上取整,例如33.4M的文件,块大小为1M,则需要34块)
从源文件读取数据,并依次向每一个块文件写数据
文件分块测试代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Test public void testChunk () throws IOException { File sourceFile = new File ("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4" ); String chunkPath = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\" ; File chunkFolder = new File (chunkPath); if (!chunkFolder.exists()) { chunkFolder.mkdirs(); } long chunkSize = 1024 * 1024 * 1 ; long chunkNum = (long ) Math.ceil(sourceFile.length() * 1.0 / chunkSize); byte [] buffer = new byte [1024 ]; RandomAccessFile raf_read = new RandomAccessFile (sourceFile, "r" ); for (int i = 0 ; i < chunkNum; i++) { File file = new File (chunkPath + i); if (file.exists()){ file.delete(); } boolean newFile = file.createNewFile(); if (newFile) { int len; RandomAccessFile raf_write = new RandomAccessFile (file, "rw" ); while ((len = raf_read.read(buffer)) != -1 ) { raf_write.write(buffer, 0 , len); if (file.length() >= chunkSize) break ; } raf_write.close(); } } raf_read.close(); System.out.println("写入分块完毕" ); }
文件合并流程
找到要合并的文件并按文件分块的先后顺序排序
创建合并文件
依次从合并的文件中读取数据冰箱合并文件写入数据
文件合并的测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void testMerge () throws IOException { File chunkFolder = new File ("D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\" ); File sourceFile = new File ("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4" ); File mergeFile = new File ("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1-1.mp4" ); mergeFile.createNewFile(); RandomAccessFile raf_write = new RandomAccessFile (mergeFile, "rw" ); byte [] buffer = new byte [1024 ]; File[] files = chunkFolder.listFiles(); List<File> fileList = Arrays.asList(files); Collections.sort(fileList, Comparator.comparingInt(o -> Integer.parseInt(o.getName()))); for (File chunkFile : fileList) { RandomAccessFile raf_read = new RandomAccessFile (chunkFile, "r" ); int len; while ((len = raf_read.read(buffer)) != -1 ) { raf_write.write(buffer, 0 , len); } raf_read.close(); } raf_write.close(); FileInputStream fileInputStream = new FileInputStream (sourceFile); FileInputStream mergeFileStream = new FileInputStream (mergeFile); String originalMd5 = DigestUtils.md5Hex(fileInputStream); String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream); if (originalMd5.equals(mergeFileMd5)) { System.out.println("合并文件成功" ); } else { System.out.println("合并文件失败" ); } }
上传视频流程
前端上传文件前,请求媒资接口层检查文件是否存在
若存在,则不再上传
若不存在,则开始上传,首先对视频文件进行分块
前端分块进行上传,上传前首先检查分块是否已经存在
若分块已存在,则不再上传
若分块不存在,则开始上传分块
前端请求媒资管理接口层,请求上传分块
接口层请求服务层上传分块
服务端将分块信息上传到MinIO
前端将分块上传完毕,请求接口层合并分块
接口层请求服务层合并分块
服务层根据文件信息找到MinIO中的分块文件,下载到本地临时目录,将所有分块下载完毕后开始合并
合并完成后,将合并后的文件上传至MinIO
接口定义
根据上传视频流程,定义接口
与前端的约定是
在base工程的model包下新建RestResponse类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Data public class RestResponse <T> { private int code; private String msg; private T result; public RestResponse (int code, String msg) { this .code = code; this .msg = msg; } public RestResponse () { this (0 , "success" ); } public static <T> RestResponse<T> validfail () { RestResponse<T> response = new RestResponse <>(); response.setCode(-1 ); return response; } public static <T> RestResponse<T> validfail (String msg) { RestResponse<T> response = new RestResponse <>(); response.setCode(-1 ); response.setMsg(msg); return response; } public static <T> RestResponse<T> validfail (String msg, T result) { RestResponse<T> response = new RestResponse <>(); response.setCode(-1 ); response.setMsg(msg); response.setResult(result); return response; } public static <T> RestResponse<T> success () { return new RestResponse <>(); } public static <T> RestResponse<T> success (T result) { RestResponse<T> response = new RestResponse <>(); response.setResult(result); return response; } public static <T> RestResponse<T> success (String msg, T result) { RestResponse<T> response = new RestResponse <>(); response.setMsg(msg); response.setResult(result); return response; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Api(value = "大文件上传接口", tags = "大文件上传接口") @RestController public class BigFilesController { @ApiOperation(value = "文件上传前检查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkFile (@RequestParam("fileMd5") String fileMd5) { return null ; } @ApiOperation(value = "分块文件上传前检查分块") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkChunk (@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) { return null ; } @ApiOperation(value = "上传分块文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadChunk (@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) { return null ; } @ApiOperation(value = "合并分块文件") @PostMapping("/upload/mergechunks") public RestResponse mergeChunks (@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) { return null ; } }
接口开发
DAO开发
向媒资数据库的文件表插入记录,使用自动生成的Mapper接口即可满足要求
Service开发
检查文件和分块
首先实现检查文件方法和检查分块方法
定义Service接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 boolean checkFile (String fileMd5) ;boolean checkChunk (String fileMd5, int chunkIndex) ;
判断文件是否存在
首先判断数据库中是否存在该文件
其次判断minio的bucket中是否存在该文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public RestResponse<Boolean> checkFile (String fileMd5) { MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if (mediaFiles == null ) { return RestResponse.success(false ); } try { InputStream inputStream = minioClient.getObject(GetObjectArgs .builder() .bucket(mediaFiles.getBucket()) .object(mediaFiles.getFilePath()) .build()); if (inputStream == null ) { return RestResponse.success(false ); } } catch (Exception e) { return RestResponse.success(false ); } return RestResponse.success(true ); }
判断分块是否存在
分块是否存在,只需要判断minio对应的目录下是否存在分块文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public RestResponse<Boolean> checkChunk (String fileMd5, int chunkIndex) { String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); String chunkFilePath = chunkFileFolderPath + chunkIndex; try { InputStream inputStream = minioClient.getObject(GetObjectArgs .builder() .bucket(video_files) .object(chunkFilePath) .build()); if (inputStream == null ) { return RestResponse.success(false ); } } catch (Exception e) { return RestResponse.success(false ); } return RestResponse.success(); } private String getChunkFileFolderPath (String fileMd5) { return fileMd5.substring(0 , 1 ) + "/" + fileMd5.substring(1 , 2 ) + "/" + fileMd5 + "/" + "chunk" + "/" ; }
上传分块
1 2 3 4 5 6 7 8 RestResponse uploadChunk (String fileMd5,int chunk,byte [] bytes) ;
1 2 3 4 5 6 7 8 9 10 11 12 @Override public RestResponse uploadChunk (String fileMd5, int chunk, byte [] bytes) { String chunkFilePath = getChunkFileFolderPath(fileMd5) + chunk; try { addMediaFilesToMinIO(bytes, video_files, chunkFilePath); return RestResponse.success(true ); } catch (Exception e) { log.debug("上传分块文件:{}失败:{}" , chunkFilePath, e.getMessage()); } return RestResponse.validfail("上传文件失败" , false ); }
上传分块测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @ApiOperation(value = "文件上传前检查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkFile (@RequestParam("fileMd5") String fileMd5) { return mediaFileService.checkFile(fileMd5); } @ApiOperation(value = "分块文件上传前检查分块") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkChunk (@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) { return mediaFileService.checkChunk(fileMd5, chunk); } @ApiOperation(value = "上传分块文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadChunk (@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { return mediaFileService.uploadChunk(fileMd5, chunk, file.getBytes()); }
合并前下载分块
在合并分块前,我们需要先下载分块,在ServiceImpl中定义下载分块方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private File[] checkChunkStatus(String fileMd5, int chunkTotal) { File[] files = new File [chunkTotal]; String chunkFileFolder = getChunkFileFolderPath(fileMd5); for (int i = 0 ; i < chunkTotal; i++) { String chunkFilePath = chunkFileFolder + i; File chunkFile = null ; try { chunkFile = File.createTempFile("chunk" + i, null ); } catch (Exception e) { XueChengPlusException.cast("创建临时分块文件出错:" + e.getMessage()); } chunkFile = downloadFileFromMinio(chunkFile, video_files, chunkFilePath); files[i] = chunkFile; } return files; } private File downloadFileFromMinio (File file, String bucket, String objectName) { try (FileOutputStream fileOutputStream = new FileOutputStream (file); InputStream inputStream = minioClient.getObject(GetObjectArgs .builder() .bucket(bucket) .object(objectName) .build())) { IOUtils.copy(inputStream, fileOutputStream); return file; } catch (Exception e) { XueChengPlusException.cast("查询文件分块出错" ); } return null ; }
合并分块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Override public RestResponse mergeChunks (Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) throws IOException { File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal); String fileName = uploadFileParamsDto.getFilename(); String extension = fileName.substring(fileName.lastIndexOf("." )); File mergeFile = File.createTempFile(fileName, extension); byte [] buffer = new byte [1024 ]; RandomAccessFile raf_write = new RandomAccessFile (mergeFile, "rw" ); for (File chunkFile : chunkFiles) { RandomAccessFile raf_read = new RandomAccessFile (chunkFile, "r" ); int len; while ((len = raf_read.read(buffer)) != -1 ) { raf_write.write(buffer, 0 , len); } } uploadFileParamsDto.setFileSize(mergeFile.length()); FileInputStream mergeInputStream = new FileInputStream (mergeFile); String mergeMd5 = DigestUtils.md5DigestAsHex(mergeInputStream); if (!fileMd5.equals(mergeMd5)) { XueChengPlusException.cast("合并文件校验失败" ); } String mergeFilePath = getFilePathByMd5(fileMd5, extension); addMediaFilesToMinIO(mergeFile.getAbsolutePath(), video_files, mergeFilePath); MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, mergeFilePath, mergeMd5, video_files); if (mediaFiles == null ) { XueChengPlusException.cast("媒资文件入库出错" ); } return RestResponse.success(); } private void addMediaFilesToMinIO (String filePath, String bucket, String objectName) { String contentType = getContentType(objectName); try { minioClient.uploadObject(UploadObjectArgs .builder() .bucket(bucket) .object(objectName) .filename(filePath) .contentType(contentType) .build()); } catch (Exception e) { XueChengPlusException.cast("上传到文件系统出错" ); } } private String getFilePathByMd5 (String fileMd5, String extension) { return fileMd5.substring(0 , 1 ) + "/" + fileMd5.substring(1 , 2 ) + "/" + fileMd5 + "/" + fileMd5 + extension; }
基本的业务逻辑就是这些,但是现在还少了点东西,我们没有做异常处理,简单的throw出去而已,并且创建的临时文件,也需要删除,完善后的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Override public RestResponse mergeChunks (Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal); String fileName = uploadFileParamsDto.getFilename(); String extension = fileName.substring(fileName.lastIndexOf("." )); File mergeFile = null ; try { mergeFile = File.createTempFile(fileName, extension); } catch (IOException e) { XueChengPlusException.cast("创建合并临时文件出错" ); } try { byte [] buffer = new byte [1024 ]; try (RandomAccessFile raf_write = new RandomAccessFile (mergeFile, "rw" )) { for (File chunkFile : chunkFiles) { try (RandomAccessFile raf_read = new RandomAccessFile (chunkFile, "r" )) { int len; while ((len = raf_read.read(buffer)) != -1 ) { raf_write.write(buffer, 0 , len); } } } } catch (Exception e) { XueChengPlusException.cast("合并文件过程中出错" ); } uploadFileParamsDto.setFileSize(mergeFile.length()); try (FileInputStream mergeInputStream = new FileInputStream (mergeFile)) { String mergeMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(mergeInputStream); if (!fileMd5.equals(mergeMd5)) { XueChengPlusException.cast("合并文件校验失败" ); } log.debug("合并文件校验通过:{}" , mergeFile.getAbsolutePath()); } catch (Exception e) { XueChengPlusException.cast("合并文件校验异常" ); } String mergeFilePath = getFilePathByMd5(fileMd5, extension); addMediaFilesToMinIO(mergeFile.getAbsolutePath(), video_files, mergeFilePath); log.debug("合并文件上传至MinIO完成{}" , mergeFile.getAbsolutePath()); MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, mergeFilePath, fileMd5, video_files); if (mediaFiles == null ) { XueChengPlusException.cast("媒资文件入库出错" ); } log.debug("媒资文件入库完成" ); return RestResponse.success(); } finally { for (File chunkFile : chunkFiles) { try { chunkFile.delete(); } catch (Exception e) { log.debug("临时分块文件删除错误:{}" , e.getMessage()); } } try { mergeFile.delete(); } catch (Exception e) { log.debug("临时合并文件删除错误:{}" , e.getMessage()); } } }
接口层完善
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Api(value = "大文件上传接口", tags = "大文件上传接口") @RestController public class BigFilesController { @Autowired private MediaFileService mediaFileService; @ApiOperation(value = "文件上传前检查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkFile (@RequestParam("fileMd5") String fileMd5) { return mediaFileService.checkFile(fileMd5); } @ApiOperation(value = "分块文件上传前检查分块") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkChunk (@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) { return mediaFileService.checkChunk(fileMd5, chunk); } @ApiOperation(value = "上传分块文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadChunk (@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { return mediaFileService.uploadChunk(fileMd5, chunk, file.getBytes()); } @ApiOperation(value = "合并分块文件") @PostMapping("/upload/mergechunks") public RestResponse mergeChunks (@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws IOException { Long companyId = 1232141425L ; UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto (); uploadFileParamsDto.setFileType("001002" ); uploadFileParamsDto.setTags("课程视频" ); uploadFileParamsDto.setRemark("" ); uploadFileParamsDto.setFilename(fileName); return mediaFileService.mergeChunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto); } }
接口测试
前后端联调,上传视频进行测试
数据库和MinIO中均能看到对应的数据
文件预览
需求分析
图片上传成功、视频上传成功后,可以通过预览按钮查看文件内容
预览的方式是通过浏览器直接打开文件,对于图片和浏览器支持的视频格式可以直接浏览
说明
前端请求接口层预览文件
接口层将文件id传递给服务层
服务层使用文件id查询媒资数据库文件表,获取文件的URL
接口层将文件url返回给前端,通过浏览器打开URL
接口定义
1 2 3 4 5 @ApiOperation(value = "预览文件") @GetMapping("/preview/{mediaId}") public RestResponse<String> getPlayUrlByMediaId (@PathVariable String mediaId) { return null ; }
接口开发
设置URL
有一些浏览器不支持的视频格式,不能在浏览器中直接浏览,所以我们要修改保存媒资信息到数据库的方法
当文件是图片时,设置URL字段
当视频是MP4格式时,设置URL字段
其他情况暂不设置URL,需要文件处理后再设置URL字段
修改保存媒资信息的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 /** * 将文件信息添加到文件表 * * @param companyId 机构id * @param uploadFileParamsDto 上传文件的信息 * @param objectName 对象名称 * @param fileMD5 文件的md5码 * @param bucket 桶 */ @Transactional public MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) { // 保存到数据库 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5); if (mediaFiles == null) { mediaFiles = new MediaFiles(); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMD5); mediaFiles.setFileId(fileMD5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1"); mediaFiles.setFilePath(objectName); + // 获取源文件名的contentType + String contentType = getContentType(objectName); + // 如果是图片格式或者mp4格式,则设置URL属性,否则不设置 + if (contentType.contains("image") || contentType.contains("mp4")) { + mediaFiles.setUrl("/" + bucket + "/" + objectName); + } // 查阅数据字典,002003表示审核通过 mediaFiles.setAuditStatus("002003"); } int insert = mediaFilesMapper.insert(mediaFiles); if (insert <= 0) { XueChengPlusException.cast("保存文件信息失败"); } return mediaFiles; }
DAO开发
Service开发
1 MediaFiles getFileById (String mediaId) ;
1 2 3 4 5 6 7 8 @Override public MediaFiles getFileById (String id) { MediaFiles mediaFiles = mediaFilesMapper.selectById(id); if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())) { XueChengPlusException.cast("视频还没有转码处理" ); } return mediaFiles; }
完善Controller
1 2 3 4 5 6 @ApiOperation(value = "预览文件") @GetMapping("/preview/{mediaId}") public RestResponse<String> getPlayUrlByMediaId (@PathVariable String mediaId) { MediaFiles mediaFile = mediaFileService.getFileById(mediaId); return RestResponse.success(mediaFile.getUrl()); }
接口测试
前后端联调
上传MP4视频文件,并预览
上传图片文件,并预览
上传.avi格式的视频文件,尝试预览,观察错误提示信息,稍后通过视频处理对视频转码
视频处理
分布式任务处理
什么是分布式任务调度
1 2 3 4 5 6 7 8 9 10 11 某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券 某财务系统需要在每天上午10天谴结算前一天的账单数据,统计汇总 某电商平台每天凌晨3点,要对订单中的无效订单进行处理 12306网站会根据车次不同,设置几个时间点分批放票 电商正点抢购,商品价格某天上午8点整开始优惠 商品成功发货后,需要向客户发送短信提醒
类似的场景还有很多,我们该如何实现呢?
任务调度,顾名思义就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务
如何实现任务调度?
多线程方式实现
我们可以开启一个线程,每sleep一段时间,就去检查是否已经到预期执行时间,下面代码简单实现了任务调度的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) { final long timeInterval = 1000 ; Runnable runnable = new Runnable () { public void run () { while (true ) { try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread thread = new Thread (runnable); thread.start(); }
上面的代码实现了按一定时间间隔,执行任务调度的功能
JDK也为我们提供了相关支持,如Timer、ScheduledExecutor,下面我们了解下
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { Timer timer = new Timer (); timer.schedule(new TimerTask () { @Override public void run () { } }, 1000 , 2000 ); }
Timer的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(10 ); service.scheduleAtFixedRate( new Runnable () { @Override public void run () { System.out.println("todo something" ); } }, 1 , 2 , TimeUnit.SECONDS); }
Java5推出了基于线程池设计的ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中的一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰
Timer和ScheduledExecutor都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每个月第一天凌晨1点执行任务、复杂调度任务的管理、任务键传递数据等等
Quartz是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求
Quartz设计的核心类包括Job,Trigger以及Scheduler。
Job负责定义需要执行的任务
Trigger负责设置调度策略
Scheduler将二者组装在一起,并触发任务开始执行。
Quartz支持简单的按时间间隔调度,还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度
第三方Quartz方式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static void main (String [] agrs) throws SchedulerException { SchedulerFactory schedulerFactory = new StdSchedulerFactory (); Scheduler scheduler = schedulerFactory.getScheduler(); JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class); jobDetailBuilder.withIdentity("jobName" ,"jobGroupName" ); JobDetail jobDetail = jobDetailBuilder.build(); CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity("triggerName" , "triggerGroupName" ) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?" )) .build(); scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); } public class MyJob implements Job { @Override public void execute (JobExecutionContext jobExecutionContext) { System.out.println("todo something" ); } }
什么是分布式任务调度
通常任务调度的程序是集成在应用中的,比如
优惠券服务汇总包括了定时发布优惠券的调度程序
结算服务中包括了定期生成报表的任务调度程序
由于采用分布式架构,一个服务通常会部署在多个冗余实例来运行我们的业务
在这种分布式环境下运行任务调度,就称之为分布式业务调度
分布式调度要实现的目标
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式,就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力
并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机的CPU处理能力是有限的
如果将任务调度程序分布式部署,每个节点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率
高可用
弹性扩容
任务管理与检测
对系统中存在的定时任进行统一的管理及监测,让开发人员及运维人员能够及时了解任务执行情况,从而做出快速应急处理响应
避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如上面提到的电商系统中定时发放优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次
XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计是开发迅速、学习简单、轻量级、易扩展,现已开放源代码并接入多家公司线上产品线,开箱即用
官网:https://www.xuxueli.com/xxl-job/
XXL-JOB主要由调度中心、执行器、任务
调度中心
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器
负责接收调度请求并执行任务逻辑
主要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务
调度中心与执行器之间的工作流程如下
执行流程
任务执行器根据配置的调度中心的地址,自动注册到调度中心
达到任务出发条件,调度中心下发任务
执行器基于线程池执行任务,并把执行结果放入内存队列、把执行日志写入日志文件中
执行器消费内存队列中的执行结果,主动上报给调度中心
当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
搭建XXL-JOB
调度中心
首先下载XXL-JOB
使用IDEA打开项目
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxj-job-executor-samples:执行器Sample示例
xxl-job-executor-sample-springboot:SpringBoot版本,通过SpringBoot管理执行器
xxl-job-executor-sample-frameless:无框架版本
根据数据库脚本创建数据库,修改数据库连接信息和端口,启动xxl-job-admin,访问http://local:18088/xxl-job-admin/
账号密码:admin/123456
启动成功之后,可以选择在Linux上运行
使用maven命令,将xxl-job-admin打包,然后将其上传至Linux中,使用命令启动
1 nohup java -jar /绝对路径/xxl-job-admin-2.3.1.jar &
执行器
下面配置执行器,执行器负责与调度中心通信,接收调度中心发起的任务调度请求
首先在media-service工程中添加依赖(父工程中完成了版本控制,这里的版本是2.3.1)
1 2 3 4 <dependency > <groupId > com.xuxueli</groupId > <artifactId > xxl-job-core</artifactId > </dependency >
在nacos下的media-service-dev.yaml下配置xxl-job
注意这里配置的appname是执行器的应用名,稍后会在调度中心配置执行器的时候使用
1 2 3 4 5 6 7 8 9 10 11 12 xxl: job: admin: addresses: http://192.168.101.128:18088/xxl-job-admin/ executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job-jobhandler logretentiondays: 30 accessToken: default_token
配置xxl-job的执行器
将示例工程下的配置类拷贝到media-service工程下,该类中的属性就是获取配置文件中的配置得到的,同时提供了一个执行器的Bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor () { logger.info(">>>>>>>>>>> xxl-job config init." ); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor (); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
进入调度中心,添加执行器
重启媒资管理服务模块,可以看到执行器在调入中心注册成功
执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 package com.xxl.job.executor.service.jobhandler;import com.xxl.job.core.context.XxlJobHelper;import com.xxl.job.core.handler.annotation.XxlJob;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.DataOutputStream;import java.io.InputStreamReader;import java.net.HttpURLConnection;import java.net.URL;import java.util.Arrays;import java.util.concurrent.TimeUnit;@Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); @XxlJob("demoJobHandler") public void demoJobHandler () throws Exception { XxlJobHelper.log("XXL-JOB, Hello World." ); for (int i = 0 ; i < 5 ; i++) { XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2 ); } } @XxlJob("shardingJobHandler") public void shardingJobHandler () throws Exception { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}" , shardIndex, shardTotal); for (int i = 0 ; i < shardTotal; i++) { if (i == shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理" , i); } else { XxlJobHelper.log("第 {} 片, 忽略" , i); } } } @XxlJob("commandJobHandler") public void commandJobHandler () throws Exception { String command = XxlJobHelper.getJobParam(); int exitValue = -1 ; BufferedReader bufferedReader = null ; try { ProcessBuilder processBuilder = new ProcessBuilder (); processBuilder.command(command); processBuilder.redirectErrorStream(true ); Process process = processBuilder.start(); BufferedInputStream bufferedInputStream = new BufferedInputStream (process.getInputStream()); bufferedReader = new BufferedReader (new InputStreamReader (bufferedInputStream)); String line; while ((line = bufferedReader.readLine()) != null ) { XxlJobHelper.log(line); } process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobHelper.log(e); } finally { if (bufferedReader != null ) { bufferedReader.close(); } } if (exitValue == 0 ) { } else { XxlJobHelper.handleFail("command exit value(" +exitValue+") is failed" ); } } @XxlJob("httpJobHandler") public void httpJobHandler () throws Exception { String param = XxlJobHelper.getJobParam(); if (param==null || param.trim().length()==0 ) { XxlJobHelper.log("param[" + param +"] invalid." ); XxlJobHelper.handleFail(); return ; } String[] httpParams = param.split("\n" ); String url = null ; String method = null ; String data = null ; for (String httpParam: httpParams) { if (httpParam.startsWith("url:" )) { url = httpParam.substring(httpParam.indexOf("url:" ) + 4 ).trim(); } if (httpParam.startsWith("method:" )) { method = httpParam.substring(httpParam.indexOf("method:" ) + 7 ).trim().toUpperCase(); } if (httpParam.startsWith("data:" )) { data = httpParam.substring(httpParam.indexOf("data:" ) + 5 ).trim(); } } if (url==null || url.trim().length()==0 ) { XxlJobHelper.log("url[" + url +"] invalid." ); XxlJobHelper.handleFail(); return ; } if (method==null || !Arrays.asList("GET" , "POST" ).contains(method)) { XxlJobHelper.log("method[" + method +"] invalid." ); XxlJobHelper.handleFail(); return ; } boolean isPostMethod = method.equals("POST" ); HttpURLConnection connection = null ; BufferedReader bufferedReader = null ; try { URL realUrl = new URL (url); connection = (HttpURLConnection) realUrl.openConnection(); connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true ); connection.setUseCaches(false ); connection.setReadTimeout(5 * 1000 ); connection.setConnectTimeout(3 * 1000 ); connection.setRequestProperty("connection" , "Keep-Alive" ); connection.setRequestProperty("Content-Type" , "application/json;charset=UTF-8" ); connection.setRequestProperty("Accept-Charset" , "application/json;charset=UTF-8" ); connection.connect(); if (isPostMethod && data!=null && data.trim().length()>0 ) { DataOutputStream dataOutputStream = new DataOutputStream (connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8" )); dataOutputStream.flush(); dataOutputStream.close(); } int statusCode = connection.getResponseCode(); if (statusCode != 200 ) { throw new RuntimeException ("Http Request StatusCode(" + statusCode + ") Invalid." ); } bufferedReader = new BufferedReader (new InputStreamReader (connection.getInputStream(), "UTF-8" )); StringBuilder result = new StringBuilder (); String line; while ((line = bufferedReader.readLine()) != null ) { result.append(line); } String responseMsg = result.toString(); XxlJobHelper.log(responseMsg); return ; } catch (Exception e) { XxlJobHelper.log(e); XxlJobHelper.handleFail(); return ; } finally { try { if (bufferedReader != null ) { bufferedReader.close(); } if (connection != null ) { connection.disconnect(); } } catch (Exception e2) { XxlJobHelper.log(e2); } } } @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public void demoJobHandler2 () throws Exception { XxlJobHelper.log("XXL-JOB, Hello World." ); } public void init () { logger.info("init" ); } public void destroy () { logger.info("destroy" ); } }
我们现在参考简单示例自己编写代码,在media-service下新建包com.xuecheng.media.service.jobhandler
,在该包下定义我们的任务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.xuecheng.media.service.jobhandler;import com.xxl.job.core.handler.annotation.XxlJob;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;@Slf4j @Component public class SimpleJob { @XxlJob("testJob") public void testJob () { log.debug("开始执行......." ); } }
然后进入调度中心添加任务,进入任务管理,新增任务信息
其中JobHandler中填写@XxlJob注解中的名称
随后启动任务,控制台可以看到执行器的方法执行
分片广播
前面我们了解了一下xxl-job的基本使用,下面思考如何进行分布式任务处理呢?
我们需要启动多个执行器组成一个集群,去执行任务
执行器在集群部署下调度中心有哪些调度策略呢?
1 2 3 4 5 6 7 8 9 10 11 12 - 高级配置: - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括; - FIRST(第一个):固定选择第一个机器; - LAST(最后一个):固定选择最后一个机器; - ROUND(轮询):; - RANDOM(随机):随机选择在线的机器; - CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 - LEAST_ FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举; - LEAST_ RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举; - FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; - BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; - SHARDING_ BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
我们这里重点要说的是SHARDING_BROADCAST(分片广播)
,分片是指调度中心将集群汇总的执行器标上序号:0、1、2、3…,广播是指每次调度会向集群中的所有执行器发送调度请求,请求中携带分片参数
每个执行器收到调度请求,根据分片参数自行决定是否执行任务
另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量
作业分片适用于哪些场景呢?
分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍
广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等
所以广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群的分布式处理任务
使用说明
分片广播和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理
获取分片参数方式,参考示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @XxlJob("shardingJobHandler") public void shardingJobHandler () throws Exception { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}" , shardIndex, shardTotal); for (int i = 0 ; i < shardTotal; i++) { if (i == shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理" , i); } else { XxlJobHelper.log("第 {} 片, 忽略" , i); } } }
下面测试作业分片
1 2 3 4 5 6 @XxlJob("shardingJobHandler") public void shardingJob () { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); log.debug("shardIndex:{}, shardTotal:{}" , shardIndex, shardTotal); }
在调度中心添加任务,注意路由策略选择分片广播
高级配置说明
子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
调度过期策略:
忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
下面我们需要启动两个执行器实例,观察每个实例的执行情况
首先我们需要在nacos中编辑media-service的配置,设置本地配置优先
1 2 3 4 spring: cloud: config: override-none: true
将media-service启动两个实例,添加的vm选项就是用本地配置覆盖nacos中的配置,主要是修改端口号和xxl执行器端口
1 2 3 4 5 6 7 -Dserver.port=53051 -Dxxl.job.executor.port=9998 对应如下配置项 server: port: 53051 xxl: job: executor: port: 9998
将两个服务启动,观察任务调度中心,可以看到有两个执行器
启动任务,可以从日志中看到,两个实例的分片序号不同
1 2 3 4 5 ## 实例1 [SimpleJob.java:25] - shardIndex:0, shardTotal:2 ## 实例2 [SimpleJob.java:25] - shardIndex:1, shardTotal:2
到此作业分片任务调试完成,此时我们来思考一下
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
需求分析
作业分片方案
任务添加成功后,对于要处理的任务,会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?
在上一小节的测试中,每个执行器收到广播任务有两个参数,分片序号和分片总数
每个执行器从数据表取任务时,可以用任务id
对分片总数
取模
,如果等于该执行器的分片序号,则执行此任务
例如
1 % 2 = 1,执行器2执行
2 % 2 = 0,执行器1执行
3 % 2 = 1,执行器2执行
4 % 2 = 1,执行器1执行
以此类推
保证任务不重复执行
通过作业分片方案,保证了执行器之间分配的任务不重复执行
但是如果同一个执行器,在处理一个视频的时候,还没有处理完,此时调度中心又来了一次请求调度,为了不重复处理同一个视频,该怎么办?
首先配置调度过期策略
调度过期策略:
忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里选择丢弃后续调度
,避免重复调度
最后,也就是要注意保证任务处理的幂等性
,什么是任务的幂等性
?
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。
执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
什么是幂等性?
它描述了一次和多次请求某一个资源,对于资源本身应该具有相同的结果
幂等性是为了解决重复提交问题,比如:恶意刷单、重复支付等
解决幂等性常用的方案
数据库约束,例如:唯一索引、主键
乐观锁,长用户数据库,更新数据时根据乐观锁状态去更新
唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等,则执行
这里我们在数据库视频处理表中添加状态处理字段,视频处理完成更新状态为完成,执行视频前判断状态是否完成,如果完成则不再处理
业务流程
确定了分片方案,下面梳理哼歌视频上传以及处理的业务流程
上传视频成功,向视频待处理表中添加记录,视频处理的详细流程如下
任务调度中心广播作业分片
执行器收到广播作业分片,从数据库读取待处理任务
执行器根据任务内容MinIO下载要处理的文件
执行器启动多线程去处理任务
任务处理完成,上传处理后的视频到MinIO
将更新任务处理结果,如果视频处理完成,除了更新任务处理结果之外,还要将文件的访问地址更新至任务处理表及文件中,最后将任务完成记录写入历史表
下面是待处理任务表
查询待处理任务
添加待处理任务
上传视频成功,向视频处理待处理表添加记录,暂时只添加.avi类型视频的处理记录
根据Mime Type去判断,是否为avi视频,下面列出部分Mime Type
Video Type
Extension
MIME Type
Flash
fl
video/x-flv
MPEG-4
.mp4
video/mp4
iPhone Index
.m3u8
application/x-mpegURL
iPhone Segment
.ts
video/MP2T
3GP Mobile
.3gp
video/3gpp
QuickTime
.mov
video/quicktime
A/V Interleave
.avi
video/x-msvideo
Windows Media
.wmv
video/x-ms-wmv
avi视频的Mine Type是video/x-msvideo
修改addMediaFilesToDB方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 /** * 将文件信息添加到文件表 * * @param companyId 机构id * @param uploadFileParamsDto 上传文件的信息 * @param objectName 对象名称 * @param fileMD5 文件的md5码 * @param bucket 桶 */ @Transactional public MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) { // 根据文件名获取Content-Type String contentType = getContentType(objectName); // 保存到数据库 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5); if (mediaFiles == null) { mediaFiles = new MediaFiles(); BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMD5); mediaFiles.setFileId(fileMD5); mediaFiles.setCompanyId(companyId); mediaFiles.setBucket(bucket); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setStatus("1"); mediaFiles.setFilePath(objectName); if (contentType.contains("image") || contentType.contains("mp4")) { mediaFiles.setUrl("/" + bucket + "/" + objectName); } // 查阅数据字典,002003表示审核通过 mediaFiles.setAuditStatus("002003"); } int insert = mediaFilesMapper.insert(mediaFiles); if (insert <= 0) { XueChengPlusException.cast("保存文件信息失败"); } + // 如果是avi视频,则额外添加至视频待处理表 + if ("video/x-msvideo".equals(contentType)) { + MediaProcess mediaProcess = new MediaProcess(); + BeanUtils.copyProperties(mediaFiles, mediaProcess); + mediaProcess.setStatus("1"); // 未处理 + int processInsert = mediaProcessMapper.insert(mediaProcess); + if (processInsert <= 0) { + XueChengPlusException.cast("保存avi视频到待处理表失败"); + } + } return mediaFiles; }
查询待处理任务
如何保证查询到的待处理视频记录不重复?
解决方案我们前面已经给出了,用任务id
对分片总数
取模
,如果等于该执行器的分片序号
,则执行,同时为了避免同一个任务被执行两次,我们需要额外指定任务状态为未处理,即status = 1
1 SELECT * FROM media_process WHERE id % #{shareTotal} = #{shareIndex} AND status = '1' LIMIT #{count}
编写根据分片参数获取待处理任务的DAO方法,定义DAO接口如下
1 2 3 4 5 6 7 8 9 @Select("SELECT * FROM media_process WHERE id % #{shardTotal} = #{shardIndex} AND status = '1' LIMIT #{count}") List<MediaProcess> selectListByShardIndex (@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count) ;
1 2 3 4 5 6 7 8 9 10 public interface MediaFileProcessService { List<MediaProcess> getMediaProcessList (int shardIndex, int shardTotal, int count) ; }
1 2 3 4 5 6 7 8 9 10 11 12 @Slf4j @Service public class MediaFileProcessServiceImpl implements MediaFileProcessService { @Autowired private MediaProcessMapper mediaProcessMapper; @Override public List<MediaProcess> getMediaProcessList (int shardIndex, int shardTotal, int count) { return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); } }
更新任务状态
任务处理完成后,需要更新任务处理结果,任务执行成功,则更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录
定义Service接口,更新任务状态
1 void saveProcessFinishStatus (Long taskId, String status, String fileId, String url, String errorMsg) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Transactional @Override public void saveProcessFinishStatus (Long taskId, String status, String fileId, String url, String errorMsg) { MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId); if (mediaProcess == null ) { log.debug("更新任务状态时,此任务:{},为空" , taskId); return ; } LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper <MediaProcess>().eq(MediaProcess::getId, taskId); if ("3" .equals(status)) { log.debug("任务失败:{}" , taskId); MediaProcess mediaProcess_u = new MediaProcess (); mediaProcess_u.setStatus("3" ); mediaProcess_u.setErrormsg(errorMsg); mediaProcess_u.setFinishDate(LocalDateTime.now()); mediaProcessMapper.update(mediaProcess_u, queryWrapper); return ; } if ("2" .equals(status)) { mediaProcess.setStatus("2" ); mediaProcess.setUrl(url); mediaProcess.setFinishDate(LocalDateTime.now()); mediaProcessMapper.update(mediaProcess, queryWrapper); MediaProcessHistory mediaProcessHistory = new MediaProcessHistory (); BeanUtils.copyProperties(mediaProcess, mediaProcessHistory); mediaProcessHistoryMapper.insert(mediaProcessHistory); mediaProcessMapper.deleteById(taskId); } }
视频处理
什么是视频编码
视频上传成功后,需要对视频进行转码处理
什么是视频编码?百度百科的定义如下
所谓视频编码方式就是指通过压缩技术,将原始视频格式的文件转换成另一种视频格式文件的方式。视频流传输中最为重要的编解码标准有国际电联的H.261、H.263、H.264,运动静止图像专家组的M-JPEG和国际标准化组织运动图像专家组的MPEG系列标准,此外在互联网上被广泛应用的还有Real-Networks的RealVideo、微软公司的WMV以及Apple公司的QuickTime等。
首先我们要分清文件格式和编码格式
文件格式
是指:.mp4
、.avi
、.rmvb
等这些不同扩展名的视频文件的文件格式。视频文件的内容主要包括视频、音频,其文件格式是按照一定的编码格式去编码,并且按照该文件所规定的的封装格式,将视频、音频、字幕等信息封装到一起,播放器会根据他们的封装个事去提取出编码,然后由播放器解码,最终播放音视频
编码格式
是指:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b
音视频编码格式种类繁多,主要由以下几类
MPEG系列
(由ISO下属的MPEG开发)
视频编码方面主要是Mpeg1(VCD)、Mpeg2(DVD)、Mpeg4(divx,xvid)、Mpeg4 AVC
音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(MP3)、MPEG-2 AAC 、MPEG-4 AAC
H.26X系列
(由ITU主导,侧重网络传输,注意:只是视频编码)
包括H.261、H.262、H.263、H.263+、H.263++、H.264
目前最常用的编码标准是
FFmpeg的基本使用
我们将视频录制完成后,使用视频编码软件第视频进行编码,本项目使用FFmpeg对视频进行编码
什么是FFmpeg?
FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。
FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。FFmpeg编码库可以使用GPU加速。
安装成功后,做一下简单测试,将.mp4文件转为.avi,再将.avi转为.gif等
1 2 3 ffmpeg -i 胶水.mp4 胶水.avi ffmpeg -i 胶水.avi 胶水.gif
视频处理工具类
导入黑马提供的工具类,将其拷贝至base工程
其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类
下面我们来简单了解一下该类的使用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws IOException { String ffmpeg_path = "D:\\SoftWare\\ffmpeg\\ffmpeg.exe" ; String video_path = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\胶水.avi" ; String mp4_name = "胶水_mp4.mp4" ; String mp4_path = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\胶水_mp4.mp4" ; Mp4VideoUtil videoUtil = new Mp4VideoUtil (ffmpeg_path,video_path,mp4_name,mp4_path); String s = videoUtil.generateMp4(); System.out.println(s); }
执行main方法,最终在控制台输出success
表示执行成功
任务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 @Slf4j @Component public class VideoTask { @Value("${videoprocess.ffmpegpath}") String ffmpegPath; @Autowired private MediaProcessMapper mediaProcessMapper; @Autowired private MediaFileService mediaFileService; @Autowired private MediaFileProcessService mediaFileProcessService; @XxlJob("videoJobHandler") public void videoJobHandler () throws InterruptedException { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardTotal, shardIndex, 12 ); CountDownLatch countDownLatch = new CountDownLatch (mediaProcessList.size()); if (mediaProcessList == null || mediaProcessList.size() == 0 ) { log.debug("查询到的待处理任务数为0" ); return ; } int size = mediaProcessList.size(); ExecutorService threadPool = Executors.newFixedThreadPool(size); mediaProcessList.forEach(mediaProcess -> threadPool.execute(() -> { String status = mediaProcess.getStatus(); if ("2" .equals(status)) { log.debug("该视频已经被处理,无需再次处理。视频信息:{}" , mediaProcess); countDownLatch.countDown(); return ; } String bucket = mediaProcess.getBucket(); String filePath = mediaProcess.getFilePath(); String fileId = mediaProcess.getFileId(); File originalFile = null ; File mp4File = null ; try { originalFile = File.createTempFile("original" , null ); mp4File = File.createTempFile("mp4" , ".mp4" ); } catch (IOException e) { log.error("处理视频前创建临时文件失败" ); countDownLatch.countDown(); XueChengPlusException.cast("处理视频前创建临时文件失败" ); } try { mediaFileService.downloadFileFromMinio(originalFile, bucket, filePath); } catch (Exception e) { log.error("下载原始文件过程中出错:{},文件信息:{}" , e.getMessage(), mediaProcess); countDownLatch.countDown(); XueChengPlusException.cast("下载原始文件过程出错" ); } String result = null ; try { Mp4VideoUtil videoUtil = new Mp4VideoUtil (ffmpegPath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath()); result = videoUtil.generateMp4(); } catch (Exception e) { log.error("处理视频失败,视频地址:{},错误信息:{}" , originalFile.getAbsolutePath(), e.getMessage()); countDownLatch.countDown(); XueChengPlusException.cast("处理视频失败" ); } status = "3" ; String url = null ; if ("success" .equals(result)) { String objectName = mediaFileService.getFilePathByMd5(fileId, ".mp4" ); try { mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), bucket, objectName); } catch (Exception e) { log.error("上传文件失败:{}" , e.getMessage()); XueChengPlusException.cast("上传文件失败" ); } status = "2" ; url = "/" + bucket + "/" + objectName; } mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), status, fileId, url, result); countDownLatch.countDown(); })); countDownLatch.await(30 , TimeUnit.MINUTES); } }
在media-service-dev.yaml中新增配置,指定ffmpeg安装位置
1 2 videoprocess: ffmpegpath: D:\SoftWare\ffmpeg\ffmpeg.exe
视频处理测试
进入xxl-job调度中心添加执行器和视频处理任务
在xxl-job配置任务调度策略
配置阻塞处理策略为:丢弃后续调度
配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些
如:5分钟,即使到达调度时间,如果视频没有处理完成,仍丢弃调度请求
配置完成后开始测试视频处理
首先上传至少4个视频,非mp4格式
在xxl-job启动视频处理任务
观察媒资管理服务后台日志
面试
XXL-JOB工作原理
xxl-job的工作原理是什么?xxl-job是什么?
xxl-job分布式任务调度服务由调度中心和执行器组成,调度中心负责按任务调度策略向执行器下发任务,执行器负责接收任务,执行任务
首先部署并启动xxl-job调度中心(一个java工程,打成jar包可以放到虚拟机上运行)
1 nohup java -jar xxl-job-admin... &
在微服务中添加xxl-job依赖,在微服务中配置执行器
1 2 3 4 <dependency > <groupId > com.xuxueli</groupId > <artifactId > xxl-job-core</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 xxl: job: admin: addresses: http://192.168.101.128:18088/xxl-job-admin/ executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job-jobhandler logretentiondays: 30 accessToken: default_token
配置类就是读取yml配置中的信息,创建一个Xxl的执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor () { logger.info(">>>>>>>>>>> xxl-job config init." ); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor (); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
启动微服务,执行器向调度中心上报自己
在微服务中写一个任务方法,并用xxl-job的注解去标记执行任务的方法名称
1 2 3 4 @XxlJob("testJob") public void testJob () { log.debug("开始执行......." ); }
在调度中心配置任务调度策略,调度策略就是每个多长时间执行,又或者是每天/每月的固定时间去执行等
在调度中心启动任务
调度中心根据任务调度策略,到达时间就开始下发任务给执行器
执行器收到任务就开始执行任务
如何保证任务不重复执行?
调度中心按分片广播
的方式去下发任务
执行器收到作业分片广播的参数:分片总数(shardTotal)和分片序号(shardIndex),计算任务id % 分片总数
(taskId % shardTotal),如果结果等于分片序号,就去执行这个任务(taskId % shardTotal = shardIndex)。这样就可以保证不同的执行器执行不同的任务
配置调度过期策略为忽略
,避免同一个执行器多次重复执行同一个任务
配置任务阻塞处理策略为丢弃后续调度
,注意:丢弃也没事,下一次调度还可以执行
另外还要保证任务处理的幂等性,执行过的任务可以打一个状态标记已完成(上面的代码设置status=2即为完成),下次再次调度该任务时,判断该任务已完成,就不再执行
任务幂等性如何保证?
幂等性描述的是一次和多次请求某一个资源,对于资源本身,应该返回同样的结果
幂等性是为了解决重复提交问题,例如:恶意刷单,重复支付等
解决幂等性的常用方案
数据库约束,例如:唯一索引、主键
乐观锁:常用于数据库,更新数据时,根据乐观锁的状态去更新
唯一序列号,请求前生成的唯一序列号,携带序列号去请求,执行时在redis记录该序列号,用于表示该序列号请求已经执行过了,如果相同的序列号再次来执行,则说明是重复执行。这里的解决方式是在数据库中添加状态处理字段,视频处理完成,则更新该字段为已完成,执行视频处理之前判断状态是否为已完成,若已完成则不处理
绑定媒资
需求分析
业务流程
截至目前,媒资管理已经完成文件上传、视频处理等基本功能。那么本小节就来讲解课程计划绑定媒资文件
如何将课程计划绑定媒资文件呢?
进入课程计划界面,在小节中点击添加视频/文档/作业
按钮,输入关键字搜索,进行绑定即可
数据模型
课程计划绑定媒资文件后,存储至课程计划绑定媒资表
,即teachplan_media
表中
接口定义
根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击添加视频
按钮后,用户选择视频,点击提交,前端以json格式请求以下参数
1 2 3 4 5 6 7 8 请求网址: http: 请求方法: POST 载荷: { "mediaId" : "a92da96ebcf28dfe194a1e2c393dd860" , "fileName" : "胶水.avi" , "teachplanId" : 293 }
从请求网址可以看出,该接口在内容管理模块提供,即在content-api中提供
请求方式为POST,那么我们定义一个DTO类用来接收请求参数,里面只包含载荷中的三个属性即可,在content-model中新建BindTeachplanMediaDto模型类
1 2 3 4 5 6 7 8 9 10 11 12 @Data @ApiModel(value = "BindTeachplanMediaDto", description = "教学计划-媒资绑定DTO") public class BindTeachplanMediaDto { @ApiModelProperty(value = "媒资文件id", required = true) private String mediaId; @ApiModelProperty(value = "媒资文件名称", required = true) private String fileName; @ApiModelProperty(value = "课程计划标识", required = true) private Long teachplanId; }
在content-api下的TeachplanController中定义接口
1 2 3 4 5 @ApiOperation("课程计划与媒资信息绑定") @PostMapping("/teachplan/association/media") public void associationMedia (@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto) { }
接口开发
DAO开发
使用自动生成的TeachplanMedia的Mapper即可
Service开发
1 2 3 4 5 void associationMedia (BindTeachplanMediaDto bindTeachplanMediaDto) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Transactional @Override public void associationMedia (BindTeachplanMediaDto bindTeachplanMediaDto) { Long teachplanId = bindTeachplanMediaDto.getTeachplanId(); Teachplan teachplan = teachplanMapper.selectById(teachplanId); if (teachplan == null ) { XueChengPlusException.cast("教学计划不存在" ); } Integer grade = teachplan.getGrade(); if (grade != 2 ) { XueChengPlusException.cast("只有小节允许绑定媒资信息" ); } LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper <TeachplanMedia>().eq(TeachplanMedia::getTeachplanId, teachplanId); teachplanMediaMapper.delete(queryWrapper); TeachplanMedia teachplanMedia = new TeachplanMedia (); teachplanMedia.setTeachplanId(bindTeachplanMediaDto.getTeachplanId()); teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName()); teachplanMedia.setMediaId(bindTeachplanMediaDto.getMediaId()); teachplanMedia.setCourseId(teachplan.getCourseId()); teachplanMedia.setCreateDate(LocalDateTime.now()); teachplanMediaMapper.insert(teachplanMedia); }
接口层完善
1 2 3 4 5 @ApiOperation("课程计划与媒资信息绑定") @PostMapping("/teachplan/association/media") public void associationMedia (@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto) { teachplanService.associationMedia(bindTeachplanMediaDto); }
接口测试
向指定课程计划添加视频,成功添加后,再次添加视频,则会替换掉原有的视频
实战
1 delete /teachplan/association/media/{ teachPlanId} /{ mediaId}
在TeachplanController中定义接口
1 2 3 4 5 @ApiOperation("课程计划解除媒资信息绑定") @DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}") public void unassociationMedia (@PathVariable Long teachPlanId, @PathVariable Long mediaId) { }
1 2 3 4 5 void unassociationMedia (Long teachPlanId, Long mediaId) ;
1 2 3 4 5 6 7 @Override public void unassociationMedia (Long teachPlanId, Long mediaId) { LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper <>(); queryWrapper.eq(TeachplanMedia::getTeachplanId, teachPlanId) .eq(TeachplanMedia::getMediaId, mediaId); teachplanMediaMapper.delete(queryWrapper); }
1 2 3 4 5 @ApiOperation("课程计划解除媒资信息绑定") @DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}") public void unassociationMedia (@PathVariable Long teachPlanId, @PathVariable String mediaId) { teachplanService.unassociationMedia(teachPlanId, mediaId); }
功能写完了,重启服务发现一直报404,就离谱啊 为什么报的是404呢?我路径参数100%没问题的啊 遂继续重启,无果 最后发现是我重启服务重启错了,淦