Skip to content

任务调度

添加任务调度

在 Program.cs 文件中添加任务调度

cs
using Cronos;
using FreeScheduler;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using ZhonTai.Admin.Core;
using ZhonTai.Admin.Core.Consts;
using ZhonTai.Admin.Core.Db;
using ZhonTai.Admin.Core.Startup;
using ZhonTai.Admin.Domain;
using ZhonTai.Admin.Tools.TaskScheduler;
using ZhonTai.Common.Extensions;
using TaskStatus = FreeScheduler.TaskStatus;

static void ConfigureScheduler(IFreeSql fsql)
{
    fsql.CodeFirst
    .ConfigEntity<TaskInfo>(a =>
    {
        a.Name("app_task");
    })
    .ConfigEntity<TaskLog>(a =>
    {
        a.Name("app_task_log");
    })
    .ConfigEntity<TaskInfoExt>(a =>
    {
        a.Name("app_task_ext");
    });
}

new HostApp(new HostAppOptions
{
    //配置FreeSql
    ConfigureFreeSql = (freeSql, dbConfig) =>
    {
        if (dbConfig.Key == DbKeys.TaskDb)
        {
            freeSql.SyncSchedulerStructure(dbConfig, ConfigureScheduler);
        }
    },

	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            options.ConfigureFreeSql = ConfigureScheduler;
        });
    }
}).Run(args);

#if DEBUG
public partial class Program { }
#endif

更多任务表和任务日志表配置

cs
freeSql.CodeFirst
//配置任务表
.ConfigEntity<TaskInfo>(a =>
{
    a.Name("app_task");
    a.Property((TaskInfo b) => b.Id).IsPrimary(value: true);
    a.Property((TaskInfo b) => b.Body).StringLength(-1);
    a.Property((TaskInfo b) => b.Interval).MapType(typeof(int));
    a.Property((TaskInfo b) => b.IntervalArgument).StringLength(1024);
    a.Property((TaskInfo b) => b.Status).MapType(typeof(int));
    a.Property((TaskInfo b) => b.CreateTime).ServerTime(DateTimeKind.Local);
    a.Property((TaskInfo b) => b.LastRunTime).ServerTime(DateTimeKind.Local);
})
//配置任务日志表
.ConfigEntity<TaskLog>(a =>
{
    a.Name("app_task_log");
    a.Property((TaskLog b) => b.Exception).StringLength(-1);
    a.Property((TaskLog b) => b.Remark).StringLength(-1);
    a.Property((TaskLog b) => b.CreateTime).ServerTime(DateTimeKind.Local);
});

任务调度配置

appsettings.json 中添加配置

json
//任务调度配置
"TaskScheduler": {
    //进程启动信息
    "ProcessStartInfo": {
        "FileName": "C:/grpcurl_1.8.7/grpcurl",
        //工作目录
        "WorkingDirectory": ""
    },
    //告警邮件
    "AlerEmail": {
        "Enable": true,
        "Adress": ""
    }
}

解析 Cron 表达式

1、安装解析Cron表达式包

cs
<PackageReference Include="Cronos" Version="0.8.0" />

2、解析Cron表达式

cs
new HostApp(new HostAppOptions
{
	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            //配置任务调度
            options.ConfigureFreeSchedulerBuilder = freeSchedulerBuilder =>
            {
                freeSchedulerBuilder
                .UseCustomInterval(task =>
                {
                    //利用 cron 功能库解析 task.IntervalArgument 得到下一次执行时间
                    //与当前时间相减,得到 TimeSpan,若返回 null 则任务完成
                    var expression = CronExpression.Parse(task.IntervalArgument, CronFormat.IncludeSeconds);
                    var next = expression.GetNextOccurrence(DateTimeOffset.Now, TimeZoneInfo.Local);
                    var nextLocalTime = next?.DateTime;

                    return nextLocalTime == null ? null : nextLocalTime - DateTime.Now;
                });
            };
        });
    }
}).Run(args);

添加任务常量

在 MyCompanyName.MyProjectName.Api\Core\Consts 目录下添加任务常量类TaskNames.cs

cs
namespace MyCompanyName.MyProjectName.Api.Core.Consts;

/// <summary>
/// 任务常量
/// </summary>
public static partial class TaskNames
{
    /// <summary>
    /// 任务名称
    /// </summary>
    public const string ModuleTaskName = "ModuleTaskName";
}

