MetaStore 是 Hive 的元数据服务,本文会从使用和部署上分析,着重说明它的元数据格式。然后从 Impala 的角度,分析使用方式和元数据存储。

# MetaStore

# 整体架构

首先是最上层客户端,客户端可以是 Hive,也可以是 HiveMetastoreClient。
然后是服务端 HiveMetaStore,数据存储在仓库中,元数据存储在关系型数据库中,依赖下层操作元数据。
最底层就是元数据存储层,封装了 RawStore 来处理数据。

# 支持的数据库

元数据存储在关系型数据库中。

# 内嵌 Derby

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:derby:;databaseName=metastore_db;create=true</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>org.apache.derby.jdbc.EmbeddedDriver</value>
</property>
  • 主要用于单测
  • 只能有一个客户端连接
  • 使用内存版本,数据更改不会持久化

# 外部数据库

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://hybrid03.com.deploy.octopus-1659594140016-debugbox:3305/hive?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>/O_C_KfSxg</value>
</property>

官方支持 5 种:

  • MS SQL Server
  • MySQL
  • MariDB
  • Oracle
  • Postgres

# 运行模式

# 内嵌模式

<property>
  <name>hive.metastore.uris</name>
  <value></value>
</property>

value 配置为空就代表内嵌模式。
client 中直接启动 metastore 服务。只能有一个 client 连接,多个 client 需要内嵌多个。

# 远程模式

server:

<property>
  <name>metastore.thrift.port</name>
  <value>9083</value>
</property>

client:

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://localhost:9083</value>
</property>

server 启动 metastore 服务,上层通过 thrift 协议对外提供接口,如 createDatabase、getTable 等。
client 通过 thrift 协议访问 server 端口。
可以有多个 client 连接同时连接 server。

# 部署模式

# 内嵌模式


metastore 服务和数据库都内嵌在 Hive 中,同时只能有一个连接。

# 本地模式


metastore 内嵌在 Hive 中,数据库使用外部的。

# 远程模式


metastore 独立部署,数据库也使用外部的。Hive、Impala 等通过 thrift 协议访问 metastore 接口,同时能够支持多个连接。

# 高可用

# uris + selection

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://houst1:9083,thrift://houst2:9083,thrift://houst3:9083</value>
</property>
<property>
  <name>metastore.thrift.uri.selection</name>
  <value>RANDOM/SEQUENTIAL</value>
</property>

这种方式是多个 MetaStore 节点,轮询调用。

# zookeeper 动态服务发现

<property>
  <name>hive.metastore.uris</name>
  <value>thrift://houst1:9083,thrift://houst2:9083,thrift://houst3:9083</value>
</property>
<property>
  <name>hive.metastore.service.discovery.mode</name>
  <value>zookeeper</value>
</property>
<property>
  <name>hive.metastore.zookeeper.client.port</name>
  <value>2181</value>
</property>
<property>
  <name>hive.metastore.zookeeper.namespace</name>
  <value>/metastore</value>
</property>
<property>
  <name>hive.metastore.zookeeper.session.timeout</name>
  <value>15000</value>
</property>
<property>
  <name>hive.metastore.zookeeper.connection.timeout</name>
  <value>15000</value>
</property>
<property>
  <name>hive.metastore.zookeeper.connection.max.retries</name>
  <value>10</value>
</property>
<property>
  <name>hive.metastore.zookeeper.connection.basesleeptime</name>
  <value>1000</value>
</property>

这种方式就需要依赖 zk 获取 MetaStore 节点。

# 元数据格式

大致归为几类:

  • hive 版本
  • 数据库
  • 表和视图
  • 索引
  • 文件存储
  • 表字段
  • 表分区
  • 函数
  • 角色
  • 授权
  • 计数

# 版本

# Hive 版本信息(VERSION)

字段说明示例
VER_ID版本 ID1
SCHEMA_VERSIONHive 版本3.1.3000
VERSION_COMMENT版本说明Hive release version 3.1.3000

# CDH 版本信息(CDH_VERSION)

