项目作者: Nepxion

项目描述 :
Nepxion EventBus是一个基于Google Guava和Spring框架AOP的通用事件调度组件,支持同步和异步模式基于Google Guava通用事件派发机制的事件总线组件,注解式异步,同步发布订阅
高级语言: Java
项目地址: git://github.com/Nepxion/EventBus.git
创建时间: 2017-12-09T10:30:17Z
项目社区:https://github.com/Nepxion/EventBus

开源协议:Apache License 2.0

下载


Nepxion EventBus

Total visits License Maven Central Javadocs Build Status Codacy Badge Stars Stars

Nepxion EventBus是一款基于Google Guava通用事件派发机制的事件总线组件。它采用Spring Framework AOP机制,提供注解调用方式,支持异步和同步两种方式

简介

  • 实现基于@EventBus注解开启EventBus机制
  • 实现异步模式下(默认),子线程中收到派发的事件,基于@EventBus(async = false),来切换是同步还是异步
  • 实现批量派发事件
  • 实现同步模式下,主线程中收到派发的事件
  • 实现线程隔离技术,并定制化配置线程池
  • 实现事件对象的多元化,可以发布和订阅Java基本类型,也可以利用框架内置的Event类型,当然也可以使用任意自定义类型

兼容

最新版本兼容

  • Spring 4.x.x和Spring Boot 1.x.x
  • Spring 5.x.x和Spring Boot 2.x.x

依赖

  1. <dependency>
  2. <groupId>com.nepxion</groupId>
  3. <artifactId>eventbus-aop-starter</artifactId>
  4. <version>${eventbus.version}</version>
  5. </dependency>

用法

  1. @SpringBootApplication
  2. @EnableEventBus
  3. public class MyApplication {
  4. }

策略

  • EventBus事件控制器(Controller)策略

① 可以由单个Controller控制缺省identifier的EventBus事件(在Google Guava内部定义缺省identifier的值为’default’)。用法如下:

事件发布端

  1. eventControllerFactory.getAsyncController().post("abc"); // 异步发送
  2. eventControllerFactory.getSyncController().post("abc"); // 同步发送

事件订阅端

  1. @EventBus // 订阅异步消息,async不指定,默认为true
  2. public class MySubscriber {
  3. }
  4. @EventBus(async = false) // 订阅同步消息
  5. public class MySubscriber {
  6. }

② 可以由多个Controller控制不同identifier的EventBus事件。用法如下:

事件发布端

  1. eventControllerFactory.getAsyncController(identifier).post("abc"); // 异步发送
  2. eventControllerFactory.getSyncController(identifier).post("abc"); // 同步发送

事件订阅端

  1. @EventBus(identifier = "xyz") // 订阅异步消息,async不指定,默认为true
  2. public class MySubscriber {
  3. }
  4. @EventBus(identifier = "xyz", async = false) // 订阅同步消息
  5. public class MySubscriber {
  6. }

注意:事件发布端和订阅端的identifier一定要一致

  1. # EventBus config
  2. # 开关配置,结合注解@EnableEventBus使用
  3. # eventbus.enabled=true
  • EventBus线程池(ThreadPool)策略

① 配置如下:

线程池配置,参考application.properties,可以不需要配置,那么采取如下默认值

  1. # Thread Pool Config
  2. # Multi thread pool,是否线程隔离。如果是,那么每个不同identifier的事件都会占用一个单独线程池,否则共享一个线程池
  3. threadPoolMultiMode=false
  4. # 共享线程池的名称
  5. threadPoolSharedName=EventBus
  6. # 是否显示自定义的线程池名
  7. threadPoolNameCustomized=true
  8. # CPU unit(CPU核数单位,例如在8核心CPU上,threadPoolCorePoolSize配置为2,那么最终核心线程数为16,下同)
  9. threadPoolCorePoolSize=1
  10. # CPU unit
  11. threadPoolMaximumPoolSize=2
  12. threadPoolKeepAliveTime=900000
  13. threadPoolAllowCoreThreadTimeout=false
  14. # LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue
  15. threadPoolQueue=LinkedBlockingQueue
  16. # CPU unit (Used for LinkedBlockingQueue or ArrayBlockingQueue)
  17. threadPoolQueueCapacity=128
  18. # BlockingPolicyWithReport, CallerRunsPolicyWithReport, AbortPolicyWithReport, RejectedPolicyWithReport, DiscardedPolicyWithReport
  19. threadPoolRejectedPolicy=BlockingPolicyWithReport

