本文主要从源码角度分析 Catalog 的整体运行流程,包括它的主要职责,与其他模块的交互等。

# 简介

首先,来看官网描述:

The Impala component known as the Catalog Service relays the metadata changes from Impala SQL statements to all the Impala daemons in a cluster.
It is physically represented by a daemon process named catalogd. You only need such a process on one host in a cluster. Because the requests are
passed through the StateStore daemon, it makes sense to run the statestored and catalogd services on the same host.

The catalog service avoids the need to issue REFRESH and INVALIDATE METADATA statements when the metadata changes are performed by statements
issued through Impala. When you create a table, load data, and so on through Hive, you do need to issue REFRESH or INVALIDATE METADATA on an
Impala daemon before executing a query there.

Most considerations for load balancing and high availability apply to the impalad daemon. The statestored and catalogd daemons do not have special
requirements for high availability, because problems with those daemons do not result in data loss. If those daemons become unavailable due to an
outage on a particular host, you can stop the Impala service, delete the Impala StateStore and Impala Catalog Server roles, add the roles on a
different host, and restart the Impala service.

翻译:
Impala 组件 Catalog Service 把 SQL 语句中的元数据变化传递给集群中的 Impala deamons。它是一个名叫 catalogd 的守护进程,只需要部署一个节点,因为请求是通过 StateStore daemon 传递的,所以需要把它俩部署在一个节点上。

当通过 Impala 执行的语句涉及元数据变化时,Catalog Servie 无需执行 REFRESH 和 INVALIDATE METADATA 语句。当通过 Hive 建表、导入数据等时,需要在查询前执行 REFRESH 或 INVALIDATE METADATA。

负载均衡和高可用考虑的大多数问题都适用于 impalad,statestored 和 catalogd 不需要考虑高可用,因为这些服务的问题不会导致数据丢失。如果这些服务因为节点停机不可用了,可以停掉 Impala 服务,删掉 Statestore 和 Catalogd 角色,在另外的节点重新添加角色,重启 Impala 服务即可。

# 源码分析

# 代码结构

可以看出,c++ 部分其实只是启动了服务和端口,真正的实现逻辑都在 Java org.apache.impala.catalog 包中。

# 服务启动

Catalogd 启动入口是 CatalogdMain 方法。

be/src/catalog/catalogd-main.cc
// Catalog Server debug Web UI for administrators to monitor and troubleshoot.
  FLAGS_webserver_port = 25020;
  // Internal use only. The Catalog Server daemon listens on this port for updates from the StateStore daemon.
  FLAGS_state_store_subscriber_port = 23020;
  // 初始化公共运行环境
  InitCommonRuntime(argc, argv, true);
  // 初始化 fe 代码
  InitFeSupport();
  // 初始化 catalog 服务环境,启动 webserver 和 metrics webserver,注册内存统计信息,开启内存维护线程等
  DaemonEnv daemon_env("catalog");
  ABORT_IF_ERROR(daemon_env.Init(/* init_jvm */ true));
  // 初始化元数据事件统计信息
  MetastoreEventMetrics::InitMetastoreEventMetrics(daemon_env.metrics());
  // 启动 catalog 服务,创建了两个线程:收集 catalog 更新、刷新统计信息,启动 statestore 监听端口
  CatalogServer catalog_server(daemon_env.metrics())
  ABORT_IF_ERROR(catalog_server.Start());
  // 注册页面
  catalog_server.RegisterWebpages(daemon_env.webserver());
  // FLAGS_catalog_service_port(26000): Internal use only. The Catalog Server uses this port to communicate with the Impala daemons.
  // 启动 thrift 端口,提供给 impalad
  // ...
  ABORT_IF_ERROR(server->Start());
  catalog_server.MarkServiceAsStarted();
  server->Join();
  return 0;

可以看出,catalogd 一共启动了 3 个端口,一个 web 端口供 debug 使用,一个 thrift 端口订阅 statestore 信息,一个 thrift 端口供 impalad 操作元数据。

# Catalog Service API

catalogd 对 impalad 提供了 9 个接口