字段说明示例
VER_ID版本 ID1
SCHEMA_VERSIONHive 版本3.1.3000.7.2.15.0-Update4
VERSION_COMMENT版本说明Hive release version 3.1.3000 for CDH 7.2.15.0

# 数据库

# 数据库信息(DBS)

字段说明示例
DB_ID数据库 ID1
DESC描述Default Hive database
DB_LOCATION_URI外部表的路径hdfs://localhost:20500/test-warehouse
NAME名称default
OWNER_NAME所有者名称public
OWNER_TYPE所有者类型ROLE
CTLG_NAME目录名称hive
CREATE_TIME创建时间1662909881
DB_MANAGED_LOCATION_URI内部表的路径NULL

# 数据库属性信息(DATABASE_PARAMS)

字段说明示例
DB_ID数据库 ID1
PARAM_KEY参数名impala_registered_function_get_exts_version()
PARAM_VALUE参数值HBgHcmF3ZGF0YRgQZ2V0X2V4dHNfdmVyc2lvbgAVBBkMHBkcFQAcFRgAAAASKBJnZXRfZXh0c192ZXJzaW9uKCkYLWhkZnM6Ly9uYW1lc2VydmljZTAxL3NhL2V4dHMvbGliaW1wYWxhRXh0cy5zbxwYMl9aMTRHZXRFeHRzVmVyc2lvblBOMTBpbXBhbGFfdWRmMTVGdW5jdGlvbkNvbnRleHRFACEA

# 表和视图

# 表信息(TBLS)

字段说明示例
TBL_ID表 ID1
CREATE_TIME创建时间1662909958
DB_ID数据库 ID1
LAST_ACCESS_TIME上次访问时间0
OWNER所有者用户名impala
OWNER_TYPE所有者角色USER
RETENTION保留字段0
SD_ID存储信息 ID1
TBL_NAME表名称workload_query
TBL_TYPE表类型EXTERNAL_TABLE
VIEW_EXPANDED_TEXT视图的详细 HQL 语句NULL
VIEW_ORIGINAL_TEXT视图的原始 HQL 语句NULL
IS_REWRITE_ENABLED是否可以重写FALSE
WRITE_ID重写 ID0

# 表属性信息(TABLE_PARAMS)

字段说明示例
TBL_ID表 ID1
PARAM_KEY参数名EXTERNAL
PARAM_VALUE参数值TRUE

# 文件存储信息

# 数据表信息(CDS)

字段说明示例
CD_ID数据表 ID1

# 存储信息(SDS)

字段说明示例
SD_ID存储信息 ID1
CD_ID数据表 ID1
INPUT_FORMAT文件输入格式org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat
IS_COMPRESSED是否压缩0
IS_STOREDASSUBDIRECTORIES是否以子目录存储0
LOCATION路径hdfs://nameservice01/user/impala/workload_query
NUM_BUCKETS分桶数量0
OUTPUT_FORMAT文件输出格式org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat
SERDE_ID序列化 ID1

# 存储属性信息(SD_PARAMS)

字段说明示例
SD_ID存储信息 ID1
PARAM_KEY属性名
PARAM_VALUE属性值

# 序列化类信息(SERDES)

字段说明示例
SERDE_ID序列化类配置 ID1
NAME序列化类配置名称NULL
SLIB序列化类org.apache.hadoop.hive.serde2.avro.AvroSerDe
DESCRIPTION描述NULL
SERIALIZER_CLASS序列化类NULL
DESERIALIZER_CLASS反序列化类NULL
SERDE_TYPE序列化类型0

# 序列化类属性信息(SERDE_PARAMS)

字段说明示例
SERDE_ID序列化类配置 ID1
PARAM_KEY属性名field.delim
PARAM_VALUE属性值

# 表字段

# 字段信息(COLUMNS_V2)

字段说明示例
CD_ID数据表 ID1
COMMENT备注from deserializer
COLUMN_NAME字段名称acc_his_est_mem
TYPE_NAME字段类型string
INTEGER_IDX顺序11

# 排序字段信息(SORT_COLS)

字段说明示例
SD_ID存储信息 ID1
COLUMN_NAME字段名称event_id
ORDER排序方式1
INTEGER_IDX顺序11

# 表分区

