CatalogServiceCatalog 管理所有 Catalog 元数据的加载以及处理 DDL 请求。它维护了一个全局版本,每当从 Catalog 中新增、修改或删除时,版本就会新增并分配给 CatalogObject。这意味这每一个 CatalogObject 将有一个单独的版本,被分配的版本会严格递增。
# 总览
CatalogServiceCatalog 定期收集增量更新(基于指定的版本号),构建要发送给 statestore 的主题更新。每一个 catalog 主题更新被一个范围版本号定义, CatalogServiceCatalog 保证每一个具有指定范围版本号的 CatalogObject 被包含在 catalog 主题更新中。当一个 主题更新被发送的时候,并发的 DDL 请求是允许的。因此,频繁修改 CatalogObject 可能导致跳过主题更新是可能的。这可能发生在一个主题更新线程尝试收集一个对象更新时,这个对象被另一个元数据操作修改,导致它的版本超过主题更新的目标版本。为了保证所有的更新最终都包含在主题更新中,我们持续跟踪每个 CatalogObject 跳过主题更新的次数,如果次数超过指定阈值,我们将 Catalog Object 加入到下一个主题更新中,即使它的版本比主题更新的目标版本更高。因此,相同版本的对象可能被发送到两个独立的主题更新中。
CatalogServiceCatalog 维护了两种日志:
- 删除日志。从缓存删了对象之后,缓存本身追踪删除已经没有用了。这个日志通过记录已经从 Catalog 中移除的 CatalogObject,在一个主题更新中填充删除列表是有用的。一个新版本的实体每次被删的时候都会被加入这个日志。增加一个对象的版本,并将它加入删除日志应该是自动的。当相关删除实体被加入主题更新时,一个实体通过主题更新线程被从这个日志移除。
- 主题更新日志。这个日志记录了关于 Catalog 主题更新中的 CatalogObject 的信息。只有处理主题更新的线程才能从日志中添加、更新或删除实体。所有其他操作只能读主题更新日志实体,但是不能修改。每一个实体包含 CatalogObject 跳过主题更新的次数,被最后发送到一个主题更新的对象版本,主题更新的版本。主题更新日志的实体,为了阻止日志无限增长,每隔 TOPIC_UPDATE_LOG_GC_FREQUENCY 个主题更新被主题更新处理线程垃圾回收。元数据操作使用 SYNC_DDL 检查这个日志确认 catalog 主题版本,确保 impalad 必须等待以便确保这个操作的影响被广播到其他的 coordinators。
SYNC_DDL 的一直异常:
基于时间清理处理的主题更新实体可能导致用了 SYNC_DDL 的元数据操作挂掉,当等待指定主题更新日志实体时。如果线程处理元数据操作长时间停滞(比处理 TOPIC_UPDATE_LOG_GC_FREQUENCY 主题更新时间更长)在操作在 catalog 缓存生效和 SYNC_DDL 版本被检查之间,这个是可能发生的。为了减少这种事件的可能性,我们给 TOPIC_UPDATE_LOG_GC_FREQUENCY 设置一个很大的值。另外,为了阻止元数据操作因为未知原因挂在这里,不允许使用 SYNC_DDL 的操作无限等待指定主题日志实体,如果达到指定的最大等待时间会抛出异常。查看 waitForSyncDdlVersion () 了解更多细节。
IncompleteTables 表元数据被 TableLoadingMgr 在后台加载,通过调用 prioritizeLoad () 表可以表优先加载。后台加载也可以开启,这种情况下遗失的表会被提交给 TableLoadingMgr,任何表元数据都会失效并在启动时加载。完全加载的表元数据就地更新,不会通过 TableLoadingMgr 触发后台元数据加载。访问一个没有被加载的表一经请求将会加载表的元数据,在表加载线程池之外。
查看 CatalogOpExecutor 的类注释,描述了锁协议应该被应用如果版本锁和表锁需要同时获取。
# 常量 & 变量
// MetaStore Client 资源池初始大小 | |
public static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10; | |
// 主题更新跳过的最大次数 | |
private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2; | |
// 跳过更新锁争夺的最大次数 | |
private final int maxSkippedUpdatesLockContention_; | |
// 主题更新表锁最大等待时间 | |
private final long topicUpdateTblLockMaxWaitTimeMs_; | |
// 获取表锁的超时时间 | |
public static final long LOCK_RETRY_TIMEOUT_MS = 7200000; | |
// 获取表锁的间隔时间 | |
private static final int LOCK_RETRY_DELAY_MS = 10; | |
// GetPartialCatalogObjectRequest 中 table id | |
public static final long TABLE_ID_UNAVAILABLE = -1; | |
//catalog 服务 id | |
private final TUniqueId catalogServiceId_; | |
// 版本锁,公平锁 | |
private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true); | |
//catalog 版本 | |
private long catalogVersion_ = INITIAL_CATALOG_VERSION; | |
// 上次执行 reset () 方法的 catalog 版本,被版本锁保护。 | |
private long lastResetStartVersion_ = INITIAL_CATALOG_VERSION; | |
// 负责后台表加载的调度 | |
private final TableLoadingMgr tableLoadingMgr_; | |
// 是否在后台加载 | |
private final boolean loadInBackground_; | |
// 定期轮询 HDFS 来获取最新的一组已知缓存池 | |
private final ScheduledExecutorService cachePoolReader_ = Executors.newScheduledThreadPool(1); | |
// 删除锁 | |
private final CatalogDeltaLog deleteLog_; | |
// 上次主题更新版本,其中的 catalog 版本 | |
private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1); | |
// 主题更新等待时间 | |
private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000; | |
// 主题更新日志 | |
private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog(); | |
// 本地库路径 | |
private final String localLibraryPath_; | |
//catalog 表失效器 | |
private CatalogdTableInvalidator catalogdTableInvalidator_; | |
// 负责 MetaStore 事件处理来标记表失效 | |
private ExternalEventsProcessor metastoreEventProcessor_; | |
// MetaStore server | |
private ICatalogMetastoreServer catalogMetastoreServer_; | |
// 同步到最新 MetaStore 事件的工厂 | |
private MetastoreEventFactory syncToLatestEventFactory_; | |
// 主题模式 | |
private static enum TopicMode { | |
// 发布完整的 catalog object | |
FULL, | |
// 发布完整的和简略的 catalog object | |
MIXED, | |
// 只发布简略的 catalog object | |
MINIMAL | |
}; | |
final TopicMode topicMode_; | |
// 局部获取 RPC 队列超时时间 | |
private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE.getCatalogPartialFetchRpcQueueTimeoutS(); | |
// 局部获取 RPC 最大并行度 | |
private final int MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT = BackendConfig.INSTANCE.getCatalogMaxParallelPartialFetchRpc(); | |
// 控制并发访问 doGetPartialCatalogObject 方法,控制并发数到 --catalog_max_parallel_partial_fetch_rpc | |
private final Semaphore partialObjectFetchAccess_ = new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true); | |
// 授权 | |
private AuthorizationManager authzManager_; | |
// 数据库黑名单 | |
private final Set<String> blacklistedDbs_; | |
// 表黑名单 | |
private final Set<TableName> blacklistedTables_; |
# 构造方法
构造方法没有做什么特殊的事情,这里就不再看了。
# 方法
# 获取锁
/** | |
* 表写锁,参数默认 | |
*/ | |
public boolean tryWriteLock(Table tbl) { | |
return tryLock(tbl, true, LOCK_RETRY_TIMEOUT_MS); | |
} | |
/** | |
* 多表写锁 | |
*/ | |
public boolean tryWriteLock(Table[] tables) { | |
StringBuilder tableInfo = new StringBuilder(); | |
int numTables = tables.length; | |
if (LOG.isDebugEnabled()) { | |
for(int i = 0; i < numTables; i++) { | |
tableInfo.append(tables[i].getFullName()); | |
if(i < numTables - 1) { | |
tableInfo.append(", "); | |
} | |
} | |
LOG.debug("Trying to acquire write locks for tables: " + | |
tableInfo); | |
} | |
int tableIndex=-1, versionLockCount = 0; | |
try { | |
for(tableIndex = 0; tableIndex < numTables; tableIndex++) { | |
Table tbl = tables[tableIndex]; | |
if (!tryWriteLock(tbl)) { | |
LOG.debug("Could not acquire write lock on table: " + tbl.getFullName()); | |
return false; | |
} | |
versionLockCount += 1; | |
} | |
// in case of success, release version write lock for all tables except last | |
if (tableIndex == numTables) { | |
versionLockCount = versionLockCount - 1; | |
} | |
return true; | |
} finally { | |
if (tableIndex != numTables) { | |
// couldn't acquire lock on all tables | |
StringBuilder tablesInfo = new StringBuilder(); | |
for(int i = 0; i < tableIndex; i++) { | |
tables[i].releaseWriteLock(); | |
tablesInfo.append(tables[i].getFullName() + ((i < tableIndex-1) ? ", " : "")); | |
} | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Released table write lock on tables: {}", tablesInfo); | |
} | |
} | |
LOG.debug("Unlocking versionLock_ write lock {} number of times", versionLockCount); | |
while (versionLockCount > 0) { | |
versionLock_.writeLock().unlock(); | |
versionLockCount = versionLockCount - 1; | |
} | |
} | |
} |