添加 Shell 任务

下载 grpcurl 应用程序

https://github.com/fullstorydev/grpcurl/releases/tag/v1.8.7

建议使用 grpcurl v1.8.7 版本,v1.8.8+ 版本存在调用 grpc 方法异常的问题

添加或修改任务参数

json
{
  "desc": "任务说明",
  "fileName": "/app/bin/grpcurl",
  "arguments": "-plaintext -d \"{ \\\"id\\\": 1 }\" no-protocol-host:port YourNamespace.YourGrpcService/YourMethod"
}
json
{
  "desc": "任务说明",
  "fileName": "C:/grpcurl_1.8.7/grpcurl",
  "arguments": "-plaintext -d \"{ \\\"id\\\": 1 }\" no-protocol-host:port YourNamespace.YourGrpcService/YourMethod"
}

执行 Shell 任务

添加 TaskSchedulerConfig 任务调度配置类

cs
namespace ZhonTai.Admin.Tools.TaskScheduler;

/// <summary>
/// 任务调度配置
/// </summary>
public class TaskSchedulerConfig
{
    public static class Models
    {
        /// <summary>
        /// 告警邮件
        /// </summary>
        public class AlerEmail
        {
            /// <summary>
            /// 是否启用
            /// </summary>
            public bool Enable { get; set; }

            /// <summary>
            /// 邮件地址
            /// </summary>
            public string Adress { get; set; }
        }

        /// <summary>
        /// 进程启动信息
        /// </summary>
        public class ProcessStartInfoModel
        {
            /// <summary>
            /// 应用程序
            /// </summary>
            public string FileName { get; set; }

            /// <summary>
            /// 工作目录
            /// </summary>
            public string WorkingDirectory { get; set; }
        }
    }

    /// <summary>
    /// 进程启动信息
    /// </summary>
    public Models.ProcessStartInfoModel ProcessStartInfo { get; set; }

    /// <summary>
    /// 告警邮件
    /// </summary>
    public Models.AlerEmail AlerEmail { get; set; }
}

执行 Shell 任务

cs
new HostApp(new HostAppOptions
{
	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            //配置任务调度
            options.ConfigureFreeSchedulerBuilder = freeSchedulerBuilder =>
            {
                freeSchedulerBuilder
                .OnExecuting(task =>
                {
                    var taskSchedulerConfig = AppInfo.GetRequiredService<IOptions<TaskSchedulerConfig>>().Value;

                    if (task.Topic?.StartsWith("[shell]") == true)
                    {
                        var jsonArgs = JToken.Parse(task.Body);
                        var shellArgs = jsonArgs.Adapt<ShellArgs>();

                        var arguments = shellArgs.Arguments;
                        var modeulName = jsonArgs["moduleName"]?.ToString();
                        if (modeulName.NotNull())
                        {
                            //通过moduleName获取配置文件moduleName对应的Grpc远程地址
                            var grpcAddress = string.Empty;
                            if (grpcAddress.NotNull())
                            {
                                arguments = arguments.Replace("${grpcAddress}", grpcAddress, StringComparison.OrdinalIgnoreCase);
                            }
                        }

                        var fileName = shellArgs.FileName;
                        if (fileName.IsNull())
                        {
                            fileName = taskSchedulerConfig?.ProcessStartInfo?.FileName;
                        }

                        var workingDirectory = shellArgs.WorkingDirectory;
                        if (workingDirectory.IsNull())
                        {
                            workingDirectory = taskSchedulerConfig?.ProcessStartInfo?.WorkingDirectory;
                        }

                        var startInfo = new ProcessStartInfo
                        {
                            FileName = fileName,
                            Arguments = arguments,
                            UseShellExecute = false,
                            RedirectStandardOutput = true,
                            RedirectStandardError = true,
                            WorkingDirectory = workingDirectory,
                            StandardOutputEncoding = Encoding.UTF8,
                            StandardErrorEncoding = Encoding.UTF8,
                        };

                        var response = string.Empty;
                        var error = string.Empty;
                        using (var process = Process.Start(startInfo))
                        {
                            response = process.StandardOutput.ReadToEnd();
                            error = process.StandardError.ReadToEnd();

                            //if (response.NotNull())
                            //{
                            //    Console.WriteLine("Response:");
                            //    Console.WriteLine(response);
                            //}

                            //if (error.NotNull())
                            //{
                            //    Console.WriteLine("Error:");
                            //    Console.WriteLine(error);
                            //}

                            process.WaitForExit();
                        }

                        if (response.NotNull())
                            task.Remark(response);

                        if (error.NotNull())
                            throw new Exception(error);
                    }
                });
            };
        });
    }
}).Run(args);

