Gravitino Event System


背景

为提升 Gravitino 的扩展性,引入了可插拔的事件监听系统,允许用户在 Gravitino 的操作后挂载自定义逻辑。典型使用场景包括:

  • 日志审计;
  • 向其他 Catalog 进行双写;

设计目标

  • 支持在 Gravitino 服务 与 Iceberg REST Catalog 服务 中统一接入事件监听机制
  • 支持可插拔插件体系,方便第三方自定义事件处理逻辑

不支持

  • 强一致性的权限控制(如精细化授权)不作为目标
  • 事件监听器内部异常处理(如线程阻塞、挂起)交由调用方自行管理
  • 权限校验在 HTTP Handler 层完成,不由事件机制承担
  • 该机制不用于 Gravitino 内部模块之间的通信

具体设计

提供 hook 点,在 Gravitino 操作完成后触发事件。事件可以同步或异步处理。

用户视角

配置方式

# 配置一个事件监听器
gravitino.eventListener.names=audit
# 配置多个事件监听器
gravitino.eventListener.names=audit,test
# 监听器实现类
gravitino.eventListener.{name}.class=xxxx
# 用户自定义属性
gravitino.eventListener.{name}.{key}
# 配置异步队列大小
gravitino.eventListener.{name}.queueCapacity=1000

接口定义

核心接口:EventListenerPlugin

public interface EventListenerPlugin {
  void init(Map<String, String> properties);
  void start();
  void stop();
  void onPostEvent(Event event); // 接收事件
  default boolean isAsync() { return false; }
  default boolean isSharedQueue() { return true; }
}

事件资源类型:EventResourceType

public enum EventResourceType {
  METALAKE, CATALOG, RELATION, FILESET, ICEBERG_REST
}

实现视角

Implement

核心组件

  • EventListenerManager:负责加载所有事件监听插件,并根据不同操作类型创建对应的 EventBus
  • EventBus:管理一组功能一致的事件监听器,按照顺序将事件分发给每个监听器。支持同步与异步监听器,异步处理由 AsyncQueueListener 管理。
  • AsyncQueueListener:通过队列机制异步消费事件。不同监听器可通过配置队列名共享消费线程,实现事件的串行或并行处理。

生命周期与流程

  1. 全局唯一的 EventListenerManager
  2. 加载监听器插件
  3. 创建对应的 AsyncQueueListener 实例
  4. 为每种 HTTP 操作构建并注册 EventBus 实例,绑定监听器集合

设计考量

接口粒度

方案一:统一处理事件

public class AuditLogEventListenerPlugin implements EventListenerPlugin {
  public void onPostEvent(Event event) {
    // 从 event 中提取审计信息并写入日志
  }
}
  • 优势:适合通用逻辑,插件逻辑集中;
  • 劣势:对每个事件都需要解析字段,不够明确

方案二:细分回调接口

public class AuditLogEventListenerPlugin implements RelationalEventListener {
  void onTableCreate(NameIdentifier id, OperationResult result, Table table) {
    // 写审计日志
  }
}
  • 优势:更直观清晰
  • 劣势:每个事件都需要写独立方法,代码重复
模式 优点 缺点
同步 不丢事件 阻塞主流程
异步 不阻塞主流程 可能丢事件

同时支持同步和异步,由插件本身决定

是否支持类加载隔离

支持更好,否则所有插件只能放在 Gravitino 的 classpath 中,可能冲突。

Dispatcher 视角

Dispatcher

Gravitino 的 Catalog 实现采用了经典的装饰器模式(Decorator Pattern)。在该模式下,所有组件均实现相同的接口(或继承统一的抽象类),通过包装已有对象的方式,在不修改其原有逻辑的前提下,增强其功能。

以 ModelEventDispatcher 为例,它并不直接处理具体的 Catalog 操作,而是包装一个实际的 Dispatcher 实例。核心职责如下:

  • 将操作委托给被包装的底层 Dispatcher;
  • 在委托执行前后插入自定义逻辑,如触发事件系统、发布事件等。
public class ModelEventDispatcher implements ModelDispatcher {
  // 构造的时候传递 EventBus 实例以及 ModelDispatcher 实现类
  public ModelEventDispatcher(EventBus eventBus, ModelDispatcher dispatcher) {
    this.eventBus = eventBus;
    this.dispatcher = dispatcher;
  }

