爱星物联平台提供两种定时任务的处理,第一种是对接xxl-job分布式定时任务,其主要运用于数据定时统计、数据定时清理等服务相关的定时业务;第二种则是针对于设备的定时、倒计时的业务;
对接xxl-job分布式定时任务实现数据统计:
对接xxl-job用到了开源go执行器(xxl-job-executor-go)客户端,有了他平台对接xxl-job就简单多了;
爱星物联 iot_statistics_service
服务中使用这个定时任务做统计数据生成业务,下面我们说说这个服务;
在 iot_statistics_service
服务中,我们首先需要配置xxljob相关的配置文件;
xxlJob:
enable: true #是否启用xxlJob
serverAddr: http://127.0.0.1:8088/xxl-job-admin #设置调度中心地址
accessToken: xxxxxxxxx #请求令牌
executorIp: 127.0.0.1 #设置执行器IP
executorPort: 9999 #设置执行器端口
registryKey: jobs-statistics #设置执行器标识,xxljob管理端添加执行器
然后就是对xxljob进行初始化和调用接口的注册
找到 iot_statistics_service/main.go
下的 xxlService
方法,可以看到方法首先创建了xxljob的执行器,然后注册需要任务的handle;目前平台已经提供的任务handle有app活跃用户统计、数据概览月度统计、数据概览小时统计、app用户月统计、开发者统计、设备月故障统计和设备激活[小时、日、月]数据统计等;
func XxlService() error {
xxlJobCnf := config.Global.XxlJob
//创建了xxljob的执行器
exec := xxl.NewExecutor(
xxl.ServerAddr(xxlJobCnf.ServerAddr),
xxl.AccessToken(xxlJobCnf.AccessToken),
xxl.ExecutorIp(xxlJobCnf.ExecutorIp),
xxl.ExecutorPort(xxlJobCnf.ExecutorPort),
xxl.RegistryKey(xxlJobCnf.RegistryKey),
xxl.SetLogger(&task.XxlLogger{}),
)
exec.Init()
exec.LogHandler(task.XxlLogHandler)
//注册任务handler
exec.RegTask("task.hour.dataoverview", task.HourDataOveriewStatistics)
exec.RegTask("task.month.dataoverview", task.MonthDataOveriewStatistics)
exec.RegTask("task.hour.device.active", task.HourActive)
exec.RegTask("task.month.device.fault", task.MonthFault)
exec.RegTask("task.developer.statistics", task.DeveloperStatistics)
exec.RegTask("task.app.statistics", task.AppListStatistics)
exec.RegTask("task.month.app.user", task.MonthAppUser)
exec.RegTask("task.day.app.user", task.DayAppActiveUser)
exec.RegTask("task.hour.history.device.active", task.HistoryActive)
exec.RegTask("task.month.history.device.fault", task.HistoryFault)
exec.RegTask("task.month.history.app.user", task.MonthHistoryAppUser)
exec.RegTask("task.hour.history.dataoverview", task.HistoryHourDataOveriewActive)
exec.RegTask("task.month.history.dataoverview", task.HistoryMonthDataOveriewActive)
if err := exec.Run(); err != nil {
iotlogger.LogHelper.Errorf("exec.Run error:%s", err.Error())
return err
}
return nil
}
上面创建的任务handle都已经初始化到xxl-job的初始化数据脚本中;
如果需要扩展其它数据,只需要根据需求在数据库中创建统计数据表,然后注册任务handle,最后在管理后台添加定时任务即可
exec.RegTask("task.xxxxx", func(cxt context.Context, param *xxl.RunReq) string {
//编写数据统计查询和存储逻辑
//.....
return ""
})
进入到xxl-job平台添加定时任务,注意将JobHandler的值对应上;
设备定时器、倒计时介绍
设备定时器和倒计时是物联网产品非常重要的功能,给与设备使用者极大的帮助;
爱星物联社区版的定时器和倒计时放在了 iot_device_service
服务下,使用开源库“github.com/robfig/cron/v3
”,使用cron需要在main.go中进行初始化,此初始化包括cron注册和历史定时器和倒计时的任务创建操作;相关代码如下:
main.go
//调用NewCron()、InitCron()
service.RunJob()
// 关闭必须放到main.go下
defer service.GetCron().Stop()
cron的初始化
func NewCron() {
cronObj = &CronObj{
c: cron.New(cron.WithSeconds()),
m: make(map[int64]*DeviceJob),
endM: make(map[int64]*DeviceJob),
mux: new(sync.RWMutex),
endMux: new(sync.RWMutex),
}
}
历史任务创建,这里需要根据所部署的区域服务Id进行创建;
func InitCron() error {
svc := new(IotJobSvc)
req := &protosService.IotJobListRequest{
Query: &protosService.IotJob{
Enabled: 1,
RegionServerId: config.Global.Service.ServerId, //config中读取服务器Id
},
}
jobs, _, err := svc.GetListIotJob(req)
if err != nil {
return err
}
for i := range jobs {
if err := cronObj.CreateJob(jobs[i]); err != nil {
iotlogger.LogHelper.Errorf("create task:[%s] job error: [%s]", jobs[i].TaskId, err)
continue
}
}
cronObj.c.Start()
return nil
}
Cron已准备好,然后说说平台提供的设备定时器、倒计时的业务功能;
定时器、倒计时的业务表分别是 iot_device.t_iot_device_timer
、iot_device.t_iot_device_countdown
,同样也是属于 iot_device_service
服务下管理,在创建定时器、倒计时的时候需要通知cron执行器创建执行任务,同理修改和删除都需要通知cron进行相应的处理;
//任务执行参数
reqJob := &proto.IotJob{
Id: timerId, //定时器、倒计时记录的Id
ProductKey: productKey, //产品Key
DeviceId: deviceId, //设备Id
TaskType: 2, //任务类型(1 倒计时任务 2 定时任务)
Enabled: 1, //定时器状态(=1启动 =2 禁用)
Cron: "0 8 14 15 5 ?", //执行规则
Data: "{\"1\":true}", //需要发送给设备执行的命令
Timezone: "Asia/Shanghai", //时区
RegionServerId: 1, //区域服务器Id(例如:1:中国、2:美国)
}
//创建任务
GetCron().CreateJob(req)
//删除任务
GetCron().DeleteJob(req)
//修改任务(先删后创建)
GetCron().DeleteJob(req)
GetCron().CreateJob(req)
目前APP中的应用如下:
APP控制面板发起定时器、倒计时的任务创建,云端根据APP设置的规则、时区创建执行任务;