# 分桶字段信息(BUCKETING_COLS)

字段说明示例
SD_ID存储信息 ID1
BUCKET_COL_NAME分桶字段名称event_id
INTEGER_IDX顺序11

# 分区信息(PARTITIONS)

字段说明示例
PART_ID分区 ID1
CREATE_TIME创建时间1662909964
LAST_ACCESS_TIME上次访问时间0
PART_NAME分区名称day=19246
SD_ID存储信息 ID3
TBL_ID表 ID1
WRITE_ID重写 ID0

# 分区字段信息(PARTITION_KEYS)

字段说明示例
TBL_ID表 ID1
PKEY_COMMENT分区字段备注NULL
PKEY_NAME分区字段名称day
PKEY_TYPE分区字段类型bigint
INTEGER_IDX顺序0

# 分区字段值信息(PARTITION_KEY_VALS)

字段说明示例
PART_ID分区 ID1
PART_KEY_VAL分区字段值19246
INTEGER_IDX顺序0

# 分区属性信息(PARTITION_PARAMS)

字段说明示例
PART_ID分区 ID1
PARAM_KEY属性名transient_lastDdlTime
PARAM_VALUE属性值1662909964

# 函数

# 函数信息(FUNCS)

字段说明示例
FUNC_ID函数 ID1
CLASS_NAME类名称com.sensorsdata.impala.udf.Hash
CREATE_TIME创建时间1436320455
DB_ID数据库 ID1
FUNC_NAME函数名称hash
FUNC_TYPE函数类型1
OWNER_NAME所有者名称NULL
OWNER_TYPE所有者类型USER

# 函数规则信息(FUNC_RU)

字段说明示例
FUNC_ID函数 ID1
RESOURCE_TYPE资源类型1
RESOURCE_URI资源路径hdfs://nameservice01/user/impala/hash.jar
INTEGER_IDX顺序0

# 角色

# 角色信息(ROLES)

字段说明示例
ROLE_ID角色 ID1
CREATE_TIME创建时间1662909881
OWNER_NAME所有者名称admin
ROLE_NAME角色名称admin

# 角色映射信息(ROLE_MAP)

字段说明示例
ROLE_GRANT_ID角色授权 ID1
ADD_TIME创建时间1662909881
GRANT_OPTION开启或禁用授权0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
ROLE_ID角色 ID1

# 授权

# 全局授权信息(GLOBAL_PRIVS)

字段说明示例
USER_GRANT_ID用户授权 ID1
CREATE_TIME授权时间1436320455
GRANT_OPTION开启或者禁用权限1
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
USER_PRIV用户权限All
AUTHORIZER授权人v1

# 数据库授权信息(DB_PRIV)

字段说明示例
DB_GRANT_ID数据库授权 ID1
CREATE_TIME授权时间1436320455
DB_ID数据库 ID1
GRANT_OPTION开启或者禁用权限0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
DB_PRIV数据库权限Create
AUTHORIZER授权人v1

# 表授权信息(TBL_PRIVS)

字段说明示例
TBL_GRANT_ID表授权 ID1
CREATE_TIME授权时间1436320455
GRANT_OPTION开启或者禁用权限0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
TBL_PRIV表权限Select、Alter
TBL_ID表 ID1
AUTHORIZER授权人v1

# 表字段授权信息(TBL_COL_PRIVS)

字段说明示例
TBL_COLUMN_GRANT_ID表字段授权 ID1
COLUMN_NAME字段名称event_id
CREATE_TIME授权时间1436320455
GRANT_OPTION开启或者禁用权限0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
TBL_COL_PRIV表权限Add
TBL_ID表 ID1
AUTHORIZER授权人v1

# 分区字段授权信息(PART_COL_PRIVS)

字段说明示例
PART_COLUMN_GRANT_ID分区字段授权 ID1
COLUMN_NAME字段名称event_id
CREATE_TIME授权时间1436320455
GRANT_OPTION开启或者禁用权限0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PART_ID分区 ID1
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
TBL_COL_PRIV表权限Add
AUTHORIZER授权人v1

# 分区授权信息(PART_PRIVS)

