在此特别感谢黑马程序员提供的课程

写在最前

媒资管理模块

模块需求分析

模块介绍

  • 媒资管理系统是每个在线教育平台所必须具备的,百度百科对其定义如下

媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。

  • 每个教学机构都可以在媒资管理系统管理自己的教学资源,包括:视频、教案等文件
  • 目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等
  • 主要的几个功能如下:
    • 媒资查询:教学机构查询自己所拥有的的媒资信息
    • 文件上传:包括上传图片、上传文档、上传视频
    • 视频处理:视频上传成功,系统自动对视频进行编码处理
    • 文件删除:教学机构删除自己上传的媒资文件
  • 下图是课程编辑与发布的全流程,可以通过下图看到媒资在整体流程的位置

业务流程

上传图片

  • 教学机构人员在课程信息编辑页面上传课程图片,课程图片统一记录在媒资管理系统

上传视频

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件
  2. 教育机构用户在媒资管理页面中点击上传视频按钮
  3. 选择要上传的文件,自动执行文件上传
  4. 视频上传成功会自动处理,处理完成后可以预览视频

处理视频

  • 对需要转码处理的视频,系统会自动对齐处理,处理后生成视频的URL

审核媒资

  1. 运营用户登入运营平台,并进入媒资管理界面,查找待审核媒资
  2. 点击列表中媒资名称链接,可以预览该媒资,若是视频,则播放视频
  3. 点击列表中某媒资后的审核按钮,即完成媒资的审批过程

绑定媒资

  • 课程计划创建好后需要绑定媒资文件,比如:如果课程计划绑定了视频文件,进入课程在线学习界面后,点课程计划名称则在线播放视频
  • 如何将课程计划绑定媒资呢?
    1. 教育机构用户进入课程管理页面编辑某一课程,在课程大纲编辑页的某一小节后,可以添加媒资信息
    2. 点击添加视频,会弹出对话框,可通过输入视频关键字搜索已审核通过的视频媒资
    3. 选择视频媒资,点击提交安努,完成课程计划绑定媒资流程

数据模型

  • 本模块妹子恩建相关数据表如下
    1. media_files:存储文件信息,包括图片、视频、文档等
    2. media_process:待处理视频表
    3. media_process_history:视频处理历史表,记录已经处理成功的视频信息

搭建模块环境

架构分析

  • 当前要开发的是媒资管理服务,目前为止共三个微服务:内容管理、系统管理、媒资管理
  • 后期还会添加更多的微服务,目前这种由前端直接请求微服务的方式存在弊端
  • 如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,例如下面这种
1
2
3
4
5
6
// 列表
export async function dictionaryAll(params: any = undefined, body: any = undefined): Promise<ISystemDictionary[]> {
//const { data } = await createAPI('/system/dictionary/all', 'get', params, body)
const { data } = await createAPI('http://localhost:53110/system/dictionary/all', 'get', params, body)
return data
}
  • 当系统上限后,需要将localhost改为公网域名,如果这种地址非常多,那么修改会很麻烦
  • 基于这个问题,可以采用网关来解决
  • 这样在前端代码中只需要指定每个接口的相对路径
1
2
3
4
5
6
// 列表
export async function dictionaryAll(params: any = undefined, body: any = undefined): Promise<ISystemDictionary[]> {
//const { data } = await createAPI('/system/dictionary/all', 'get', params, body)
const { data } = await createAPI('/system/dictionary/all', 'get', params, body)
return data
}
  • 在前端代码的一个固定的地方在接口地址前统一添加网关地址,每个请求统一到网关,由网关将请求转发到具体的微服务
  • 有了网关就可以对请求进行路由,例如:可以根据请求路径路由、根据host地址路由等。当微服务有多个实例时,还可以通过负载均衡算法进行路由
  • 此外,网关还可以实现权限控制、限流等功能
  • 本项目采用SpringCloudGateway作为网关,网关在请求路由时,需要知道每个微服务实例的地址。
  • 项目使用Nacos作为服务发现中心和配置中心,整体架构如下
  • 流程如下
    1. 微服务启动,将自己注册到Nacos,Nacos记录了个微服务实例的地址
    2. 网关从Nacos读取服务列表,包括服务名称、服务地址等
    3. 请求到达网关,网关将请求路由到具体的微服务
  • 要使用网关首先搭建Nacos、Nacos有两个作用
    1. 服务发现中心
      • 微服务将自身注册到Nacos,网关从Nacos获取微服务列表
    2. 配置中心
      • 微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置

搭建Nacos

  • 在此之前我们先下载安装并启动
    • 拉取镜像
    1
    docker pull nacos/nacos-server:1.4.1
    • 创建并运行一个容器
    1
    docker run --name nacos -e MODE=standalone -p 8849:8848 -d nacos/nacos-server:1.4.1
    • 访问虚拟机ip:8848/nacos登录,默认的账号密码均为nacos