执行 Service 任务

cs
new HostApp(new HostAppOptions
{
	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            //配置任务调度
            options.ConfigureFreeSchedulerBuilder = freeSchedulerBuilder =>
            {
                freeSchedulerBuilder
                .OnExecuting(task =>
                {
                    //匹配任务
                    switch (task.Topic)
                    {
                        //模块任务
                        case TaskNames.ModuleTaskName:
                            Task.Run(async () => {
                                using var scope = AppInfo.ServiceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope();
                                var moduleService = scope.ServiceProvider.GetRequiredService<IModuleService>();
                                var dics = JsonConvert.DeserializeObject<Dictionary<string, string>>(task.Body);
                                var moduleId = dics["moduleId"];
                                var result = await moduleService.GetAsync(moduleId.ToLong());
                                if (result.Success)
                                {
                                    //完成并结束任务
                                    task.Status = TaskStatus.Completed;
                                }
                            }).Wait();
                            break;
                    }
                });
            };
        });
    }
}).Run(args);

发送告警邮件

appsettings.json 中添加以下配置

json
"Email": {
    "Host": "smtp.exmail.qq.com", //主机
    "Port": 465, //端口 465、587、25
    "UseSsl": true, //是否使用SSL
    "UserName": "", //邮箱账号
    "Password": "", //邮箱密码
    "FromEmail": { //发件人
        "Name": "",
        "Address": ""
    },
    "ToEmail": { //收件人
        "Name": "",
        "Address": ""
    }
}

发送邮件

cs
new HostApp(new HostAppOptions
{
	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            //配置任务调度
            options.ConfigureFreeSchedulerBuilder = freeSchedulerBuilder =>
            {
                freeSchedulerBuilder
                .OnExecuted((task, taskLog) =>
                {
                    try
                    {
                        if (!taskLog.Success)
                        {
                            var taskSchedulerConfig = AppInfo.GetRequiredService<IOptionsMonitor<TaskSchedulerConfig>>().CurrentValue;
                            if (taskSchedulerConfig.AlerEmail != null && taskSchedulerConfig.AlerEmail.Enable)
                            {
                                //发送告警邮件
                                var alarmEmail = taskInfo?.AlarmEmail;
                                var taskSchedulerConfig = AppInfo.GetRequiredService<IOptionsMonitor<TaskSchedulerConfig>>().CurrentValue;
                                if (taskSchedulerConfig.AlerEmail != null && taskSchedulerConfig.AlerEmail.Enable)
                                {
                                    var emailService = AppInfo.GetRequiredService<EmailService>();
                                    if (alarmEmail.IsNull())
                                    {
                                        alarmEmail = taskSchedulerConfig.AlerEmail.Adress;
                                    }
                                    var topic = task.Topic;
                                    if (alarmEmail.NotNull())
                                    {
                                        var jsonArgs = JToken.Parse(task.Body);
                                        var desc = jsonArgs["desc"]?.ToString();
                                        if (desc.NotNull())
                                            topic = desc;
                                    }
                                    alarmEmail?.Split(',')?.ToList()?.ForEach(async address =>
                                    {
                                        await emailService.SingleSendAsync(new EmailSingleSendEvent
                                        {
                                            ToEmail = new EmailSingleSendEvent.Models.EmailModel
                                            {
                                                Address = address,
                                                Name = address
                                            },
                                            Subject = "【任务调度中心】监控报警",
                                            Body = $@"<p>任务名称:{topic}</p>
                                            <p>任务编号:{task.Id}</p>
                                            <p>告警类型:调度失败</p>
                                            <p>告警内容:<br/>{taskLog.Exception}</p>"
                                        });
                                    });
                                }
                            }
                        }
                    }
                    catch(Exception ex)
                    {
                        AppInfo.Log.Error(ex);
                    }
                });
            };
        });
    }
}).Run(args);

失败重试 8.5.0