service CatalogService {
  // 执行 DDL 语句
  TDdlExecResponse ExecDdl(1: TDdlExecRequest req);

  // 获取 CatalogObject,可以是数据库、表、函数等
  TGetCatalogObjectResponse GetCatalogObject(1: TGetCatalogObjectRequest req);

  // 获取表分区状态
  TGetPartitionStatsResponse GetPartitionStats(1: TGetPartitionStatsRequest req);

  // 重置元数据
  TResetMetadataResponse ResetMetadata(1: TResetMetadataRequest req);

  // 更新分区信息
  TUpdateCatalogResponse UpdateCatalog(1: TUpdateCatalogRequest req);

  // 获取函数
  TGetFunctionsResponse GetFunctions(1: TGetFunctionsRequest req);

  // 优先加载某个表的元数据
  TPrioritizeLoadResponse PrioritizeLoad(1: TPrioritizeLoadRequest req);

  // 获取部分信息,看着是一些统计信息
  TGetPartialCatalogObjectResponse GetPartialCatalogObject(
      1: TGetPartialCatalogObjectRequest req);

  // 更新表使用次数
  TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);
}

下面分别来简单看几个接口的实现,具体的实现就是 Java 了。

# ExecDdl

这个接口执行 DDL 语句,我们以 CREATE DATABASE 为例,来看下具体逻辑。

fe/src/main/java/org/apache/impala/service/JniCatalog.java
public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException, TException {
  // ...
  // CatalogOpExecutor 执行 DDL 操作
  byte[] res = serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
  // ...
}
fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
    throws ImpalaException {
  // ...
  switch (ddl_type) {
    case CREATE_DATABASE:
      // ...
      // 这里又封装了一层
      createDatabase(create_db_params, response, syncDdl, wantMinimalResult);
      break;
    // ...
  }
  // SYNC_DDL 更改版本,后面再看执行 SYNC_DDL 后 impala 做了什么
  if (syncDdl) {
    response.getResult().setVersion(
        catalog_.waitForSyncDdlVersion(response.getResult()));
  }
  // ...
}
private void createDatabase(TCreateDbParams params, TDdlExecResponse resp,
    boolean syncDdl, boolean wantMinimalResult) throws ImpalaException {
  String dbName = params.getDb();
  //catalog_ 是一层缓存
  Db existingDb = catalog_.getDb(dbName);
  if (params.if_not_exists && existingDb != null) {
    // 数据库已存在,不存在才创建,这种情况直接跳过
    // 虽然不需要再创建了,这种情况还是需要更新 Catalog version
    if (syncDdl) {
      tryLock(existingDb, "create database");
      try {
        long newVersion = catalog_.incrementAndGetCatalogVersion();
        existingDb.setCatalogVersion(newVersion);
      } finally {
        // 释放锁
        catalog_.getLock().writeLock().unlock();
        existingDb.getLock().unlock();
      }
    }
    // 处理结果并返回
    addDbToCatalogUpdate(existingDb, wantMinimalResult, resp.result);
    return;
  }
  // 初始化 metastore 接口参数
  org.apache.hadoop.hive.metastore.api.Database db =
      new org.apache.hadoop.hive.metastore.api.Database();
  // 省略 set name、comment、location 等
  
  Db newDb = null;
  // 拿到 metastore ddl 的锁
  getMetastoreDdlLock().lock();
  try {
    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
      try {
        // 调 metastore 接口创建数据库
        long eventId = getCurrentEventId(msClient);
        msClient.getHiveClient().createDatabase(db);
        // ...
        // 加入缓存
        newDb = catalog_.addDb(dbName, eventDbPair.second, eventDbPair.first);
      } catch (AlreadyExistsException e) {
        // ...
      } catch (TException e) {
        // ...
      }
    }
    // 处理结果并返回
    addDbToCatalogUpdate(newDb, wantMinimalResult, resp.result);
    // ...
  } finally {
    // 释放锁
    getMetastoreDdlLock().unlock();
  }
}

# GetCatalogObject

JniCatalog 中和上面类似,都是封装了一下。区别就是,这个接口只是查询,不涉及更改,所以直接找 CatalogServiceCatalog 查就行了,不需要 CatalogOpExecutor 处理。
注意:CatalogServiceCatalog 目前就先当做缓存层,其实它做了缓存之上更复杂的事情,后面再分析。

fe/src/main/java/org/apache/impala/service/JniCatalog.java
public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
    TException {
  // ...
  // 这里不涉及更改,直接找缓存查
  byte[] res = serializer.serialize(catalog_.getTCatalogObject(objectDesc));
  // ...
}

# 总结

以上大致知道了 Catalog 做的事情,主要是管理元数据,处理 DDL 请求。在后续的文章中继续分析 CatalogServiceCatalog,来看下它是如何管理元数据的。