【小白二开系列】爱星物联定时任务相关代码解析

[复制链接]
查看1069 | 回复5 | 2024-6-5 18:03:31 | 显示全部楼层 |阅读模式

爱星物联平台提供两种定时任务的处理,第一种是对接xxl-job分布式定时任务,其主要运用于数据定时统计、数据定时清理等服务相关的定时业务;第二种则是针对于设备的定时、倒计时的业务;

对接xxl-job分布式定时任务实现数据统计:

对接xxl-job用到了开源go执行器(xxl-job-executor-go)客户端,有了他平台对接xxl-job就简单多了;

爱星物联 iot_statistics_service 服务中使用这个定时任务做统计数据生成业务,下面我们说说这个服务;

iot_statistics_service服务中,我们首先需要配置xxljob相关的配置文件;

xxlJob:
  enable: true #是否启用xxlJob
  serverAddr: http://127.0.0.1:8088/xxl-job-admin #设置调度中心地址
  accessToken: xxxxxxxxx #请求令牌
  executorIp: 127.0.0.1 #设置执行器IP
  executorPort: 9999 #设置执行器端口
  registryKey: jobs-statistics #设置执行器标识,xxljob管理端添加执行器

然后就是对xxljob进行初始化和调用接口的注册

找到 iot_statistics_service/main.go下的 xxlService方法,可以看到方法首先创建了xxljob的执行器,然后注册需要任务的handle;目前平台已经提供的任务handle有app活跃用户统计、数据概览月度统计、数据概览小时统计、app用户月统计、开发者统计、设备月故障统计和设备激活[小时、日、月]数据统计等;

func XxlService() error {
    xxlJobCnf := config.Global.XxlJob
        //创建了xxljob的执行器
    exec := xxl.NewExecutor(
        xxl.ServerAddr(xxlJobCnf.ServerAddr),
        xxl.AccessToken(xxlJobCnf.AccessToken),
        xxl.ExecutorIp(xxlJobCnf.ExecutorIp),
        xxl.ExecutorPort(xxlJobCnf.ExecutorPort),
        xxl.RegistryKey(xxlJobCnf.RegistryKey),
        xxl.SetLogger(&task.XxlLogger{}),
    )
    exec.Init()
    exec.LogHandler(task.XxlLogHandler)
    //注册任务handler
    exec.RegTask("task.hour.dataoverview", task.HourDataOveriewStatistics)
    exec.RegTask("task.month.dataoverview", task.MonthDataOveriewStatistics)
    exec.RegTask("task.hour.device.active", task.HourActive)
    exec.RegTask("task.month.device.fault", task.MonthFault)
    exec.RegTask("task.developer.statistics", task.DeveloperStatistics)
    exec.RegTask("task.app.statistics", task.AppListStatistics)
    exec.RegTask("task.month.app.user", task.MonthAppUser)
    exec.RegTask("task.day.app.user", task.DayAppActiveUser)
    exec.RegTask("task.hour.history.device.active", task.HistoryActive)
    exec.RegTask("task.month.history.device.fault", task.HistoryFault)
    exec.RegTask("task.month.history.app.user", task.MonthHistoryAppUser)
    exec.RegTask("task.hour.history.dataoverview", task.HistoryHourDataOveriewActive)
    exec.RegTask("task.month.history.dataoverview", task.HistoryMonthDataOveriewActive)
    if err := exec.Run(); err != nil {
        iotlogger.LogHelper.Errorf("exec.Run error:%s", err.Error())
        return err
    }
    return nil
}

上面创建的任务handle都已经初始化到xxl-job的初始化数据脚本中;

image.png

如果需要扩展其它数据,只需要根据需求在数据库中创建统计数据表,然后注册任务handle,最后在管理后台添加定时任务即可

 exec.RegTask("task.xxxxx", func(cxt context.Context, param *xxl.RunReq) string {
  //编写数据统计查询和存储逻辑
  //.....
  return ""
})

进入到xxl-job平台添加定时任务,注意将JobHandler的值对应上;

image.png

设备定时器、倒计时介绍

设备定时器和倒计时是物联网产品非常重要的功能,给与设备使用者极大的帮助;

爱星物联社区版的定时器和倒计时放在了 iot_device_service服务下,使用开源库“github.com/robfig/cron/v3”,使用cron需要在main.go中进行初始化,此初始化包括cron注册和历史定时器和倒计时的任务创建操作;相关代码如下:

main.go

//调用NewCron()、InitCron() 
service.RunJob()
// 关闭必须放到main.go下
defer service.GetCron().Stop()

cron的初始化

func NewCron() {
    cronObj = &CronObj{
        c:      cron.New(cron.WithSeconds()),
        m:      make(map[int64]*DeviceJob),
        endM:   make(map[int64]*DeviceJob),
        mux:    new(sync.RWMutex),
        endMux: new(sync.RWMutex),
    }
}

历史任务创建,这里需要根据所部署的区域服务Id进行创建;

func InitCron() error {
    svc := new(IotJobSvc)
    req := &protosService.IotJobListRequest{
        Query: &protosService.IotJob{
            Enabled:        1,
            RegionServerId: config.Global.Service.ServerId, //config中读取服务器Id
        },
    }
    jobs, _, err := svc.GetListIotJob(req)
    if err != nil {
        return err
    }

    for i := range jobs {
        if err := cronObj.CreateJob(jobs[i]); err != nil {
            iotlogger.LogHelper.Errorf("create task:[%s] job error: [%s]", jobs[i].TaskId, err)
            continue
        }
    }
    cronObj.c.Start()
    return nil
}

Cron已准备好,然后说说平台提供的设备定时器、倒计时的业务功能;

定时器、倒计时的业务表分别是 iot_device.t_iot_device_timeriot_device.t_iot_device_countdown,同样也是属于 iot_device_service服务下管理,在创建定时器、倒计时的时候需要通知cron执行器创建执行任务,同理修改和删除都需要通知cron进行相应的处理;

//任务执行参数
reqJob := &proto.IotJob{
    Id:             timerId,          //定时器、倒计时记录的Id
    ProductKey:     productKey,       //产品Key
    DeviceId:       deviceId,         //设备Id
    TaskType:       2,                //任务类型(1 倒计时任务 2 定时任务)
    Enabled:        1,                //定时器状态(=1启动 =2 禁用)
    Cron:           "0 8 14 15 5 ?",  //执行规则
    Data:           "{\"1\":true}",   //需要发送给设备执行的命令
    Timezone:       "Asia/Shanghai",  //时区
    RegionServerId: 1,                //区域服务器Id(例如:1:中国、2:美国)
}

//创建任务
GetCron().CreateJob(req)

//删除任务
GetCron().DeleteJob(req)

//修改任务(先删后创建)
GetCron().DeleteJob(req)
GetCron().CreateJob(req)

目前APP中的应用如下:

APP控制面板发起定时器、倒计时的任务创建,云端根据APP设置的规则、时区创建执行任务;

image.png

回复

使用道具 举报

1084504793 | 2024-6-5 18:24:00 | 显示全部楼层
回复

使用道具 举报

WT_0213 | 2024-6-6 08:22:30 | 显示全部楼层
点赞
回复

使用道具 举报

jkernet | 2024-6-6 11:08:37 | 显示全部楼层
学习
回复

使用道具 举报

爱笑 | 2024-6-6 11:42:34 | 显示全部楼层
滴滴滴,学习卡。
用心做好保姆工作
回复 支持 反对

使用道具 举报

iiv | 2024-6-8 18:09:48 | 显示全部楼层
滴滴
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则