[自研开源] MyData 数据集成任务的流程介绍 v0.7.1 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
lien321
V2EX    分享创造

[自研开源] MyData 数据集成任务的流程介绍 v0.7.1

  •  
  •   lien321 2024-03-08 21:22:48 +08:00 1641 次点击
    这是一个创建于 665 天前的主题,其中的信息可能已经有所发展或是发生改变。

    开源地址:gitee | github

    详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0

    部署文档:[用 Docker 部署 MyData v0.7.1]( https://www.mydata.work/docs#/./docker/用 Docker 部署 MyData)

    使用手册:MyData 使用手册 v0.7.1

    交流 Q 群:430089673

    MyData 后端结构

    MyData 的后端由 3 个子服务组成,分别是管理服务任务服务业务数据服务

    • 管理服务:通过项目、数据标准、应用 API 、环境的管理 配置出同步业务数据的任务;
    • 任务服务:根据配置的任务 定时调用应用 API 和数据服务 实现业务数据的传输和存储;
    • 数据服务:封装业务数据的隔离机制和读写操作;

    依赖的组件:

    • MySQL:存储管理数据;
    • Redis:缓存管理数据和任务;
    • MongoDB ;存储业务数据;

    下图从数据流角度 展示 3 个子服务的关联: image.png 注:开源版本采用单体 SpringBoot ;

    任务服务

    配置任务

    任务主要包括:项目环境、数据标准、应用 API 、任务类型、字段映射、任务周期;

    • 项目环境:确定应用 API 的统一前缀地址;
    • 数据标准:明确集成的业务数据的数据结构;
    • 应用 API: 业务数据的传输通道;
    • 任务类型:明确数据的传输方向,提供数据表示从应用 API 读取业务员数据、消费数据表示向应用 API 发送业务数据;
    • 字段映射:配置接口响应结构中 与标准数据字段的映射关系;
    • 任务周期:定期执行任务的时间间隔,格式为 cron 表达式; image.png

    任务流程

    数据集成的任务执行流程如下图:

    1. 任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;

      public class JobExecutor implements ApplicationRunner { ... @Override public void run(ApplicationArguments args) { // 移除已有缓存 jobCache.removeAll(); // 查询已启动的任务 List<Task> tasks = taskService.listRunningTasks(); log.info("tasks.size() = " + tasks.size()); if (CollUtil.isNotEmpty(tasks)) { tasks.forEach(this::startTask); } } ... } 
    2. 根据任务的 cron 表达式,计算任务的下次执行时间;

      /** * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间 * * @param taskInfo 定时任务 */ private void calculateNextRunTime(TaskInfo taskInfo) { Assert.notNull(taskInfo); Assert.notEmpty(taskInfo.getTaskPeriod()); Date date = taskInfo.getStartTime(); if (taskInfo.getFailCount() > 0) { date = taskInfo.getNextRunTime(); } CronExpression crOnExpression= new CronExpression(taskInfo.getTaskPeriod()); Date nextRunTime = cronExpression.getNextValidTimeAfter(date); taskInfo.setNextRunTime(nextRunTime); } 
    3. 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;

      /** * 缓存任务 * * @param taskInfo 任务对象 * @throws IllegalArgumentException 缓存时长无效 */ public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException { // 计算任务缓存有效时长 long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND); if (expire <= 0) { throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}" , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN) , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN))); } redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo); redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire); taskInfo.appendLog("任务存入 redis ,缓存时长 {} 秒", expire); } 
    4. 通过监听 redis 的 key 失效事件,获得待执行的任务;

      public class RedisKeyExpiredListener implements MessageListener { private final JobExecutor jobExecutor; @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) { String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length()); jobExecutor.notify(taskId); } } } 
    5. 将任务加入待执行的线程池,随后即可执行

      /** * 任务存入执行队列 * * @param taskInfo 任务 */ private void executeJob(TaskInfo taskInfo) { taskInfo.appendLog("任务存入执行队列"); Runnable runnable = new JobThread(taskInfo); getThreadPoolExecutor().execute(runnable); } 
    6. 根据任务类型分别执行提供数据消费数据流程;

      1. 提供数据

        1. 调用应用 API ,获取 json 格式数据;
        2. 根据任务中字段映射 解析 json 为业务数据 Map 集合;
        3. 调用数据服务 将业务数据存入 MongoDB ;
        case MdConstant.DATA_PRODUCER: // 调用 api 获取 json String json = ApiUtil.read(taskInfo); // 将 json 按字段映射 解析为业务数据 jobDataService.parseData(taskInfo, json); // 根据条件过滤数据 jobDataFilterService.doFilter(taskInfo); // 保存业务数据 jobDataService.saveTaskData(taskInfo); // 更新环境变量 jobVarService.saveVarValue(taskInfo, json); break; 
      2. 消费数据

        1. 根据任务所选数据标准,查询业务数据;
        2. 再根据字段映射,将业务数据 转为指定的 json 对象集合;
        3. 调用应用 API ,传输 json 数据;
        case MdConstant.DATA_CONSUMER: List<BizDataFilter> filters = taskInfo.getDataFilters(); if (CollUtil.isNotEmpty(filters)) { // 解析过滤条件值中的 自定义字符串 parseFilterValue(filters); // 排除值为 null 的条件 filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList()); } // 根据过滤条件 查询数据 String dataCode = taskInfo.getDataCode(); if (StrUtil.isNotEmpty(dataCode)) { List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters); taskInfo.setConsumeDataList(dataList); // 根据字段映射转换为 api 参数 jobDataService.convertData(taskInfo); } // 调用 api 传输数据 ApiUtil.write(taskInfo); break; 
    7. 保存任务执行日志;

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2498 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 03:16 PVG 11:16 LAX 19:16 JFK 22:16
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86