字段说明示例
PART_GRANT_ID分区授权 ID1
CREATE_TIME授权时间1436320455
GRANT_OPTION开启或者禁用权限0
GRANTOR授权者root
GRANTOR_TYPE授权者类型USER
PART_ID分区 ID1
PRINCIPAL_NAME被授权者名称sa_cluster
PRINCIPAL_TYPE被授权者类型USER
TBL_COL_PRIV表权限Add
AUTHORIZER授权人v1

# 例子

# CREATE DATABASE

CREATE DATABASE test COMMENT 'test database' LOCATION 'hdfs://localhost:20500/test' MANAGEDLOCATION 'hdfs://localhost:20500/test/managed';

DBS:

DB_IDDESCDB_LOCATION_URINAMEOWNER_NAMEOWNER_TYPECTLG_NAMECREATE_TIMEDB_MANAGED_LOCATION_URI
6test databasehdfs://localhost:20500/testtestimpalaUSERhive1677635246hdfs://localhost:20500/test/managed

# CREATE TABLE

CREATE TABLE test.test (a INT COMMENT 'column') PARTITIONED BY (day INT COMMENT 'day column') SORT BY (a) COMMENT 'table_comment' ROW FORMAT DELIMITED WITH SERDEPROPERTIES ('key1' = 'value1') STORED AS TEXTFILE LOCATION 'hdfs://localhost:20500/test/test' TBLPROPERTIES ('key1' = 'value1');

TBLS:

TBL_IDCREATE_TIMEDB_IDLAST_ACCESS_TIMEOWNEROWNER_TYPERETENTIONSD_IDTBL_NAMETBL_TYPEVIEW_EXPANDED_TEXTVIEW_ORIGINAL_TEXTIS_REWRITE_ENABLEDWRITE_ID
6167763669560impalaUSER06testEXTERNAL_TABLEf0

TABLE_PARAMS:

TBL_IDPARAM_KEYPARAM_VALUE
6TRANSLATED_TO_EXTERNALTRUE
6sort.columnsa
6key1value1
6transient_lastDdlTime1677636695
6commenttable_comment
6OBJCAPABILITIESEXTREAD,EXTWRITE
6external.table.purgeTRUE
6sort.orderLEXICAL
6EXTERNALTRUE

SDS:

SD_IDINPUT_FORMATIS_COMPRESSEDLOCATIONNUM_BUCKETSOUTPUT_FORMATSERDE_IDCD_IDIS_STOREDASSUBDIRECTORIES
6org.apache.hadoop.mapred.TextInputFormatfhdfs://localhost:20500/test/test0org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat66f

SERDES:

SERDE_IDNAMESLIBDESCRIPTIONSERIALIZER_CLASSDESERIALIZER_CLASSSERDE_TYPE
6org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe0

SERDE_PARAMS:

SERDE_IDPARAM_KEYPARAM_VALUE
6key1value1

CDS:

CD_ID
6

COLUMNS_V2:

CD_IDCOMMENTCOLUMN_NAMETYPE_NAMEINTEGER_IDX
6columnaint0

PARTITION_KEYS:

TBL_IDPKEY_COMMENTPKEY_NAMEPKEY_TYPEINTEGER_IDX
6day columndayint0

# 从源码角度分析创建表过程

# client

IMetaStoreClient 定义了 hive metastore thrift api。

IMetaStoreClient.java
void createTable(Table tbl) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException;

HiveMetaStoreClient 根据 metastore.thrift.uris 配置,选择连接方式。
未配置 uris,走内嵌模式,直接构造一个 HMSHandler。

HiveMetaStoreClient.java
if (localMetaStore) {
  if (!allowEmbedded) {
    throw new MetaException("Embedded metastore is not allowed here. Please configure "
        + ConfVars.THRIFT_URIS.toString() + "; it is currently set to [" + msUri + "]");
  }
 
  client = callEmbeddedMetastore(this.conf);
 
  // instantiate the metastore server handler directly instead of connecting
  // through the network
  isConnected = true;
  snapshotActiveConf();
  return;
}

远程模式的话,开启客户端连接,根据 uris 配置或 zk 服务发现选择连接地址。