服务发现中心

  • 搭建Nacos服务发现中心之前,需要搞清楚两个概念
    1. namespace:用于区分环境,例如:开发环境dev、测试环境test、生产环境prod
    2. group:用于区分项目,例如xuecheng-plus、reggie项目
  • 首先,在Nacos配置namespace
    • 新增开发环境命名空间
    • 用同样的方法新增生产环境和测试环境的命名空间
  • 随后将各微服务注册到Nacos
    1. 在xuecheng-plus-parent中添加依赖管理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <properties>
    <spring-cloud-alibaba.version>2.2.6.RELEASE</spring-cloud-alibaba.version>
    </properties>
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>${spring-cloud-alibaba.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    1. 在xuecheng-content-api、xuecheng-plus-system中添加依赖
    1
    2
    3
    4
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
    </dependency>
    1. 配置nacos地址
      • 在content-service的application.yml中配置如下信息
      1
      2
      3
      4
      5
      6
      7
      8
      9
      spring:
      application:
      name: content-service
      cloud:
      nacos:
      server-addr: 127.0.0.1:8848
      discovery:
      namespace: dev
      group: xuecheng-plus-project
      • 在system-service的application.yml中配置如下信息
      1
      2
      3
      4
      5
      6
      7
      8
      9
      spring:
      application:
      name: system-service
      cloud:
      nacos:
      server-addr: 127.0.0.1:8848
      discovery:
      namespace: dev
      group: xuecheng-plus-project
    2. 重启这两个服务,进入Nacos中查看服务列表

配置中心

配置三要素
  • 搭建完Nacos服务发现中心,现在我们来搭建Nacos配置中心,其目的就是通过Nacos去管理项目中的所有配置
  • 那么先将项目中的配置文件进行分类
    1. 每个项目特有的配置
    2. 项目所公用的配置
      • 是指一些在若干项目中配置内容相同的配置,例如redis的配置,很多项目用的同一套redis服务,所以配置也一样
  • 另外,还需要知道Nacos如何去定位一个具体的配置文件,即配置三要素
    • namespace
    • group
    • dataid
  • 通过namespace、group找到具体的环境和具体的项目
  • 通过dataid找到具体的配置文件,dataid由三部分组成
    • 例如content-service-dev.yml,由content-servicedevyml三部分组成
      • content-service:它是在application.yml中配置的应用名,即spring.application.name的值
      • dev:它是环境名,由spring.profile.active指定
      • yml:它是配置文件的后缀
  • 所以,如果我们要配置content-service工程的配置文件
    • 在开发环境中配置content-service-dev.yml
    • 在生产环境中配置content-service-prod.yml
    • 在测试环境中配置content-service-test.yml
配置content-service
  • 下面以配置content-service为例,在开发环境下添加content-service-dev.yaml配置
  • 为什么不在nacos中配置如下内容?
1
2
3
spring:
application:
name: content-service
  • 因为Nacos是先根据spring.cloud.nacos.server-addr获取Nacos地址,再根据${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}作为文件id,来读取配置。
  • 而读取配置文件的顺序如下
    1. bootstrap.yml
    2. nacos中的配置文件
    3. 本地application.yml
  • 所以我们要先在bootstrap.yml中配置文件id,然后nacos才知道怎么读取配置文件。
  • 那么我们现在在content-service下创建bootstrap.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: content-service
cloud:
nacos:
server-addr: 127.0.0.1:8848
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
profiles:
active: dev
  • 删除原本的application.yml,在content-service工程中添加依赖
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
  • 随后运行测试方法,看看能否正产读取数据库的配置信息,并读取数据
1
2
3
4
5
@Test
void contextQueryCourseTest() {
PageResult<CourseBase> result = courseBaseInfoService.queryCourseBaseList(new PageParams(1L, 10L), new QueryCourseParamDto());
log.info("查询到数据:{}", result);
}
配置content-api
  • 以相同的方法配置content-api,在nacos的开发环境下新建配置content-api-dev.yaml
  • 我们先来看看哪些配置可以交给Nacos管理
    • 原本content-api的配置如下,除了服务名,都可以交给content-api-dev.yaml管理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    server:
    servlet:
    context-path: /content
    port: 53040

    spring:
    application:
    name: content-api

    ## 日志文件配置路径
    logging:
    config: classpath:log4j2-dev.xml

    ## swagger 文档配置
    swagger:
    title: "学成在线内容管理系统"
    description: "内容系统管理系统对课程相关信息进行业务管理数据"
    base-package: com.xuecheng.content
    enabled: true
    version: 1.0.0
  • 经过我们的分析,content-api-dev.yaml的内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
servlet:
context-path: /content
port: 53040

## 日志文件配置路径
logging:
config: classpath:log4j2-dev.xml

## swagger 文档配置
swagger:
title: "学成在线内容管理系统"
description: "内容系统管理系统对课程相关信息进行业务管理数据"
base-package: com.xuecheng.content
enabled: true
version: 1.0.0
  • 那么我们现在就需要配置content-apibootstrap.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#微服务配置
spring:
application:
name: content-api
cloud:
nacos:
server-addr: 127.0.0.1:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: content-service-${spring.profiles.active}.yaml
group: xuecheng-plus-project
refresh: true
profiles:
active: dev
  • 注意:因为content-api接口工程依赖了content-service工程的jar,而content-service的配置也交由nacos管理了(主要是数据库配置),所以我们现在需要extension-configs扩展配置文件的方式引用service工程的配置文件
  • 如果需要添加多个扩展文件,继续在下面添加即可
1
2
3
4
5
6
7
extension-configs:
- data-id: content-service-${spring.profiles.active}.yaml
group: xuecheng-plus-project
refresh: true
- data-id: 填写文件 dataid
group: xuecheng-plus-project
refresh: true

公用配置

  • nacos还提供了shared-configs,可以引用公用配置
  • 我们之前在content-api中配置了swagger,并且所有接口工程都需要配置swagger
  • 那么我们这里就可以将swagger的配置定义为一个公用配置,哪个项目需要,哪个项目就引用
  • 接下来,我们就着手创建xuecheng-plus的公用配置,进入nacos的开发环境,添加swagger-dev.yaml公用配置,这里的group可以设置为xuecheng-plus-common,该组下的内容都作为xuecheng-plus的公用配置
  • 删除content-api-dev.yaml中的swagger配置,在content-apibootstrap.yml中使用shared-config添加公用配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  server:
servlet:
context-path: /content
port: 53040

## 日志文件配置路径
logging:
config: classpath:log4j2-dev.xml
## swagger 文档配置
- swagger:
- title: "学成在线内容管理系统"
- description: "内容系统管理系统对课程相关信息进行业务管理数据"
- base-package: com.xuecheng.content
- enabled: true
- version: 1.0.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#微服务配置
spring:
application:
name: content-api
cloud:
nacos:
server-addr: 127.0.0.1:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: content-service-${spring.profiles.active}.yaml
group: xuecheng-plus-project
refresh: true
+ shared-configs:
+ - data-id: swagger-${spring.profiles.active}.yaml
+ group: xuecheng-plus-common
+ refresh: true

profiles:
active: dev

  • 再以相同的方法配置日志的公用配置
  • 删除content-api-dev.yaml中的logging配置,在content-apibootstrap.yml中使用shared-config添加公用配置
1
2
3
4
5
6
7
8
  server:
servlet:
context-path: /content
port: 53040

- ## 日志文件配置路径
- logging:
- config: classpath:log4j2-dev.xml
1
2
3
4
5
6
7
    shared-configs:
- data-id: swagger-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
+ - data-id: logging-${spring.profiles.active}.yaml
+ group: xuecheng-plus-common
+ refresh: true
  • 那么到此为止,配置完成,重启content-api,访问swagger页面,查看swagger接口文档是否可以正常显示,查看控制台log4j2日志是否正常输出

系统管理配置

  • 按照上面的方法,将系统管理服务的配置信息在nacos上进行配置
  • 在开发环境下添加system-service-dev.yaml配置
  • 在system-service下创建bootstrap.yml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
application:
name: system-service
cloud:
nacos:
server-addr: 127.0.0.1:8848
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
profiles:
active: dev
  • 在开发环境下添加system-api-dev.yaml配置
  • 在system-api下创建bootstrap.yml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#微服务配置
spring:
application:
name: system-api
cloud:
nacos:
server-addr: 127.0.0.1:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: system-service-${spring.profiles.active}.yaml
group: xuecheng-plus-project
refresh: true
shared-configs:
- data-id: swagger-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
- data-id: logging-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
profiles:
active: dev

配置优先级

  • 到目前位置,已经将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的bootstrap.yaml和nacos上的配置文件
  • 引入配置文件的形式有
    1. 通过dataid方式引入
    2. 以扩展配置文件方式引入
    3. 以公用配置文件方式引入
  • 各配置文件的优先级:项目应用名配置文件(content-api-dev.yaml) > 扩展配置文件(content-service-dev.html) > 共享配置文件(swagger-dev.yaml) > 本地配置文件(application.yml)
  • 有时候我们在测试程序时,直接在本地加一个配置进行测试,这时候我们想让本地配置优先级最高,那么可以在nacos配置文件中配置如下内容
1
2
3
4
spring:
cloud:
config:
override-none: true

搭建Gateway

  • 本项目使用SpringCloudGateway作为网关,下面创建网关工程
  • 指定父工程,添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!--指定父工程为xuecheng-plus-parent-->
<parent>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-plus-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../xuecheng-plus-parent</relativePath>
</parent>

<artifactId>xuecheng-plus-gateway</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>xuecheng-plus-gateway</name>
<description>xuecheng-plus-gateway</description>
<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--服务发现中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<!-- 排除 Spring Boot 依赖的日志包冲突 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spring Boot 集成 log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
</dependencies>

</project>
  • 配置网关的bootstrap.yml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#微服务配置
spring:
application:
name: gateway
cloud:
nacos:
server-addr: 127.0.0.1:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
profiles:
active: dev
  • 在nacos的开发环境下创建gateway-dev.yaml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 53010 ## 网关端口
spring:
cloud:
gateway:
routes:
- id: content-api ## 路由id,自定义,只要唯一即可
uri: lb://content-api ## 路由的目标地址 lb就是负载均衡,后面跟服务名称
predicates: ## 路由断言,也就是判断请求是否符合路由规则的条件
- Path=/content/** ## 这个是按照路径匹配,只要以/content/开头就符合要求
- id: system-api
uri: lb://system-api
predicates:
- Path=/system/**
  • 在http-client.env.json中配置网关的地址
1
2
3
4
5
6
7
8
9
10
{
"dev": {
"host": "localhost:53010",
"content_host": "localhost:53040",
"system_host": "localhost:53110",
"media_host": "localhost:53050",
"cache_host": "localhost:53035",
+ "gateway_host": "localhost:53010"
}
}
  • 使用HTTP Client测试课程查询接口
1
2
3
4
5
6
7
8
9
#### 课程查询列表
POST {{gateway_host}}/content/course/list?pageNo=1&pageSize=2
Content-Type: application/json

{
"auditStatus": "",
"courseName": "",
"publishStatus": ""
}
  • 运行,得到正常请求数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
POST http://localhost:53010/content/course/list?pageNo=1&pageSize=2

HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json
Date: Tue, 14 Feb 2023 10:25:35 GMT

{
"items": [
{
"id": 1,
"companyId": 22,
"companyName": null,
"name": "JAVA8/9/10新特性讲解啊",

···

}

搭建媒资工程

  • 从课程资料中获取媒资工程xuecheng-plus-media,拷贝到项目工程根目录,修改其bootstrap中的nacos连接信息
  • 创建媒资数据库xc_media,并导入数据库脚本
  • 在nacos的开发环境下创建media-api-dev.yamlmedia-service-dev.yaml
1
2
3
4
server:
servlet:
context-path: /media
port: 53050
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
spring:
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
dynamic:
primary: content #设置默认的数据源或者数据源组,默认值即为master
strict: true #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候回抛出异常,不启动会使用默认数据源.
druid:
initial-size: 3
max-active: 5
min-idle: 5
max-wait: 60000
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
validation-query: SELECT 1 FROM DUAL
test-while-idle: true
test-on-borrow: false
test-on-return: false
stat-view-servlet:
enabled: true
url-pattern: /druid/*
#login-username: admin
#login-password: admin
filter:
stat:
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true
datasource:
content:
url: jdbc:mysql://localhost:3306/xc_content?serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8
username: root
password: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
media:
url: jdbc:mysql://localhost:3306/xc_media?serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8
username: root
password: mysql
driver-class-name: com.mysql.cj.jdbc.Driver

分布式文件系统

什么是分布式文件系统

  • 我们先来看看文件系统的定义

文件系统是操作系统用于明确存储设备(常见的是磁盘,也有基于NAND Flash的固态硬盘)或分区上的文件的方法和数据结构;即在存储设备上组织文件的方法。操作系统中负责管理和存储文件信息的软件机构称为文件管理系统,简称文件系统。文件系统由三部分组成:文件系统的接口,对对象操纵和管理的软件集合,对象及属性。从系统角度来看,文件系统是对文件存储设备的空间进行组织和分配,负责文件存储并对存入的文件进行保护和检索的系统。具体地说,它负责为用户建立文件,存入、读出、修改、转储文件,控制文件的存取,当用户不再使用时撤销文件等。

  • 文件系统是负责管理和存储文件和系统软件,操作系统通过文件系统提供的借口去存取文件,用户通过操作系统访问磁盘上的文件
  • 常见的文件系统:FAT16/FAT32、NTFS、HFS、UFS、APFS、XFS、Ext4等
  • 现在有个问题,一些短视频平台拥有大量的视频、图片,这些视频文件、图片文件该如何存储呢?如何存储可以满足互联网上海量用户的浏览
    • 这里说的分布式文件系统就是海量用户查阅海量文件的方案
  • 我们再来看看分布式文件系统的定义

分布式文件系统(Distributed File System,DFS)是指文件系统管理的物理存储资源不一定直接连接在本地节点上,而是通过计算机网络与节点(可简单的理解为一台计算机)相连;或是若干不同的逻辑磁盘分区或卷标组合在一起而形成的完整的有层次的文件系统。DFS为分布在网络上任意位置的资源提供一个逻辑上的树形文件系统结构,从而使用户访问分布在网络上的共享文件更加简便。单独的 DFS共享文件夹的作用是相对于通过网络上的其他共享文件夹的访问点

  • 可以简单的理解为:一个计算机无法存储海量的文件,通过网络将若干计算机组织起来共同去存储海量的文件,去接收海量用户的请求,这些组织起来的计算机通过计算机网络通信

  • 这样做的好处

    1. 一台计算机的文件系统处理能力扩充到多台计算机同时处理
    2. 一台计算机挂了, 还有另外副本计算机提供数据
    3. 每台计算机可以放在不同的地域,这样用户就可以就近访问,提高访问速度
  • 市面上有哪些分布式文件系统的产品呢?

    1. NFS(Linux里讲过)

      NFS是基于UDP/IP协议的应用,其实现主要是采用远程过程调用RPC机制,RPC提供了一组与机器、操作系统以及低层传送协议无关的存取远程文件的操作。RPC采用了XDR的支持。XDR是一种与机器无关的数据描述编码的协议,他以独立与任意机器体系结构的格式对网上传送的数据进行编码和解码,支持在异构系统之间数据的传送。

      • 特点
        • 在客户端上映射NFS服务器的驱动器
        • 客户端通过万国访问NFS服务器的硬盘完全透明
    2. GFS

      GFS是一个可扩展的分布式文件系统,用于大型的、分布式的、对大量数据进行访问的应用。它运行于廉价的普通硬件上,并提供容错功能。它可以给大量的用户提供总体性能较高的服务。

      • GFS采用主从结构,一个GFS集群由一个master和大量的chunkserver组成
      • master存储了数据文件的元数据,一个文件被分成了若干块存储在多个chunkserver中
      • 用户从master中获取数据元信息,向chunkserver存储数据
    3. HDFS

      Hadoop分布式文件系统(HDFS)是指被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统(Distributed File System)。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。
      HDFS有着高容错性(fault-tolerant)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以实现流的形式访问(streaming access)文件系统中的数据。

      • HDFS采用主从结构,一个HDFS集群由一个名称节点和若干数据节点组成
      • 名称节点存储数据的元信息,一个完整的数据文件分成若干块存储在数据节点
      • 客户端从名称节点获取数据的元信息及数据分块的信息,得到信息客户端即可从数据块来存储数据
    4. 云计算厂家
      • 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。其数据设计持久性不低于 99.9999999999%(12 个 9),服务设计可用性(或业务连续性)不低于 99.995%。
      • 百度对象存储BOS提供稳定、安全、高效、高可扩展的云存储服务。您可以将任意数量和形式的非结构化数据存入BOS,并对数据进行管理和处理。BOS支持标准、低频、冷和归档存储等多种存储类型,满足多场景的存储需求。

MinIO

介绍

  • 本项目采用MinIO构建分布式文件系统,MinIO是一个非常轻量的服务,可以很简单的和其他应用结合使用。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等
  • 它的一大特点就是轻量,使用简单、功能强大,支持各种平台,单个文件最大5TB,兼容提供了Java、Python、GO等多版本SDK支持
  • 官网:https://min.io/,
  • 中文:https://www.minio.org.cn/http://docs.minio.org.cn/docs/
  • MinIO采用去中心化共享架构,每个节点是对等关系,通过Nginx可对MinIO进行负载均衡访问
  • 去中心化有什么好处?
    • 在大数据领域,通常的设计理念都是无中心和分布式。MinIO分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置
  • 它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式MinIO避免了单点故障
  • MinIO使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块,冗余地分散存储在各个节点的磁盘上,所有可用的磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时,会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验开会分散的存储在这8块硬盘上
  • 使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍可以恢复数据。例如上面集合中有4个以内的硬盘损害,仍可保证数据恢复,不影响上传和下载;但如果多余一半的硬盘损坏,则无法恢复。
  • MinIO下载地址:https://dl.min.io/server/minio/release/
  • 安装完毕后,CMD进入minio.exe所在目录,执行下面的命令,会在D盘创建4个目录,模拟4个硬盘
1
minio.exe server D:\develop\minio_data\data1  D:\develop\minio_data\data2  D:\develop\minio_data\data3  D:\develop\minio_data\data4
  • 启动结果如下
  • 默认账号密码均为minioadmin,访问localhost:9000进行登录
  • 不过我们这里由于条件有限,所以先不做分布式,修改启动命令
1
minio.exe server d:\minio_data
  • 之后创建两个buckets
    • mediafiles:普通文件
    • video:视频文件

SDK

上传文件
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.4.3</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.8.1</version>
</dependency>
  • 从官方文档中看到,需要三个参数才能连接到minio服务
Parameters Description
Endpoint URL to S3 service.
Access Key Access key (aka user ID) of an account in the S3 service.
Secret Key Secret key (aka password) of an account in the S3 service.
  • 官方文档给出的示例代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.minio.BucketExistsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.UploadObjectArgs;
import io.minio.errors.MinioException;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

public class FileUploader {
public static void main(String[] args)
throws IOException, NoSuchAlgorithmException, InvalidKeyException {
try {
// Create a minioClient with the MinIO server playground, its access key and secret key.
MinioClient minioClient =
MinioClient.builder()
.endpoint("https://play.min.io")
.credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")
.build();

// Make 'asiatrip' bucket if not exist.
boolean found =
minioClient.bucketExists(BucketExistsArgs.builder().bucket("asiatrip").build());
if (!found) {
// Make a new bucket called 'asiatrip'.
minioClient.makeBucket(MakeBucketArgs.builder().bucket("asiatrip").build());
} else {
System.out.println("Bucket 'asiatrip' already exists.");
}

// Upload '/home/user/Photos/asiaphotos.zip' as object name 'asiaphotos-2015.zip' to bucket
// 'asiatrip'.
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("asiatrip")
.object("asiaphotos-2015.zip")
.filename("/home/user/Photos/asiaphotos.zip")
.build());
System.out.println(
"'/home/user/Photos/asiaphotos.zip' is successfully uploaded as "
+ "object 'asiaphotos-2015.zip' to bucket 'asiatrip'.");
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
System.out.println("HTTP trace: " + e.httpTrace());
}
}
}
  • 那么我们在其基础上进行修改,完成基本的上传、下载和删除功能
  • 我们先来分析一下示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class FileUploader {
public static void main(String[] args)
throws IOException, NoSuchAlgorithmException, InvalidKeyException {
try {
// Create a minioClient with the MinIO server playground, its access key and secret key.
// 创建MinIO客户端,连接参数就是上述表格中的三个参数,127.0.0.1:9000、minioadmin、minioadmin
MinioClient minioClient =
MinioClient.builder()
.endpoint("https://play.min.io")
.credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")
.build();

// Make 'asiatrip' bucket if not exist.
// 由于backet我们已经手动创建了,所以这段代码可以删掉
boolean found =
minioClient.bucketExists(BucketExistsArgs.builder().bucket("asiatrip").build());
if (!found) {
// Make a new bucket called 'asiatrip'.
minioClient.makeBucket(MakeBucketArgs.builder().bucket("asiatrip").build());
} else {
System.out.println("Bucket 'asiatrip' already exists.");
}

// Upload '/home/user/Photos/asiaphotos.zip' as object name 'asiaphotos-2015.zip' to bucket
// 'asiatrip'.
// 将 '/home/user/Photos/asiaphotos.zip' 文件命名为 'asiaphotos-2015.zip'
// 并上传到 'asiatrip' 里(示例代码创建的bucket)
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("asiatrip")
.object("asiaphotos-2015.zip")
.filename("/home/user/Photos/asiaphotos.zip")
.build());
// 这段输出也没有用,可以直接删掉
System.out.println(
"'/home/user/Photos/asiaphotos.zip' is successfully uploaded as "
+ "object 'asiaphotos-2015.zip' to bucket 'asiatrip'.");
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
System.out.println("HTTP trace: " + e.httpTrace());
}
}
}
  • 在此之前,我们先创建一个名为testbucket的桶,然后将其权限修改为public
  • 那我们现在把不要的代码删掉,再修改一下连接参数、bucket名、和要上传的文件路径,就可以进行测试了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SpringBootTest
public class MinIOTest {
// 创建MinioClient对象
static MinioClient minioClient =
MinioClient.builder()
.endpoint("http://127.0.0.1:9000")
.credentials("minioadmin", "minioadmin")
.build();

/**
* 上传测试方法
*/
@Test
public void uploadTest() {
try {
minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("testbucket")
.object("pic01.png") // 同一个桶内对象名不能重复
.filename("D:\\Picture\\background\\01.png")
.build()
);
System.out.println("上传成功");
} catch (Exception e) {
System.out.println("上传失败");
}
}
}
  • 由于我们已经将桶的权限修改为了public,所以我们直接访问http://127.0.0.1:9000/testbucket/pic01.png, 也是可以直接看到上传的图片的
