Abp vNext 源码分析 - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

一、简要介绍

ABP vNext 封装了两种事件总线结构,第一种是 ABP vNext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,ABP vNext 自己封装了一个抽象层进行定义,并使用 RabbitMQ 编写了一个基本实现。

在使用方式上,两种事件总线的作用基本相同。

事件总线分布在两个模块,在 Volo.Abp.EventBus 模块内部,定义了事件总线的抽象接口,以及本地事件总线 (ILocalEventBus) 的实现。分布式事件总线的具体实现,是在 Volo.Abp.EventBus.RabbitMQ 模块内部进行定义,从项目名称可以看出来,这个模块是基于 RabbitMQ 消息队列实现的。

image-20191210132252458

image-20191210132345656

但是该项目并不是直接引用 RabbitMQ.Client 包,而是在 Volo.Abp.RabbitMQ 项目内部引用。这是因为除了分布式事件总线以外,ABP 还基于 RabbitMQ 实现了一个后台作业管理器。

image-20191210132504932

ABP vNext 框架便将一些对象抽象出来,放在 Volo.Abp.RabbitMQ 项目内部进行定义和实现。

image-20191210132622410

二、源码分析

2.1 事件处理器的注册

分析源码,首先从一个项目的模块开始,Volo.Abp.EventBus 库的模块 AbpEventBusModule 只干了一件事情。在组件注册的时候,根据组件的实现接口 (ILocalEventHandlerIDistributedEventHandler) 不同,将其赋值给 AbpLocalEventBusOptionsAbpDistributedEventBusOptionsHandlers 属性。

也就是说,开发人员定义的事件处理程序 (Handler) 都会在依赖注入的时候,都会将其类型 (Type) 添加到事件总线的配置类当中,方便后续进行使用。

2.2 事件总线的接口

通过事件总线模块的单元测试我们可以知道,事件的发布与订阅都是通过 IEventBus 的两个子接口 (ILocalEventBus/IDistributedEventBus) 进行的。在 IEventBus 接口的定义中,有三种行为,分别是 发布订阅取消订阅

对于 ILocalEventBus 接口和 IDistributedEventBus 接口来说,它们都提供了一个,针对本地事件处理器和分布式处理器的特殊订阅方法。

ILocalEventBus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/// <summary>
/// Defines interface of the event bus.
/// </summary>
public interface ILocalEventBus : IEventBus
{
    /// <summary>
    /// Registers to an event. 
    /// Same (given) instance of the handler is used for all event occurrences.
    /// </summary>
    /// <typeparam name="TEvent">Event type</typeparam>
    /// <param name="handler">Object to handle the event</param>
    IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
        where TEvent : class;
}

IDistributedEventBus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface IDistributedEventBus : IEventBus
{
    /// <summary>
    /// Registers to an event. 
    /// Same (given) instance of the handler is used for all event occurrences.
    /// </summary>
    /// <typeparam name="TEvent">Event type</typeparam>
    /// <param name="handler">Object to handle the event</param>
    IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler)
        where TEvent : class;
}

2.3 事件总线基本流程和实现

同其他模块一样,因为有分布式事件总线和本地事件总线,ABP vNext 同样抽象了一个 EventBusBase 类型,作为它们的基类实现。

一般的流程,我们是先定义某个事件,然后订阅该事件并指定事件处理器,最后在某个时刻发布事件。例如下面的代码:

首先定义了一个事件处理器,专门用于处理 EntityChangedEventData<MyEntity> 事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class MyEventHandler : ILocalEventHandler<EntityChangedEventData<MyEntity>>
{
    public int EntityChangedEventCount { get; set; }

    public Task HandleEventAsync(EntityChangedEventData<MyEntity> eventData)
    {
        EntityChangedEventCount++;
        return Task.CompletedTask;
    }
}
var handler = new MyEventHandler();

LocalEventBus.Subscribe<EntityChangedEventData<MyEntity>>(handler);

await LocalEventBus.PublishAsync(new EntityCreatedEventData<MyEntity>(new MyEntity()));

2.3.1 事件的订阅

可以看到,这里使用的是 ILocalEventBus 定义的订阅方法,跳转到内部实现,它还是调用的 EventBus 的方法。

1
2
3
4
5
6
7
8
9
public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
{
    // 调用基类的 Subscribe 方法,并传递 TEvent 的类型,和事件处理器。
    return Subscribe(typeof(TEvent), handler);
}
public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
    return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}

可以看到,这里传递了一个 SingleInstanceHandlerFactory 对象,这玩意儿是干嘛用的呢?从名字可以看出来,这是一个工厂,是用来创建 Handler (事件处理器) 的工厂,并且是一个单实例的事件处理器工厂。

