User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Java   ›   正文
Java

Java—有向无环图(DAG)

2023-05-09 11:10:06
850  0 3

阅读完需:约 5 分钟

在软件开发中会有会有定时任务的存在,简单的定时任务只需要在固定时间触发它的执行就可以了。

但是对于复杂的定时任务,可能是由多个任务组成一个任务组,它们之间存在依赖关系,一个任务执行的条件,必须是它的前置任务已经执行成功(或者没有前置任务),它才可以执行。

比如下面有一组任务就是有依赖关系的任务组

图中任务的依赖关系为:

  • 任务1:依赖4,5
  • 任务5:无依赖
  • 任务4:依赖2
  • 任务2:依赖9
  • 任务9:依赖7,8
  • 任务8:依赖3
  • 任务7:无依赖
  • 任务3:无依赖
  • 任务6:依赖9

这个任务关系图其实就是“有向无环图”(简称DAG)这种数据结构。

图是由一系列顶点和连接顶点的边组成的数据结构。它分为有向图和无向图。

有向图的边是有方向的,即A->B这条边和B->A是两条不同的边,而无向图中,A->B和B->A是共用一条边的。基于这种数据结构,我们可以用图的顶点表示一个任务,而图的边表示任务之间的依赖关系,就可以基于有向无环图来实现任务调度。

代码实现

/**
 * 利用有向无环图(DAG)进行任务调度
 * 执行人
 * @author admin
 * @date 2023/05/08
 * @constructor 创建[Executor]
 */
interface Executor {
    fun execute(): Boolean
}


/**
 * 任务
 * @author admin
 * @date 2023/05/08
 * @constructor 创建[Task]
 * @param [id] id
 * @param [name] 名字
 * @param [state] 状态 简化为0:未执行,1:已执行
 */
class Task(val id: Long, val name: String,private var state: Int) : Executor {
    override fun execute(): Boolean {
        Thread.sleep(1000)
        println("Task id: [$id], task name: [$name] is running")
        state = 1
        return true
    }

    /**
     * 已执行 返回任务是否已执行
     * @return [Boolean]
     */
    fun hasExecuted(): Boolean {
        return state == 1
    }
}


/**
 * 有向图
 *
 * 这个类使用了邻接表来表示有向无环图。
 *
 * tasks是顶点集合,也就是任务集合。
 *
 * map是任务依赖关系集合。key是一个任务,value是它的前置任务集合。
 *
 * 一个任务执行的前提是它在map中没有以它作为key的entry,或者是它的前置任务集合中的任务都是已执行的状态。
 *
 * @author admin
 * @date 2023/05/08
 * @constructor 创建[Digraph]
 */
class Digraph {
    private var tasks: MutableSet<Task>
    private var map: MutableMap<Task, MutableSet<Task>>

    init {
        tasks = HashSet()
        map = HashMap()
    }

    /**
     * 添加前置
     * @param [task] 任务
     * @param [prev] 上一页
     */
    fun addEdge(task: Task, prev: Task) {
        require(!(!tasks.contains(task) || !tasks.contains(prev)))
        var prevs = map[task]
        if (prevs == null) {
            prevs = HashSet()
            map[task] = prevs
        }
        require(!prevs.contains(prev))
        prevs.add(prev)
    }

    /**
     * 添加任务
     * @param [task] 任务
     */
    fun addTask(task: Task) {
        require(!tasks.contains(task))
        tasks.add(task)
    }

    /**
     * 删除
     * @param [task] 任务
     */
    fun remove(task: Task) {
        if (!tasks.contains(task)) {
            return
        }
        if (map.containsKey(task)) {
            map.remove(task)
        }
        for (set in map.values) {
            if (set.contains(task)) {
                set.remove(task)
            }
        }
    }

    fun getTasks(): Set<Task> {
        return tasks
    }

    fun setTasks(tasks: MutableSet<Task>) {
        this.tasks = tasks
    }

    fun getMap(): Map<Task, MutableSet<Task>> {
        return map
    }

