本文主要从源码角度分析 Catalog 的整体运行流程,包括它的主要职责,与其他模块的交互等。

# 简介


Impala 组件 Catalog Service 把 SQL 语句中的元数据变化传递给集群中的 Impala deamons。它是一个名叫 catalogd 的守护进程,只需要部署一个节点,因为请求是通过 StateStore daemon 传递的,所以需要把它俩部署在一个节点上。

当通过 Impala 执行的语句涉及元数据变化时,Catalog Servie 无需执行 REFRESH 和 INVALIDATE METADATA 语句。当通过 Hive 建表、导入数据等时,需要在查询前执行 REFRESH 或 INVALIDATE METADATA。

负载均衡和高可用考虑的大多数问题都适用于 impalad,statestored 和 catalogd 不需要考虑高可用,因为这些服务的问题不会导致数据丢失。如果这些服务因为节点停机不可用了,可以停掉 Impala 服务,删掉 Statestore 和 Catalogd 角色,在另外的节点重新添加角色,重启 Impala 服务即可。

# 源码分析

# 代码结构

可以看出,c++ 部分其实只是启动了服务和端口,真正的实现逻辑都在 Java org.apache.impala.catalog 包中。

# 服务启动

Catalogd 启动入口是 CatalogdMain 方法。

// Catalog Server debug Web UI for administrators to monitor and troubleshoot.
  FLAGS_webserver_port = 25020;
  // Internal use only. The Catalog Server daemon listens on this port for updates from the StateStore daemon.
  FLAGS_state_store_subscriber_port = 23020;
  // 初始化公共运行环境
  InitCommonRuntime(argc, argv, true);
  // 初始化 fe 代码
  // 初始化 catalog 服务环境,启动 webserver 和 metrics webserver,注册内存统计信息,开启内存维护线程等
  DaemonEnv daemon_env("catalog");
  ABORT_IF_ERROR(daemon_env.Init(/* init_jvm */ true));
  // 初始化元数据事件统计信息
  // 启动 catalog 服务,创建了两个线程:收集 catalog 更新、刷新统计信息,启动 statestore 监听端口
  CatalogServer catalog_server(daemon_env.metrics())
  // 注册页面
  // FLAGS_catalog_service_port(26000): Internal use only. The Catalog Server uses this port to communicate with the Impala daemons.
  // 启动 thrift 端口,提供给 impalad
  // ...
  return 0;

可以看出,catalogd 一共启动了 3 个端口,一个 web 端口供 debug 使用,一个 thrift 端口订阅 statestore 信息,一个 thrift 端口供 impalad 操作元数据。

# Catalog Service API

catalogd 对 impalad 提供了 9 个接口

service CatalogService {
  // 执行 DDL 语句
  TDdlExecResponse ExecDdl(1: TDdlExecRequest req);

  // 获取 CatalogObject,可以是数据库、表、函数等
  TGetCatalogObjectResponse GetCatalogObject(1: TGetCatalogObjectRequest req);

  // 获取表分区状态
  TGetPartitionStatsResponse GetPartitionStats(1: TGetPartitionStatsRequest req);

  // 重置元数据
  TResetMetadataResponse ResetMetadata(1: TResetMetadataRequest req);

  // 更新分区信息
  TUpdateCatalogResponse UpdateCatalog(1: TUpdateCatalogRequest req);

  // 获取函数
  TGetFunctionsResponse GetFunctions(1: TGetFunctionsRequest req);

  // 优先加载某个表的元数据
  TPrioritizeLoadResponse PrioritizeLoad(1: TPrioritizeLoadRequest req);

  // 获取部分信息,看着是一些统计信息
  TGetPartialCatalogObjectResponse GetPartialCatalogObject(
      1: TGetPartialCatalogObjectRequest req);

  // 更新表使用次数
  TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);

下面分别来简单看几个接口的实现,具体的实现就是 Java 了。

# ExecDdl

这个接口执行 DDL 语句,我们以 CREATE DATABASE 为例,来看下具体逻辑。

public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException, TException {
  // ...
  // CatalogOpExecutor 执行 DDL 操作
  byte[] res = serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
  // ...
public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
    throws ImpalaException {
  // ...
  switch (ddl_type) {
      // ...
      // 这里又封装了一层
      createDatabase(create_db_params, response, syncDdl, wantMinimalResult);
    // ...
  // SYNC_DDL 更改版本,后面再看执行 SYNC_DDL 后 impala 做了什么
  if (syncDdl) {
  // ...
private void createDatabase(TCreateDbParams params, TDdlExecResponse resp,
    boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
  String dbName = params.getDb();
  //catalog_ 是一层缓存
  Db existingDb = catalog_.getDb(dbName);
  if (params.if_not_exists && existingDb != null) {
    // 数据库已存在,不存在才创建,这种情况直接跳过
    // 虽然不需要再创建了,这种情况还是需要更新 Catalog version
    if (syncDdl) {
      tryLock(existingDb, "create database");
      try {
        long newVersion = catalog_.incrementAndGetCatalogVersion();
      } finally {
        // 释放锁
    // 处理结果并返回
    addDbToCatalogUpdate(existingDb, wantMinimalResult, resp.result);
  // 初始化 metastore 接口参数
  org.apache.hadoop.hive.metastore.api.Database db =
      new org.apache.hadoop.hive.metastore.api.Database();
  // 省略 set name、comment、location 等
  Db newDb = null;
  // 拿到 metastore ddl 的锁
  try {
    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
      try {
        // 调 metastore 接口创建数据库
        long eventId = getCurrentEventId(msClient);
        // ...
        // 加入缓存
        newDb = catalog_.addDb(dbName, eventDbPair.second, eventDbPair.first);
      } catch (AlreadyExistsException e) {
        // ...
      } catch (TException e) {
        // ...
    // 处理结果并返回
    addDbToCatalogUpdate(newDb, wantMinimalResult, resp.result);
    // ...
  } finally {
    // 释放锁

# GetCatalogObject

JniCatalog 中和上面类似,都是封装了一下。区别就是,这个接口只是查询,不涉及更改,所以直接找 CatalogServiceCatalog 查就行了,不需要 CatalogOpExecutor 处理。
注意:CatalogServiceCatalog 目前就先当做缓存层,其实它做了缓存之上更复杂的事情,后面再分析。

public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
    TException {
  // ...
  // 这里不涉及更改,直接找缓存查
  byte[] res = serializer.serialize(catalog_.getTCatalogObject(objectDesc));
  // ...

# 总结

以上大致知道了 Catalog 做的事情,主要是管理元数据,处理 DDL 请求。在后续的文章中继续分析 CatalogServiceCatalog,来看下它是如何管理元数据的。