删除文件
  • 编写测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void deleteTest() {
try {
minioClient.removeObject(RemoveObjectArgs
.builder()
.bucket("testbucket")
.object("pic01.png")
.build());
System.out.println("删除成功");
} catch (Exception e) {
System.out.println("删除失败");
}
}
查询文件
  • 编写测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void getFileTest() {
try {
InputStream inputStream = minioClient.getObject(GetObjectArgs.builder()
.bucket("testbucket")
.object("pic01.png")
.build());
FileOutputStream fileOutputStream = new FileOutputStream("C:\\Users\\15863\\Desktop\\tmp.png");
byte[] buffer = new byte[1024];
int len;
while ((len = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer,0,len);
}
inputStream.close();
fileOutputStream.close();
System.out.println("下载成功");
} catch (Exception e) {
System.out.println("下载失败");
}
}
  • 用IOUtils简化代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void getFileTest() {
try {
InputStream inputStream = minioClient.getObject(GetObjectArgs.builder()
.bucket("testbucket")
.object("pic01.png")
.build());
FileOutputStream fileOutputStream = new FileOutputStream("C:\\Users\\15863\\Desktop\\tmp.png");
IOUtils.copy(inputStream,fileOutputStream);
System.out.println("下载成功");
} catch (Exception e) {
System.out.println("下载失败");
}
}

上传图片

需求分析

业务流程

  • 我们在新增课程的时候,需要上传课程图片
  • 课程图片上传至分布式文件系统,在课程信息中保存课程图片路径,流程如下
  1. 前端进入上传图片界面
  2. 上传图片,请求媒资管理服务
  3. 媒资管理服务将图片文件存储在MinIO
  4. 媒资管理记录文件信息到数据库
  5. 保存课程信息,在内容管理数据库保存图片地址

  • 媒资管理服务由接口层和业务层共同完成,具体分工如下
    • 用户上传图片请求至媒资管理的接口层,接口层解析文件信息,通过业务层将文件保存至minio和数据库

数据模型

  • 涉及到的数据表主要是媒资信息

环境准备

  • 在minio中配置bucket,创建一个名为mediafiles的bucket,并将其权限设置为public
  • 在nacos中配置minio的相关信息,在nacos的开发环境下新增配置media-service-dev.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/xc_media?serverTimezone=UTC&userUnicode=true&useSSL=false
username: root
password: A10ne,tillde@th.
cloud:
config:
override-none: true

minio:
endpoint: http://127.0.0.1:9000
accessKey: minioadmin
secretKey: minioadmin
bucket:
files: mediafiles
videofiles: video
  • 在media-service工程下配置bootstrap.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
application:
name: media-service
cloud:
nacos:
server-addr: 127.0.0.1:8848
discovery:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
config:
namespace: ${spring.profiles.active}
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
shared-configs:
- data-id: logging-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true

#profiles默认为dev
profiles:
active: dev
  • 在media-service工程下编写minio的配置类
    • 该配置类中药根据yaml中的minio配置信息,创建一个MinioClient对象,并声明为bean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class MinioConfig {
@Value("${minio.endpoint}")
private String endpoint;
@Value("${minio.accessKey}")
private String accessKey;
@Value("${minio.secretKey}")
private String secretKey;

@Bean
public MinioClient minioClient() {
return MinioClient.builder().
endpoint(endpoint).
credentials(accessKey, secretKey).
build();
}
}

接口定义

  • 根据需求分析,下面进行接口定义。
  • 此接口定义为一个通用的上传文件的接口,可以上传图片或其他文件
  • 首先分析接口
    • 请求地址:/media/upload/coursefile
    • 请求参数:Content-Type: multipart/form-data;boundary=… FormData: filedata=??, folder=?, objectName=?
    • 响应参数:文件信息,如下,其内容与media_files表中的字段完全一致
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    {
    "id": "a16da7a132559daf9e1193166b3e7f52",
    "companyId": 1232141425,
    "companyName": null,
    "filename": "1.jpg",
    "fileType": "001001",
    "tags": "",
    "bucket": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
    "fileId": "a16da7a132559daf9e1193166b3e7f52",
    "url": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
    "timelength": null,
    "username": null,
    "createDate": "2022-09-12T21:57:18",
    "changeDate": null,
    "status": "1",
    "remark": "",
    "auditStatus": null,
    "auditMind": null,
    "fileSize": 248329
    }
  • 定义模型类,虽然响应结果与MediaFiles表中的字段完全一致,但最好不要直接用MediaFiles类。因为该类属于PO类,如果后期我们要对响应结果进行修改,那么模型类也需要进行修改,但是MediaFiles是PO类,我们不能动。所以可以直接用一个类继承MediaFiles,里面什么属性都不用加
1
2
3
@Data
public class UploadFileResultDto extends MediaFiles {
}
  • 定义接口,其中folder和objectName这两个参数不一定传,所以将其required设为false
1
2
3
4
5
6
7
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile upload,
@RequestParam(value = "folder", required = false) String folder,
@RequestParam(value = "objectName", required = false) String objectName) {
return null;
}

接口开发

DAO开发

  • 根据需求分析,DAO层实现向media_file表中插入一条记录,使用media_files表生成的mapper即可

Service开发

  • Service方法需要听歌一个更加通用的保存文件的方法
  • 定义请求参数类,上传文件,我们需要文件名称、文件的content-type、文件类型(文档、视频、图片等,对应数据字典表中的类型)、文件大小、标签、上传人、备注
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 @Data
@ToString
public class UploadFileParamsDto {

/**
* 文件名称
*/
private String filename;

/**
* 文件content-type
*/
private String contentType;

/**
* 文件类型(文档,图片,视频)
*/
private String fileType;
/**
* 文件大小
*/
private Long fileSize;

/**
* 标签
*/
private String tags;

/**
* 上传人
*/
private String username;

/**
* 备注
*/
private String remark;
}
  • 定义service方法,MultipartFile是SpringMVC提供简化上传操作的工具类,不使用框架之前,都是使用原生的HttpServletRequest来接收上传的数据,文件是以二进制流传递到后端的。为了使接口更通用,我们可以用字节数组代替MultpartFile类型
1
2
3
4
5
6
7
8
9
10
/**
* @description 上传文件的通用接口
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return com.xuecheng.media.model.dto.UploadFileResultDto
*/
UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName);
  • 实现方法如下,主要分为两部分
    1. 将文件上传到minio
    2. 将文件信息写入media_file表中
  • 将文件上传到minio
  • 前面我们上传文件是用的uploadObject方法,是从本地磁盘上传文件,为了使方法更通用,这里使用putObject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Autowired
MinioClient minioClient;

// 从配置文件获取bucket
@Value("${minio.bucket.files}")
private String bucket_files;

/**
*
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return
*/
@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {

if (StringUtils.isEmpty(folder)) {
// 如果目录不存在,则自动生成一个目录
folder = getFileFolder(true, true, true);
} else if (!folder.endsWith("/")) {
// 如果目录末尾没有 / ,替他加一个
folder = folder + "/";
}
if (StringUtils.isEmpty(objectName)) {
// 如果文件名为空,则设置其默认文件名为文件的md5码 + 文件后缀名
String filename = uploadFileParamsDto.getFilename();
objectName = DigestUtils.md5DigestAsHex(bytes) + filename.substring(filename.lastIndexOf("."));
}
objectName = folder + objectName;
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucket_files)
.object(objectName)
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1)
.contentType(uploadFileParamsDto.getContentType())
.build());
} catch (Exception e) {

}
return null;
}

/**
* 自动生成目录
* @param year 是否包含年
* @param month 是否包含月
* @param day 是否包含日
* @return
*/
private String getFileFolder(boolean year, boolean month, boolean day) {
StringBuffer stringBuffer = new StringBuffer();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String dateString = dateFormat.format(new Date());
String[] split = dateString.split("-");
if (year) {
stringBuffer.append(split[0]).append("/");
}
if (month) {
stringBuffer.append(split[1]).append("/");
}
if (day) {
stringBuffer.append(split[2]).append("/");
}
return stringBuffer.toString();
}
  • 将文件信息写入media_file表中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 保存到数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMD5);
mediaFiles.setFileId(fileMD5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket_files);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket_files + "/" + objectName);
// 查阅数据字典,002003表示审核通过
mediaFiles.setAuditStatus("002003");
}
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败");
}
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
return uploadFileResultDto;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
@Autowired
MediaFilesMapper mediaFilesMapper;

@Autowired
MinioClient minioClient;

@Value("${minio.bucket.files}")
private String bucket_files;

/**
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return
*/
@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {
String fileMD5 = DigestUtils.md5DigestAsHex(bytes);
if (StringUtils.isEmpty(folder)) {
// 如果目录不存在,则自动生成一个目录
folder = getFileFolder(true, true, true);
} else if (!folder.endsWith("/")) {
// 如果目录末尾没有 / ,替他加一个
folder = folder + "/";
}
if (StringUtils.isEmpty(objectName)) {
// 如果文件名为空,则设置其默认文件名为文件的md5码 + 文件后缀名
String filename = uploadFileParamsDto.getFilename();
objectName = fileMD5 + filename.substring(filename.lastIndexOf("."));
}
objectName = folder + objectName;
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
// 上传到minio
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucket_files)
.object(objectName)
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1)
.contentType(uploadFileParamsDto.getContentType())
.build());
// 保存到数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMD5);
mediaFiles.setFileId(fileMD5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket_files);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket_files + "/" + objectName);
// 查阅数据字典,002003表示审核通过
mediaFiles.setAuditStatus("002003");
}
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败");
}
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
return uploadFileResultDto;
} catch (Exception e) {
XueChengPlusException.cast("上传过程中出错");
}
return null;
}

/**
* 自动生成目录
* @param year 是否包含年
* @param month 是否包含月
* @param day 是否包含日
* @return
*/
private String getFileFolder(boolean year, boolean month, boolean day) {
StringBuffer stringBuffer = new StringBuffer();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String dateString = dateFormat.format(new Date());
String[] split = dateString.split("-");
if (year) {
stringBuffer.append(split[0]).append("/");
}
if (month) {
stringBuffer.append(split[1]).append("/");
}
if (day) {
stringBuffer.append(split[2]).append("/");
}
return stringBuffer.toString();
}
private String getFileFolder(boolean year, boolean month, boolean day) {
StringBuffer stringBuffer = new StringBuffer();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String dateString = dateFormat.format(new Date());
String[] split = dateString.split("-");
if (year) {
stringBuffer.append(split[0]).append("/");
}
if (month) {
stringBuffer.append(split[1]).append("/");
}
if (day) {
stringBuffer.append(split[2]).append("/");
}
return stringBuffer.toString();
}

完善Controller

  • 完善接口层代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile upload,
@RequestParam(value = "folder", required = false) String folder,
@RequestParam(value = "objectName", required = false) String objectName) {
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
uploadFileParamsDto.setFileSize(upload.getSize());
String contentType = upload.getContentType();
if (contentType.contains("image")) {
// 图片
uploadFileParamsDto.setFileType("001001");
} else {
// 其他
uploadFileParamsDto.setFileType("001003");
}
uploadFileParamsDto.setFilename(upload.getOriginalFilename());
uploadFileParamsDto.setContentType(contentType);
Long companyId = 1232141425L;
try {
UploadFileResultDto uploadFileResultDto = mediaFileService.uploadFile(companyId, uploadFileParamsDto, upload.getBytes(), folder, objectName);
return uploadFileResultDto;
} catch (IOException e) {
XueChengPlusException.cast("上传文件过程出错");
}
return null;
}
  • 使用HTTP Client测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#### 上传文件
POST {{media_host}}/media/upload/coursefile
Content-Type: multipart/form-data; boundary=WebAppBoundary

--WebAppBoundary
Content-Disposition: form-data; name="filedata"; filename="test01.jpg"
Content-Type: application/octet-stream

< C:\Users\kyle\Desktop\Picture\photo\bg01.jpg

## 响应结果如下
POST http://localhost:53050/media/upload/coursefile

HTTP/1.1 200
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 16 Feb 2023 09:57:48 GMT
Keep-Alive: timeout=60
Connection: keep-alive

