是 Solon 分布式事件总线的解决方案。也是 Solon “最终一致性”分布式事务的解决方案之一
事务?就是要求 Event 有原子性,当多个 Event 发布时,要么全成功,要么全失败。
public class EventDemo {
public void event_tran() {
//新建一个 Event 事务
EventTran eventTran = CloudClient.event().newTran();
try {
//发布,并使用事务
CloudClient.event().publish(new Event("user.event1", "test1").tran(eventTran));
CloudClient.event().publish(new Event("user.event2", "test2").tran(eventTran));
CloudClient.event().publish(new Event("user.event2", "test3").tran(eventTran));
//如果没问题,提交事务
eventTran.commit();
} catch (Throwable ex) {
//如果有问题,回滚事务
eventTran.rollback();
}
}
}
上面的体验与经典的 Jdbc 事务是很像的。加入 Solon 的事务注解管理后,体验可以再简洁些,也能与 Jdbc 事务整合到一起。
@Component
public class EventDemo {
//使用 @Tran 管理事务(将 jdbc, event 事务整合到一起)
@Tran
public void event_and_jdbc_tran() {
//新建一个 Event 事务,并加入 @Tran 的管理
EventTran eventTran = CloudClient.event().newTranAndJoin();
CloudClient.event().publish(new Event("user.event1", "test1").tran(eventTran));
CloudClient.event().publish(new Event("user.event2", "test2").tran(eventTran));
CloudClient.event().publish(new Event("user.event2", "test3").tran(eventTran));
}
}
我们设计一个用户注册的场景应用:
主服务程序,负责主业务:
@Component
public class UserService {
@Inject
UserDao userDao;
//用户注册
@Tran
public void userRegister(long userId, String name){
userDao.addUser(userId, name);
this.onUserRegistered(userId);
}
//当用户完成注册时(发布事件)
private void onUserRegistered(long userId) {
String eventJson = String.format("{\"userId\":%d}", userId);
Date eventTime = DateTime.Now().addDay(10);
EventTran eventTran = CloudClient.event().newTranAndJoin();
//发布用户已注册事件
CloudClient.event().publish(new Event("user.registered", eventJson).tran(eventTran));
//发布用户已唤醒事件(用于检查用户在10内,有没有活动行为)
CloudClient.event().publish(new Event("user.reawakened", eventJson).scheduled(eventTime).tran(eventTran));
}
}
次服务程序,负责辅助业务(也可以合到主服务程序):
@CloudEvent("user.registered")
public class UserRegisteredEventHandler implements CloudEventHandler {
@Inject
UserService userService;
@Inject
MobileService mobileSerivce;
@Override
public boolean handler(Event event) throws Throwable {
long userId = ONode.load(event.context()).get("userId").getLong();
//送10个金币
userService.addGold(userId, 10);
//送手机充值100块
String mobie = userService.getMobile(userId);
mobileSerivce.recharge(mobile, 100);
return true;
}
}
@CloudEvent("user.reawakened")
public class UserReawakenedEventHandler implements CloudEventHandler {
@Inject
UserService userService;
@Inject
PushService pushService
@Override
public boolean handler(Event event) throws Throwable {
long userId = ONode.load(event.context()).get("userId").getLong();
if (userService.hasLive(userId, 10)) {
//再送100个金币
userService.addGold(userId, 100);
} else {
//获取设备id
String duid = userService.getDuid(userId);
//发布推送
pushService.push(duid, "有100个金币等你来拿哟...")
}
return true;
}
}