HiveMetaStoreClient.java
open();

客户端调用创建表接口。HiveMetaHook 定义了一些钩子,创建表相关的如:preCreateTable、commitCreateTable、rollbackCreateTable。

HiveMetaStoreClient.java
@Override
public void createTable(CreateTableRequest request) throws
        InvalidObjectException, MetaException, NoSuchObjectException, TException {
  Table tbl = request.getTable();
  if (!tbl.isSetCatName()) {
    tbl.setCatName(getDefaultCatalog(conf));
  }
 
  if (processorCapabilities != null) {
    request.setProcessorCapabilities(new ArrayList<String>(Arrays.asList(processorCapabilities)));
    request.setProcessorIdentifier(processorIdentifier);
  }
 
  HiveMetaHook hook = getHook(tbl);
  if (hook != null) {
    hook.preCreateTable(tbl);
  }
  boolean success = false;
  try {
    // Subclasses can override this step (for example, for temporary tables)
    if (hook == null || !hook.createHMSTableInHook()) {
      create_table(request);
    }
    if (hook != null) {
      hook.commitCreateTable(tbl);
    }
    success = true;
  } finally {
    if (!success && (hook != null)) {
      try {
        hook.rollbackCreateTable(tbl);
      } catch (Exception e) {
        LOG.error("Create rollback failed with", e);
      }
    }
  }
}

create_table 发起 rpc 调用。

HiveMetastoreClient.java
protected void create_table(CreateTableRequest request) throws
    InvalidObjectException, MetaException, NoSuchObjectException, TException {
  client.create_table_req(request);
}

# server

通过 HiveMetastore 启动 server。

HiveMetaStore.java
public static void main(String[] args) throws Throwable {
  ...
  startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf, true, null);
  ...
}

启动 thrift server 和 metastore 服务。

HiveMetaStore.java
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
    Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
  isMetaStoreRemote = true;
  String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary");
  boolean isHttpTransport = transportMode.equalsIgnoreCase("http");
  if (isHttpTransport) {
    thriftServer = startHttpMetastore(port, conf);
  } else {
    thriftServer = startBinaryMetastore(port, bridge, conf);
  }
 
  logCompactionParameters(conf);
 
  boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
  LOG.info("Direct SQL optimization = {}",  directSqlEnabled);
 
  if (startMetaStoreThreads) {
    Lock metaStoreThreadsLock = new ReentrantLock();
    Condition startCondition = metaStoreThreadsLock.newCondition();
    AtomicBoolean startedServing = new AtomicBoolean();
    startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing,
        isMetaStoreHousekeepingLeader(conf), startedBackgroundThreads);
    signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, startCondition, startedServing);
  }
 
  // If dynamic service discovery through ZooKeeper is enabled, add this server to the ZooKeeper.
  if (MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE)
          .equalsIgnoreCase("zookeeper")) {
    try {
      zooKeeperHelper = MetastoreConf.getZKConfig(conf);
      String serverInstanceURI = getServerInstanceURI(port);
      zooKeeperHelper.addServerInstanceToZooKeeper(serverInstanceURI, serverInstanceURI, null,
          new ZKDeRegisterWatcher(zooKeeperHelper));
      LOG.info("Metastore server instance with URL " + serverInstanceURI + " added to " +
          "the zookeeper");
    } catch (Exception e) {
      LOG.error("Error adding this metastore instance to ZooKeeper: ", e);
      throw e;
    }
  }
 
  thriftServer.start();
}

以二进制协议为例,这里省略了 thrift server 启动的其他细节。其实就是包装了 IHMSHandler。

HiveMetaStore.java
private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridge bridge,
    Configuration conf) throws Throwable {
  ...
  TProcessor processor;
  ...
  IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
  TServerSocket serverSocket;
  if (useSasl) {
    processor = saslServer.wrapProcessor(
        new ThriftHiveMetastore.Processor<>(handler));
    LOG.info("Starting DB backed MetaStore Server in Secure Mode");
  } else {
    // we are in unsecure mode.
    if (MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
      processor = new TUGIBasedProcessor<>(handler);
      LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
    } else {
      processor = new TSetIpAddressProcessor<>(handler);
      LOG.info("Starting DB backed MetaStore Server");
    }
  }
  ...
 
  TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
      .processor(processor)
      .transportFactory(transFactory)
      .protocolFactory(protocolFactory)
      .inputProtocolFactory(inputProtoFactory)
      .executorService(executorService);
 
  TServer tServer = new TThreadPoolServer(args);
  ...
}