{
"id": "632fb34166d91865da576032b9330ced",
"companyId": 1232141425,
"companyName": null,
"filename": "test01.jpg",
"fileType": "001003",
"tags": null,
"bucket": "mediafiles",
"filePath": "2023/57/16/632fb34166d91865da576032b9330ced.jpg",
"fileId": "632fb34166d91865da576032b9330ced",
"url": "/mediafiles/2023/57/16/632fb34166d91865da576032b9330ced.jpg",
"username": null,
"createDate": "2023-02-16 17:57:48",
"changeDate": null,
"status": "1",
"remark": "",
"auditStatus": "002003",
"auditMind": null,
"fileSize": 22543
}
响应文件已保存。
> 2023-02-16T175748.200.json
  • 在对应的bucket中也可以查看到上传的图片

Service代码优化

  • 在上传文件的方法中包括两部分
    1. 向MinIO存储文件
    2. 向数据库存储文件信息
  • 下面将这两部分抽取出来,后期可供其他Service方法调用
  • 为了跟方便的获取content-type,我们可以添加simplemagic依赖,它提供的方法可以根据文件扩展名,得到资源的content-type
  • 在base工程中添加依赖
1
2
3
4
5
<dependency>
<groupId>com.j256.simplemagic</groupId>
<artifactId>simplemagic</artifactId>
<version>1.17</version>
</dependency>
  • 可通过如下代码得到资源的content-type
1
2
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(扩展名);
String contentType = extensionMatch.getMimeType();
  • IDEA中使用Ctrl + Alt + M 可以快速重构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* @param companyId 机构id
* @param uploadFileParamsDto 文件信息
* @param bytes 文件字节数组
* @param folder 桶下边的子目录
* @param objectName 对象名称
* @return
*/
@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, byte[] bytes, String folder, String objectName) {
String fileMD5 = DigestUtils.md5DigestAsHex(bytes);
if (StringUtils.isEmpty(folder)) {
// 如果目录不存在,则自动生成一个目录
folder = getFileFolder(true, true, true);
} else if (!folder.endsWith("/")) {
// 如果目录末尾没有 / ,替他加一个
folder = folder + "/";
}
if (StringUtils.isEmpty(objectName)) {
// 如果文件名为空,则设置其默认文件名为文件的md5码 + 文件后缀名
String filename = uploadFileParamsDto.getFilename();
objectName = fileMD5 + filename.substring(filename.lastIndexOf("."));
}
objectName = folder + objectName;
try {
addMediaFilesToMinIO(bytes, bucket_files, objectName);
MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, objectName, fileMD5, bucket_files);
UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
return uploadFileResultDto;
} catch (Exception e) {
XueChengPlusException.cast("上传过程中出错");
}
return null;
}

/**
* 将文件信息添加到文件表
* @param companyId 机构id
* @param uploadFileParamsDto 上传文件的信息
* @param objectName 对象名称
* @param fileMD5 文件的md5码
* @param bucket 桶
* @return
*/
private MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) {
// 保存到数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMD5);
mediaFiles.setFileId(fileMD5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setFilePath(objectName);
mediaFiles.setUrl("/" + bucket + "/" + objectName);
// 查阅数据字典,002003表示审核通过
mediaFiles.setAuditStatus("002003");
}
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败");
}
return mediaFiles;
}

/**
* @param bytes 文件字节数组
* @param bucket 桶
* @param objectName 对象名称 23/02/15/porn.mp4
* @throws ErrorResponseException
* @throws InsufficientDataException
* @throws InternalException
* @throws InvalidKeyException
* @throws InvalidResponseException
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws ServerException
* @throws XmlParserException
*/
private void addMediaFilesToMinIO(byte[] bytes, String bucket, String objectName) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
String contentType = MediaType.APPLICATION_OCTET_STREAM_VALUE; // 默认content-type为未知二进制流
if (objectName.indexOf(".") >= 0) { // 判断对象名是否包含 .
// 有 . 则划分出扩展名
String extension = objectName.substring(objectName.lastIndexOf("."));
// 根据扩展名得到content-type,如果为未知扩展名,例如 .abc之类的东西,则会返回null
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
// 如果得到了正常的content-type,则重新赋值,覆盖默认类型
if (extensionMatch != null) {
contentType = extensionMatch.getMimeType();
}
}
try {
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucket)
.object(objectName)
.stream(byteArrayInputStream, byteArrayInputStream.available(), -1)
.contentType(contentType)
.build());
} catch (Exception e) {
log.debug("上传到文件系统出错:{}", e.getMessage());
throw new XueChengPlusException("上传到文件系统出错");
}
}
  • 优化后使用HTTP Client进行测试
  • 同时,根据文件扩展名获取content-type的方法可以进一步抽取,可以在base工程中创建一个工具类,供其他微服务使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static String getContentType(String objectName) {
String contentType = MediaType.APPLICATION_OCTET_STREAM_VALUE; // 默认content-type为未知二进制流
if (objectName.indexOf(".") >= 0) { // 判断对象名是否包含 .
// 有 . 则划分出扩展名
String extension = objectName.substring(objectName.lastIndexOf("."));
// 根据扩展名得到content-type,如果为未知扩展名,例如 .abc之类的东西,则会返回null
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
// 如果得到了正常的content-type,则重新赋值,覆盖默认类型
if (extensionMatch != null) {
contentType = extensionMatch.getMimeType();
}
}
return contentType;
}

Service事务优化

  • 我们现在思考一下,updateFile方法是否应该开启事务
  • 目前如果在updateFile方法上添加@Transactional,当调用updateFile方法前会开启数据库事务,如果上传文件过程时间较长(例如用户在上传超大视频文件),那么数据库的食物持续时间也会变长(因为在updateFile方法中,我们即要将文件上传到minio,又要将文件信息写入数据库),这样数据库连接释放就慢,最终导致数据库链接不够用
  • 那么解决办法也显而易见,那就是只在addMediaFilesToDB方法上添加事务控制即可,同时将uploadFile方法上的@Transactional注解去掉
  • 但事情并不是那么简单,首先我们来看一下Spring的事务控制
  • 判断方法能否被事务控制
    1. 是不是通过代理对象调用的方法
    2. 该方法上是否添加了@Transactional注解
  • 现在只满足了添加事务注解,那么如何判断是不是通过代理对象调用的方法呢?
    • 我们可以打个断点看一下
  • 当我们在一个不能被事务控制的方法里(uploadFile),调用一个被事务控制的方法(addMediaFilesToDB),那么该方法(addMediaFilesToDB)也不会被事务控制
  • 那么如何解决呢?
    • 我们需要通过代理对象去调用addMediaFilesToDB方法
  • 在MediaFileService的实现类中注入MediaFileService的代理对象
1
2
@Autowired
MediaFileService currentProxy;
  • 将addMediaFilesToDB方法提取成接口
1
2
3
4
5
6
7
8
9
10
11
/**
* 将文件信息添加到文件表
*
* @param companyId 机构id
* @param uploadFileParamsDto 上传文件的信息
* @param objectName 对象名称
* @param fileMD5 文件md5码
* @param bucket 桶
* @return
*/
MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket);
  • 通过代理对象调用addMediaFilesToDB
1
MediaFiles mediaFiles = currentProxy.addMediaFilesToDB(companyId, uploadFileParamsDto, objectName, fileMD5, bucket_files);
  • 再次测试事务是否可以正常控制
    • 打断点看到这次是代理对象调用的方法

前后端联调

  • 修改前段的图片服务器地址为自己的minio地址
1
2
## 图片服务器地址
VUE_APP_SERVER_PICSERVER_URL=http://127.0.0.1:9000
  • 在新增课程、编辑课程界面上传图片,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存图片信息
  • 上传图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息

bug修复

  • 在媒资列表可以查看到刚刚上传的图片信息,但是通过条件查询不起作用
  • 原因:没有使用查询条件
  • 解决:修改MediaFileServiceImpl中的queryMediaFiles方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Override
public PageResult<MediaFiles> queryMediaFiles(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) {

//构建查询条件对象
LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();
+ queryWrapper.like(!StringUtils.isEmpty(queryMediaParamsDto.getFilename()), MediaFiles::getFilename, queryMediaParamsDto.getFilename());
+ queryWrapper.eq(!StringUtils.isEmpty(queryMediaParamsDto.getFileType()), MediaFiles::getFileType, queryMediaParamsDto.getFileType());
//分页对象
Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize());
// 查询数据内容获得结果
Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper);
// 获取数据列表
List<MediaFiles> list = pageResult.getRecords();
// 获取数据总数
long total = pageResult.getTotal();
// 构建结果集
PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize());
return mediaListResult;

}
  • 重启服务,测试是否能正常查询

上传视频

需求分析

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件
  2. 教育机构人员在媒资管理页面中点击上传视频按钮,打开上传界面
  3. 选择要上传的文件,自动执行文件上传
  4. 视频上传成功会自动处理,处理完成后可以预览视频

断点续传

什么是断点续传

  • 通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传需求。HTTP协议本身对上传文件大小没有限制,但是客户的网络环境之类、电脑硬件环境等参差不齐,如果一个大文件快上传完了,但是突然断网了,没有上传完成,需要客户重新上传,那么用户体验就非常差。所以对于大文件上传的最基本要求就是断点续传
  • 流程如下
    1. 前端上传前先把文件分成块
    2. 一块一块的上传,上传中断后重新上传。已上传的分块则不用再上传
    3. 各分块上传完成后,在服务端合并文件

分块与合并测试

  • 为了更好的理解文件分块上传的原理,下面用Java代码测试文件的分块与合并
  • 文件分块的流程如下
    1. 获取源文件长度
    2. 根据设定的分块文件大小,计算出块数(向上取整,例如33.4M的文件,块大小为1M,则需要34块)
    3. 从源文件读取数据,并依次向每一个块文件写数据
  • 文件分块测试代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
public void testChunk() throws IOException {
// 源文件
File sourceFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4");
// 块文件路径
String chunkPath = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\";
File chunkFolder = new File(chunkPath);
if (!chunkFolder.exists()) {
chunkFolder.mkdirs();
}
// 分块大小 1M
long chunkSize = 1024 * 1024 * 1;
// 计算块数,向上取整
long chunkNum = (long) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
// 缓冲区大小
byte[] buffer = new byte[1024];
// 使用RandomAccessFile访问文件
RandomAccessFile raf_read = new RandomAccessFile(sourceFile, "r");
// 遍历分块,依次向每一个分块写入数据
for (int i = 0; i < chunkNum; i++) {
// 创建分块文件,默认文件名 path + i,例如chunk\1 chunk\2
File file = new File(chunkPath + i);
if (file.exists()){
file.delete();
}
boolean newFile = file.createNewFile();
if (newFile) {
int len;
RandomAccessFile raf_write = new RandomAccessFile(file, "rw");
// 向分块文件写入数据
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
// 写满就停
if (file.length() >= chunkSize)
break;
}
raf_write.close();
}
}
raf_read.close();
System.out.println("写入分块完毕");
}
  • 文件合并流程
    1. 找到要合并的文件并按文件分块的先后顺序排序
    2. 创建合并文件
    3. 依次从合并的文件中读取数据冰箱合并文件写入数据
  • 文件合并的测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void testMerge() throws IOException {
// 块文件目录
File chunkFolder = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\chunk\\");
// 源文件
File sourceFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1.mp4");
// 合并文件
File mergeFile = new File("D:\\BaiduNetdiskDownload\\星际牛仔1998\\星际牛仔1-1.mp4");
mergeFile.createNewFile();
// 用于写文件
RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");
// 缓冲区
byte[] buffer = new byte[1024];
// 文件名升序排序
File[] files = chunkFolder.listFiles();
List<File> fileList = Arrays.asList(files);
Collections.sort(fileList, Comparator.comparingInt(o -> Integer.parseInt(o.getName())));
// 合并文件
for (File chunkFile : fileList) {
RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "r");
int len;
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
}
raf_read.close();
}
raf_write.close();
// 判断合并后的文件是否与源文件相同
FileInputStream fileInputStream = new FileInputStream(sourceFile);
FileInputStream mergeFileStream = new FileInputStream(mergeFile);
//取出原始文件的md5
String originalMd5 = DigestUtils.md5Hex(fileInputStream);
//取出合并文件的md5进行比较
String mergeFileMd5 = DigestUtils.md5Hex(mergeFileStream);
if (originalMd5.equals(mergeFileMd5)) {
System.out.println("合并文件成功");
} else {
System.out.println("合并文件失败");
}
}

上传视频流程

  1. 前端上传文件前,请求媒资接口层检查文件是否存在
    • 若存在,则不再上传
    • 若不存在,则开始上传,首先对视频文件进行分块
  2. 前端分块进行上传,上传前首先检查分块是否已经存在
    • 若分块已存在,则不再上传
    • 若分块不存在,则开始上传分块
  3. 前端请求媒资管理接口层,请求上传分块
  4. 接口层请求服务层上传分块
  5. 服务端将分块信息上传到MinIO
  6. 前端将分块上传完毕,请求接口层合并分块
  7. 接口层请求服务层合并分块
  8. 服务层根据文件信息找到MinIO中的分块文件,下载到本地临时目录,将所有分块下载完毕后开始合并
  9. 合并完成后,将合并后的文件上传至MinIO

接口定义

  • 根据上传视频流程,定义接口
  • 与前端的约定是
    • 操作成功返回
    1
    2
    3
    {
    "code": 0
    }
    • 操作失败返回
    1
    2
    3
    {
    "code": 1
    }
  • 在base工程的model包下新建RestResponse类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Data
