Spring Reactor使用(2)

Spring Reactor使用(2)

八月 01, 2019

封装通用的reactor事件处理,无需自己去写事件,只需关心业务逻辑

Code

实例化Reactor Bean,这里采用内部 Bean 方式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//src/.../reactor/config/ReactorConfig.java

@Component
public class ReactorConfig {

@Bean
Environment env() {
return new Environment();
}

@Bean
@Primary
Reactor pool(Environment env) {
return Reactors.reactor()
.env(env)
.dispatcher(Environment.THREAD_POOL)
.get();
}

@Bean
Reactor loop(Environment env) {
return Reactors.reactor()
.env(env)
.dispatcher(Environment.EVENT_LOOP)
.get();
}
}

公共事件处理,即实现了InitializingBean初始化,也实现了业务逻辑的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//src/.../reactor/BaseHandler.java
public abstract class BaseHandler<T> implements InitializingBean, Consumer<Event<T>> {

/**
* 事件处理池(异步处理)
*/
@Autowired
protected Reactor pool;

/**
* 事件处理队列(one by one)
*/
@Autowired
@Qualifier(value = "loop")
protected Reactor loop;

/**
* 选择器-获取class的小驼峰字符串$()组合
* @return
*/
protected final Selector selector() {
//reactor.on(********$("userHandler")*********, userHandler);
return $(this.getClass().getSimpleName());
}

/**
* 键选择器-获取class的小驼峰字符串
* @return
*/
protected final String selectorKey() {
return this.getClass().getSimpleName();
}

/**
* 事件的监听器,以便于接收发送的事件并处理。需要实现 Consumer<Event<T>> 接口,其中 T 是处理程序接收的数据类型
* 封装公共使用
* @param tEvent
*/
@Override
public void accept(Event<T> tEvent) {
this.handler(tEvent.getData(), tEvent);
}

/**
* 发送的事件绑定到指定的监听器
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
this.register();
}

/**
* 封装公共发送事件方法
* @param o
*/
public void notify(T o) {
pool.notify(selectorKey(), Event.wrap(o));
}

public final void notify(Event o) {
pool.notify(selectorKey(), o);
}

protected abstract void handler(T o, Event<T> tEvent);

protected void register() throws Exception {
pool.on(selector(), this);
}
}

后续编写对应自己需要的异步操作,创建一个类继承BaseHandler传入dto泛型,实现handler就完成了,无需自己去编写绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//src/.../reactor/handler/Article.java
@Slf4j
@Component
public class ArticleHandler extends BaseHandler<ArticleDto> {

@Autowired
ArticleMapper articleMapper;

@Override
protected void handler(ArticleDto articleDto, Event<ArticleDto> articleDtoEvent) {

try {
ArticleDto article = articleMapper.getArticle(articleDto.getId());
log.info("in ArticleHandler: article={}", article);
} catch(Exception e) {
log.error(String.valueOf(e));
}
}

/**
* 如果需要大量的异步操作,可以重载修改notify和register
*/
// @Override
// public void notify(ArticleDto articleDto) {
// loop.notify(selectorKey(), Event.wrap(articleDto));
// }
//
// @Override
// protected void register() throws Exception {
// loop.on(selector(), this);
// }
}