IHMSHandler 定义了 thrift server 接口,HMSHandler 进行了实现。

HMSHandler.java
@Override
public void create_table_req(final CreateTableRequest req)
    throws AlreadyExistsException, MetaException, InvalidObjectException,
    InvalidInputException {
  Table tbl = req.getTable();
  startFunction("create_table_req", ": " + tbl.toString());
  boolean success = false;
  Exception ex = null;
  try {
    create_table_core(getMS(), req);
    success = true;
  } catch (Exception e) {
    LOG.warn("create_table_req got ", e);
    ex = e;
    throw handleException(e).throwIfInstance(MetaException.class, InvalidObjectException.class)
        .throwIfInstance(AlreadyExistsException.class, InvalidInputException.class)
        .convertIfInstance(NoSuchObjectException.class, InvalidObjectException.class)
        .defaultMetaException();
  } finally {
    endFunction("create_table_req", success, ex, tbl.getTableName());
  }
}

通过 getMS () 获取 RawStore 来操作数据库,RawStore 定义了数据库操作的接口,ObjectStore 和 CachedStore 对其进行了实现,具体使用哪个根据配置来通过反射获取。CachedStore 默认是在 ObjectStore 之上加了一层缓存。

RawStore.java
void createTable(Table tbl) throws InvalidObjectException, MetaException;

这里截取了 create_table_core 的关键代码,依赖 WareHouse 创建仓库路径,依赖 RawStore 进行数据库操作,通过 Listener 在创建数据过程中进行事件监听。

HMSHandler.java
private void create_table_core(final RawStore ms, final CreateTableRequest req)
    throws AlreadyExistsException, MetaException,
    InvalidObjectException, NoSuchObjectException, InvalidInputException {
 
  ms.openTransaction();
  firePreEvent(new PreCreateTableEvent(tbl, db, this));
  try {
    wh.mkdirs(tblPath);
    ms.createTable(tbl);
    success = ms.commitTransaction();
  } finally {
    if (!success) {
      ms.rollbackTransaction();
    }
    if (!listeners.isEmpty()) {
      MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE,
          new CreateTableEvent(tbl, success, this, isReplicated), envContext,
          transactionalListenerResponses, ms);
      if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY,
            new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), success, this), envContext);
      }
      if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_FOREIGNKEY,
            new AddForeignKeyEvent(constraints.getForeignKeys(), success, this), envContext);
      }
      if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_UNIQUECONSTRAINT,
            new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), success, this), envContext);
      }
      if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_NOTNULLCONSTRAINT,
            new AddNotNullConstraintEvent(constraints.getNotNullConstraints(), success, this), envContext);
      }
      if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_DEFAULTCONSTRAINT,
            new AddDefaultConstraintEvent(constraints.getDefaultConstraints(), success, this), envContext);
      }
      if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) {
        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_CHECKCONSTRAINT,
            new AddCheckConstraintEvent(constraints.getCheckConstraints(), success, this), envContext);
      }
    }
  }
}

# metastore on impala

# 依赖方式

impala 中实现了两个连接池:MetaStoreClientPool 和 EmbeddedMetastoreClientPool。
MetaStoreClientPool 使用可重试的 HiveMetaStoreClient,根据 hive-site.xml 中配置使用对应的连接方式。
EmbeddedMetastoreClientPool 主要用于单测,使用内嵌模式。由于内嵌 Derby 的连接限制,连接池中只有一个连接。

# 自定义场景

CDH 环境中,独立部署了 hive metastore,所有应用通过远程方式访问。
mothership 环境中,没有部署 hive metastore,impala 通过本地方式访问;同时,catalogd 启动了一个带有 rpc 端口的 metastore,Flink 和 Spark 通过远程方式访问。