CatalogServiceCatalog 管理所有 Catalog 元数据的加载以及处理 DDL 请求。它维护了一个全局版本,每当从 Catalog 中新增、修改或删除时,版本就会新增并分配给 CatalogObject。这意味这每一个 CatalogObject 将有一个单独的版本,被分配的版本会严格递增。

# 总览

CatalogServiceCatalog 定期收集增量更新(基于指定的版本号),构建要发送给 statestore 的主题更新。每一个 catalog 主题更新被一个范围版本号定义, CatalogServiceCatalog 保证每一个具有指定范围版本号的 CatalogObject 被包含在 catalog 主题更新中。当一个 主题更新被发送的时候,并发的 DDL 请求是允许的。因此,频繁修改 CatalogObject 可能导致跳过主题更新是可能的。这可能发生在一个主题更新线程尝试收集一个对象更新时,这个对象被另一个元数据操作修改,导致它的版本超过主题更新的目标版本。为了保证所有的更新最终都包含在主题更新中,我们持续跟踪每个 CatalogObject 跳过主题更新的次数,如果次数超过指定阈值,我们将 Catalog Object 加入到下一个主题更新中,即使它的版本比主题更新的目标版本更高。因此,相同版本的对象可能被发送到两个独立的主题更新中。

CatalogServiceCatalog 维护了两种日志:

  1. 删除日志。从缓存删了对象之后,缓存本身追踪删除已经没有用了。这个日志通过记录已经从 Catalog 中移除的 CatalogObject,在一个主题更新中填充删除列表是有用的。一个新版本的实体每次被删的时候都会被加入这个日志。增加一个对象的版本,并将它加入删除日志应该是自动的。当相关删除实体被加入主题更新时,一个实体通过主题更新线程被从这个日志移除。
  2. 主题更新日志。这个日志记录了关于 Catalog 主题更新中的 CatalogObject 的信息。只有处理主题更新的线程才能从日志中添加、更新或删除实体。所有其他操作只能读主题更新日志实体,但是不能修改。每一个实体包含 CatalogObject 跳过主题更新的次数,被最后发送到一个主题更新的对象版本,主题更新的版本。主题更新日志的实体,为了阻止日志无限增长,每隔 TOPIC_UPDATE_LOG_GC_FREQUENCY 个主题更新被主题更新处理线程垃圾回收。元数据操作使用 SYNC_DDL 检查这个日志确认 catalog 主题版本,确保 impalad 必须等待以便确保这个操作的影响被广播到其他的 coordinators。

SYNC_DDL 的一直异常:
基于时间清理处理的主题更新实体可能导致用了 SYNC_DDL 的元数据操作挂掉,当等待指定主题更新日志实体时。如果线程处理元数据操作长时间停滞(比处理 TOPIC_UPDATE_LOG_GC_FREQUENCY 主题更新时间更长)在操作在 catalog 缓存生效和 SYNC_DDL 版本被检查之间,这个是可能发生的。为了减少这种事件的可能性,我们给 TOPIC_UPDATE_LOG_GC_FREQUENCY 设置一个很大的值。另外,为了阻止元数据操作因为未知原因挂在这里,不允许使用 SYNC_DDL 的操作无限等待指定主题日志实体,如果达到指定的最大等待时间会抛出异常。查看 waitForSyncDdlVersion () 了解更多细节。

IncompleteTables 表元数据被 TableLoadingMgr 在后台加载,通过调用 prioritizeLoad () 表可以表优先加载。后台加载也可以开启,这种情况下遗失的表会被提交给 TableLoadingMgr,任何表元数据都会失效并在启动时加载。完全加载的表元数据就地更新,不会通过 TableLoadingMgr 触发后台元数据加载。访问一个没有被加载的表一经请求将会加载表的元数据,在表加载线程池之外。

查看 CatalogOpExecutor 的类注释,描述了锁协议应该被应用如果版本锁和表锁需要同时获取。

# 常量 & 变量