public class RestResponse<T> {
/**
* 相应编码 0为正常 -1为错误
*/
private int code;
/**
* 响应提示信息
*/
private String msg;
/**
* 响应内容
*/
private T result;

public RestResponse(int code, String msg) {
this.code = code;
this.msg = msg;
}

public RestResponse() {
this(0, "success");
}

/**
* 错误信息的封装
*/
public static <T> RestResponse<T> validfail() {
RestResponse<T> response = new RestResponse<>();
response.setCode(-1);
return response;
}

public static <T> RestResponse<T> validfail(String msg) {
RestResponse<T> response = new RestResponse<>();
response.setCode(-1);
response.setMsg(msg);
return response;
}

public static <T> RestResponse<T> validfail(String msg, T result) {
RestResponse<T> response = new RestResponse<>();
response.setCode(-1);
response.setMsg(msg);
response.setResult(result);
return response;
}

/**
* 正常信息的封装
*/
public static <T> RestResponse<T> success() {
return new RestResponse<>();
}

public static <T> RestResponse<T> success(T result) {
RestResponse<T> response = new RestResponse<>();
response.setResult(result);
return response;
}

public static <T> RestResponse<T> success(String msg, T result) {
RestResponse<T> response = new RestResponse<>();
response.setMsg(msg);
response.setResult(result);
return response;
}
}
  • 定义接口如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkFile(@RequestParam("fileMd5") String fileMd5) {
return null;
}

@ApiOperation(value = "分块文件上传前检查分块")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkChunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) {
return null;
}

@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadChunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) {
return null;
}

@ApiOperation(value = "合并分块文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergeChunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) {
return null;
}
}

接口开发

DAO开发

  • 向媒资数据库的文件表插入记录,使用自动生成的Mapper接口即可满足要求

Service开发

检查文件和分块
  • 首先实现检查文件方法和检查分块方法
  • 定义Service接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 检查文件是否存在
*
* @param fileMd5 文件的md5
* @return
*/
boolean checkFile(String fileMd5);

/**
* 检查分块是否存在
* @param fileMd5 文件的MD5
* @param chunkIndex 分块序号
* @return
*/
boolean checkChunk(String fileMd5, int chunkIndex);
  • 判断文件是否存在
    • 首先判断数据库中是否存在该文件
    • 其次判断minio的bucket中是否存在该文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
// 数据库中不存在,则直接返回false 表示不存在
if (mediaFiles == null) {
return RestResponse.success(false);
}
// 若数据库中存在,根据数据库中的文件信息,则继续判断bucket中是否存在
try {
InputStream inputStream = minioClient.getObject(GetObjectArgs
.builder()
.bucket(mediaFiles.getBucket())
.object(mediaFiles.getFilePath())
.build());
if (inputStream == null) {
return RestResponse.success(false);
}
} catch (Exception e) {
return RestResponse.success(false);
}
return RestResponse.success(true);
}
  • 判断分块是否存在
    • 分块是否存在,只需要判断minio对应的目录下是否存在分块文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
// 获取分块目录
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
String chunkFilePath = chunkFileFolderPath + chunkIndex;
try {
// 判断分块是否存在
InputStream inputStream = minioClient.getObject(GetObjectArgs
.builder()
.bucket(video_files)
.object(chunkFilePath)
.build());
// 不存在返回false
if (inputStream == null) {
return RestResponse.success(false);
}
} catch (Exception e) {
// 出异常也返回false
return RestResponse.success(false);
}
// 否则返回true
return RestResponse.success();
}

private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}
上传分块
  • 定义Service接口
1
2
3
4
5
6
7
8
/**
* 上传分块
* @param fileMd5 文件MD5
* @param chunk 分块序号
* @param bytes 文件字节
* @return
*/
RestResponse uploadChunk(String fileMd5,int chunk,byte[] bytes);
  • 接口实现
1
2
3
4
5
6
7
8
9
10
11
12
@Override
public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) {
// 分块文件路径
String chunkFilePath = getChunkFileFolderPath(fileMd5) + chunk;
try {
addMediaFilesToMinIO(bytes, video_files, chunkFilePath);
return RestResponse.success(true);
} catch (Exception e) {
log.debug("上传分块文件:{}失败:{}", chunkFilePath, e.getMessage());
}
return RestResponse.validfail("上传文件失败", false);
}
上传分块测试
  • 完善Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkFile(@RequestParam("fileMd5") String fileMd5) {
return mediaFileService.checkFile(fileMd5);
}

@ApiOperation(value = "分块文件上传前检查分块")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkChunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) {
return mediaFileService.checkChunk(fileMd5, chunk);
}

@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadChunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception {
return mediaFileService.uploadChunk(fileMd5, chunk, file.getBytes());
}
合并前下载分块
  • 在合并分块前,我们需要先下载分块,在ServiceImpl中定义下载分块方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 下载分块文件
* @param fileMd5 文件的MD5
* @param chunkTotal 总块数
* @return 分块文件数组
*/
private File[] checkChunkStatus(String fileMd5, int chunkTotal) {
// 作为结果返回
File[] files = new File[chunkTotal];
// 获取分块文件目录
String chunkFileFolder = getChunkFileFolderPath(fileMd5);
for (int i = 0; i < chunkTotal; i++) {
// 获取分块文件路径
String chunkFilePath = chunkFileFolder + i;
File chunkFile = null;
try {
// 创建临时的分块文件
chunkFile = File.createTempFile("chunk" + i, null);
} catch (Exception e) {
XueChengPlusException.cast("创建临时分块文件出错:" + e.getMessage());
}
// 下载分块文件
chunkFile = downloadFileFromMinio(chunkFile, video_files, chunkFilePath);
// 组成结果
files[i] = chunkFile;
}
return files;
}

/**
* 从Minio中下载文件
* @param file 目标文件
* @param bucket 桶
* @param objectName 桶内文件路径
* @return
*/
private File downloadFileFromMinio(File file, String bucket, String objectName) {
try (FileOutputStream fileOutputStream = new FileOutputStream(file);
InputStream inputStream = minioClient.getObject(GetObjectArgs
.builder()
.bucket(bucket)
.object(objectName)
.build())) {
IOUtils.copy(inputStream, fileOutputStream);
return file;
} catch (Exception e) {
XueChengPlusException.cast("查询文件分块出错");
}
return null;
}
合并分块
  • 合并分块的实现代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Override
public RestResponse mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) throws IOException {
// 下载分块文件
File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal);
// 获取源文件名
String fileName = uploadFileParamsDto.getFilename();
// 获取源文件扩展名
String extension = fileName.substring(fileName.lastIndexOf("."));
// 创建出临时文件,准备合并
File mergeFile = File.createTempFile(fileName, extension);
// 缓冲区
byte[] buffer = new byte[1024];
// 写入流,向临时文件写入
RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");
// 遍历分块文件数组
for (File chunkFile : chunkFiles) {
// 读取流,读分块文件
RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "r");
int len;
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
}
}
uploadFileParamsDto.setFileSize(mergeFile.length());
// 对文件进行校验,通过MD5值比较
FileInputStream mergeInputStream = new FileInputStream(mergeFile);
String mergeMd5 = DigestUtils.md5DigestAsHex(mergeInputStream);
if (!fileMd5.equals(mergeMd5)) {
XueChengPlusException.cast("合并文件校验失败");
}
// 拼接合并文件路径
String mergeFilePath = getFilePathByMd5(fileMd5, extension);
// 将本地合并好的文件,上传到minio中,这里重载了一个方法
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), video_files, mergeFilePath);
// 将文件信息写入数据库
MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, mergeFilePath, mergeMd5, video_files);
if (mediaFiles == null) {
XueChengPlusException.cast("媒资文件入库出错");
}
return RestResponse.success();
}

/**
* 将本地文件上传到minio
* @param filePath 本地文件路径
* @param bucket 桶
* @param objectName 对象名称
*/
private void addMediaFilesToMinIO(String filePath, String bucket, String objectName) {
String contentType = getContentType(objectName);
try {
minioClient.uploadObject(UploadObjectArgs
.builder()
.bucket(bucket)
.object(objectName)
.filename(filePath)
.contentType(contentType)
.build());
} catch (Exception e) {
XueChengPlusException.cast("上传到文件系统出错");
}
}

/**
* 根据MD5和文件扩展名,生成文件路径,例 /2/f/2f6451sdg/2f6451sdg.mp4
* @param fileMd5 文件MD5
* @param extension 文件扩展名
* @return
*/
private String getFilePathByMd5(String fileMd5, String extension) {
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + extension;
}
  • 基本的业务逻辑就是这些,但是现在还少了点东西,我们没有做异常处理,简单的throw出去而已,并且创建的临时文件,也需要删除,完善后的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Override
public RestResponse mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
// 下载分块文件
File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal);
// 获取源文件名
String fileName = uploadFileParamsDto.getFilename();
// 获取源文件扩展名
String extension = fileName.substring(fileName.lastIndexOf("."));
// 创建出临时文件,准备合并
File mergeFile = null;
try {
mergeFile = File.createTempFile(fileName, extension);
} catch (IOException e) {
XueChengPlusException.cast("创建合并临时文件出错");
}
try {
// 缓冲区
byte[] buffer = new byte[1024];
// 写入流,向临时文件写入
try (RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw")) {
// 遍历分块文件数组
for (File chunkFile : chunkFiles) {
// 读取流,读分块文件
try (RandomAccessFile raf_read = new RandomAccessFile(chunkFile, "r")) {
int len;
while ((len = raf_read.read(buffer)) != -1) {
raf_write.write(buffer, 0, len);
}
}
}
} catch (Exception e) {
XueChengPlusException.cast("合并文件过程中出错");
}
uploadFileParamsDto.setFileSize(mergeFile.length());
// 对文件进行校验,通过MD5值比较
try (FileInputStream mergeInputStream = new FileInputStream(mergeFile)) {
String mergeMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(mergeInputStream);
if (!fileMd5.equals(mergeMd5)) {
XueChengPlusException.cast("合并文件校验失败");
}
log.debug("合并文件校验通过:{}", mergeFile.getAbsolutePath());
} catch (Exception e) {
XueChengPlusException.cast("合并文件校验异常");
}
String mergeFilePath = getFilePathByMd5(fileMd5, extension);
// 将本地合并好的文件,上传到minio中,这里重载了一个方法
addMediaFilesToMinIO(mergeFile.getAbsolutePath(), video_files, mergeFilePath);
log.debug("合并文件上传至MinIO完成{}", mergeFile.getAbsolutePath());
// 将文件信息写入数据库
MediaFiles mediaFiles = addMediaFilesToDB(companyId, uploadFileParamsDto, mergeFilePath, fileMd5, video_files);
if (mediaFiles == null) {
XueChengPlusException.cast("媒资文件入库出错");
}
log.debug("媒资文件入库完成");
return RestResponse.success();
} finally {
for (File chunkFile : chunkFiles) {
try {
chunkFile.delete();
} catch (Exception e) {
log.debug("临时分块文件删除错误:{}", e.getMessage());
}
}
try {
mergeFile.delete();
} catch (Exception e) {
log.debug("临时合并文件删除错误:{}", e.getMessage());
}
}
}

接口层完善

  • 下面完善接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {
@Autowired
private MediaFileService mediaFileService;

@ApiOperation(value = "文件上传前检查文件")
@PostMapping("/upload/checkfile")
public RestResponse<Boolean> checkFile(@RequestParam("fileMd5") String fileMd5) {
return mediaFileService.checkFile(fileMd5);
}

@ApiOperation(value = "分块文件上传前检查分块")
@PostMapping("/upload/checkchunk")
public RestResponse<Boolean> checkChunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) {
return mediaFileService.checkChunk(fileMd5, chunk);
}

@ApiOperation(value = "上传分块文件")
@PostMapping("/upload/uploadchunk")
public RestResponse uploadChunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception {
return mediaFileService.uploadChunk(fileMd5, chunk, file.getBytes());
}

@ApiOperation(value = "合并分块文件")
@PostMapping("/upload/mergechunks")
public RestResponse mergeChunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws IOException {
Long companyId = 1232141425L;
UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
uploadFileParamsDto.setFileType("001002");
uploadFileParamsDto.setTags("课程视频");
uploadFileParamsDto.setRemark("");
uploadFileParamsDto.setFilename(fileName);
return mediaFileService.mergeChunks(companyId, fileMd5, chunkTotal, uploadFileParamsDto);
}
}

接口测试

  • 前后端联调,上传视频进行测试
  • 数据库和MinIO中均能看到对应的数据

文件预览

需求分析

  • 图片上传成功、视频上传成功后,可以通过预览按钮查看文件内容
  • 预览的方式是通过浏览器直接打开文件,对于图片和浏览器支持的视频格式可以直接浏览
  • 说明
    1. 前端请求接口层预览文件
    2. 接口层将文件id传递给服务层
    3. 服务层使用文件id查询媒资数据库文件表,获取文件的URL
    4. 接口层将文件url返回给前端,通过浏览器打开URL

接口定义

  • 根据需求分析,定义的接口如下
1
2
3
4
5
@ApiOperation(value = "预览文件")
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId) {
return null;
}

接口开发