下面就是 IEventHandlerFactory 接口的定义,以及 SingleInstanceHandlerFactory 实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface IEventHandlerFactory
{
    // 获得一个事件处理器包装对象,即事件处理器执行完毕之后,可以调用
    // IEventHandlerDisposeWrapper.Dispose() 进行释放。
    IEventHandlerDisposeWrapper GetHandler();

    // 判断在已有的事件处理器工厂集合中,是否已经存在了相同的事件处理器。
    bool IsInFactories(List<IEventHandlerFactory> handlerFactories);
}

public class SingleInstanceHandlerFactory : IEventHandlerFactory
{
    // 构造工厂时,传递的事件处理器实例。
    public IEventHandler HandlerInstance { get; }


    public SingleInstanceHandlerFactory(IEventHandler handler)
    {
        HandlerInstance = handler;
    }

    // 通过 EventHandlerDisposeWrapper 包装事件处理器实例。
    public IEventHandlerDisposeWrapper GetHandler()
    {
        return new EventHandlerDisposeWrapper(HandlerInstance);
    }

    // 判断针对 HandlerInstance 的事件处理器是否已经存在。
    public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
    {
        return handlerFactories
            .OfType<SingleInstanceHandlerFactory>()
            .Any(f => f.HandlerInstance == HandlerInstance);
    }
}

针对 IEventHandlerFactory 工厂,还拥有 3 个不同的实现,下表分别说明它们的应用场景。

image-20191210143548128

实现类型 作用
IocEventHandlerFactory 每个工厂对应一个事件处理器的的类型,并通过 ScopeFactory 解析具体的事件处理器。生命周期由 scope 控制,当 scope 释放时,对应的事件处理器实例也会被销毁。
SingleInstanceHandlerFactory 每个工厂对应单独的一个事件处理器实例,事件处理器实例是由创建者控制的。
TransientEventHandlerFactory 每个工厂对应一个事件处理器的类型,区别是它不由 IoC 解析实例,而是使用的 Activator.CreateInstance() 方法构造实例,是一个瞬时对象,调用包装器的 Dispose 即会进行释放。
TransientEventHandlerFactory<THandler> 每个工厂对应指定的 THandler 事件处理器,生命周期同上面的工厂一样。

这几种工厂都是在订阅操作时,不同的订阅重载使用不同的工厂,或者是自己指定事件处理器的工厂均可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public virtual IDisposable Subscribe<TEvent, THandler>()
    where TEvent : class
    where THandler : IEventHandler, new()
{
    return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
}

public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
    return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}

不过有一种特殊的行为,开发人员可以 不用显式订阅。在 EventBus 类型中,定义了一个 SubscribeHandlers(ITypeList<IEventHandler> handlers) 方法。该方法接收一个类型集合,通过遍历集合,从事件处理器的定义当中,取得事件处理器监听的事件类型 TEvent

在取得了事件类型,并知晓了事件处理器类型以后,事件总线就可以订阅 TEvent 类型的事件,并使用 IocEventHandlerFactory 工厂来构造事件处理器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
{
    // 遍历事件处理器的类型,其实这里的就是模块启动时,传递给 XXXOptions 的集合。
    foreach (var handler in handlers)
    {
        // 获得事件处理器的所有接口定义,并遍历接口进行检查。
        var interfaces = handler.GetInterfaces();
        foreach (var @interface in interfaces)
        {
            // 如果接口没有实现 IEventHandler 类型,则忽略。
            if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
            {
                continue;
            }

            // 从泛型参数当中,获得定义的事件类型。
            var genericArgs = @interface.GetGenericArguments();
            // 泛型参数完全匹配 1 时,才进行订阅操作。
            if (genericArgs.Length == 1)
            {
                Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
            }
        }
    }
}

这个订阅方法在 EventBus 当中是一个抽象方法,分别在本地事件总线和分布式事件总线有实现,这里我们首先讲解本地事件的逻辑。

image-20191210150712047

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }

    public LocalEventBus(
        IOptions<AbpLocalEventBusOptions> options,
        IServiceScopeFactory serviceScopeFactory)
        : base(serviceScopeFactory)
    {
        Options = options.Value;
        Logger = NullLogger<LocalEventBus>.Instance;

        HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();

        // 调用父类的方法,将模块初始化时扫描到的事件处理器,都尝试进行订阅。
        SubscribeHandlers(Options.Handlers);
    }

    public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
    {
        GetOrCreateHandlerFactories(eventType)
            // 锁住集合,以确保线程安全。
            .Locking(factories =>
                {
                    // 如果在集合内部,已经有了对应的工厂,则不进行添加。
                    if (!factory.IsInFactories(factories))
                    {
                        factories.Add(factory);
                    }
                }
            );

        // 返回一个事件处理器工厂注销器,当调用 Dispose() 方法时,会取消之前订阅的事件。
        return new EventHandlerFactoryUnregistrar(this, eventType, factory);
    }

    private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
    {
        // 根据事件的类型,从字典中获得该类型的所有事件处理器工厂。
        return HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
    }
}

