Catalog 是一个抽象类,为了了解 Catalog 的元数据管理,Catalog 是一个很重要的类。本文就分析下 Catalog 类的源码。

# 前言

首先来看下类上的注释:

Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
This class provides a storage API for caching CatalogObjects: databases, tables,
and functions and the relevant metadata to go along with them. Although this class is
thread safe, it does not guarantee consistency with the MetaStore. It is important
to keep in mind that there may be external (potentially conflicting) concurrent
metastore updates occurring at any time.
The CatalogObject storage hierarchy is:
Catalog -> Db -> Table
-> Function
Each level has its own synchronization, so the cache of Dbs is synchronized and each
Db has a cache of tables which is synchronized independently.

The catalog is populated with the impala builtins on startup. Builtins and user
functions are treated identically by the catalog. The builtins go in a specific
database that the user cannot modify.
Builtins are populated on startup in initBuiltins().

Catalog 是一个线程安全接口,主要功能是读写 Hive Metastore 元数据。

这个类为缓存 CatalogObjects 提供了一个存储 API:数据库、表和函数及相关元数据。虽然这个类是线程安全的,但是它不保证和 MetaStore 的一致性。外部随时可能并发修改 MetaStore。

CatalogObject 存储机制是:
Catalog -> Db -> Table
-> Function
每一层有他的独立同步,所以 Dbs 的缓存是同步的,每一个 Db 有一个 Tables 的缓存,他们是独立同步的。

Catalog 填充了启动时 impala 的内置函数,内置函数和用户函数对于 Catalog 来说是一样的。内置函数放在一个特殊的数据库,用户不能修改它。

内置函数启动时在 initBuiltins () 中被载入。

# 类关系

Catalog 实现了 AutoCloseable,为了在示例被销毁时释放 MetaStoreClient 的资源池。

@Override
public void close() { metaStoreClientPool_.close(); }

# 常量

// Catalog 初始版本
public final static long INITIAL_CATALOG_VERSION = 0L;
// Catalog 初始服务 ID
public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
// 默认数据库
public static final String DEFAULT_DB = "default";

# 变量

//metastore client 资源池
private final MetaStoreClientPool metaStoreClientPool_;
// 认证策略
protected AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
//db 缓存,db 中有 table 缓存,原子更新,并发安全
protected AtomicReference<Map<String, Db>> dbCache_ =
    new AtomicReference<>(new ConcurrentHashMap<String, Db>());
// DataSource 缓存
protected final CatalogObjectCache<DataSource> dataSources_;
//txc 是 transaction 的缩写,事务到写 ID 的映射
protected final ConcurrentHashMap<Long, Set<TableWriteId>> txnToWriteIds_ =
    new ConcurrentHashMap<>();
// HdfsCachePool 缓存
protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
    new CatalogObjectCache<HdfsCachePool>(false);
// AuthzCacheInvalidation 缓存
protected final CatalogObjectCache<AuthzCacheInvalidation> authzCacheInvalidation_ =
    new CatalogObjectCache<>();
// HMS 锁和事务心跳时间
private TransactionKeepalive transactionKeepalive_;

# 构造函数

a
public Catalog(MetaStoreClientPool metaStoreClientPool) {
  // 初始化 DataSource
  dataSources_ = new CatalogObjectCache<DataSource>();
  metaStoreClientPool_ = Preconditions.checkNotNull(metaStoreClientPool);
  // Metastore 主版本大于 2 才会创建事务心跳
  if (MetastoreShim.getMajorVersion() > 2) {
    transactionKeepalive_ = new TransactionKeepalive(metaStoreClientPool_);
  } else {
    transactionKeepalive_ = null;
  }
}
public Catalog() {
  // 初始化 MetaStoreClientPool 
  this(new MetaStoreClientPool(0, 0));
}

# 方法

首先是针对缓存操作的方法,这里省略具体实现。

  • addDb
  • getDb
  • removeDb
  • getDbs
  • getTableNoThrow
  • getTable
  • getTableIfCachedNoThrow
  • getTableIfCached
  • removeTable
  • getTableNames
  • containsTable
  • addFunction
  • getFunction
  • removeFunction
  • containsFunction

然后是事务和锁。

  • getAcidUserId
  • openTransaction
  • lockTableStandalone
  • lockTableInTransaction
  • lockTableInternal
  • releaseTableLock

其他的都是些更简单的,这里不再看了。