MetaStoreClientPool 是 MetaStore 客户端的连接池。
# 前言
Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty
a new client is created and added to the pool. The idle pool can expand till a maximum
size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed.This default implementation reads the Hive metastore configuration from the HiveConf
object passed in the c'tor. If you are looking for a temporary HMS instance created
from scratch for unit tests, refer to EmbeddedMetastoreClientPool class. It mocks an
actual HMS by creating a temporary Derby backend database on the fly. It should not
be used for production Catalog server instances.
管理 RetryingMetaStoreClient 连接的池子。如果连接池是空的,新的连接会被创建加入池子。空闲池子大小可以扩展到 MAX_HMS_CONNECTION_POOL_SIZE,超过之后连接会被关闭。
默认实现从 HiveConf 中读 Hive MetaStore 配置。如果你需要一个临时 HMS 用于单测,使用 EmbeddedMetastoreClientPool。 它通过创建临时的内置数据库 mock 了一个真实的 HMS。它不能被用于生产环境。
# 常量
//cnxn 是 connection 的缩写,配置名称 | |
private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF = | |
"impala.catalog.metastore.cnxn.creation.delay.ms"; | |
// 默认值 | |
private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0; | |
// 池子最大值 | |
private static final int MAX_HMS_CONNECTION_POOL_SIZE = 32; | |
// HMS 连接创建时间间隔 | |
private final int clientCreationDelayMs_; | |
// 连接池 | |
private final ConcurrentLinkedQueue<MetaStoreClient> clientPool_ = | |
new ConcurrentLinkedQueue<MetaStoreClient>(); | |
// 池子是否关闭 | |
private Boolean poolClosed_ = false; | |
// 池子关闭的锁 | |
private final Object poolCloseLock_ = new Object(); | |
//hive 配置 | |
private final HiveConf hiveConf_; | |
// RetryingMetaStoreClient 需要的 | |
private static final HiveMetaHookLoader dummyHookLoader = new HiveMetaHookLoader() { | |
@Override | |
public HiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Table tbl) | |
throws MetaException { | |
return null; | |
} | |
}; |
# 构造函数
初始化池子时,需要设置池子初始大小和初始连接超时时间,这个时间只对第一个连接生效。
public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec) { | |
this(initialSize, initialCnxnTimeoutSec, new HiveConf(MetaStoreClientPool.class)); | |
} | |
public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec, | |
HiveConf hiveConf) { | |
hiveConf_ = hiveConf; | |
// 获取连接创建间隔时间 | |
clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF, | |
DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF); | |
initClients(initialSize, initialCnxnTimeoutSec); | |
} | |
public void initClients(int numClients, int initialCnxnTimeoutSec) { | |
// 初始化大小是 0 | |
Preconditions.checkState(clientPool_.size() == 0); | |
// 需要创建的 client 数 | |
if (numClients > 0) { | |
// 只有第一个连接具有超时时间 | |
clientPool_.add(new MetaStoreClient(hiveConf_, initialCnxnTimeoutSec)); | |
for (int i = 0; i < numClients - 1; ++i) { | |
clientPool_.add(new MetaStoreClient(hiveConf_, 0)); | |
} | |
} | |
} |
# 方法
# 获取连接
public MetaStoreClient getClient() { | |
if (Thread.currentThread().getContextClassLoader() == null) { | |
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); | |
} | |
// 从 list 中 poll 一个 | |
MetaStoreClient client = clientPool_.poll(); | |
// 池子是空的,创建一个 | |
if (client == null) { | |
synchronized (this) { | |
try { | |
Thread.sleep(clientCreationDelayMs_); | |
} catch (InterruptedException e) { | |
/* ignore */ | |
} | |
client = new MetaStoreClient(hiveConf_, 0); | |
} | |
} | |
// 标记已被使用 | |
client.markInUse(); | |
return client; | |
} |
# 关闭池子
public void close() { | |
// 同步 | |
synchronized (poolCloseLock_) { | |
if (poolClosed_) { return; } | |
// 标记被关闭 | |
poolClosed_ = true; | |
} | |
// 关闭每个连接 | |
MetaStoreClient client = null; | |
while ((client = clientPool_.poll()) != null) { | |
client.getHiveClient().close(); | |
} | |
} |
# 内部类
# MetaStoreClient
实现 AutoCloseable,需要关闭连接。
@Override | |
public void close() { | |
Preconditions.checkState(isInUse_); | |
isInUse_ = false; | |
// 同步 | |
synchronized (poolCloseLock_) { | |
// 池子关了或者满了销毁,否则放入池子 | |
if (poolClosed_ || clientPool_.size() >= MAX_HMS_CONNECTION_POOL_SIZE) { | |
hiveClient_.close(); | |
} else { | |
clientPool_.offer(this); | |
} | |
} | |
} |
# 变量
// MetaStore Client | |
private final IMetaStoreClient hiveClient_; | |
// 是否在使用 | |
private boolean isInUse_; |
# 构造函数
private MetaStoreClient(HiveConf hiveConf, int cnxnTimeoutSec) { | |
if (LOG.isTraceEnabled()) { | |
LOG.trace("Creating MetaStoreClient. Pool Size = " + clientPool_.size()); | |
} | |
// 获取连接创建间隔时间限制 | |
long retryDelaySeconds = hiveConf.getTimeVar( | |
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); | |
long retryDelayMillis = retryDelaySeconds * 1000; | |
// 超时时间 | |
long endTimeMillis = System.currentTimeMillis() + cnxnTimeoutSec * 1000; | |
IMetaStoreClient hiveClient = null; | |
while (true) { | |
try { | |
// 获取 MetaStore Client | |
hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader, | |
HiveMetaStoreClient.class.getName()); | |
break; | |
} catch (Exception e) { | |
// 超时了,抛异常 | |
long delayUntilMillis = System.currentTimeMillis() + retryDelayMillis; | |
if (delayUntilMillis >= endTimeMillis) throw new IllegalStateException(e); | |
LOG.warn("Failed to connect to Hive MetaStore. Retrying.", e); | |
// 重试 | |
while (delayUntilMillis > System.currentTimeMillis()) { | |
try { | |
Thread.sleep(delayUntilMillis - System.currentTimeMillis()); | |
} catch (InterruptedException | IllegalArgumentException ignore) {} | |
} | |
} | |
} | |
// 创建成功 | |
hiveClient_ = hiveClient; | |
// 初始没有被使用 | |
isInUse_ = false; | |
} |