上述流程结合 EventBusLocalEventBus 讲解了事件的订阅流程,事件的订阅操作都是对 HandlerFactories 的操作,往里面添加指定事件的事件处理器工厂,而每个工厂都是跟具体的事件处理器实例/类型进行关联的。

2.3.2 事件的发布

当开发人员需要发布事件的时候,一般都是通过对应的 EventBus,调用响应的 PublishAsync 方法,传递要触发的事件类型与事件数据。接口和基类当中,定义了两种发布方法的签名与实现:

1
2
3
4
5
6
public virtual Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
{
    return PublishAsync(typeof(TEvent), eventData);
}

public abstract Task PublishAsync(Type eventType, object eventData);

image-20191210152220343

第二种方法一共也分为本地事件总线的实现,和分布式事件总线的实现,本地事件比较简单,我们先分析本地事件总线的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public override async Task PublishAsync(Type eventType, object eventData)
{
    // 定义了一个异常集合,用于接收多个事件处理器执行时,产生的所有异常。
    var exceptions = new List<Exception>();

    // 触发事件处理器。
    await TriggerHandlersAsync(eventType, eventData, exceptions);

    // 如果有任何异常产生,则抛出到之前的调用栈。
    if (exceptions.Any())
    {
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}

可以看到真正的触发行为是在 TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions) 内部进行实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions)
{
    // 针对于这个的作用,等同于 ConfigureAwait(false) 。
    // 具体可以参考 https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/。
    await new SynchronizationContextRemover();

    // 根据事件的类型,得到它的所有事件处理器工厂。
    foreach (var handlerFactories in GetHandlerFactories(eventType))
    {
        // 遍历所有的事件处理器工厂,通过 Factory 获得事件处理器,调用 Handler 的 HandleEventAsync 方法。
        foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
        {
            await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
        }
    }

    // 如果类型继承了 IEventDataWithInheritableGenericArgument 接口,那么会检测泛型参数是否有父类。
    // 如果有父类,则会使用当前的事件数据,为其父类发布一个事件。
    if (eventType.GetTypeInfo().IsGenericType &&
        eventType.GetGenericArguments().Length == 1 &&
        typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
    {
        var genericArg = eventType.GetGenericArguments()[0];
        var baseArg = genericArg.GetTypeInfo().BaseType;
        if (baseArg != null)
        {
            // 构造基类的事件类型,使用当前一样的泛型定义,只是泛型参数使用基类。
            var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
            // 构建类型的构造参数。
            var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
            // 通过事件类型和构造参数,构造一个新的事件数据实例。
            var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
            // 发布父类的同类事件。
            await PublishAsync(baseEventType, baseEventData);
        }
    }
}

在上述代码内部,都还没有真正执行事件处理器,真正的事件处理器执行程序是在下面的方法进行执行的。ABP vNext 通过引入 IEventDataWithInheritableGenericArgument 接口,实现了 类型继承事件 的触发,该接口提供了一个 GetConstructorArgs() 方法定义,方便后面生成构造参数。

例如有一个基础事件叫做 EntityEventData<Student>,如果 Student 继承自 Person,那么在触发该事件的时候,也会发布一个 EntityEventData<Person> 事件。

2.3.3 事件处理器的执行

真正事件处理器的执行,是通过下面的方法实现的,大概思路就是通过事件总线工厂,构建了事件处理器的实例。通过反射,调用事件处理器的 HandleEventAsync() 方法。如果在处理过程当中,出现了异常,则将异常数据放置在 List<Exception> 集合当中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
{
    using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
    {
        try
        {
            // 获得事件处理器的类型。
            var handlerType = eventHandlerWrapper.EventHandler.GetType();

            // 判断事件处理器是本地事件还是分布式事件。
            if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>)))
            {
                // 获得方法定义。
                var method = typeof(ILocalEventHandler<>)
                    .MakeGenericType(eventType)
                    .GetMethod(
                        nameof(ILocalEventHandler<object>.HandleEventAsync),
                        new[] { eventType }
                    );

                // 使用工厂创建的实例调用方法。
                await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
            }
            else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
            {
                var method = typeof(IDistributedEventHandler<>)
                    .MakeGenericType(eventType)
                    .GetMethod(
                        nameof(IDistributedEventHandler<object>.HandleEventAsync),
                        new[] { eventType }
                    );

                await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
            }
            else
            {
                // 如果都不是,则说明类型不正确,抛出异常。
                throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
            }
        }
        // 捕获到异常都统一添加到异常集合当中。
        catch (TargetInvocationException ex)
        {
            exceptions.Add(ex.InnerException);
        }
        catch (Exception ex)
        {
            exceptions.Add(ex);
        }
    }
}

