Abp vNext 源码分析 - 12. 后台作业与后台工作者

一、简要说明

文章信息:

基于的 ABP vNext 版本:1.0.0

创作日期:2019 年 10 月 24 日晚

更新日期:暂无

ABP vNext 提供了后台工作者和后台作业的支持,基本实现与原来的 ABP 框架类似,并且 ABP vNext 还提供了对 HangFire 和 RabbitMQ 的后台作业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不需要做其他复杂的配置。

后台作业在系统开发的过程当中,是比较常用的功能。因为总是有一些长耗时的任务,而这些任务我们不是立即响应的,例如 Excel 文档导入、批量发送短信通知等。

后台工作者 的话,ABP vNext 的实现就是在 CLR 的 Timer 之上封装了一层,周期性地执行用户逻辑。ABP vNext 默认提供的 后台任务管理器,就是在后台工作者基础之上进行的封装。

涉及到后台任务、后台工作者的模块一共有 6 个,它们分别是:

  • Volo.Abp.Threading :提供了一些常用的线程组件,其中 AbpTimer 就是在里面实现的。
  • Volo.Abp.BackgroundWorkers :后台工作者的定义和实现。
  • Volo.Abp.BackgroundJobs.Abstractions :后台任务的一些共有定义。
  • Volo.Abp.BackgroundJobs :默认的后台任务管理器实现。
  • Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 库实现的后台任务管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 实现的后台任务管理器。

二、源码分析

2.1 线程组件

2.1.1 健壮的计时器

CLR 为我们提供了多种计时器,我们一般使用的是 System.Threading.Timer ,它是基于 CLR 线程池的一个周期计时器,会根据我们配置的 Period (周期) 定时执行。在 CLR 线程池中,所有的 Timer 只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 Timer 对象到期时,这个线程就会将 Timer 的回调方法通过 ThreadPool.QueueUserWorkItem() 扔到线程池去执行。

不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会造成上一个任务还没执行完成又开始执行新的任务。

解决这个方法其实很简单,即启动之后,将周期设置为 Timeout.Infinite ,这样只会执行一次。当回调方法执行完成之后,就设置 dueTime 参数说明下次执行要等待多久,并且周期还是 Timeout.Infinite

ABP vNext 已经为我们提供了健壮的计时器,该类型的定义是 AbpTimer ,在内部用到了 volatile 关键字和 Monitor 实现 条件变量模式 解决多线程环境下的问题。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class AbpTimer : ITransientDependency
{
    // 回调事件。
    public event EventHandler Elapsed;

    // 执行周期。
    public int Period { get; set; }

    // 定时器启动之后就开始运行,默认为 Fasle。
    public bool RunOnStart { get; set; }

    // 日志记录器。
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定时器是否在执行任务,默认为 false。
    private volatile bool _performingTasks;
    // 定时器的运行状态,默认为 false。
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回调函数是 TimerCallBack,执行周期为永不执行。
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 如果传递的周期小于等于 0 ,则抛出异常。
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥锁,保证线程安全。
        lock (_taskTimer)
        {
            // 如果启动之后就需要马上执行,则设置为 0,马上执行任务,否则会等待 Period 毫秒之后再执行(1 个周期)。
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定时器成功运行了。
            _isRunning = true;
        }
        // 释放 _taskTimer 的互斥锁。
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥锁。
        lock (_taskTimer)
        {
            // 将内部定时器设置为永不执行的状态。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 检测当前是否还有正在执行的任务,如果有则等待任务执行完成。
            while (_performingTasks)
            {
                // 临时释放锁,阻塞当前线程。但是其他线程可以获取 _timer 的互斥锁。
                Monitor.Wait(_taskTimer);
            }

            // 需要表示停止状态,所以标记状态为 false。
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 如果有任务正在运行,或者内部定时器已经停止了,则不做任何事情。
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 临时停止内部定时器。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 表明马上需要执行任务了。
            _performingTasks = true;
        }

        try
        {
            // 调用绑定的事件。
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 注意,这里将会吞噬异常。
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任务执行完成,更改状态。
                _performingTasks = false;

                // 如果定时器还在运行,没有被停止,则启动下一个 Period 周期。
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除因为释放锁而阻塞的线程。
                // 如果已经调用了 Stop,则会唤醒那个因为 Wait 阻塞的线程,就会使 _isRunning 置为 false。
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}

这里对 _performingTasks_isRunning 字段设置为 volatile 防止指令重排和寄存器缓存。这是因为在 Stop 方法内部使用到的 _performingTasks 可能会被优化,所以将该字段设置为了易失的。

2.2.1 IRunnable 接口

ABP vNext 为任务的启动和停止,抽象了一个 IRunnable 接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 Thread.Abort() 。关于 Thread.Abort() 的坏处,这里不再多加赘述,可以参考 这篇文章 的描述,或者搜索其他的相关文章。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface IRunnable
{
    // 启动这个服务。
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 停止这个服务。
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}

2.2 后台工作者

2.2.1 模块的构造

后台工作者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工作者管理器 获得所有 后台工作者,并开始启动它们。在销毁时,后台工作者管理器获得所有后台工作者,并开始停止他们,这样才能够做到优雅退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 StartAsync 启动所有后台工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 StopAsync 停止所有后台工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}

