背景
为提升 Gravitino 的扩展性,引入了可插拔的事件监听系统,允许用户在 Gravitino 的操作后挂载自定义逻辑。典型使用场景包括:
- 日志审计;
- 向其他 Catalog 进行双写;
设计目标
- 支持在 Gravitino 服务 与 Iceberg REST Catalog 服务 中统一接入事件监听机制
- 支持可插拔插件体系,方便第三方自定义事件处理逻辑
不支持
- 强一致性的权限控制(如精细化授权)不作为目标
- 事件监听器内部异常处理(如线程阻塞、挂起)交由调用方自行管理
- 权限校验在 HTTP Handler 层完成,不由事件机制承担
- 该机制不用于 Gravitino 内部模块之间的通信
具体设计
提供 hook 点,在 Gravitino 操作完成后触发事件。事件可以同步或异步处理。
用户视角
配置方式
# 配置一个事件监听器
gravitino.eventListener.names=audit
# 配置多个事件监听器
gravitino.eventListener.names=audit,test
# 监听器实现类
gravitino.eventListener.{name}.class=xxxx
# 用户自定义属性
gravitino.eventListener.{name}.{key}
# 配置异步队列大小
gravitino.eventListener.{name}.queueCapacity=1000
接口定义
核心接口:EventListenerPlugin
public interface EventListenerPlugin {
void init(Map<String, String> properties);
void start();
void stop();
void onPostEvent(Event event); // 接收事件
default boolean isAsync() { return false; }
default boolean isSharedQueue() { return true; }
}
事件资源类型:EventResourceType
public enum EventResourceType {
METALAKE, CATALOG, RELATION, FILESET, ICEBERG_REST
}
实现视角
核心组件
- EventListenerManager:负责加载所有事件监听插件,并根据不同操作类型创建对应的
EventBus
。 - EventBus:管理一组功能一致的事件监听器,按照顺序将事件分发给每个监听器。支持同步与异步监听器,异步处理由
AsyncQueueListener
管理。 - AsyncQueueListener:通过队列机制异步消费事件。不同监听器可通过配置队列名共享消费线程,实现事件的串行或并行处理。
生命周期与流程
- 全局唯一的
EventListenerManager
- 加载监听器插件
- 创建对应的
AsyncQueueListener
实例 - 为每种 HTTP 操作构建并注册
EventBus
实例,绑定监听器集合
设计考量
接口粒度
方案一:统一处理事件
public class AuditLogEventListenerPlugin implements EventListenerPlugin {
public void onPostEvent(Event event) {
// 从 event 中提取审计信息并写入日志
}
}
- 优势:适合通用逻辑,插件逻辑集中;
- 劣势:对每个事件都需要解析字段,不够明确
方案二:细分回调接口
public class AuditLogEventListenerPlugin implements RelationalEventListener {
void onTableCreate(NameIdentifier id, OperationResult result, Table table) {
// 写审计日志
}
}
- 优势:更直观清晰
- 劣势:每个事件都需要写独立方法,代码重复
模式 | 优点 | 缺点 |
---|---|---|
同步 | 不丢事件 | 阻塞主流程 |
异步 | 不阻塞主流程 | 可能丢事件 |
同时支持同步和异步,由插件本身决定
是否支持类加载隔离
支持更好,否则所有插件只能放在 Gravitino 的 classpath 中,可能冲突。
Dispatcher 视角
Gravitino 的 Catalog 实现采用了经典的装饰器模式(Decorator Pattern)。在该模式下,所有组件均实现相同的接口(或继承统一的抽象类),通过包装已有对象的方式,在不修改其原有逻辑的前提下,增强其功能。
以 ModelEventDispatcher
为例,它并不直接处理具体的 Catalog 操作,而是包装一个实际的 Dispatcher 实例。核心职责如下:
- 将操作委托给被包装的底层 Dispatcher;
- 在委托执行前后插入自定义逻辑,如触发事件系统、发布事件等。
public class ModelEventDispatcher implements ModelDispatcher {
// 构造的时候传递 EventBus 实例以及 ModelDispatcher 实现类
public ModelEventDispatcher(EventBus eventBus, ModelDispatcher dispatcher) {
this.eventBus = eventBus;
this.dispatcher = dispatcher;
}
// 使用 eventBus.dispatchEvent 发送事件 -> eventBus.dispatchPostEvent -> 触发每个 Plugin 的 onPostEvent 进行处理
@Override
public Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchSchemaException, ModelAlreadyExistsException {
String user = PrincipalUtils.getCurrentUserName();
ModelInfo registerRequest = new ModelInfo(ident.name(), properties, comment);
eventBus.dispatchEvent(new RegisterModelPreEvent(user, ident, registerRequest));
try {
Model model = dispatcher.registerModel(ident, comment, properties);
ModelInfo registeredModel = new ModelInfo(model);
eventBus.dispatchEvent(new RegisterModelEvent(user, ident, registeredModel));
return model;
} catch (Exception e) {
eventBus.dispatchEvent(new RegisterModelFailureEvent(user, ident, e, registerRequest));
throw e;
}
}
}
代码实现
监听器
监听器的接口是 EventListenerPlugin
@DeveloperApi
public interface EventListenerPlugin {
enum Mode {
SYNC,
ASYNC_ISOLATED,
ASYNC_SHARED
}
// 初始化方法,初始化插件,用于加载配置、连接资源等
// 和服务器一起启动,如果插件启动失败,则会导致服务器启动失败
void init(Map<String, String> properties) throws RuntimeException;
// 插件启动阶段,表示插件准备好开始处理事件
// 启动失败,依然会导致服务器启动失败
void start() throws RuntimeException;
// 停止该插件,异常不会影响主流程,但建议清理资源以防泄漏
void stop() throws RuntimeException;
// 处理操作执行前的事件(如校验、拒绝等)
// 如果抛出 ForbiddenException 且模式为 SYNC,主操作将被跳过
// 在 ASYNC 模式下抛出异常无效,只记录日志
// 允许修改事件中的资源(如动态修改参数)
default void onPreEvent(PreEvent preEvent) throws ForbiddenException {}
// 处理操作完成后的事件(如创建表后触发)
// 默认实现为空,只读事件,不能改变资源
// 可同步或异步执行(由 mode() 决定)
default void onPostEvent(Event postEvent) throws RuntimeException {}
// 该监听器的事件处理模式
// 默认使用同步
default Mode mode() {
return Mode.SYNC;
}
}
模式 | 特点 | 适用场景 |
---|---|---|
SYNC |
同步执行,阻塞主线程 | 拒绝请求、写审计、修改事件资源 |
ASYNC_ISOLATED |
异步执行,每个监听器独立队列 | 各监听器资源隔离,处理较复杂 |
ASYNC_SHARED |
异步执行,监听器共享队列 | 多监听器共享线程,资源高效 |
EventBus
核心字段
// 存储所有注册的事件监听器实例
private final List<EventListenerPlugin> eventListeners;
核心方法
public class EventBus {
// 发送 PreEvent 或者 Event
public void dispatchEvent(BaseEvent baseEvent) {
if (baseEvent instanceof PreEvent) {
dispatchPreEvent((PreEvent) baseEvent);
} else if (baseEvent instanceof Event) {
dispatchPostEvent((Event) baseEvent);
} else {
throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName());
}
}
// 发送 PostEvent
private void dispatchPostEvent(Event postEvent) {
eventListeners.forEach(eventListener -> eventListener.onPostEvent(postEvent));
}
// 发送 PreEvent
private void dispatchPreEvent(PreEvent preEvent) throws ForbiddenException {
eventListeners.forEach(eventListener -> eventListener.onPreEvent(preEvent));
}
}
EventListenerManager
根据配置加载监听器 → 包装处理(按执行模式) → 构建可用的监听器列表 → 创建 EventBus
用于事件分发。
关键字段:
// 异步队列大小
private int queueCapacity;
// join 超时等待时间
private int dispatcherJoinSeconds;
// 所有处理完组装逻辑后的监听器列表
private List<EventListenerPlugin> eventListeners;
关键方法
public class EventListenerManager {
public void init(Map<String, String> properties) {
EventListenerConfig config = new EventListenerConfig(properties);
// 默认队列大小 3000,超过队列大小则事件会被抛弃
this.queueCapacity = config.get(EventListenerConfig.QUEUE_CAPACITY);
// join 超时事件 3 秒
this.dispatcherJoinSeconds = config.get(EventListenerConfig.DISPATCHER_JOIN_SECONDS);
// 1. 解析配置 → 找出所有监听器名字(例如 audit、metrics)
// 2. 加载并实例化每个监听器类,通过反射的方式构建实现类对象并调用 init 方法
// 3. 包装为 EventListenerWrapper / AsyncQueueListener
// 4. 根据是否是 shared queue 进行分组组装
}
// 启动所有的插件
public void start() {
eventListeners.stream().forEach(listener -> listener.start());
}
// 停止所有的插件
public void stop() {
eventListeners.stream().forEach(listener -> listener.stop());
}
// 根据插件模式(Mode)构建不同包装的插件实例
private List<EventListenerPlugin> assembleEventListeners(Map<String, EventListenerPlugin> userEventListeners)
// 使用配置的 class 字段获取类名并实例化
// 调用 init(config) 初始化
// 如果失败,会记录日志并抛出异常阻止启动
private EventListenerPlugin loadUserEventListenerPlugin(String listenerName, Map<String, String> config)
}
模式 | 包装方式 | 特点 |
---|---|---|
SYNC |
EventListenerPluginWrapper |
同步执行,可记录 metrics、处理异常等(默认模式) |
ASYNC_ISOLATED |
单独的 AsyncQueueListener |
每个插件独立线程/队列 |
ASYNC_SHARED |
汇总后统一放入 AsyncQueueListener ("default") |
多个插件共享线程/队列,节省资源 |
注意事项
- 所有共享模式插件会集中起来,在最后统一创建一个
AsyncQueueListener ("default")
- 所有包装后的插件都实现
EventListenerPlugin
,因此可统一交给EventBus
使用
EventListenerPluginWrapper
EventListenerManager
在管理插件时,创建的都是 EventListenerPluginWrapper
实例,该类封装了诸如异常处理等公共逻辑。其 onPreEvent
包装方法根据 Mode 定义了不同的行为:例如在 SYNC 模式下,如果 PreEvent 抛出 ForbiddenException
异常,则会立即中断操作并抛出该异常。
@Override
public void onPreEvent(PreEvent preEvent) {
try {
userEventListener.onPreEvent(preEvent);
} catch (ForbiddenException e) {
if (Mode.SYNC.equals(mode())) {
LOG.warn(
"Event listener {} process pre event {} throws ForbiddenException, will skip the "
+ "operation.",
listenerName,
preEvent.getClass().getSimpleName(),
e);
throw e;
}
printExceptionInEventProcess(listenerName, preEvent, e);
} catch (Exception e) {
printExceptionInEventProcess(listenerName, preEvent, e);
}
}
Event
PreEvent
PreEvent
表示操作请求的前置事件。在操作正式执行前,该事件允许用户插入自定义拦截逻辑;在使用同步插件时,如果抛出 ForbiddenException
,则会中止当前操作。
PostEvent
PostEvent
表示操作成功完成后的事件。如果操作执行失败,则会生成一个 FailureEvent
,该事件归类为 PostEvent。PostEvent 一经生成,用户无法修改或拦截该事件。
FailureEvent
FailureEvent 用于表示操作执行失败的情况。它记录了导致错误的原因以及其他相关信息,以便后续进行错误分析与处理。
DummyEventListener
该类用于测试,是 EventListenerPlugin
接口的一种实现。它分别使用两个 LinkedList
来存储 PostEvent
和 PreEvent
事件。之所以采用 LinkedList
,是因为需要使用 removeLast ()
方法弹出队列末尾的事件,而该位置存储了最新产生的事件。通过提供 popPostEvent
和 popPreEvent
方法,测试代码能够取出最新事件进行验证。
通常这种 “Dummy” 或者 “Mock” 实现用于测试场景中,模拟事件监听器的行为,或作为简单的占位实现,用于验证事件发布和消费逻辑。
public Event popPostEvent() {
Assertions.assertTrue(postEvents.size() > 0, "No events to pop");
return postEvents.removeLast();
}
public PreEvent popPreEvent() {
Assertions.assertTrue(preEvents.size() > 0, "No events to pop");
return preEvents.removeLast();
}
同时提供了一个异步实现 DummyAsyncEventListener
,这个类使用 tryGetPreEvents
和 tryGetPostEvents
方法来异步获取事件。
- 最多等待 20 秒;
- 轮训间隔 10 毫秒;
public List<PreEvent> tryGetPreEvents() {
Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> getPreEvents().size() > 0);
return getPreEvents();
}
public List<Event> tryGetPostEvents() {
Awaitility.await()
.atMost(20, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> getPostEvents().size() > 0);
return getPostEvents();
}
AsyncQueueListener
AsyncQueueListener
实现了 EventListenerPlugin
接口,用于异步地接收并分发事件。其核心思想是利用一个阻塞队列(BlockingQueue<BaseEvent>
)缓存事件,再由一个专门的线程(asyncProcessor
)不停地从队列中取出事件,并把这些事件分发给所有真实的监听器。这种方式的好处在于能够解耦事件的产生和事件的处理,实现异步分发,提升系统的响应能力。
关键字段
// 原子标志,表示异步监听器是否已经停止。
private final AtomicBoolean stopped:
// 用于记录队列因满或者其他原因而丢弃的事件数量。
private final AtomicLong dropEventCounters = new AtomicLong(0);
private final AtomicLong lastDropEventCounters = new AtomicLong(0);
// 最后一次事件记录的事件,使用 Instant 对象
private Instant lastRecordDropEventTime;
processEvents
方法用于处理事件,它的核心逻辑如下:
- 该方法位于无限循环中,使用
queue. take ()
阻塞等待队列中有事件可以处理。 - 判断取出的事件是 PreEvent 或 PostEvent,并分别调用所有监听器对应的
onPreEvent
或onPostEvent
方法。 - 在处理过程中捕获
InterruptedException
(线程被中断时退出循环)以及其他异常,并记录日志信息。
改进
Event 实现增强
使用 instanceOf
进行分支判断这种实现非常的糟糕,具体来说
- 当前实现对支持的事件类型进行硬编码。每当添加一个新的事件类型(例如,FailureEvent、AuditEvent)时,都必须修改
dispatchEvent
方法。 - 如果事件类型的数量在未来增加或变得更细粒度,很容易错过处理某些分支;
public void dispatchEvent(BaseEvent baseEvent) {
if (baseEvent instanceof PreEvent) {
dispatchPreEvent((PreEvent) baseEvent);
} else if (baseEvent instanceof Event) {
dispatchPostEvent((Event) baseEvent);
} else {
throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName());
}
}
一个更好的实现应具备如下特点
- 类型安全:消除
instanceof
检查,所有的问题都会在编译期暴露; - 解耦:事件只负责调用
accept
方法,不关心监听器的实现; - 强封装:通过限制对访问者方法的直接访问,防止外部误用;
- 扩展性:添加新的事件类型只需要创建新的类和
visitXXX ()
方法,而无需修改分发逻辑;
为了限制外部对访问者方法的直接访问,因此使用一个内部静态类来实现 EventDispatcher
接口,具体代码如下:
private class InternalVisitor implements EventDispatcher {
/** {@inheritDoc} */
@Override
public void dispatchPreEvent(PreEvent event) throws ForbiddenException {
eventListeners.forEach(eventListener -> eventListener.onPreEvent(event));
}
/** {@inheritDoc} */
@Override
public void dispatchPostEvent(Event event) {
eventListeners.forEach(eventListener -> eventListener.onPostEvent(event));
}
}
经过改造,EventBus 的 dispatchEvent
方法如下:
public void dispatchEvent(BaseEvent baseEvent) {
baseEvent.accept(internalVisitor);
}