MetaStoreClientPool 是 MetaStore 客户端的连接池。

# 前言

Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty
a new client is created and added to the pool. The idle pool can expand till a maximum
size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed.

This default implementation reads the Hive metastore configuration from the HiveConf
object passed in the c'tor. If you are looking for a temporary HMS instance created
from scratch for unit tests, refer to EmbeddedMetastoreClientPool class. It mocks an
actual HMS by creating a temporary Derby backend database on the fly. It should not
be used for production Catalog server instances.

管理 RetryingMetaStoreClient 连接的池子。如果连接池是空的,新的连接会被创建加入池子。空闲池子大小可以扩展到 MAX_HMS_CONNECTION_POOL_SIZE,超过之后连接会被关闭。

默认实现从 HiveConf 中读 Hive MetaStore 配置。如果你需要一个临时 HMS 用于单测,使用 EmbeddedMetastoreClientPool。 它通过创建临时的内置数据库 mock 了一个真实的 HMS。它不能被用于生产环境。

# 常量

//cnxn 是 connection 的缩写,配置名称
private static final String HIVE_METASTORE_CNXN_DELAY_MS_CONF =
    "impala.catalog.metastore.cnxn.creation.delay.ms";
// 默认值
private static final int DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF = 0;
// 池子最大值
private static final int MAX_HMS_CONNECTION_POOL_SIZE = 32;
// HMS 连接创建时间间隔
private final int clientCreationDelayMs_;
// 连接池
private final ConcurrentLinkedQueue<MetaStoreClient> clientPool_ =
    new ConcurrentLinkedQueue<MetaStoreClient>();
// 池子是否关闭
private Boolean poolClosed_ = false;
// 池子关闭的锁
private final Object poolCloseLock_ = new Object();
//hive 配置
private final HiveConf hiveConf_;
// RetryingMetaStoreClient 需要的
private static final HiveMetaHookLoader dummyHookLoader = new HiveMetaHookLoader() {
  @Override
  public HiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Table tbl)
      throws MetaException {
    return null;
  }
};

# 构造函数

初始化池子时,需要设置池子初始大小和初始连接超时时间,这个时间只对第一个连接生效。

public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec) {
  this(initialSize, initialCnxnTimeoutSec, new HiveConf(MetaStoreClientPool.class));
}
public MetaStoreClientPool(int initialSize, int initialCnxnTimeoutSec,
    HiveConf hiveConf) {
  hiveConf_ = hiveConf;
  // 获取连接创建间隔时间
  clientCreationDelayMs_ = hiveConf_.getInt(HIVE_METASTORE_CNXN_DELAY_MS_CONF,
      DEFAULT_HIVE_METASTORE_CNXN_DELAY_MS_CONF);
  initClients(initialSize, initialCnxnTimeoutSec);
}
public void initClients(int numClients, int initialCnxnTimeoutSec) {
  // 初始化大小是 0
  Preconditions.checkState(clientPool_.size() == 0);
  // 需要创建的 client 数
  if (numClients > 0) {
    // 只有第一个连接具有超时时间
    clientPool_.add(new MetaStoreClient(hiveConf_, initialCnxnTimeoutSec));
    for (int i = 0; i < numClients - 1; ++i) {
      clientPool_.add(new MetaStoreClient(hiveConf_, 0));
    }
  }
}

# 方法

# 获取连接

public MetaStoreClient getClient() {
  if (Thread.currentThread().getContextClassLoader() == null) {
    Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
  }
  // 从 list 中 poll 一个
  MetaStoreClient client = clientPool_.poll();
  // 池子是空的,创建一个
  if (client == null) {
    synchronized (this) {
      try {
        Thread.sleep(clientCreationDelayMs_);
      } catch (InterruptedException e) {
        /* ignore */
      }
      client = new MetaStoreClient(hiveConf_, 0);
    }
  }
  // 标记已被使用
  client.markInUse();
  return client;
}

# 关闭池子

public void close() {
  // 同步
  synchronized (poolCloseLock_) {
    if (poolClosed_) { return; }
    // 标记被关闭
    poolClosed_ = true;
  }
  // 关闭每个连接
  MetaStoreClient client = null;
  while ((client = clientPool_.poll()) != null) {
    client.getHiveClient().close();
  }
}

# 内部类

# MetaStoreClient

实现 AutoCloseable,需要关闭连接。

@Override
public void close() {
  Preconditions.checkState(isInUse_);
  isInUse_ = false;
  // 同步
  synchronized (poolCloseLock_) {
    // 池子关了或者满了销毁,否则放入池子
    if (poolClosed_ || clientPool_.size() >= MAX_HMS_CONNECTION_POOL_SIZE) {
      hiveClient_.close();
    } else {
      clientPool_.offer(this);
    }
  }
}

# 变量

// MetaStore Client
private final IMetaStoreClient hiveClient_;
// 是否在使用
private boolean isInUse_;

# 构造函数

private MetaStoreClient(HiveConf hiveConf, int cnxnTimeoutSec) {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Creating MetaStoreClient. Pool Size = " + clientPool_.size());
  }
  // 获取连接创建间隔时间限制
  long retryDelaySeconds = hiveConf.getTimeVar(
      HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
  long retryDelayMillis = retryDelaySeconds * 1000;
  // 超时时间
  long endTimeMillis = System.currentTimeMillis() + cnxnTimeoutSec * 1000;
  IMetaStoreClient hiveClient = null;
  while (true) {
    try {
      // 获取 MetaStore Client
      hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, dummyHookLoader,
          HiveMetaStoreClient.class.getName());
      break;
    } catch (Exception e) {
      // 超时了,抛异常
      long delayUntilMillis = System.currentTimeMillis() + retryDelayMillis;
      if (delayUntilMillis >= endTimeMillis) throw new IllegalStateException(e);
      LOG.warn("Failed to connect to Hive MetaStore. Retrying.", e);
      // 重试
      while (delayUntilMillis > System.currentTimeMillis()) {
        try {
          Thread.sleep(delayUntilMillis - System.currentTimeMillis());
        } catch (InterruptedException | IllegalArgumentException ignore) {}
      }
    }
  }
  // 创建成功
  hiveClient_ = hiveClient;
  // 初始没有被使用
  isInUse_ = false;
}