设置URL

  • 有一些浏览器不支持的视频格式,不能在浏览器中直接浏览,所以我们要修改保存媒资信息到数据库的方法
    • 当文件是图片时,设置URL字段
    • 当视频是MP4格式时,设置URL字段
    • 其他情况暂不设置URL,需要文件处理后再设置URL字段
  • 修改保存媒资信息的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    /**
* 将文件信息添加到文件表
*
* @param companyId 机构id
* @param uploadFileParamsDto 上传文件的信息
* @param objectName 对象名称
* @param fileMD5 文件的md5码
* @param bucket 桶
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) {
// 保存到数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMD5);
mediaFiles.setFileId(fileMD5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setFilePath(objectName);
+ // 获取源文件名的contentType
+ String contentType = getContentType(objectName);
+ // 如果是图片格式或者mp4格式,则设置URL属性,否则不设置
+ if (contentType.contains("image") || contentType.contains("mp4")) {
+ mediaFiles.setUrl("/" + bucket + "/" + objectName);
+ }
// 查阅数据字典,002003表示审核通过
mediaFiles.setAuditStatus("002003");
}
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败");
}
return mediaFiles;
}

DAO开发

  • 使用自动生成的Mapper接口即可

Service开发

  • 定义根据id查询媒资文件接口
1
MediaFiles getFileById(String mediaId);
  • 方法实现
1
2
3
4
5
6
7
8
@Override
public MediaFiles getFileById(String id) {
MediaFiles mediaFiles = mediaFilesMapper.selectById(id);
if (mediaFiles == null || StringUtils.isEmpty(mediaFiles.getUrl())) {
XueChengPlusException.cast("视频还没有转码处理");
}
return mediaFiles;
}

完善Controller

  • 完善接口层代码
1
2
3
4
5
6
@ApiOperation(value = "预览文件")
@GetMapping("/preview/{mediaId}")
public RestResponse<String> getPlayUrlByMediaId(@PathVariable String mediaId) {
MediaFiles mediaFile = mediaFileService.getFileById(mediaId);
return RestResponse.success(mediaFile.getUrl());
}

接口测试

  • 前后端联调
    • 上传MP4视频文件,并预览
    • 上传图片文件,并预览
    • 上传.avi格式的视频文件,尝试预览,观察错误提示信息,稍后通过视频处理对视频转码

视频处理

分布式任务处理

什么是分布式任务调度

  • 视频上传成功需要对视频格式进行处理,如何用Java程序对视频进行处理呢?

    • 这里有一个关键的需求就是:当视频比较多的时候,我们如何高效的处理
  • 如何去高效的处理一批任务呢?

    1. 多线程
      • 多线程是充分利用单机的资源
    2. 分布式+多线程
      • 充分利用多台计算机,每台计算机使用多线程处理
  • 方案2的可扩展性更强,同时方案二也是一种分布式任务调度的处理方案

  • 什么是分布式任务调度?

    • 我们可以先思考一下下面业务场景的解决方案
1
2
3
4
5
6
7
8
9
10
11
某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券

某财务系统需要在每天上午10天谴结算前一天的账单数据,统计汇总

某电商平台每天凌晨3点,要对订单中的无效订单进行处理

12306网站会根据车次不同,设置几个时间点分批放票

电商正点抢购,商品价格某天上午8点整开始优惠

商品成功发货后,需要向客户发送短信提醒
  • 类似的场景还有很多,我们该如何实现呢?
    • 以上这些场景,就是任务调度所需要解决的问题
  • 任务调度,顾名思义就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务
  • 如何实现任务调度?
    • 多线程方式实现
      • 我们可以开启一个线程,每sleep一段时间,就去检查是否已经到预期执行时间,下面代码简单实现了任务调度的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
// 任务执行时间间隔
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while(true) {
//TODO: something
try {
Thread.sleep(timeInterval);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}
  • 上面的代码实现了按一定时间间隔,执行任务调度的功能
  • JDK也为我们提供了相关支持,如Timer、ScheduledExecutor,下面我们了解下
  • Timer方式实现
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
//TODO: something
}
}, 1000, 2000); // 1秒后开始调度,每2秒执行一次
}
  • Timer的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行
  • ScheduledExecutor方式实现
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
//TODO: something
System.out.println("todo something");
}
}, 1, 2, TimeUnit.SECONDS);
}
  • Java5推出了基于线程池设计的ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中的一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰
  • Timer和ScheduledExecutor都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每个月第一天凌晨1点执行任务、复杂调度任务的管理、任务键传递数据等等
  • Quartz是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求
    • Quartz设计的核心类包括Job,Trigger以及Scheduler。
      • Job负责定义需要执行的任务
      • Trigger负责设置调度策略
      • Scheduler将二者组装在一起,并触发任务开始执行。
    • Quartz支持简单的按时间间隔调度,还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度
    • 第三方Quartz方式实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String [] agrs) throws SchedulerException {
//创建一个Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//创建JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//创建触发的CronTrigger 支持按日历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
//创建触发的SimpleTrigger 简单的间隔调度
/*SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","triggerGroupName")
.startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();*/
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}

public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
System.out.println("todo something");
}
}
  • 什么是分布式任务调度
    • 通常任务调度的程序是集成在应用中的,比如
      • 优惠券服务汇总包括了定时发布优惠券的调度程序
      • 结算服务中包括了定期生成报表的任务调度程序
    • 由于采用分布式架构,一个服务通常会部署在多个冗余实例来运行我们的业务
    • 在这种分布式环境下运行任务调度,就称之为分布式业务调度
  • 分布式调度要实现的目标
    • 不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式,就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力
    1. 并行任务调度
      • 并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机的CPU处理能力是有限的
      • 如果将任务调度程序分布式部署,每个节点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率
    2. 高可用
      • 若某一个实例宕机,不影响其他实例来执行任务
    3. 弹性扩容
      • 当集群中增加实例就可以提高并执行任务的处理效率
    4. 任务管理与检测
      • 对系统中存在的定时任进行统一的管理及监测,让开发人员及运维人员能够及时了解任务执行情况,从而做出快速应急处理响应
    5. 避免任务重复执行
      • 当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如上面提到的电商系统中定时发放优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次

XXL-JOB介绍

  • XXL-JOB是一个轻量级分布式任务调度平台,其核心设计是开发迅速、学习简单、轻量级、易扩展,现已开放源代码并接入多家公司线上产品线,开箱即用
  • 官网:https://www.xuxueli.com/xxl-job/
  • XXL-JOB主要由调度中心、执行器、任务
    • 调度中心
      • 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码
      • 主要职责为执行器管理、任务管理、监控运维、日志管理等
    • 任务执行器
      • 负责接收调度请求并执行任务逻辑
      • 主要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
    • 任务
      • 负责执行具体的业务逻辑
  • 调度中心与执行器之间的工作流程如下
  • 执行流程
    1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心
    2. 达到任务出发条件,调度中心下发任务
    3. 执行器基于线程池执行任务,并把执行结果放入内存队列、把执行日志写入日志文件中
    4. 执行器消费内存队列中的执行结果,主动上报给调度中心
    5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

搭建XXL-JOB

调度中心
  • 首先下载XXL-JOB
  • 使用IDEA打开项目
    • xxl-job-admin:调度中心
    • xxl-job-core:公共依赖
    • xxj-job-executor-samples:执行器Sample示例
      • xxl-job-executor-sample-springboot:SpringBoot版本,通过SpringBoot管理执行器
      • xxl-job-executor-sample-frameless:无框架版本
  • 根据数据库脚本创建数据库,修改数据库连接信息和端口,启动xxl-job-admin,访问http://local:18088/xxl-job-admin/
    • 账号密码:admin/123456
  • 启动成功之后,可以选择在Linux上运行
    • 使用maven命令,将xxl-job-admin打包,然后将其上传至Linux中,使用命令启动
    1
    nohup java -jar /绝对路径/xxl-job-admin-2.3.1.jar &
执行器
  • 下面配置执行器,执行器负责与调度中心通信,接收调度中心发起的任务调度请求
    1. 首先在media-service工程中添加依赖(父工程中完成了版本控制,这里的版本是2.3.1)
    1
    2
    3
    4
    <dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    </dependency>
    1. 在nacos下的media-service-dev.yaml下配置xxl-job
      • 注意这里配置的appname是执行器的应用名,稍后会在调度中心配置执行器的时候使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    xxl:
    job:
    admin:
    addresses: http://192.168.101.128:18088/xxl-job-admin/
    executor:
    appname: media-process-service
    address:
    ip:
    port: 9999
    logpath: /data/applogs/xxl-job-jobhandler
    logretentiondays: 30
    accessToken: default_token
    1. 配置xxl-job的执行器
      • 将示例工程下的配置类拷贝到media-service工程下,该类中的属性就是获取配置文件中的配置得到的,同时提供了一个执行器的Bean
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      @Configuration
      public class XxlJobConfig {
      private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

      @Value("${xxl.job.admin.addresses}")
      private String adminAddresses;

      @Value("${xxl.job.accessToken}")
      private String accessToken;

      @Value("${xxl.job.executor.appname}")
      private String appname;

      @Value("${xxl.job.executor.address}")
      private String address;

      @Value("${xxl.job.executor.ip}")
      private String ip;

      @Value("${xxl.job.executor.port}")
      private int port;

      @Value("${xxl.job.executor.logpath}")
      private String logPath;

      @Value("${xxl.job.executor.logretentiondays}")
      private int logRetentionDays;


      @Bean
      public XxlJobSpringExecutor xxlJobExecutor() {
      logger.info(">>>>>>>>>>> xxl-job config init.");
      XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
      xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
      xxlJobSpringExecutor.setAppname(appname);
      xxlJobSpringExecutor.setAddress(address);
      xxlJobSpringExecutor.setIp(ip);
      xxlJobSpringExecutor.setPort(port);
      xxlJobSpringExecutor.setAccessToken(accessToken);
      xxlJobSpringExecutor.setLogPath(logPath);
      xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

      return xxlJobSpringExecutor;
      }

      /**
      * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
      *
      * 1、引入依赖:
      * <dependency>
      * <groupId>org.springframework.cloud</groupId>
      * <artifactId>spring-cloud-commons</artifactId>
      * <version>${version}</version>
      * </dependency>
      *
      * 2、配置文件,或者容器启动变量
      * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
      *
      * 3、获取IP
      * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
      */

      }
    2. 进入调度中心,添加执行器
  • 重启媒资管理服务模块,可以看到执行器在调入中心注册成功
执行任务
  • 下面编写任务,任务类的编写方法参考示例工程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package com.xxl.job.executor.service.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
* XxlJob开发示例(Bean模式)
*
* 开发步骤:
* 1、任务开发:在Spring Bean实例中,开发Job方法;
* 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
* 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}


/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}

}


/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception {
String command = XxlJobHelper.getJobParam();
int exitValue = -1;

BufferedReader bufferedReader = null;
try {
// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(command);
processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();
//Process process = Runtime.getRuntime().exec(command);

BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobHelper.log(line);
}

// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobHelper.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}

if (exitValue == 0) {
// default success
} else {
XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
}

}


/**
* 4、跨平台Http任务
* 参数示例:
* "url: http://www.baidu.com\n" +
* "method: get\n" +
* "data: content\n";
*/
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception {

// param parse
String param = XxlJobHelper.getJobParam();
if (param==null || param.trim().length()==0) {
XxlJobHelper.log("param["+ param +"] invalid.");

XxlJobHelper.handleFail();
return;
}

String[] httpParams = param.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam: httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}

// param valid
if (url==null || url.trim().length()==0) {
XxlJobHelper.log("url["+ url +"] invalid.");

XxlJobHelper.handleFail();
return;
}
if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobHelper.log("method["+ method +"] invalid.");

XxlJobHelper.handleFail();
return;
}
boolean isPostMethod = method.equals("POST");

// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();

// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

// do connection
connection.connect();

// data
if (isPostMethod && data!=null && data.trim().length()>0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}

// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}

// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();

XxlJobHelper.log(responseMsg);

return;
} catch (Exception e) {
XxlJobHelper.log(e);

XxlJobHelper.handleFail();
return;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobHelper.log(e2);
}
}

}

/**
* 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
}
public void init(){
logger.info("init");
}
public void destroy(){
logger.info("destroy");
}


}
  • 我们现在参考简单示例自己编写代码,在media-service下新建包com.xuecheng.media.service.jobhandler,在该包下定义我们的任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.xuecheng.media.service.jobhandler;

import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* 测试执行器
*/
@Slf4j
@Component
public class SimpleJob {
@XxlJob("testJob")
public void testJob() {
log.debug("开始执行.......");
}
}
  • 然后进入调度中心添加任务,进入任务管理,新增任务信息
  • 其中JobHandler中填写@XxlJob注解中的名称
  • 随后启动任务,控制台可以看到执行器的方法执行

分片广播

  • 前面我们了解了一下xxl-job的基本使用,下面思考如何进行分布式任务处理呢?
    • 我们需要启动多个执行器组成一个集群,去执行任务
  • 执行器在集群部署下调度中心有哪些调度策略呢?
    • 查看xxl-job官方文档
1
2
3
4
5
6
7
8
9
10
11
12
- 高级配置:
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
- FIRST(第一个):固定选择第一个机器;
- LAST(最后一个):固定选择最后一个机器;
- ROUND(轮询):;
- RANDOM(随机):随机选择在线的机器;
- CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
- LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
- LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
- FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
- BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
- SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
  • 我们这里重点要说的是SHARDING_BROADCAST(分片广播),分片是指调度中心将集群汇总的执行器标上序号:0、1、2、3…,广播是指每次调度会向集群中的所有执行器发送调度请求,请求中携带分片参数
  • 每个执行器收到调度请求,根据分片参数自行决定是否执行任务
  • 另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量
  • 作业分片适用于哪些场景呢?
    • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍
    • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等
  • 所以广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群的分布式处理任务
  • 使用说明
    • 分片广播和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理
    • 获取分片参数方式,参考示例代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * 2、分片广播任务
    */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception {

    // 分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

    // 业务逻辑
    for (int i = 0; i < shardTotal; i++) {
    if (i == shardIndex) {
    XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
    } else {
    XxlJobHelper.log("第 {} 片, 忽略", i);
    }
    }

    }
  • 下面测试作业分片