cs
new HostApp(new HostAppOptions
{
	//配置后置服务
	ConfigurePostServices = context =>
	{ 
        //添加任务调度
        context.Services.AddTaskScheduler(DbKeys.TaskDb, options =>
        {
            //配置任务调度
            options.ConfigureFreeSchedulerBuilder = freeSchedulerBuilder =>
            {
                freeSchedulerBuilder
                .OnExecuted((task, taskLog) =>
                {
                    try
                    {
                        if (!taskLog.Success)
                        {
                            var taskService = AppInfo.GetRequiredService<TaskService>();
                            var taskInfo = taskService.GetAsync(task.Id).Result;

                            //失败重试
                            if (taskInfo != null && taskInfo.FailRetryCount > 0)
                            {
                                var retryRound = 0;
                                var failRetryCount = taskInfo.FailRetryCount;
                                var failRetryInterval = taskInfo.FailRetryInterval > 0 ? taskInfo.FailRetryInterval.Value : 10;
                                var scheduler = AppInfo.GetRequiredService<Scheduler>();
                                var currentRound = taskLog.Round;
                                void OnFailedCallBak()
                                {
                                    failRetryCount--;
                                    retryRound++;
                                    var startdt = DateTime.UtcNow;
                                    var result = new TaskLog
                                    {
                                        CreateTime = DateTime.UtcNow.Add(scheduler.TimeOffset),
                                        TaskId = task.Id,
                                        Round = currentRound,
                                        Remark = $"第{retryRound}次失败重试",
                                        Success = true
                                    };

                                    try
                                    {
                                        OnExecuting(task);
                                    }
                                    catch (Exception ex)
                                    {
                                        result.Success = false;
                                        result.Exception = ex.InnerException == null ? $"{ex.Message}\r\n{ex.StackTrace}" : $"{ex.Message}\r\n{ex.StackTrace}\r\n\r\nInnerException: {ex.InnerException.Message}\r\n{ex.InnerException.StackTrace}";

                                        if (failRetryCount > 0)
                                        {
                                            scheduler.AddTempTask(TimeSpan.FromSeconds(failRetryInterval), OnFailedCallBak);
                                        }
                                    }
                                    finally
                                    {
                                        result.ElapsedMilliseconds = (long)DateTime.UtcNow.Subtract(startdt).TotalMilliseconds;
                                        var taskLogService = AppInfo.GetRequiredService<TaskLogService>();
                                        taskLogService.Add(result);
                                    }
                                }

                                scheduler.AddTempTask(TimeSpan.FromSeconds(failRetryInterval), OnFailedCallBak);
                            }
                        }
                    }
                    catch(Exception ex)
                    {
                        AppInfo.Log.Error(ex);
                    }
                });
            };
        });
    }
}).Run(args);

添加任务

在模块服务中添加任务

cs
namespace MyCompanyName.MyProjectName.Api.Services.Module;

/// <summary>
/// 模块服务
/// </summary>
[DynamicApi(Area = ApiConsts.AreaName)]
public class ModuleService : BaseService, IModuleService, IDynamicApi
{
    private readonly Lazy<Scheduler> _scheduler;

    public TaskService(Lazy<Scheduler> scheduler)
    {
        _scheduler = scheduler;
    }

    /// <summary>
    /// 执行任务
    /// </summary>
    /// <returns></returns>
    public void ExecuteTask()
    {
        var scheduler = _scheduler.Value;

        //方式1:添加任务组,第一组每次间隔15秒,第二组每次间隔2分钟
        scheduler.AddTask(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), new int[] { 15, 15, 120, 120 });

        //方式2:添加任务,每次间隔15秒
        scheduler.AddTask(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), 2, 15);

        //方式3:无限循环任务,每次间隔10分钟
        scheduler.AddTask(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), -1, 600);

        //方式4:每天凌晨执行一次
        scheduler.AddTaskRunOnDay(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), 1, "0:00:00");

        //方式5:每周一晚上11点半执行一次,0为周日,1-6为周一至周六
        scheduler.AddTaskRunOnWeek(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), 1, "1:23:30:00");

        //方式6:每个月1号下午4点执行1次, -1为每月最后一日
        scheduler.AddTaskRunOnMonth(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), 1, "1:16:00:00");

        //方式7:自定义cron表达式,从0秒开始每10秒执行一次
        scheduler.AddTaskCustom(TaskNames.ModuleTaskName, JsonConvert.SerializeObject(new
        {
            moduleId = 1
        }), "0/10 * * * * ?");
    }
}