 // 使用 eventBus.dispatchEvent 发送事件 -> eventBus.dispatchPostEvent -> 触发每个 Plugin 的 onPostEvent 进行处理
  @Override
  public Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
  throws NoSuchSchemaException, ModelAlreadyExistsException {
    String user = PrincipalUtils.getCurrentUserName();
    ModelInfo registerRequest = new ModelInfo(ident.name(), properties, comment);

    eventBus.dispatchEvent(new RegisterModelPreEvent(user, ident, registerRequest));
    try {
      Model model = dispatcher.registerModel(ident, comment, properties);
      ModelInfo registeredModel = new ModelInfo(model);
      eventBus.dispatchEvent(new RegisterModelEvent(user, ident, registeredModel));
      return model;
    } catch (Exception e) {
      eventBus.dispatchEvent(new RegisterModelFailureEvent(user, ident, e, registerRequest));
      throw e;
    }
  }
}

代码实现

EventBus

监听器

监听器的接口是 EventListenerPlugin

@DeveloperApi
public interface EventListenerPlugin {
 enum Mode {
    SYNC,
    ASYNC_ISOLATED,
    ASYNC_SHARED
  }
  
 // 初始化方法,初始化插件,用于加载配置、连接资源等
 // 和服务器一起启动,如果插件启动失败,则会导致服务器启动失败
  void init(Map<String, String> properties) throws RuntimeException;
  
  // 插件启动阶段,表示插件准备好开始处理事件
  // 启动失败,依然会导致服务器启动失败
  void start() throws RuntimeException;
  
  // 停止该插件,异常不会影响主流程,但建议清理资源以防泄漏
  void stop() throws RuntimeException;
  
  // 处理操作执行前的事件(如校验、拒绝等)
  // 如果抛出 ForbiddenException 且模式为 SYNC,主操作将被跳过
 // 在 ASYNC 模式下抛出异常无效,只记录日志
 // 允许修改事件中的资源(如动态修改参数)
  default void onPreEvent(PreEvent preEvent) throws ForbiddenException {}
  
 // 处理操作完成后的事件(如创建表后触发)
 // 默认实现为空,只读事件,不能改变资源
 // 可同步或异步执行(由 mode() 决定)
  default void onPostEvent(Event postEvent) throws RuntimeException {}
  
  // 该监听器的事件处理模式
  // 默认使用同步
  default Mode mode() {
    return Mode.SYNC;
  }
}
模式 特点 适用场景
SYNC 同步执行,阻塞主线程 拒绝请求、写审计、修改事件资源
ASYNC_ISOLATED 异步执行,每个监听器独立队列 各监听器资源隔离,处理较复杂
ASYNC_SHARED 异步执行,监听器共享队列 多监听器共享线程,资源高效

EventBus

核心字段

// 存储所有注册的事件监听器实例
private final List<EventListenerPlugin> eventListeners;

核心方法

public class EventBus {
  // 发送 PreEvent 或者 Event
  public void dispatchEvent(BaseEvent baseEvent) {
    if (baseEvent instanceof PreEvent) {
      dispatchPreEvent((PreEvent) baseEvent);
    } else if (baseEvent instanceof Event) {
      dispatchPostEvent((Event) baseEvent);
    } else {
      throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName());
    }
  }

  // 发送 PostEvent
  private void dispatchPostEvent(Event postEvent) {
    eventListeners.forEach(eventListener -> eventListener.onPostEvent(postEvent));
  }

 // 发送 PreEvent
  private void dispatchPreEvent(PreEvent preEvent) throws ForbiddenException {
    eventListeners.forEach(eventListener -> eventListener.onPreEvent(preEvent));
  }
}

EventListenerManager

根据配置加载监听器 → 包装处理(按执行模式) → 构建可用的监听器列表 → 创建 EventBus 用于事件分发。

关键字段:

// 异步队列大小
private int queueCapacity;
// join 超时等待时间
private int dispatcherJoinSeconds;
// 所有处理完组装逻辑后的监听器列表
private List<EventListenerPlugin> eventListeners;

关键方法

public class EventListenerManager {

