publicabstractclassBackgroundWorkerBase:RunnableBase,IBackgroundWorker{// 配置管理器publicISettingManagerSettingManager{protectedget;set;}// 工作单元管理器publicIUnitOfWorkManagerUnitOfWorkManager{get{if(_unitOfWorkManager==null){thrownewAbpException("Must set UnitOfWorkManager before use it.");}return_unitOfWorkManager;}set{_unitOfWorkManager=value;}}privateIUnitOfWorkManager_unitOfWorkManager;// 获得当前的工作单元protectedIActiveUnitOfWorkCurrentUnitOfWork{get{returnUnitOfWorkManager.Current;}}// 本地化资源管理器publicILocalizationManagerLocalizationManager{protectedget;set;}// 默认的本地化资源的源名称protectedstringLocalizationSourceName{get;set;}protectedILocalizationSourceLocalizationSource{get{// 如果没有配置源名称,直接抛出异常if(LocalizationSourceName==null){thrownewAbpException("Must set LocalizationSourceName before, in order to get LocalizationSource");}if(_localizationSource==null||_localizationSource.Name!=LocalizationSourceName){_localizationSource=LocalizationManager.GetSource(LocalizationSourceName);}return_localizationSource;}}privateILocalizationSource_localizationSource;// 日志记录器publicILoggerLogger{protectedget;set;}protectedBackgroundWorkerBase(){Logger=NullLogger.Instance;LocalizationManager=NullLocalizationManager.Instance;}// ... 其他模板代码}
publicasyncTask<bool>DeleteAsync(stringjobId){// 判断 jobId 的值是否有效if(long.TryParse(jobId,outlongfinalJobId)==false){thrownewArgumentException($"The jobId '{jobId}' should be a number.",nameof(jobId));}// 使用 jobId 从 Store 处筛选到 JobInfo 对象的信息BackgroundJobInfojobInfo=await_store.GetAsync(finalJobId);if(jobInfo==null){returnfalse;}// 如果存在有 JobInfo 则使用 Store 进行删除操作await_store.DeleteAsync(jobInfo);returntrue;}
后台作业管理器实质上是一个周期性执行的后台工作者,那么我们的后台作业是每 5000 ms 执行一次,那么他的 DoWork() 方法又在执行什么操作呢?
1
2
3
4
5
6
7
8
9
10
11
protectedoverridevoidDoWork(){// 从 Store 当中获得等待执行的后台作业集合varwaitingJobs=AsyncHelper.RunSync(()=>_store.GetWaitingJobsAsync(1000));// 遍历这些等待执行的后台任务,然后通过 TryProcessJob 进行执行foreach(varjobinwaitingJobs){TryProcessJob(job);}}
privatevoidTryProcessJob(BackgroundJobInfojobInfo){try{// 任务执行次数自增 1jobInfo.TryCount++;// 最后一次执行时间设置为当前时间jobInfo.LastTryTime=Clock.Now;// 通过反射取得后台作业的类型varjobType=Type.GetType(jobInfo.JobType);// 通过 Ioc 解析器得到一个临时的后台作业对象,执行完之后既被释放using(varjob=_iocResolver.ResolveAsDisposable(jobType)){try{// 通过反射得到后台作业的 Execute 方法varjobExecuteMethod=job.Object.GetType().GetTypeInfo().GetMethod("Execute");varargsType=jobExecuteMethod.GetParameters()[0].ParameterType;varargsObj=JsonConvert.DeserializeObject(jobInfo.JobArgs,argsType);// 结合持久话存储的参数信息,调用 Execute 方法进行后台作业jobExecuteMethod.Invoke(job.Object,new[]{argsObj});// 执行完成之后从 Store 删除该任务的信息AsyncHelper.RunSync(()=>_store.DeleteAsync(jobInfo));}catch(Exceptionex){Logger.Warn(ex.Message,ex);// 计算下一次执行的时间,一旦超过 2 天该任务都执行失败,则返回 nullvarnextTryTime=jobInfo.CalculateNextTryTime();if(nextTryTime.HasValue){jobInfo.NextTryTime=nextTryTime.Value;}else{// 如果为 null 则说明该任务在 2 天的时间内都没有执行成功,则放弃继续执行jobInfo.IsAbandoned=true;}// 更新 Store 存储的任务信息TryUpdate(jobInfo);// 触发异常事件EventBus.Trigger(this,newAbpHandledExceptionData(newBackgroundJobException("A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.",ex){BackgroundJob=jobInfo,JobObject=job.Object}));}}}catch(Exceptionex){Logger.Warn(ex.ToString(),ex);// 表示任务不再执行jobInfo.IsAbandoned=true;// 更新 StoreTryUpdate(jobInfo);}}
publicinterfaceIBackgroundJobStore{// 通过 JobId 获取后台任务信息Task<BackgroundJobInfo>GetAsync(longjobId);// 插入一个新的后台任务信息TaskInsertAsync(BackgroundJobInfojobInfo);/// <summary>/// Gets waiting jobs. It should get jobs based on these:/// Conditions: !IsAbandoned And NextTryTime <= Clock.Now./// Order by: Priority DESC, TryCount ASC, NextTryTime ASC./// Maximum result: <paramref name="maxResultCount"/>./// </summary>/// <param name="maxResultCount">Maximum result count.</param>Task<List<BackgroundJobInfo>>GetWaitingJobsAsync(intmaxResultCount);/// <summary>/// Deletes a job./// </summary>/// <param name="jobInfo">Job information.</param>TaskDeleteAsync(BackgroundJobInfojobInfo);/// <summary>/// Updates a job./// </summary>/// <param name="jobInfo">Job information.</param>TaskUpdateAsync(BackgroundJobInfojobInfo);}
这里先从简单的内存 Store 说起,这个 InMemoryBackgroundJobStore 内部使用了一个并行字典来存储这些任务信息。