2.2.1 后台工作者的定义

首先看看 IBackgroundWorker 接口的定义,是空的。不过继承了 ISingletonDependency 接口,说明我们的每个后台工作者都是 单例 的。

1
2
3
4
5
6
7
/// <summary>
/// 在后台运行,执行某些任务的工作程序(线程)的接口定义。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}

ABP vNext 为我们定义了一个抽象的后台工作者类型 BackgroundWorkerBase,这个基类的设计目的是提供一些常用组件(和 ApplicationService 一样)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否应该提供工作单元、本地化以及其他常用的属性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}

ABP vNext 内部只有一个默认的后台工作者实现 PeriodicBackgroundWorkerBase。从名字上来看,意思是就是周期执行的后台工作者,内部就是用的 AbpTimer 来实现,ABP vNext 将其包装起来是为了实现统一的模式(后台工作者)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 启动后台工作者。
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 停止后台工作者。
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 关联的周期事件,之所以不直接挂载 DoWork,是为了捕获异常。
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要周期执行的任务。
    protected abstract void DoWork();
}

我们如果要实现自己的后台工作者,只需要继承该类,实现 DoWork() 方法即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分钟执行一次。
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("后台工作者被执行了。");
    }
}

然后在我们自己模块的 OnPreApplicationInitialization() 方法内解析出后台作业管理器(IBackgroundWorkerManager),调用它的 Add() 方法,将我们定义的 TestWorker 添加到管理器当中即可。

2.2.2 后台工作者管理器

所有的后台工作者都是通过 IBackgroundWorkerManager 进行管理的,它提供了 StartAsync()StopAsync()Add() 方法。前面两个方法就是 IRunnable 接口定义的,后台工作者管理器直接集成了该接口,后面的 Add() 方法就是用来动态添加我们的后台工作者。

后台工作者管理器的默认实现是 BackgroundWorkerManager 类型,它内部做的事情很简单,就是维护一个后台工作者集合。每当调用 StartAsync()StopAsync() 方法的时候,都从这个集合遍历后台工作者,执行他们的启动和停止方法。

这里值得注意的一点是,当我们调用 Add() 方法添加了一个后台工作者之后,后台工作者管理器就会启动这个后台工作者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 如果当前后台工作者管理器还处于运行状态,则调用工作者的 StartAsync() 方法启动。
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 启动,则遍历集合启动。
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 停止, 则遍历集合停止。
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

上述代码其实存在一个问题,即后台工作者被释放以后,是否还能执行 Add() 操作。参考我 之前的文章 ,其实当对象被释放之后,就应该抛出 ObjectDisposedException 异常。

2.3 后台作业

比起后台工作者,我们执行一次性任务的时候,一般会使用后台作业进行处理。比起只能设置固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的后台作业管理器,能够让我们使用 Cron 表达式,更加灵活地设置任务的执行周期。

2.3.1 模块的构造

关于后台作业的模块,我们需要说道两处。第一处是位于 Volo.Abp.BackgroundJobs.Abstractions 项目的 AbpBackgroundJobsAbstractionsModule ,第二出则是位于 Volo.Abp.BackgroundJobs 项目的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行为是将符合条件的后台作业,添加到 AbpBackgroundJobOptions 配置当中,以便后续进行使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 如果注册的类型符合 IBackgroundJob<> 泛型,则添加到集合当中。
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 将数据赋值给配置类。
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}