1
2
3
4
5
6
@XxlJob("shardingJobHandler")
public void shardingJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.debug("shardIndex:{}, shardTotal:{}", shardIndex, shardTotal);
}
  • 在调度中心添加任务,注意路由策略选择分片广播
  • 高级配置说明
    • 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
    • 调度过期策略:
      • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
      • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
    • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
      • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
      • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
      • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
    • 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
  • 下面我们需要启动两个执行器实例,观察每个实例的执行情况
  • 首先我们需要在nacos中编辑media-service的配置,设置本地配置优先
1
2
3
4
spring:
cloud:
config:
override-none: true
  • 将media-service启动两个实例,添加的vm选项就是用本地配置覆盖nacos中的配置,主要是修改端口号和xxl执行器端口
1
2
3
4
5
6
7
-Dserver.port=53051 -Dxxl.job.executor.port=9998  对应如下配置项
server:
port: 53051
xxl:
job:
executor:
port: 9998

  • 将两个服务启动,观察任务调度中心,可以看到有两个执行器
  • 启动任务,可以从日志中看到,两个实例的分片序号不同
1
2
3
4
5
## 实例1
[SimpleJob.java:25] - shardIndex:0, shardTotal:2

## 实例2
[SimpleJob.java:25] - shardIndex:1, shardTotal:2
  • 到此作业分片任务调试完成,此时我们来思考一下
    • 当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?

需求分析

作业分片方案

  • 任务添加成功后,对于要处理的任务,会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?
  • 在上一小节的测试中,每个执行器收到广播任务有两个参数,分片序号和分片总数
    • 每个执行器从数据表取任务时,可以用任务id分片总数,如果等于该执行器的分片序号,则执行此任务
    • 例如
      • 1 % 2 = 1,执行器2执行
      • 2 % 2 = 0,执行器1执行
      • 3 % 2 = 1,执行器2执行
      • 4 % 2 = 1,执行器1执行
      • 以此类推

保证任务不重复执行

  • 通过作业分片方案,保证了执行器之间分配的任务不重复执行
  • 但是如果同一个执行器,在处理一个视频的时候,还没有处理完,此时调度中心又来了一次请求调度,为了不重复处理同一个视频,该怎么办?
  • 首先配置调度过期策略
  • 调度过期策略:
    • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
    • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
  • 这里我们选择忽略,如果立即执行一次,可能会重复调度

  • 其次,我们在看阻塞处理策略。

  • 阻塞处理策略就是当前执行器正在执行任务还没有结束时,调度中心又请求调度,此时该如何处理

  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
    • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
    • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
    • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
  • 这里选择丢弃后续调度,避免重复调度
  • 最后,也就是要注意保证任务处理的幂等性,什么是任务的幂等性
  • 任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。
  • 执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
  • 什么是幂等性?
  • 它描述了一次和多次请求某一个资源,对于资源本身应该具有相同的结果
  • 幂等性是为了解决重复提交问题,比如:恶意刷单、重复支付等
  • 解决幂等性常用的方案
    1. 数据库约束,例如:唯一索引、主键
    2. 乐观锁,长用户数据库,更新数据时根据乐观锁状态去更新
    3. 唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等,则执行
  • 这里我们在数据库视频处理表中添加状态处理字段,视频处理完成更新状态为完成,执行视频前判断状态是否完成,如果完成则不再处理

业务流程

  • 确定了分片方案,下面梳理哼歌视频上传以及处理的业务流程
  • 上传视频成功,向视频待处理表中添加记录,视频处理的详细流程如下
  1. 任务调度中心广播作业分片
  2. 执行器收到广播作业分片,从数据库读取待处理任务
  3. 执行器根据任务内容MinIO下载要处理的文件
  4. 执行器启动多线程去处理任务
  5. 任务处理完成,上传处理后的视频到MinIO
  6. 将更新任务处理结果,如果视频处理完成,除了更新任务处理结果之外,还要将文件的访问地址更新至任务处理表及文件中,最后将任务完成记录写入历史表
  • 下面是待处理任务表

查询待处理任务

添加待处理任务

  • 上传视频成功,向视频处理待处理表添加记录,暂时只添加.avi类型视频的处理记录
  • 根据Mime Type去判断,是否为avi视频,下面列出部分Mime Type
Video Type Extension MIME Type
Flash fl video/x-flv
MPEG-4 .mp4 video/mp4
iPhone Index .m3u8 application/x-mpegURL
iPhone Segment .ts video/MP2T
3GP Mobile .3gp video/3gpp
QuickTime .mov video/quicktime
A/V Interleave .avi video/x-msvideo
Windows Media .wmv video/x-ms-wmv
  • avi视频的Mine Type是video/x-msvideo
  • 修改addMediaFilesToDB方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    /**
* 将文件信息添加到文件表
*
* @param companyId 机构id
* @param uploadFileParamsDto 上传文件的信息
* @param objectName 对象名称
* @param fileMD5 文件的md5码
* @param bucket 桶
*/
@Transactional
public MediaFiles addMediaFilesToDB(Long companyId, UploadFileParamsDto uploadFileParamsDto, String objectName, String fileMD5, String bucket) {
// 根据文件名获取Content-Type
String contentType = getContentType(objectName);
// 保存到数据库
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMD5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMD5);
mediaFiles.setFileId(fileMD5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setFilePath(objectName);
if (contentType.contains("image") || contentType.contains("mp4")) {
mediaFiles.setUrl("/" + bucket + "/" + objectName);
}
// 查阅数据字典,002003表示审核通过
mediaFiles.setAuditStatus("002003");
}
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert <= 0) {
XueChengPlusException.cast("保存文件信息失败");
}
+ // 如果是avi视频,则额外添加至视频待处理表
+ if ("video/x-msvideo".equals(contentType)) {
+ MediaProcess mediaProcess = new MediaProcess();
+ BeanUtils.copyProperties(mediaFiles, mediaProcess);
+ mediaProcess.setStatus("1"); // 未处理
+ int processInsert = mediaProcessMapper.insert(mediaProcess);
+ if (processInsert <= 0) {
+ XueChengPlusException.cast("保存avi视频到待处理表失败");
+ }
+ }
return mediaFiles;
}

查询待处理任务

  • 如何保证查询到的待处理视频记录不重复?
    • 解决方案我们前面已经给出了,用任务id分片总数,如果等于该执行器的分片序号,则执行,同时为了避免同一个任务被执行两次,我们需要额外指定任务状态为未处理,即status = 1
1
SELECT * FROM media_process WHERE id % #{shareTotal} = #{shareIndex} AND status = '1' LIMIT #{count}
  • 编写根据分片参数获取待处理任务的DAO方法,定义DAO接口如下
1
2
3
4
5
6
7
8
9
/**
* 根据分片参数获取待处理任务
* @param shardTotal 分片总数
* @param shardIndex 分片序号
* @param count 任务数
* @return
*/
@Select("SELECT * FROM media_process WHERE id % #{shardTotal} = #{shardIndex} AND status = '1' LIMIT #{count}")
List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);
  • 定义Service接口,查询待处理
1
2
3
4
5
6
7
8
9
10
public interface MediaFileProcessService {
/**
* 获取待处理任务
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param count 获取记录数
* @return 待处理任务集合
*/
List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count);
}
  • Service接口实现
1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {

@Autowired
private MediaProcessMapper mediaProcessMapper;

@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
}
}

更新任务状态

  • 任务处理完成后,需要更新任务处理结果,任务执行成功,则更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录
  • 定义Service接口,更新任务状态
1
void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg);
  • Service接口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Transactional
@Override
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
// 查询这个任务
MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
if (mediaProcess == null) {
log.debug("更新任务状态时,此任务:{},为空", taskId);
return;
}
LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);
// 如果任务失败
if ("3".equals(status)) {
log.debug("任务失败:{}", taskId);
MediaProcess mediaProcess_u = new MediaProcess();
mediaProcess_u.setStatus("3");
mediaProcess_u.setErrormsg(errorMsg);
mediaProcess_u.setFinishDate(LocalDateTime.now());
mediaProcessMapper.update(mediaProcess_u, queryWrapper);
return;
}
// 任务成功,将其从待处理任务表中删除,同时新增历史处理表记录
if ("2".equals(status)) {
mediaProcess.setStatus("2");
mediaProcess.setUrl(url);
mediaProcess.setFinishDate(LocalDateTime.now());
mediaProcessMapper.update(mediaProcess, queryWrapper);
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
// 两张表的属性完全一致,直接拷贝
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
// 向历史处理表新增数据
mediaProcessHistoryMapper.insert(mediaProcessHistory);
// 同时删除待处理任务表中的数据
mediaProcessMapper.deleteById(taskId);
}
}

视频处理

什么是视频编码

  • 视频上传成功后,需要对视频进行转码处理
  • 什么是视频编码?百度百科的定义如下
  • 所谓视频编码方式就是指通过压缩技术,将原始视频格式的文件转换成另一种视频格式文件的方式。视频流传输中最为重要的编解码标准有国际电联的H.261、H.263、H.264,运动静止图像专家组的M-JPEG和国际标准化组织运动图像专家组的MPEG系列标准,此外在互联网上被广泛应用的还有Real-Networks的RealVideo、微软公司的WMV以及Apple公司的QuickTime等。
  • 首先我们要分清文件格式和编码格式
    • 文件格式是指:.mp4.avi.rmvb等这些不同扩展名的视频文件的文件格式。视频文件的内容主要包括视频、音频,其文件格式是按照一定的编码格式去编码,并且按照该文件所规定的的封装格式,将视频、音频、字幕等信息封装到一起,播放器会根据他们的封装个事去提取出编码,然后由播放器解码,最终播放音视频
    • 编码格式是指:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b
  • 音视频编码格式种类繁多,主要由以下几类
    • MPEG系列(由ISO下属的MPEG开发)
      • 视频编码方面主要是Mpeg1(VCD)、Mpeg2(DVD)、Mpeg4(divx,xvid)、Mpeg4 AVC
      • 音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(MP3)、MPEG-2 AAC 、MPEG-4 AAC
    • H.26X系列(由ITU主导,侧重网络传输,注意:只是视频编码)
      • 包括H.261、H.262、H.263、H.263+、H.263++、H.264
  • 目前最常用的编码标准是
    • 视频:H.264
    • 音频:AAC

FFmpeg的基本使用

  • 我们将视频录制完成后,使用视频编码软件第视频进行编码,本项目使用FFmpeg对视频进行编码
  • 什么是FFmpeg?
  • FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的。
  • FFmpeg在Linux平台下开发,但它同样也可以在其它操作系统环境中编译运行,包括Windows、Mac OS X等。这个项目最早由Fabrice Bellard发起,2004年至2015年间由Michael Niedermayer主要负责维护。许多FFmpeg的开发人员都来自MPlayer项目,而且当前FFmpeg也是放在MPlayer项目组的服务器上。项目的名称来自MPEG视频编码标准,前面的"FF"代表"Fast Forward"。FFmpeg编码库可以使用GPU加速。
  • 下载:FFmpeg,并将其加入环境变量
  • 测试是否正常:
1
ffmpeg -v
  • 安装成功后,做一下简单测试,将.mp4文件转为.avi,再将.avi转为.gif等
1
2
3
ffmpeg -i 胶水.mp4 胶水.avi

ffmpeg -i 胶水.avi 胶水.gif
  • 胶水是我家的猫猫

