封装通用的reactor事件处理,无需自己去写事件,只需关心业务逻辑
Code
实例化Reactor Bean,这里采用内部 Bean 方式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| //src/.../reactor/config/ReactorConfig.java
@Component public class ReactorConfig {
@Bean Environment env() { return new Environment(); }
@Bean @Primary Reactor pool(Environment env) { return Reactors.reactor() .env(env) .dispatcher(Environment.THREAD_POOL) .get(); }
@Bean Reactor loop(Environment env) { return Reactors.reactor() .env(env) .dispatcher(Environment.EVENT_LOOP) .get(); } }
|
公共事件处理,即实现了InitializingBean初始化,也实现了业务逻辑的消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| //src/.../reactor/BaseHandler.java public abstract class BaseHandler<T> implements InitializingBean, Consumer<Event<T>> {
/** * 事件处理池(异步处理) */ @Autowired protected Reactor pool;
/** * 事件处理队列(one by one) */ @Autowired @Qualifier(value = "loop") protected Reactor loop;
/** * 选择器-获取class的小驼峰字符串$()组合 * @return */ protected final Selector selector() { //reactor.on(********$("userHandler")*********, userHandler); return $(this.getClass().getSimpleName()); }
/** * 键选择器-获取class的小驼峰字符串 * @return */ protected final String selectorKey() { return this.getClass().getSimpleName(); }
/** * 事件的监听器,以便于接收发送的事件并处理。需要实现 Consumer<Event<T>> 接口,其中 T 是处理程序接收的数据类型 * 封装公共使用 * @param tEvent */ @Override public void accept(Event<T> tEvent) { this.handler(tEvent.getData(), tEvent); }
/** * 发送的事件绑定到指定的监听器 * @throws Exception */ @Override public void afterPropertiesSet() throws Exception { this.register(); }
/** * 封装公共发送事件方法 * @param o */ public void notify(T o) { pool.notify(selectorKey(), Event.wrap(o)); }
public final void notify(Event o) { pool.notify(selectorKey(), o); }
protected abstract void handler(T o, Event<T> tEvent);
protected void register() throws Exception { pool.on(selector(), this); } }
|
后续编写对应自己需要的异步操作,创建一个类继承BaseHandler传入dto泛型,实现handler就完成了,无需自己去编写绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| //src/.../reactor/handler/Article.java @Slf4j @Component public class ArticleHandler extends BaseHandler<ArticleDto> {
@Autowired ArticleMapper articleMapper;
@Override protected void handler(ArticleDto articleDto, Event<ArticleDto> articleDtoEvent) {
try { ArticleDto article = articleMapper.getArticle(articleDto.getId()); log.info("in ArticleHandler: article={}", article); } catch(Exception e) { log.error(String.valueOf(e)); } }
/** * 如果需要大量的异步操作,可以重载修改notify和register */ // @Override // public void notify(ArticleDto articleDto) { // loop.notify(selectorKey(), Event.wrap(articleDto)); // } // // @Override // protected void register() throws Exception { // loop.on(selector(), this); // } }
|