2.4 分布式事件总线

分布式事件总线的实现都存放在 Volo.Abp.EventBus.RabbitMQ,该项目的代码比较少,由三个文件构成。

image-20191210162817622

在 RabbitMQ 模块的内部,只干了两件事情。首先从 JSON 配置文件当中,获取 AbpRabbitMqEventBusOptions 配置的三个参数,然后解析 RabbitMqDistributedEventBus 实例,并调用初始化方法 (Initialize())。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
[DependsOn(
    typeof(AbpEventBusModule),
    typeof(AbpRabbitMqModule))]
public class AbpEventBusRabbitMqModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        var configuration = context.Services.GetConfiguration();

        // 从配置文件读取配置。
        Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        // 调用初始化方法。
        context
            .ServiceProvider
            .GetRequiredService<RabbitMqDistributedEventBus>()
            .Initialize();
    }
}

2.4.1 分布式事件总线的初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void Initialize()
{
    // 创建一个消费者,并配置交换器和队列。
    Consumer = MessageConsumerFactory.Create(
        new ExchangeDeclareConfiguration(
            AbpRabbitMqEventBusOptions.ExchangeName,
            type: "direct",
            durable: true
        ),
        new QueueDeclareConfiguration(
            AbpRabbitMqEventBusOptions.ClientName,
            durable: true,
            exclusive: false,
            autoDelete: false
        ),
        AbpRabbitMqEventBusOptions.ConnectionName
    );

    // 消费者在消费消息的时候,具体的执行逻辑。
    Consumer.OnMessageReceived(ProcessEventAsync);

    // 调用基类的方法,自动订阅对应的事件。
    SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}

2.4.2 分布式事件的订阅

在定义分布式事件的时候,我们必须使用 EventNameAttribute 为事件声明路由键。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
    var handlerFactories = GetOrCreateHandlerFactories(eventType);

    if (factory.IsInFactories(handlerFactories))
    {
        return NullDisposable.Instance;
    }

    handlerFactories.Add(factory);

    if (handlerFactories.Count == 1) //TODO: Multi-threading!
    {
        // 为消费者绑定一个路由键,在收到对应的事件时,就会触发之前绑定的方法。
        Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
    }

    return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}

订阅的时候,除了 Consumer.BindAsync() 以外,基本流程和本地事件总线基本一致。

2.4.3 分布式事件的发布

分布式事件总线一样重写了发布方法,内部首先使用 IRabbitMqSerializer 序列化器 (基于 JSON.NET) 将事件数据进行序列化,然后将消息投递出去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public override Task PublishAsync(Type eventType, object eventData)
{
    var eventName = EventNameAttribute.GetNameOrDefault(eventType);
    // 序列化事件数据。
    var body = Serializer.Serialize(eventData);

    // 创建一个信道用于通讯。
    using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
    {
        channel.ExchangeDeclare(
            AbpRabbitMqEventBusOptions.ExchangeName,
            "direct",
            durable: true
        );
        
        // 更改投递模式为持久化模式。
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;

        // 发布一个新的事件。
        channel.BasicPublish(
            exchange: AbpRabbitMqEventBusOptions.ExchangeName,
            routingKey: eventName,
            mandatory: true,
            basicProperties: properties,
            body: body
        );
    }

    return Task.CompletedTask;
}

2.4.4 分布式事件的执行

执行逻辑都存放在 ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea) 方法内部,基本就是监听到指定的消息,首先反序列化消息,调用父类的 TriggerHandlersAsync 去执行具体的事件处理器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
{
    var eventName = ea.RoutingKey;
    var eventType = EventTypes.GetOrDefault(eventName);
    if (eventType == null)
    {
        return;
    }

    var eventData = Serializer.Deserialize(ea.Body, eventType);

    await TriggerHandlersAsync(eventType, eventData);
}

三、总结

ABP vNext 为我们实现了比较完善的本地事件总线,和基于 RabbitMQ 的分布式事件总线。在平时开发过程中,我们本地事件总线的使用频率应该还是比较高,而分布式事件总线目前仍处于一个半成品,很多高级特性还没实现,例如重试策略等。所以分布式事件总线要使用的话,建议使用较为成熟的 CAP 库替代 ABP vNext 的分布式事件总线。

Built with Hugo
主题 StackJimmy 设计