视频处理工具类

  • 导入黑马提供的工具类,将其拷贝至base工程
  • 其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类
  • 下面我们来简单了解一下该类的使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws IOException {
//ffmpeg的路径
String ffmpeg_path = "D:\\SoftWare\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\胶水.avi";
//转换后mp4文件的名称
String mp4_name = "胶水_mp4.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\BaiduNetdiskDownload\\星际牛仔1998\\胶水_mp4.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}
  • 执行main方法,最终在控制台输出success表示执行成功

任务类

  • 视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数

  • 所有视频处理完成后,结束本次执行,为了防止代码出现异常而无限等待,添加超时设置,到达超时时间还没有处理完成,仍结束任务

  • 代码思路如下

    1. 根据分片序号和分片总数,查询待处理任务
    2. 启动多线程去处理
    3. 将原始视频下载到本地
    4. 调用工具类将avi转为mp4
    5. 上传到MinIO
    6. 记录任务处理结果url
  • 定义任务类VideoTask如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@Slf4j
@Component
public class VideoTask {
// 从配置文件中获取ffmpeg的安装位置
@Value("${videoprocess.ffmpegpath}")
String ffmpegPath;

@Autowired
private MediaProcessMapper mediaProcessMapper;

@Autowired
private MediaFileService mediaFileService;

@Autowired
private MediaFileProcessService mediaFileProcessService;

@XxlJob("videoJobHandler")
public void videoJobHandler() throws InterruptedException {
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
// 查询待处理任务,一次处理的任务数与cpu核心数相同
List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardTotal, shardIndex, 12);
CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
// 未查询到待处理任务,结束方法
if (mediaProcessList == null || mediaProcessList.size() == 0) {
log.debug("查询到的待处理任务数为0");
return;
}
// 要处理的任务数
int size = mediaProcessList.size();
// 查询到任务,创建size个线程去处理
ExecutorService threadPool = Executors.newFixedThreadPool(size);
mediaProcessList.forEach(mediaProcess -> threadPool.execute(() -> {
String status = mediaProcess.getStatus();
// 避免重复执行任务
if ("2".equals(status)) {
log.debug("该视频已经被处理,无需再次处理。视频信息:{}", mediaProcess);
countDownLatch.countDown();
return;
}
// 桶
String bucket = mediaProcess.getBucket();
// 文件路径
String filePath = mediaProcess.getFilePath();
// 原始文件的md5
String fileId = mediaProcess.getFileId();
File originalFile = null;
File mp4File = null;
try {
// 将原始视频下载到本地,创建临时文件
originalFile = File.createTempFile("original", null);
// 处理完成后的文件
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("处理视频前创建临时文件失败");
countDownLatch.countDown();
XueChengPlusException.cast("处理视频前创建临时文件失败");
}
try {
mediaFileService.downloadFileFromMinio(originalFile, bucket, filePath);
} catch (Exception e) {
log.error("下载原始文件过程中出错:{},文件信息:{}", e.getMessage(), mediaProcess);
countDownLatch.countDown();
XueChengPlusException.cast("下载原始文件过程出错");
}
// 调用工具类将avi转为mp4
String result = null;
try {
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegPath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());
// 获取转换结果,转换成功返回success 转换失败返回错误信息
result = videoUtil.generateMp4();
} catch (Exception e) {
log.error("处理视频失败,视频地址:{},错误信息:{}", originalFile.getAbsolutePath(), e.getMessage());
countDownLatch.countDown();
XueChengPlusException.cast("处理视频失败");
}
// 转换成功,上传到MinIO
// 设置默认状态为失败
status = "3";
String url = null;
if ("success".equals(result)) {
// 根据文件md5,生成objectName
String objectName = mediaFileService.getFilePathByMd5(fileId, ".mp4");
try {
mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), bucket, objectName);
} catch (Exception e) {
log.error("上传文件失败:{}", e.getMessage());
XueChengPlusException.cast("上传文件失败");
}
// 处理成功,将状态设为成功
status = "2";
// 拼接url,准备更新数据
url = "/" + bucket + "/" + objectName;
}
// 记录任务处理结果url
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), status, fileId, url, result);
countDownLatch.countDown();
}));
// 等待,为了防止无线等待,这里设置一个超时时间为30分钟(很充裕了),若到时间还未处理完,则结束任务
countDownLatch.await(30, TimeUnit.MINUTES);
}
}
  • 在media-service-dev.yaml中新增配置,指定ffmpeg安装位置
1
2
videoprocess:
ffmpegpath: D:\SoftWare\ffmpeg\ffmpeg.exe

视频处理测试

  • 进入xxl-job调度中心添加执行器和视频处理任务
  • 在xxl-job配置任务调度策略
    • 配置阻塞处理策略为:丢弃后续调度
    • 配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些
      • 如:5分钟,即使到达调度时间,如果视频没有处理完成,仍丢弃调度请求
  • 配置完成后开始测试视频处理
    1. 首先上传至少4个视频,非mp4格式
    2. 在xxl-job启动视频处理任务
    3. 观察媒资管理服务后台日志

面试

XXL-JOB工作原理

  1. xxl-job的工作原理是什么?xxl-job是什么?

    • xxl-job分布式任务调度服务由调度中心和执行器组成,调度中心负责按任务调度策略向执行器下发任务,执行器负责接收任务,执行任务
      1. 首先部署并启动xxl-job调度中心(一个java工程,打成jar包可以放到虚拟机上运行)
      1
      nohup java -jar xxl-job-admin...  & 
      1. 在微服务中添加xxl-job依赖,在微服务中配置执行器
      1
      2
      3
      4
      <dependency>
      <groupId>com.xuxueli</groupId>
      <artifactId>xxl-job-core</artifactId>
      </dependency>
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      xxl:
      job:
      admin:
      addresses: http://192.168.101.128:18088/xxl-job-admin/
      executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job-jobhandler
      logretentiondays: 30
      accessToken: default_token
      • 配置类就是读取yml配置中的信息,创建一个Xxl的执行器
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      /**
      * xxl-job config
      *
      * @author xuxueli 2017-04-28
      */
      @Configuration
      public class XxlJobConfig {
      private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

      @Value("${xxl.job.admin.addresses}")
      private String adminAddresses;

      @Value("${xxl.job.accessToken}")
      private String accessToken;

      @Value("${xxl.job.executor.appname}")
      private String appname;

      @Value("${xxl.job.executor.address}")
      private String address;

      @Value("${xxl.job.executor.ip}")
      private String ip;

      @Value("${xxl.job.executor.port}")
      private int port;

      @Value("${xxl.job.executor.logpath}")
      private String logPath;

      @Value("${xxl.job.executor.logretentiondays}")
      private int logRetentionDays;

      @Bean
      public XxlJobSpringExecutor xxlJobExecutor() {
      logger.info(">>>>>>>>>>> xxl-job config init.");
      XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
      xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
      xxlJobSpringExecutor.setAppname(appname);
      xxlJobSpringExecutor.setAddress(address);
      xxlJobSpringExecutor.setIp(ip);
      xxlJobSpringExecutor.setPort(port);
      xxlJobSpringExecutor.setAccessToken(accessToken);
      xxlJobSpringExecutor.setLogPath(logPath);
      xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
      return xxlJobSpringExecutor;
      }
      /**
      * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
      *
      * 1、引入依赖:
      * <dependency>
      * <groupId>org.springframework.cloud</groupId>
      * <artifactId>spring-cloud-commons</artifactId>
      * <version>${version}</version>
      * </dependency>
      *
      * 2、配置文件,或者容器启动变量
      * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
      *
      * 3、获取IP
      * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
      */
      }
      1. 启动微服务,执行器向调度中心上报自己
      2. 在微服务中写一个任务方法,并用xxl-job的注解去标记执行任务的方法名称
      1
      2
      3
      4
      @XxlJob("testJob")
      public void testJob() {
      log.debug("开始执行.......");
      }
      1. 在调度中心配置任务调度策略,调度策略就是每个多长时间执行,又或者是每天/每月的固定时间去执行等
      2. 在调度中心启动任务
      3. 调度中心根据任务调度策略,到达时间就开始下发任务给执行器
      4. 执行器收到任务就开始执行任务
  2. 如何保证任务不重复执行?

    1. 调度中心按分片广播的方式去下发任务
    2. 执行器收到作业分片广播的参数:分片总数(shardTotal)和分片序号(shardIndex),计算任务id % 分片总数(taskId % shardTotal),如果结果等于分片序号,就去执行这个任务(taskId % shardTotal = shardIndex)。这样就可以保证不同的执行器执行不同的任务
    3. 配置调度过期策略为忽略,避免同一个执行器多次重复执行同一个任务
    4. 配置任务阻塞处理策略为丢弃后续调度,注意:丢弃也没事,下一次调度还可以执行
    5. 另外还要保证任务处理的幂等性,执行过的任务可以打一个状态标记已完成(上面的代码设置status=2即为完成),下次再次调度该任务时,判断该任务已完成,就不再执行
  3. 任务幂等性如何保证?

    • 幂等性描述的是一次和多次请求某一个资源,对于资源本身,应该返回同样的结果
    • 幂等性是为了解决重复提交问题,例如:恶意刷单,重复支付等
    • 解决幂等性的常用方案
      1. 数据库约束,例如:唯一索引、主键
      2. 乐观锁:常用于数据库,更新数据时,根据乐观锁的状态去更新
      3. 唯一序列号,请求前生成的唯一序列号,携带序列号去请求,执行时在redis记录该序列号,用于表示该序列号请求已经执行过了,如果相同的序列号再次来执行,则说明是重复执行。这里的解决方式是在数据库中添加状态处理字段,视频处理完成,则更新该字段为已完成,执行视频处理之前判断状态是否为已完成,若已完成则不处理

绑定媒资

需求分析

业务流程

  • 截至目前,媒资管理已经完成文件上传、视频处理等基本功能。那么本小节就来讲解课程计划绑定媒资文件
  • 如何将课程计划绑定媒资文件呢?
    • 进入课程计划界面,在小节中点击添加视频/文档/作业按钮,输入关键字搜索,进行绑定即可

数据模型

  • 课程计划绑定媒资文件后,存储至课程计划绑定媒资表,即teachplan_media表中

接口定义

  • 根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击添加视频按钮后,用户选择视频,点击提交,前端以json格式请求以下参数
1
2
3
4
5
6
7
8
请求网址: http://localhost:8601/api/content/teachplan/association/media
请求方法: POST
载荷:
{
"mediaId": "a92da96ebcf28dfe194a1e2c393dd860",
"fileName": "胶水.avi",
"teachplanId": 293
}
  • 从请求网址可以看出,该接口在内容管理模块提供,即在content-api中提供
  • 请求方式为POST,那么我们定义一个DTO类用来接收请求参数,里面只包含载荷中的三个属性即可,在content-model中新建BindTeachplanMediaDto模型类
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@ApiModel(value = "BindTeachplanMediaDto", description = "教学计划-媒资绑定DTO")
public class BindTeachplanMediaDto {
@ApiModelProperty(value = "媒资文件id", required = true)
private String mediaId;

@ApiModelProperty(value = "媒资文件名称", required = true)
private String fileName;

@ApiModelProperty(value = "课程计划标识", required = true)
private Long teachplanId;
}
  • 在content-api下的TeachplanController中定义接口
1
2
3
4
5
@ApiOperation("课程计划与媒资信息绑定")
@PostMapping("/teachplan/association/media")
public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto) {

}

接口开发

DAO开发

  • 使用自动生成的TeachplanMedia的Mapper即可

Service开发

  • 根据需求定义Service接口
1
2
3
4
5
/**
* 教学计划绑定媒资信息
* @param bindTeachplanMediaDto
*/
void associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto);
  • 定义接口实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Transactional
@Override
public void associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) {
Long teachplanId = bindTeachplanMediaDto.getTeachplanId();
// 先根据请求参数查询出对应的教学计划teachplan
Teachplan teachplan = teachplanMapper.selectById(teachplanId);
if (teachplan == null) {
XueChengPlusException.cast("教学计划不存在");
}
// 获取教学计划的层级,只有第二层级允许绑定媒资信息(第二层级为小节,第一层级为章节)
Integer grade = teachplan.getGrade();
if (grade != 2) {
XueChengPlusException.cast("只有小节允许绑定媒资信息");
}
// 绑定媒资,如果之前已经绑定过了媒资,再次绑定时为更新(例如该小节已经绑定了 星际牛仔.avi,现在改绑为 胶水.avi,其实现方式为先删再增)
LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper<TeachplanMedia>().eq(TeachplanMedia::getTeachplanId, teachplanId);
teachplanMediaMapper.delete(queryWrapper);
TeachplanMedia teachplanMedia = new TeachplanMedia();
teachplanMedia.setTeachplanId(bindTeachplanMediaDto.getTeachplanId());
teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName());
teachplanMedia.setMediaId(bindTeachplanMediaDto.getMediaId());
teachplanMedia.setCourseId(teachplan.getCourseId());
teachplanMedia.setCreateDate(LocalDateTime.now());
teachplanMediaMapper.insert(teachplanMedia);
}

接口层完善

  • 调用service层的代码
1
2
3
4
5
@ApiOperation("课程计划与媒资信息绑定")
@PostMapping("/teachplan/association/media")
public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto) {
teachplanService.associationMedia(bindTeachplanMediaDto);
}

接口测试

  • 向指定课程计划添加视频,成功添加后,再次添加视频,则会替换掉原有的视频

实战

  • 根据接口定义实现解除绑定功能
    • 点击已经绑定的视频名称即可解除绑定
  • 接口定义如下
1
delete /teachplan/association/media/{teachPlanId}/{mediaId}
  • 在TeachplanController中定义接口
1
2
3
4
5
@ApiOperation("课程计划解除媒资信息绑定")
@DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}")
public void unassociationMedia(@PathVariable Long teachPlanId, @PathVariable Long mediaId) {

}
  • 根据需求定义Service接口
1
2
3
4
5
/** 解绑教学计划与媒资信息
* @param teachPlanId 教学计划id
* @param mediaId 媒资信息id
*/
void unassociationMedia(Long teachPlanId, Long mediaId);
  • 定义接口实现
1
2
3
4
5
6
7
@Override
public void unassociationMedia(Long teachPlanId, Long mediaId) {
LambdaQueryWrapper<TeachplanMedia> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TeachplanMedia::getTeachplanId, teachPlanId)
.eq(TeachplanMedia::getMediaId, mediaId);
teachplanMediaMapper.delete(queryWrapper);
}
  • 完善接口层,调用service层的代码
1
2
3
4
5
@ApiOperation("课程计划解除媒资信息绑定")
@DeleteMapping("/teachplan/association/media/{teachPlanId}/{mediaId}")
public void unassociationMedia(@PathVariable Long teachPlanId, @PathVariable String mediaId) {
teachplanService.unassociationMedia(teachPlanId, mediaId);
}
功能写完了,重启服务发现一直报404,就离谱啊
为什么报的是404呢?我路径参数100%没问题的啊
遂继续重启,无果
最后发现是我重启服务重启错了,淦