    fun setMap(map: MutableMap<Task, MutableSet<Task>>) {
        this.map = map
    }
}


/**
 * 调度器   线程池运行注意并行的可见性
 *
 * 调度器的实现比较简单,就是遍历任务集合,找出待执行的任务集合,放到一个List中,再串行执行(若考虑性能,可优化为并行执行)。
 *
 * 若List为空,说明所有任务都已执行,则这一次任务调度结束。
 *
 * @author admin
 * @date 2023/05/08
 * @constructor 创建[Scheduler]
 */
class Scheduler {
    val threads=ThreadPoolExecutor(4,8,100,TimeUnit.SECONDS,ArrayBlockingQueue(1000))

    /**
     * 时间表
     * @param [digraph] 有向图
     */
    fun schedule(digraph: Digraph) {
        val map= mutableMapOf<String,Task>()
        while (true) {
            val todo: MutableList<Task> = ArrayList()
            for (task in digraph.getTasks()) {
                if (!task.hasExecuted()) {
                    val prevs: Set<Task>? = digraph.getMap()[task]
                    if (!prevs.isNullOrEmpty()) {
                        var toAdd = true
                        for (task1 in prevs) {
                            if (!task1.hasExecuted()) {
                                toAdd = false
                                break
                            }
                        }
                        if (toAdd && map[task.name]==null) {
                            map[task.name]=task
                            todo.add(task)
                        }
                    } else {
                        if(map[task.name]==null){
                            map[task.name]=task
                            todo.add(task)
                        }
                    }
                }
            }
            if (todo.isNotEmpty()) {
                for (task in todo) {
                    threads.execute {
                        if (!task.execute()) {
                            throw RuntimeException()
                        }
                    }
                }
            } else if (map.size==digraph.getTasks().size) {
                break
            }
        }
    }

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            val digraph = Digraph()
            val task1 = Task(1L, "task1", 0)
            val task2 = Task(2L, "task2", 0)
            val task3 = Task(3L, "task3", 0)
            val task4 = Task(4L, "task4", 0)
            val task5 = Task(5L, "task5", 0)
            val task6 = Task(6L, "task6", 0)
            val task7 = Task(7L, "task7", 0)
            val task8 = Task(8L, "task8", 0)
            val task9 = Task(9L, "task9", 0)
            digraph.addTask(task1)
            digraph.addTask(task2)
            digraph.addTask(task3)
            digraph.addTask(task4)
            digraph.addTask(task5)
            digraph.addTask(task6)
            digraph.addTask(task7)
            digraph.addTask(task8)
            digraph.addTask(task9)
            digraph.addEdge(task1, task5)
            digraph.addEdge(task1, task4)
            digraph.addEdge(task4, task2)
            digraph.addEdge(task2, task9)
            digraph.addEdge(task9, task7)
            digraph.addEdge(task9, task8)
            digraph.addEdge(task8, task3)
            digraph.addEdge(task6, task9)
            val scheduler = Scheduler()
            scheduler.schedule(digraph)
        }
    }
}

结果

----
Task id: [5], task name: [task5] is running
Task id: [7], task name: [task7] is running
Task id: [3], task name: [task3] is running
----
Task id: [8], task name: [task8] is running
----
Task id: [9], task name: [task9] is running
----
----
Task id: [2], task name: [task2] is running
Task id: [6], task name: [task6] is running
----
Task id: [4], task name: [task4] is running
----
Task id: [1], task name: [task1] is running

这段代码的主要由接口定义,任务类,任务添加类,调度器组成

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

3 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 594195浏览

随机文章
SpringCloud—项目改造为Spring-Cloud-Kubernetes项目
3年前
Kotlin-内置类型—集合框架(五)
4年前
lombok编写优雅的Bean对象
5年前
Mybatis原理—SqlSession下的四大对象(Executor、StatementHandler、ParameterHandler和ResultSetHandler)
3年前
Java—类加载内存
5年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 594195 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付