示例

调用入口1,异步模式(默认)下接收事件

  1. package com.nepxion.eventbus.example.service;
  2. /**
  3. * <p>Title: Nepxion EventBus</p>
  4. * <p>Description: Nepxion EventBus AOP</p>
  5. * <p>Copyright: Copyright (c) 2017-2050</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.stereotype.Service;
  13. import com.google.common.eventbus.Subscribe;
  14. import com.nepxion.eventbus.annotation.EventBus;
  15. import com.nepxion.eventbus.core.Event;
  16. @EventBus
  17. @Service
  18. public class MySubscriber1 {
  19. private static final Logger LOG = LoggerFactory.getLogger(MySubscriber1.class);
  20. @Subscribe
  21. public void subscribe(String event) {
  22. LOG.info("子线程接收异步事件 - {},String类型", event);
  23. }
  24. @Subscribe
  25. public void subscribe(Long event) {
  26. LOG.info("子线程接收异步事件 - {},Long类型", event);
  27. }
  28. @Subscribe
  29. public void subscribe(Boolean event) {
  30. LOG.info("子线程接收异步事件 - {},Boolean类型", event);
  31. }
  32. @Subscribe
  33. public void subscribe(Event event) {
  34. LOG.info("子线程接收异步事件 - {},内置类型Event", event);
  35. }
  36. }

调用入口2,同步模式下接收事件

  1. package com.nepxion.eventbus.example.service;
  2. /**
  3. * <p>Title: Nepxion EventBus</p>
  4. * <p>Description: Nepxion EventBus AOP</p>
  5. * <p>Copyright: Copyright (c) 2017-2050</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.stereotype.Service;
  13. import com.google.common.eventbus.Subscribe;
  14. import com.nepxion.eventbus.annotation.EventBus;
  15. import com.nepxion.eventbus.core.Event;
  16. @EventBus(async = false)
  17. @Service
  18. public class MySubscriber2 {
  19. private static final Logger LOG = LoggerFactory.getLogger(MySubscriber2.class);
  20. @Subscribe
  21. public void subscribe(String event) {
  22. LOG.info("主线程接收同步事件 - {},String类型", event);
  23. }
  24. @Subscribe
  25. public void subscribe(Long event) {
  26. LOG.info("主线程接收同步事件 - {},Long类型", event);
  27. }
  28. @Subscribe
  29. public void subscribe(Boolean event) {
  30. LOG.info("主线程接收同步事件 - {},Boolean类型", event);
  31. }
  32. @Subscribe
  33. public void subscribe(Event event) {
  34. LOG.info("主线程接收同步事件 - {},内置类型Event", event);
  35. }
  36. }

调用入口3,派发事件

  1. package com.nepxion.eventbus.example.service;
  2. /**
  3. * <p>Title: Nepxion EventBus</p>
  4. * <p>Description: Nepxion EventBus AOP</p>
  5. * <p>Copyright: Copyright (c) 2017-2050</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Service;
  14. import com.nepxion.eventbus.core.Event;
  15. import com.nepxion.eventbus.core.EventControllerFactory;
  16. @Service
  17. public class MyPublisher {
  18. private static final Logger LOG = LoggerFactory.getLogger(MyPublisher.class);
  19. @Autowired
  20. private EventControllerFactory eventControllerFactory;
  21. public void publish() {
  22. LOG.info("发送事件...");
  23. // 异步模式下(默认),子线程中收到派发的事件
  24. eventControllerFactory.getAsyncController().post("Sync Event String Format");
  25. // 同步模式下,主线程中收到派发的事件
  26. // 事件派发接口中eventControllerFactory.getSyncController(identifier)必须和@EnableEventBus参数保持一致,否则会收不到事件
  27. eventControllerFactory.getSyncController().post("Sync Event String Format");
  28. // 异步模式下(默认),子线程中收到派发的事件
  29. eventControllerFactory.getAsyncController().post(12345L);
  30. // 同步模式下,主线程中收到派发的事件
  31. // 事件派发接口中eventControllerFactory.getSyncController(identifier)必须和@EnableEventBus参数保持一致,否则会收不到事件
  32. eventControllerFactory.getSyncController().post(Boolean.TRUE);
  33. // 异步模式下(默认),子线程中收到派发的事件
  34. eventControllerFactory.getAsyncController().postEvent(new Event("Async Event"));
  35. // 同步模式下,主线程中收到派发的事件
  36. // 事件派发接口中eventControllerFactory.getSyncController(identifier)必须和@EnableEventBus参数保持一致,否则会收不到事件
  37. eventControllerFactory.getSyncController().postEvent(new Event("Sync Event"));
  38. }
  39. }

主入口

  1. package com.nepxion.eventbus.example;
  2. /**
  3. * <p>Title: Nepxion EventBus</p>
  4. * <p>Description: Nepxion EventBus AOP</p>
  5. * <p>Copyright: Copyright (c) 2017-2050</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import org.springframework.boot.SpringApplication;
  11. import org.springframework.boot.autoconfigure.SpringBootApplication;
  12. import org.springframework.context.ConfigurableApplicationContext;
  13. import com.nepxion.eventbus.annotation.EnableEventBus;
  14. import com.nepxion.eventbus.example.service.MyPublisher;
  15. @SpringBootApplication
  16. @EnableEventBus
  17. public class MyApplication {
  18. public static void main(String[] args) throws Exception {
  19. ConfigurableApplicationContext applicationContext = SpringApplication.run(MyApplication.class, args);
  20. MyPublisher myPublisher = applicationContext.getBean(MyPublisher.class);
  21. myPublisher.publish();
  22. }
  23. }

运行结果

  1. 2018-06-25 13:01:02.008 INFO [main][com.nepxion.eventbus.example.service.MyPublisher:28] - 发送事件...
  2. 2018-06-25 13:01:02.015 INFO [EventBus-192.168.0.107-thread-0][com.nepxion.eventbus.example.service.MySubscriber1:27] - 子线程接收异步事件 - Sync Event String FormatString类型
  3. 2018-06-25 13:01:02.016 INFO [main][com.nepxion.eventbus.example.service.MySubscriber2:27] - 主线程接收同步事件 - Sync Event String FormatString类型
  4. 2018-06-25 13:01:02.016 INFO [main][com.nepxion.eventbus.example.service.MySubscriber2:37] - 主线程接收同步事件 - trueBoolean类型
  5. 2018-06-25 13:01:02.016 INFO [EventBus-192.168.0.107-thread-1][com.nepxion.eventbus.example.service.MySubscriber1:32] - 子线程接收异步事件 - 12345Long类型
  6. 2018-06-25 13:01:02.017 INFO [EventBus-192.168.0.107-thread-2][com.nepxion.eventbus.example.service.MySubscriber1:42] - 子线程接收异步事件 - com.nepxion.eventbus.core.Event@67ca8c1f[
  7. source=Async Event
  8. ],内置类型Event
  9. 2018-06-25 13:01:02.017 INFO [main][com.nepxion.eventbus.example.service.MySubscriber2:42] - 主线程接收同步事件 - com.nepxion.eventbus.core.Event@1bcf67e8[
  10. source=Sync Event
  11. ],内置类型Event

请联系我

微信、钉钉、公众号和文档

Star走势图

Stargazers over time