  public void init(Map<String, String> properties) {
    EventListenerConfig config = new EventListenerConfig(properties);
    // 默认队列大小 3000,超过队列大小则事件会被抛弃
    this.queueCapacity = config.get(EventListenerConfig.QUEUE_CAPACITY);
    // join 超时事件 3 秒
    this.dispatcherJoinSeconds = config.get(EventListenerConfig.DISPATCHER_JOIN_SECONDS);
  // 1. 解析配置 → 找出所有监听器名字(例如 audit、metrics)
  // 2. 加载并实例化每个监听器类,通过反射的方式构建实现类对象并调用 init 方法
  // 3. 包装为 EventListenerWrapper / AsyncQueueListener
  // 4. 根据是否是 shared queue 进行分组组装

  }

  //  启动所有的插件
  public void start() {
    eventListeners.stream().forEach(listener -> listener.start());
  }

  // 停止所有的插件
  public void stop() {
    eventListeners.stream().forEach(listener -> listener.stop());
  }
 
 // 根据插件模式(Mode)构建不同包装的插件实例  
  private List<EventListenerPlugin> assembleEventListeners(Map<String, EventListenerPlugin> userEventListeners)
  
  // 使用配置的 class 字段获取类名并实例化
  // 调用 init(config) 初始化
 // 如果失败,会记录日志并抛出异常阻止启动
  private EventListenerPlugin loadUserEventListenerPlugin(String listenerName, Map<String, String> config)
}
模式 包装方式 特点
SYNC EventListenerPluginWrapper 同步执行,可记录 metrics、处理异常等(默认模式)
ASYNC_ISOLATED 单独的 AsyncQueueListener 每个插件独立线程/队列
ASYNC_SHARED 汇总后统一放入 AsyncQueueListener ("default") 多个插件共享线程/队列,节省资源

注意事项

  • 所有共享模式插件会集中起来,在最后统一创建一个 AsyncQueueListener ("default")
  • 所有包装后的插件都实现 EventListenerPlugin,因此可统一交给 EventBus 使用

EventListenerPluginWrapper

EventListenerManager 在管理插件时,创建的都是 EventListenerPluginWrapper 实例,该类封装了诸如异常处理等公共逻辑。其 onPreEvent 包装方法根据 Mode 定义了不同的行为:例如在 SYNC 模式下,如果 PreEvent 抛出 ForbiddenException 异常,则会立即中断操作并抛出该异常。

@Override
public void onPreEvent(PreEvent preEvent) {
 try {
   userEventListener.onPreEvent(preEvent);
  } catch (ForbiddenException e) {
   if (Mode.SYNC.equals(mode())) {
     LOG.warn(
       "Event listener {} process pre event {} throws ForbiddenException, will skip the "
       + "operation.",
       listenerName,
       preEvent.getClass().getSimpleName(),
       e);
     throw e;
   }
   printExceptionInEventProcess(listenerName, preEvent, e);
 } catch (Exception e) {
   printExceptionInEventProcess(listenerName, preEvent, e);
 }
}

Event

PreEvent

PreEvent 表示操作请求的前置事件。在操作正式执行前,该事件允许用户插入自定义拦截逻辑;在使用同步插件时,如果抛出 ForbiddenException,则会中止当前操作。

PostEvent

PostEvent 表示操作成功完成后的事件。如果操作执行失败,则会生成一个 FailureEvent,该事件归类为 PostEvent。PostEvent 一经生成,用户无法修改或拦截该事件。

FailureEvent

FailureEvent 用于表示操作执行失败的情况。它记录了导致错误的原因以及其他相关信息,以便后续进行错误分析与处理。

DummyEventListener

该类用于测试,是 EventListenerPlugin 接口的一种实现。它分别使用两个 LinkedList 来存储 PostEvent 和 PreEvent 事件。之所以采用 LinkedList,是因为需要使用 removeLast () 方法弹出队列末尾的事件,而该位置存储了最新产生的事件。通过提供 popPostEvent 和 popPreEvent 方法,测试代码能够取出最新事件进行验证。

通常这种 “Dummy” 或者 “Mock” 实现用于测试场景中,模拟事件监听器的行为,或作为简单的占位实现,用于验证事件发布和消费逻辑。

public Event popPostEvent() {
  Assertions.assertTrue(postEvents.size() > 0, "No events to pop");
  return postEvents.removeLast();
}