Volo.Abp.BackgroundJobs 内部是 ABP vNext 为我们提供的 默认后台作业管理器,这个后台作业管理器 本质上是一个后台工作者

这个后台工作者会周期性(取决于 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默认为 5 秒种)地从 IBackgroundJobStore 捞出一堆后台任务,并且在后台执行。至于每次执行多少个后台任务,这也取决于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默认值是 1000 个。

注意:

这里的 Options 类是 AbpBackgroundJobWorkerOptions,别和 AbpBackgroundWorkerOptions 混淆了。

所以在 AbpBackgroundJobsModule 模块里面,只做了一件事情,就是将负责后台作业的后台工作者,添加到后台工作者管理器种,并开始周期性地执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 获得后台工作者管理器,并将负责后台作业的工作者添加进去。
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}

2.3.2 后台作业的定义

在上一节里面看到,只要是实现 IBackgroundJob<TArgs> 类型的都视为一个后台作业。这个后台作业接口,只定义了一个行为,那就是执行(Execute(TArgs))。这里的 TArgs 泛型作为执行后台作业时,需要传递的参数类型。

1
2
3
4
5
// 因为是传入的参数,所以泛型参数是逆变的。
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}

检查源码,发现 ABP vNext 的邮箱模块定义了一个邮件发送任务 BackgroundEmailSendingJob,它的实现大概如下。

1
2
3
4
5
6
7
8
9
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}

2.3.3 后台作业管理器

后台作业都是通过一个后台作业管理器(IBackgroundJobManager)进行管理的,这个接口定义了一个入队方法(EnqueueAsync()),注意,我们的后台作业在入队后,不是马上执行的。

说一下这个入队处理逻辑:

  1. 首先我们会通过参数的类型,获取到任务的名称。(假设任务上面没有标注 BackgroundJobNameAttribute 特性,那么任务的名称就是参数类型的 FullName 。)
  2. 构造一个 BackgroundJobInfo 对象。
  3. 通过 IBackgroundJobStore 持久化任务信息。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 获取任务名称。
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 通过序列化器,序列化参数值,方便存储。这里内部其实使用的是 JSON.NET 进行序列化。
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 如果任务有执行延迟,则任务的初始执行时间要加上这个延迟。
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任务信息,方便后面执行后台作业的工作者能够取到。
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}

BackgroundJobNameAttribute 相关的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判断参数类型上面是否标注了特性,并且特性实现了 IBackgroundJobNameProvider 接口。
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,则使用类型的 FullName。
            ?? jobArgsType.FullName;
}

2.3.4 后台作业的存储

后台作业的存储默认是放在内存的,这点可以从 InMemoryBackgroundJobStore 类型实现看出来。在它的内部使用了一个并行字典,通过作业的 Guid 与作业进行关联绑定。

除了内存实现,在 Volo.Abp.BackgroundJobs.Domain 模块还有一个 BackgroundJobStore 实现,基本套路与 SettingStore 一样,都是存储到数据库里面。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}

2.3.5 后台作业的执行

默认的后台作业管理器是通过一个后台工作者来执行后台任务的,这个实现叫做 BackgroundJobWorker,这个后台工作者的生命周期也是单例的。后台作业的具体执行逻辑里面,涉及到了以下几个类型的交互。

类型 作用
AbpBackgroundJobOptions 提供每个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。
AbpBackgroundJobWorkerOptions 提供后台作业工作者的配置信息,例如每个周期 最大执行的作业数量、后台 工作者的 执行周期、作业执行 超时时间 等。
BackgroundJobConfiguration 后台任务的配置信息,作用是将持久化存储的作业信息与运行时类型进行绑定 和实例化,以便 ABP vNext 来执行具体的任务。
IBackgroundJobExecuter 后台作业的执行器,当我们从持久化存储获取到后台作业信息时,将会通过 这个执行器来执行具体的后台作业。
IBackgroundJobSerializer 后台作业序列化器,用于后台作业持久化时进行序列化的工具,默认采用的 是 JSON.NET 进行实现。
JobExecutionContext 执行器在执行后台作业时,是通过这个上下文参数进行执行的,在这个上下 文内部,包含了后台作业的具体类型、后台作业的参数值。
IBackgroundJobStore 前面已经讲过了,这个是用于后台作业的持久化存储,默认实现是存储在内存。
BackgroundJobPriority 后台作业的执行优先级定义,ABP vNext 在执行后台任务时,会根据任务的优 先级进行排序,以便在后面执行的时候优先级高的任务先执行。