// MetaStore Client 资源池初始大小
public static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
// 主题更新跳过的最大次数
private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
// 跳过更新锁争夺的最大次数
private final int maxSkippedUpdatesLockContention_;
// 主题更新表锁最大等待时间
private final long topicUpdateTblLockMaxWaitTimeMs_;
// 获取表锁的超时时间
public static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
// 获取表锁的间隔时间
private static final int LOCK_RETRY_DELAY_MS = 10;
// GetPartialCatalogObjectRequest 中 table id
public static final long TABLE_ID_UNAVAILABLE = -1;
//catalog 服务 id
private final TUniqueId catalogServiceId_;
// 版本锁,公平锁
private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
//catalog 版本
private long catalogVersion_ = INITIAL_CATALOG_VERSION;
// 上次执行 reset () 方法的 catalog 版本,被版本锁保护。
private long lastResetStartVersion_ = INITIAL_CATALOG_VERSION;
// 负责后台表加载的调度
private final TableLoadingMgr tableLoadingMgr_;
// 是否在后台加载
private final boolean loadInBackground_;
// 定期轮询 HDFS 来获取最新的一组已知缓存池
private final ScheduledExecutorService cachePoolReader_ = Executors.newScheduledThreadPool(1);
// 删除锁
private final CatalogDeltaLog deleteLog_;
// 上次主题更新版本,其中的 catalog 版本
private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
// 主题更新等待时间
private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
// 主题更新日志
private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
// 本地库路径
private final String localLibraryPath_;
//catalog 表失效器
private CatalogdTableInvalidator catalogdTableInvalidator_;
// 负责 MetaStore 事件处理来标记表失效
private ExternalEventsProcessor metastoreEventProcessor_;
// MetaStore server
private ICatalogMetastoreServer catalogMetastoreServer_;
// 同步到最新 MetaStore 事件的工厂
private MetastoreEventFactory syncToLatestEventFactory_;
// 主题模式
private static enum TopicMode {
  // 发布完整的 catalog object
  FULL,
  // 发布完整的和简略的 catalog object
  MIXED,
  // 只发布简略的 catalog object 
  MINIMAL
};
final TopicMode topicMode_;
// 局部获取 RPC 队列超时时间 
private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE.getCatalogPartialFetchRpcQueueTimeoutS();
// 局部获取 RPC 最大并行度
private final int MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT = BackendConfig.INSTANCE.getCatalogMaxParallelPartialFetchRpc();
// 控制并发访问 doGetPartialCatalogObject 方法,控制并发数到 --catalog_max_parallel_partial_fetch_rpc 
private final Semaphore partialObjectFetchAccess_ = new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true);
// 授权
private AuthorizationManager authzManager_;
// 数据库黑名单
private final Set<String> blacklistedDbs_;
// 表黑名单
private final Set<TableName> blacklistedTables_;

# 构造方法

构造方法没有做什么特殊的事情,这里就不再看了。

# 方法

# 获取锁

/**
 * 表写锁,参数默认
 */
public boolean tryWriteLock(Table tbl) {
  return tryLock(tbl, true, LOCK_RETRY_TIMEOUT_MS);
}
/**
 * 多表写锁
 */
public boolean tryWriteLock(Table[] tables) {
  StringBuilder tableInfo = new StringBuilder();
  int numTables = tables.length;
  if (LOG.isDebugEnabled()) {
    for(int i = 0; i < numTables; i++) {
      tableInfo.append(tables[i].getFullName());
      if(i < numTables - 1) {
        tableInfo.append(", ");
      }
    }
    LOG.debug("Trying to acquire write locks for tables: " +
        tableInfo);
  }
  int tableIndex=-1, versionLockCount = 0;
  try {
    for(tableIndex = 0; tableIndex < numTables; tableIndex++) {
      Table tbl = tables[tableIndex];
      if (!tryWriteLock(tbl)) {
        LOG.debug("Could not acquire write lock on table: " + tbl.getFullName());
        return false;
      }
      versionLockCount += 1;
    }
    // in case of success, release version write lock for all tables except last
    if (tableIndex == numTables) {
      versionLockCount = versionLockCount - 1;
    }
    return true;
  } finally {
    if (tableIndex != numTables) {
      // couldn't acquire lock on all tables
      StringBuilder tablesInfo = new StringBuilder();
      for(int i = 0; i < tableIndex; i++) {
        tables[i].releaseWriteLock();
        tablesInfo.append(tables[i].getFullName() + ((i < tableIndex-1) ? ", " : ""));
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Released table write lock on tables: {}", tablesInfo);
      }
    }
    LOG.debug("Unlocking versionLock_ write lock {} number of times", versionLockCount);
    while (versionLockCount > 0) {
      versionLock_.writeLock().unlock();
      versionLockCount = versionLockCount - 1;
    }
  }
}