public PreEvent popPreEvent() {
  Assertions.assertTrue(preEvents.size() > 0, "No events to pop");
  return preEvents.removeLast();
}

同时提供了一个异步实现 DummyAsyncEventListener ,这个类使用 tryGetPreEventstryGetPostEvents 方法来异步获取事件。

  1. 最多等待 20 秒;
  2. 轮训间隔 10 毫秒;
public List<PreEvent> tryGetPreEvents() {
  Awaitility.await()
      .atMost(20, TimeUnit.SECONDS)
      .pollInterval(10, TimeUnit.MILLISECONDS)
      .until(() -> getPreEvents().size() > 0);
  return getPreEvents();
}

public List<Event> tryGetPostEvents() {
  Awaitility.await()
      .atMost(20, TimeUnit.SECONDS)
      .pollInterval(10, TimeUnit.MILLISECONDS)
      .until(() -> getPostEvents().size() > 0);
  return getPostEvents();
}

AsyncQueueListener

AsyncQueueListener 实现了 EventListenerPlugin 接口,用于异步地接收并分发事件。其核心思想是利用一个阻塞队列(BlockingQueue<BaseEvent>)缓存事件,再由一个专门的线程(asyncProcessor)不停地从队列中取出事件,并把这些事件分发给所有真实的监听器。这种方式的好处在于能够解耦事件的产生和事件的处理,实现异步分发,提升系统的响应能力。

关键字段

// 原子标志,表示异步监听器是否已经停止。
private final AtomicBoolean stopped:
// 用于记录队列因满或者其他原因而丢弃的事件数量。
private final AtomicLong dropEventCounters = new AtomicLong(0);
private final AtomicLong lastDropEventCounters = new AtomicLong(0);
// 最后一次事件记录的事件,使用 Instant 对象
private Instant lastRecordDropEventTime;

processEvents 方法用于处理事件,它的核心逻辑如下:

  • 该方法位于无限循环中,使用 queue. take () 阻塞等待队列中有事件可以处理。
  • 判断取出的事件是 PreEvent 或 PostEvent,并分别调用所有监听器对应的 onPreEvent 或 onPostEvent 方法。
  • 在处理过程中捕获 InterruptedException(线程被中断时退出循环)以及其他异常,并记录日志信息。

改进

Event 实现增强

使用 instanceOf 进行分支判断这种实现非常的糟糕,具体来说

  • 当前实现对支持的事件类型进行硬编码。每当添加一个新的事件类型(例如,FailureEvent、AuditEvent)时,都必须修改 dispatchEvent 方法。
  • 如果事件类型的数量在未来增加或变得更细粒度,很容易错过处理某些分支;
public void dispatchEvent(BaseEvent baseEvent) {
    if (baseEvent instanceof PreEvent) {
      dispatchPreEvent((PreEvent) baseEvent);
    } else if (baseEvent instanceof Event) {
      dispatchPostEvent((Event) baseEvent);
    } else {
      throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName());
    }
  }

一个更好的实现应具备如下特点

  • 类型安全:消除 instanceof 检查,所有的问题都会在编译期暴露;
  • 解耦:事件只负责调用 accept 方法,不关心监听器的实现;
  • 强封装:通过限制对访问者方法的直接访问,防止外部误用;
  • 扩展性:添加新的事件类型只需要创建新的类和 visitXXX () 方法,而无需修改分发逻辑;

EventDispatcher类图

为了限制外部对访问者方法的直接访问,因此使用一个内部静态类来实现 EventDispatcher 接口,具体代码如下:

private class InternalVisitor implements EventDispatcher {
    /** {@inheritDoc} */
    @Override
    public void dispatchPreEvent(PreEvent event) throws ForbiddenException {
      eventListeners.forEach(eventListener -> eventListener.onPreEvent(event));
    }

    /** {@inheritDoc} */
    @Override
    public void dispatchPostEvent(Event event) {
      eventListeners.forEach(eventListener -> eventListener.onPostEvent(event));
    }
  }

经过改造,EventBus 的 dispatchEvent 方法如下:

public void dispatchEvent(BaseEvent baseEvent) {
    baseEvent.accept(internalVisitor);
  }

文章作者: pancx
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 pancx !
评论
  目录