我们来按照逻辑顺序走一遍它的实现,首先后台作业的执行工作者会从持久化存储内,获取 MaxJobFetchCount 个任务用于执行。从持久化存储获取后台作业信息(BackgroundJobInfo),是由 IBackgroundJobStore 提供的。

1
2
3
4
5
6
7
8
9
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何后台作业,则直接结束本次调用。
if (!waitingJobs.Any())
{
    return;
}

InMemoryBackgroundJobStore 的相关实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

上面的代码可以看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,,然后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxResultCount 结果并返回。

说到这里,我们来看一下这个 NextTryTime 是如何被计算出来的?回想起最开始的后台作业管理器,我们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值一般是 Clock.Now ,也就是它被添加到队列的时间。

不过 ABP vNext 为了让那些经常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败之后,会将 NextTryTime 的值指数级进行增加。这块代码可以在 CalculateNextTryTime 里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 一般来说,这个 DefaultWaitFactor 因子的值是 2.0 。
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同执行失败的次数进行挂钩。
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

当预期的执行时间都超过 DefaultTimeout 的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。

我们之前说到,从 IBackgroundJobStore 拿到了需要执行的后台任务信息集合,接下来我们就要开始执行后台任务了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根据任务名称获取任务的配置参数。
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根据配置里面存储的任务类型,将参数值进行反序列化。
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 构造一个新的执行上下文,让执行器执行任务。
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 如果任务执行成功则删除该任务。            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 执行过程中,产生了未知异常,设置为废弃任务,并打印日志。
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

执行后台任务的时候基本分为 5 步,它们分别是:

  1. 获得任务关联的配置参数,默认不用提供,因为在之前模块初始化的时候就已经配置了(你也可以显式指定)。
  2. 通过之前存储的配置参数,将参数值反序列化出来,构造具体实例。
  3. 构造一个执行上下文。
  4. 后台任务执行器执行具体的后台任务。
  5. 成功则删除任务,失败则更新任务下次的执行状态。

至于执行器里面的真正执行操作,你都拿到了参数值和任务类型了。就可以通过类型用 IoC 获取后台任务对象的实例,然后通过反射匹配方法签名,在实例上调用这个方法传入参数即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public virtual void Execute(JobExecutionContext context)
{
    // 构造具体的后台作业实例对象。
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 获得需要执行的方法签名。
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接通过 MethodInfo 的 Invoke 方法调用,传入具体的实例对象和参数值即可。
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 如果是执行方法内的异常,则包装进行处理,然后抛出。
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

2.3.5 集成 Hangfire

ABP vNext 对于 Hangfire 的集成代码分布在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模块内部,前者是在模块配置里面,调用 Hangfire 库的相关方法,注入组件到 IoC 容器当中。后者则是对后台作业进行了适配处理,替换了默认的 IBackgroundJobManager 实现。

AbpHangfireModule 模块内部,通过工厂创建出来一个 BackgroudJobServer 实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

我们直奔主题,看一下基于 Hangfire 的后台作业管理器是怎么实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 如果没有延迟参数,则直接通过 Enqueue() 方法扔进执行对了。
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

上述代码中使用 HangfireJobExecutionAdapter 进行了一个适配操作,因为 Hangfire 要将一个后台任务扔进队列执行,不是用 TArgs 就能解决的。

转到这个适配器定义,提供了一个 Execute(TArgs) 方法,当被添加到 Hangfire 队列执行的时候。实际 Hangfire 会调用适配器的 Excetue(TArgs) 方法,然后内部还是使用的 IBackgroundJobExecuter 来执行具体定义的任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

2.3.6 集成 RabbitMQ

基于 RabbitMQ 的后台作业实现,我想放在分布式事件总线里面,对其一起进行讲解。

三、总结

ABP vNext 为我们提供了多种后台作业管理器的实现,你可以根据自己的需求选用不同的后台作业管理器,又或者是自己动手造轮子。

需要看其他的 ABP vNext 相关文章?点击我 即可跳转到总目录。

Built with Hugo
主题 StackJimmy 设计