阅读完需:约 5 分钟
在软件开发中会有会有定时任务的存在,简单的定时任务只需要在固定时间触发它的执行就可以了。
但是对于复杂的定时任务,可能是由多个任务组成一个任务组,它们之间存在依赖关系,一个任务执行的条件,必须是它的前置任务已经执行成功(或者没有前置任务),它才可以执行。
比如下面有一组任务就是有依赖关系的任务组
图中任务的依赖关系为:
- 任务1:依赖4,5
- 任务5:无依赖
- 任务4:依赖2
- 任务2:依赖9
- 任务9:依赖7,8
- 任务8:依赖3
- 任务7:无依赖
- 任务3:无依赖
- 任务6:依赖9
这个任务关系图其实就是“有向无环图”(简称DAG)这种数据结构。
图是由一系列顶点和连接顶点的边组成的数据结构。它分为有向图和无向图。
有向图的边是有方向的,即A->B这条边和B->A是两条不同的边,而无向图中,A->B和B->A是共用一条边的。基于这种数据结构,我们可以用图的顶点表示一个任务,而图的边表示任务之间的依赖关系,就可以基于有向无环图来实现任务调度。
代码实现
/**
* 利用有向无环图(DAG)进行任务调度
* 执行人
* @author admin
* @date 2023/05/08
* @constructor 创建[Executor]
*/
interface Executor {
fun execute(): Boolean
}
/**
* 任务
* @author admin
* @date 2023/05/08
* @constructor 创建[Task]
* @param [id] id
* @param [name] 名字
* @param [state] 状态 简化为0:未执行,1:已执行
*/
class Task(val id: Long, val name: String,private var state: Int) : Executor {
override fun execute(): Boolean {
Thread.sleep(1000)
println("Task id: [$id], task name: [$name] is running")
state = 1
return true
}
/**
* 已执行 返回任务是否已执行
* @return [Boolean]
*/
fun hasExecuted(): Boolean {
return state == 1
}
}
/**
* 有向图
*
* 这个类使用了邻接表来表示有向无环图。
*
* tasks是顶点集合,也就是任务集合。
*
* map是任务依赖关系集合。key是一个任务,value是它的前置任务集合。
*
* 一个任务执行的前提是它在map中没有以它作为key的entry,或者是它的前置任务集合中的任务都是已执行的状态。
*
* @author admin
* @date 2023/05/08
* @constructor 创建[Digraph]
*/
class Digraph {
private var tasks: MutableSet<Task>
private var map: MutableMap<Task, MutableSet<Task>>
init {
tasks = HashSet()
map = HashMap()
}
/**
* 添加前置
* @param [task] 任务
* @param [prev] 上一页
*/
fun addEdge(task: Task, prev: Task) {
require(!(!tasks.contains(task) || !tasks.contains(prev)))
var prevs = map[task]
if (prevs == null) {
prevs = HashSet()
map[task] = prevs
}
require(!prevs.contains(prev))
prevs.add(prev)
}
/**
* 添加任务
* @param [task] 任务
*/
fun addTask(task: Task) {
require(!tasks.contains(task))
tasks.add(task)
}
/**
* 删除
* @param [task] 任务
*/
fun remove(task: Task) {
if (!tasks.contains(task)) {
return
}
if (map.containsKey(task)) {
map.remove(task)
}
for (set in map.values) {
if (set.contains(task)) {
set.remove(task)
}
}
}
fun getTasks(): Set<Task> {
return tasks
}
fun setTasks(tasks: MutableSet<Task>) {
this.tasks = tasks
}
fun getMap(): Map<Task, MutableSet<Task>> {
return map
}
fun setMap(map: MutableMap<Task, MutableSet<Task>>) {
this.map = map
}
}
/**
* 调度器 线程池运行注意并行的可见性
*
* 调度器的实现比较简单,就是遍历任务集合,找出待执行的任务集合,放到一个List中,再串行执行(若考虑性能,可优化为并行执行)。
*
* 若List为空,说明所有任务都已执行,则这一次任务调度结束。
*
* @author admin
* @date 2023/05/08
* @constructor 创建[Scheduler]
*/
class Scheduler {
val threads=ThreadPoolExecutor(4,8,100,TimeUnit.SECONDS,ArrayBlockingQueue(1000))
/**
* 时间表
* @param [digraph] 有向图
*/
fun schedule(digraph: Digraph) {
val map= mutableMapOf<String,Task>()
while (true) {
val todo: MutableList<Task> = ArrayList()
for (task in digraph.getTasks()) {
if (!task.hasExecuted()) {
val prevs: Set<Task>? = digraph.getMap()[task]
if (!prevs.isNullOrEmpty()) {
var toAdd = true
for (task1 in prevs) {
if (!task1.hasExecuted()) {
toAdd = false
break
}
}
if (toAdd && map[task.name]==null) {
map[task.name]=task
todo.add(task)
}
} else {
if(map[task.name]==null){
map[task.name]=task
todo.add(task)
}
}
}
}
if (todo.isNotEmpty()) {
for (task in todo) {
threads.execute {
if (!task.execute()) {
throw RuntimeException()
}
}
}
} else if (map.size==digraph.getTasks().size) {
break
}
}
}
companion object {
@JvmStatic
fun main(args: Array<String>) {
val digraph = Digraph()
val task1 = Task(1L, "task1", 0)
val task2 = Task(2L, "task2", 0)
val task3 = Task(3L, "task3", 0)
val task4 = Task(4L, "task4", 0)
val task5 = Task(5L, "task5", 0)
val task6 = Task(6L, "task6", 0)
val task7 = Task(7L, "task7", 0)
val task8 = Task(8L, "task8", 0)
val task9 = Task(9L, "task9", 0)
digraph.addTask(task1)
digraph.addTask(task2)
digraph.addTask(task3)
digraph.addTask(task4)
digraph.addTask(task5)
digraph.addTask(task6)
digraph.addTask(task7)
digraph.addTask(task8)
digraph.addTask(task9)
digraph.addEdge(task1, task5)
digraph.addEdge(task1, task4)
digraph.addEdge(task4, task2)
digraph.addEdge(task2, task9)
digraph.addEdge(task9, task7)
digraph.addEdge(task9, task8)
digraph.addEdge(task8, task3)
digraph.addEdge(task6, task9)
val scheduler = Scheduler()
scheduler.schedule(digraph)
}
}
}
结果
----
Task id: [5], task name: [task5] is running
Task id: [7], task name: [task7] is running
Task id: [3], task name: [task3] is running
----
Task id: [8], task name: [task8] is running
----
Task id: [9], task name: [task9] is running
----
----
Task id: [2], task name: [task2] is running
Task id: [6], task name: [task6] is running
----
Task id: [4], task name: [task4] is running
----
Task id: [1], task name: [task1] is running
这段代码的主要由接口定义,任务类,任务添加类,调度器组成