MetaStore 是 Hive 的元数据服务,本文会从使用和部署上分析,着重说明它的元数据格式。然后从 Impala 的角度,分析使用方式和元数据存储。
# 整体架构
首先是最上层客户端,客户端可以是 Hive,也可以是 HiveMetastoreClient。
然后是服务端 HiveMetaStore,数据存储在仓库中,元数据存储在关系型数据库中,依赖下层操作元数据。
最底层就是元数据存储层,封装了 RawStore 来处理数据。
# 支持的数据库
元数据存储在关系型数据库中。
# 内嵌 Derby
| <property> |
| <name>javax.jdo.option.ConnectionURL</name> |
| <value>jdbc:derby:;databaseName=metastore_db;create=true</value> |
| </property> |
| <property> |
| <name>javax.jdo.option.ConnectionDriverName</name> |
| <value>org.apache.derby.jdbc.EmbeddedDriver</value> |
| </property> |
- 主要用于单测
- 只能有一个客户端连接
- 使用内存版本,数据更改不会持久化
# 外部数据库
| <property> |
| <name>javax.jdo.option.ConnectionURL</name> |
| <value>jdbc:mysql://hybrid03.com.deploy.octopus-1659594140016-debugbox:3305/hive?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true</value> |
| </property> |
| <property> |
| <name>javax.jdo.option.ConnectionDriverName</name> |
| <value>com.mysql.jdbc.Driver</value> |
| </property> |
| <property> |
| <name>javax.jdo.option.ConnectionUserName</name> |
| <value>hive</value> |
| </property> |
| <property> |
| <name>javax.jdo.option.ConnectionPassword</name> |
| <value>/O_C_KfSxg</value> |
| </property> |
官方支持 5 种:
- MS SQL Server
- MySQL
- MariDB
- Oracle
- Postgres
# 运行模式
# 内嵌模式
| <property> |
| <name>hive.metastore.uris</name> |
| <value></value> |
| </property> |
value 配置为空就代表内嵌模式。
client 中直接启动 metastore 服务。只能有一个 client 连接,多个 client 需要内嵌多个。
# 远程模式
server:
| <property> |
| <name>metastore.thrift.port</name> |
| <value>9083</value> |
| </property> |
client:
| <property> |
| <name>hive.metastore.uris</name> |
| <value>thrift://localhost:9083</value> |
| </property> |
server 启动 metastore 服务,上层通过 thrift 协议对外提供接口,如 createDatabase、getTable 等。
client 通过 thrift 协议访问 server 端口。
可以有多个 client 连接同时连接 server。
# 部署模式
# 内嵌模式
metastore 服务和数据库都内嵌在 Hive 中,同时只能有一个连接。
# 本地模式
metastore 内嵌在 Hive 中,数据库使用外部的。
# 远程模式
metastore 独立部署,数据库也使用外部的。Hive、Impala 等通过 thrift 协议访问 metastore 接口,同时能够支持多个连接。
# 高可用
# uris + selection
| <property> |
| <name>hive.metastore.uris</name> |
| <value>thrift://houst1:9083,thrift://houst2:9083,thrift://houst3:9083</value> |
| </property> |
| <property> |
| <name>metastore.thrift.uri.selection</name> |
| <value>RANDOM/SEQUENTIAL</value> |
| </property> |
这种方式是多个 MetaStore 节点,轮询调用。
# zookeeper 动态服务发现
| <property> |
| <name>hive.metastore.uris</name> |
| <value>thrift://houst1:9083,thrift://houst2:9083,thrift://houst3:9083</value> |
| </property> |
| <property> |
| <name>hive.metastore.service.discovery.mode</name> |
| <value>zookeeper</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.client.port</name> |
| <value>2181</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.namespace</name> |
| <value>/metastore</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.session.timeout</name> |
| <value>15000</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.connection.timeout</name> |
| <value>15000</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.connection.max.retries</name> |
| <value>10</value> |
| </property> |
| <property> |
| <name>hive.metastore.zookeeper.connection.basesleeptime</name> |
| <value>1000</value> |
| </property> |
这种方式就需要依赖 zk 获取 MetaStore 节点。
# 元数据格式
大致归为几类:
- hive 版本
- 数据库
- 表和视图
- 索引
- 文件存储
- 表字段
- 表分区
- 函数
- 角色
- 授权
- 计数
# 版本
# Hive 版本信息(VERSION)
字段 | 说明 | 示例 |
---|
VER_ID | 版本 ID | 1 |
SCHEMA_VERSION | Hive 版本 | 3.1.3000 |
VERSION_COMMENT | 版本说明 | Hive release version 3.1.3000 |
# CDH 版本信息(CDH_VERSION)
字段 | 说明 | 示例 |
---|
VER_ID | 版本 ID | 1 |
SCHEMA_VERSION | Hive 版本 | 3.1.3000.7.2.15.0-Update4 |
VERSION_COMMENT | 版本说明 | Hive release version 3.1.3000 for CDH 7.2.15.0 |
# 数据库
# 数据库信息(DBS)
字段 | 说明 | 示例 |
---|
DB_ID | 数据库 ID | 1 |
DESC | 描述 | Default Hive database |
DB_LOCATION_URI | 外部表的路径 | hdfs://localhost:20500/test-warehouse |
NAME | 名称 | default |
OWNER_NAME | 所有者名称 | public |
OWNER_TYPE | 所有者类型 | ROLE |
CTLG_NAME | 目录名称 | hive |
CREATE_TIME | 创建时间 | 1662909881 |
DB_MANAGED_LOCATION_URI | 内部表的路径 | NULL |
# 数据库属性信息(DATABASE_PARAMS)
字段 | 说明 | 示例 |
---|
DB_ID | 数据库 ID | 1 |
PARAM_KEY | 参数名 | impala_registered_function_get_exts_version() |
PARAM_VALUE | 参数值 | HBgHcmF3ZGF0YRgQZ2V0X2V4dHNfdmVyc2lvbgAVBBkMHBkcFQAcFRgAAAASKBJnZXRfZXh0c192ZXJzaW9uKCkYLWhkZnM6Ly9uYW1lc2VydmljZTAxL3NhL2V4dHMvbGliaW1wYWxhRXh0cy5zbxwYMl9aMTRHZXRFeHRzVmVyc2lvblBOMTBpbXBhbGFfdWRmMTVGdW5jdGlvbkNvbnRleHRFACEA |
# 表和视图
# 表信息(TBLS)
字段 | 说明 | 示例 |
---|
TBL_ID | 表 ID | 1 |
CREATE_TIME | 创建时间 | 1662909958 |
DB_ID | 数据库 ID | 1 |
LAST_ACCESS_TIME | 上次访问时间 | 0 |
OWNER | 所有者用户名 | impala |
OWNER_TYPE | 所有者角色 | USER |
RETENTION | 保留字段 | 0 |
SD_ID | 存储信息 ID | 1 |
TBL_NAME | 表名称 | workload_query |
TBL_TYPE | 表类型 | EXTERNAL_TABLE |
VIEW_EXPANDED_TEXT | 视图的详细 HQL 语句 | NULL |
VIEW_ORIGINAL_TEXT | 视图的原始 HQL 语句 | NULL |
IS_REWRITE_ENABLED | 是否可以重写 | FALSE |
WRITE_ID | 重写 ID | 0 |
# 表属性信息(TABLE_PARAMS)
字段 | 说明 | 示例 |
---|
TBL_ID | 表 ID | 1 |
PARAM_KEY | 参数名 | EXTERNAL |
PARAM_VALUE | 参数值 | TRUE |
# 文件存储信息
# 数据表信息(CDS)
# 存储信息(SDS)
字段 | 说明 | 示例 |
---|
SD_ID | 存储信息 ID | 1 |
CD_ID | 数据表 ID | 1 |
INPUT_FORMAT | 文件输入格式 | org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat |
IS_COMPRESSED | 是否压缩 | 0 |
IS_STOREDASSUBDIRECTORIES | 是否以子目录存储 | 0 |
LOCATION | 路径 | hdfs://nameservice01/user/impala/workload_query |
NUM_BUCKETS | 分桶数量 | 0 |
OUTPUT_FORMAT | 文件输出格式 | org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat |
SERDE_ID | 序列化 ID | 1 |
# 存储属性信息(SD_PARAMS)
字段 | 说明 | 示例 |
---|
SD_ID | 存储信息 ID | 1 |
PARAM_KEY | 属性名 | |
PARAM_VALUE | 属性值 | |
# 序列化类信息(SERDES)
字段 | 说明 | 示例 |
---|
SERDE_ID | 序列化类配置 ID | 1 |
NAME | 序列化类配置名称 | NULL |
SLIB | 序列化类 | org.apache.hadoop.hive.serde2.avro.AvroSerDe |
DESCRIPTION | 描述 | NULL |
SERIALIZER_CLASS | 序列化类 | NULL |
DESERIALIZER_CLASS | 反序列化类 | NULL |
SERDE_TYPE | 序列化类型 | 0 |
# 序列化类属性信息(SERDE_PARAMS)
字段 | 说明 | 示例 |
---|
SERDE_ID | 序列化类配置 ID | 1 |
PARAM_KEY | 属性名 | field.delim |
PARAM_VALUE | 属性值 | | |
# 表字段
# 字段信息(COLUMNS_V2)
字段 | 说明 | 示例 |
---|
CD_ID | 数据表 ID | 1 |
COMMENT | 备注 | from deserializer |
COLUMN_NAME | 字段名称 | acc_his_est_mem |
TYPE_NAME | 字段类型 | string |
INTEGER_IDX | 顺序 | 11 |
# 排序字段信息(SORT_COLS)
字段 | 说明 | 示例 |
---|
SD_ID | 存储信息 ID | 1 |
COLUMN_NAME | 字段名称 | event_id |
ORDER | 排序方式 | 1 |
INTEGER_IDX | 顺序 | 11 |
# 表分区
# 分桶字段信息(BUCKETING_COLS)
字段 | 说明 | 示例 |
---|
SD_ID | 存储信息 ID | 1 |
BUCKET_COL_NAME | 分桶字段名称 | event_id |
INTEGER_IDX | 顺序 | 11 |
# 分区信息(PARTITIONS)
字段 | 说明 | 示例 |
---|
PART_ID | 分区 ID | 1 |
CREATE_TIME | 创建时间 | 1662909964 |
LAST_ACCESS_TIME | 上次访问时间 | 0 |
PART_NAME | 分区名称 | day=19246 |
SD_ID | 存储信息 ID | 3 |
TBL_ID | 表 ID | 1 |
WRITE_ID | 重写 ID | 0 |
# 分区字段信息(PARTITION_KEYS)
字段 | 说明 | 示例 |
---|
TBL_ID | 表 ID | 1 |
PKEY_COMMENT | 分区字段备注 | NULL |
PKEY_NAME | 分区字段名称 | day |
PKEY_TYPE | 分区字段类型 | bigint |
INTEGER_IDX | 顺序 | 0 |
# 分区字段值信息(PARTITION_KEY_VALS)
字段 | 说明 | 示例 |
---|
PART_ID | 分区 ID | 1 |
PART_KEY_VAL | 分区字段值 | 19246 |
INTEGER_IDX | 顺序 | 0 |
# 分区属性信息(PARTITION_PARAMS)
字段 | 说明 | 示例 |
---|
PART_ID | 分区 ID | 1 |
PARAM_KEY | 属性名 | transient_lastDdlTime |
PARAM_VALUE | 属性值 | 1662909964 |
# 函数
# 函数信息(FUNCS)
字段 | 说明 | 示例 |
---|
FUNC_ID | 函数 ID | 1 |
CLASS_NAME | 类名称 | com.sensorsdata.impala.udf.Hash |
CREATE_TIME | 创建时间 | 1436320455 |
DB_ID | 数据库 ID | 1 |
FUNC_NAME | 函数名称 | hash |
FUNC_TYPE | 函数类型 | 1 |
OWNER_NAME | 所有者名称 | NULL |
OWNER_TYPE | 所有者类型 | USER |
# 函数规则信息(FUNC_RU)
字段 | 说明 | 示例 |
---|
FUNC_ID | 函数 ID | 1 |
RESOURCE_TYPE | 资源类型 | 1 |
RESOURCE_URI | 资源路径 | hdfs://nameservice01/user/impala/hash.jar |
INTEGER_IDX | 顺序 | 0 |
# 角色
# 角色信息(ROLES)
字段 | 说明 | 示例 |
---|
ROLE_ID | 角色 ID | 1 |
CREATE_TIME | 创建时间 | 1662909881 |
OWNER_NAME | 所有者名称 | admin |
ROLE_NAME | 角色名称 | admin |
# 角色映射信息(ROLE_MAP)
字段 | 说明 | 示例 |
---|
ROLE_GRANT_ID | 角色授权 ID | 1 |
ADD_TIME | 创建时间 | 1662909881 |
GRANT_OPTION | 开启或禁用授权 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
ROLE_ID | 角色 ID | 1 |
# 授权
# 全局授权信息(GLOBAL_PRIVS)
字段 | 说明 | 示例 |
---|
USER_GRANT_ID | 用户授权 ID | 1 |
CREATE_TIME | 授权时间 | 1436320455 |
GRANT_OPTION | 开启或者禁用权限 | 1 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
USER_PRIV | 用户权限 | All |
AUTHORIZER | 授权人 | v1 |
# 数据库授权信息(DB_PRIV)
字段 | 说明 | 示例 |
---|
DB_GRANT_ID | 数据库授权 ID | 1 |
CREATE_TIME | 授权时间 | 1436320455 |
DB_ID | 数据库 ID | 1 |
GRANT_OPTION | 开启或者禁用权限 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
DB_PRIV | 数据库权限 | Create |
AUTHORIZER | 授权人 | v1 |
# 表授权信息(TBL_PRIVS)
字段 | 说明 | 示例 |
---|
TBL_GRANT_ID | 表授权 ID | 1 |
CREATE_TIME | 授权时间 | 1436320455 |
GRANT_OPTION | 开启或者禁用权限 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
TBL_PRIV | 表权限 | Select、Alter |
TBL_ID | 表 ID | 1 |
AUTHORIZER | 授权人 | v1 |
# 表字段授权信息(TBL_COL_PRIVS)
字段 | 说明 | 示例 |
---|
TBL_COLUMN_GRANT_ID | 表字段授权 ID | 1 |
COLUMN_NAME | 字段名称 | event_id |
CREATE_TIME | 授权时间 | 1436320455 |
GRANT_OPTION | 开启或者禁用权限 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
TBL_COL_PRIV | 表权限 | Add |
TBL_ID | 表 ID | 1 |
AUTHORIZER | 授权人 | v1 |
# 分区字段授权信息(PART_COL_PRIVS)
字段 | 说明 | 示例 |
---|
PART_COLUMN_GRANT_ID | 分区字段授权 ID | 1 |
COLUMN_NAME | 字段名称 | event_id |
CREATE_TIME | 授权时间 | 1436320455 |
GRANT_OPTION | 开启或者禁用权限 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PART_ID | 分区 ID | 1 |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
TBL_COL_PRIV | 表权限 | Add |
AUTHORIZER | 授权人 | v1 |
# 分区授权信息(PART_PRIVS)
字段 | 说明 | 示例 |
---|
PART_GRANT_ID | 分区授权 ID | 1 |
CREATE_TIME | 授权时间 | 1436320455 |
GRANT_OPTION | 开启或者禁用权限 | 0 |
GRANTOR | 授权者 | root |
GRANTOR_TYPE | 授权者类型 | USER |
PART_ID | 分区 ID | 1 |
PRINCIPAL_NAME | 被授权者名称 | sa_cluster |
PRINCIPAL_TYPE | 被授权者类型 | USER |
TBL_COL_PRIV | 表权限 | Add |
AUTHORIZER | 授权人 | v1 |
# 例子
# CREATE DATABASE
| CREATE DATABASE test COMMENT 'test database' LOCATION 'hdfs://localhost:20500/test' MANAGEDLOCATION 'hdfs://localhost:20500/test/managed'; |
DBS:
DB_ID | DESC | DB_LOCATION_URI | NAME | OWNER_NAME | OWNER_TYPE | CTLG_NAME | CREATE_TIME | DB_MANAGED_LOCATION_URI |
---|
6 | test database | hdfs://localhost:20500/test | test | impala | USER | hive | 1677635246 | hdfs://localhost:20500/test/managed |
# CREATE TABLE
| CREATE TABLE test.test (a INT COMMENT 'column') PARTITIONED BY (day INT COMMENT 'day column') SORT BY (a) COMMENT 'table_comment' ROW FORMAT DELIMITED WITH SERDEPROPERTIES ('key1' = 'value1') STORED AS TEXTFILE LOCATION 'hdfs://localhost:20500/test/test' TBLPROPERTIES ('key1' = 'value1'); |
TBLS:
TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | OWNER_TYPE | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | IS_REWRITE_ENABLED | WRITE_ID |
---|
6 | 1677636695 | 6 | 0 | impala | USER | 0 | 6 | test | EXTERNAL_TABLE | | | f | 0 |
TABLE_PARAMS:
TBL_ID | PARAM_KEY | PARAM_VALUE |
---|
6 | TRANSLATED_TO_EXTERNAL | TRUE |
6 | sort.columns | a |
6 | key1 | value1 |
6 | transient_lastDdlTime | 1677636695 |
6 | comment | table_comment |
6 | OBJCAPABILITIES | EXTREAD,EXTWRITE |
6 | external.table.purge | TRUE |
6 | sort.order | LEXICAL |
6 | EXTERNAL | TRUE |
SDS:
SD_ID | INPUT_FORMAT | IS_COMPRESSED | LOCATION | NUM_BUCKETS | OUTPUT_FORMAT | SERDE_ID | CD_ID | IS_STOREDASSUBDIRECTORIES |
---|
6 | org.apache.hadoop.mapred.TextInputFormat | f | hdfs://localhost:20500/test/test | 0 | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | 6 | 6 | f |
SERDES:
SERDE_ID | NAME | SLIB | DESCRIPTION | SERIALIZER_CLASS | DESERIALIZER_CLASS | SERDE_TYPE |
---|
6 | | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | | | 0 |
SERDE_PARAMS:
SERDE_ID | PARAM_KEY | PARAM_VALUE |
---|
6 | key1 | value1 |
CDS:
COLUMNS_V2:
CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX |
---|
6 | column | a | int | 0 |
PARTITION_KEYS:
TBL_ID | PKEY_COMMENT | PKEY_NAME | PKEY_TYPE | INTEGER_IDX |
---|
6 | day column | day | int | 0 |
# 从源码角度分析创建表过程
# client
IMetaStoreClient 定义了 hive metastore thrift api。
IMetaStoreClient.java | void createTable(Table tbl) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException; |
HiveMetaStoreClient 根据 metastore.thrift.uris 配置,选择连接方式。
未配置 uris,走内嵌模式,直接构造一个 HMSHandler。
HiveMetaStoreClient.java | if (localMetaStore) { |
| if (!allowEmbedded) { |
| throw new MetaException("Embedded metastore is not allowed here. Please configure " |
| + ConfVars.THRIFT_URIS.toString() + "; it is currently set to [" + msUri + "]"); |
| } |
| |
| client = callEmbeddedMetastore(this.conf); |
| |
| |
| |
| isConnected = true; |
| snapshotActiveConf(); |
| return; |
| } |
远程模式的话,开启客户端连接,根据 uris 配置或 zk 服务发现选择连接地址。
客户端调用创建表接口。HiveMetaHook 定义了一些钩子,创建表相关的如:preCreateTable、commitCreateTable、rollbackCreateTable。
HiveMetaStoreClient.java | @Override |
| public void createTable(CreateTableRequest request) throws |
| InvalidObjectException, MetaException, NoSuchObjectException, TException { |
| Table tbl = request.getTable(); |
| if (!tbl.isSetCatName()) { |
| tbl.setCatName(getDefaultCatalog(conf)); |
| } |
| |
| if (processorCapabilities != null) { |
| request.setProcessorCapabilities(new ArrayList<String>(Arrays.asList(processorCapabilities))); |
| request.setProcessorIdentifier(processorIdentifier); |
| } |
| |
| HiveMetaHook hook = getHook(tbl); |
| if (hook != null) { |
| hook.preCreateTable(tbl); |
| } |
| boolean success = false; |
| try { |
| |
| if (hook == null || !hook.createHMSTableInHook()) { |
| create_table(request); |
| } |
| if (hook != null) { |
| hook.commitCreateTable(tbl); |
| } |
| success = true; |
| } finally { |
| if (!success && (hook != null)) { |
| try { |
| hook.rollbackCreateTable(tbl); |
| } catch (Exception e) { |
| LOG.error("Create rollback failed with", e); |
| } |
| } |
| } |
| } |
create_table 发起 rpc 调用。
HiveMetastoreClient.java | protected void create_table(CreateTableRequest request) throws |
| InvalidObjectException, MetaException, NoSuchObjectException, TException { |
| client.create_table_req(request); |
| } |
# server
通过 HiveMetastore 启动 server。
HiveMetaStore.java | public static void main(String[] args) throws Throwable { |
| ... |
| startMetaStore(cli.getPort(), HadoopThriftAuthBridge.getBridge(), conf, true, null); |
| ... |
| } |
启动 thrift server 和 metastore 服务。
HiveMetaStore.java | public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, |
| Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable { |
| isMetaStoreRemote = true; |
| String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary"); |
| boolean isHttpTransport = transportMode.equalsIgnoreCase("http"); |
| if (isHttpTransport) { |
| thriftServer = startHttpMetastore(port, conf); |
| } else { |
| thriftServer = startBinaryMetastore(port, bridge, conf); |
| } |
| |
| logCompactionParameters(conf); |
| |
| boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL); |
| LOG.info("Direct SQL optimization = {}", directSqlEnabled); |
| |
| if (startMetaStoreThreads) { |
| Lock metaStoreThreadsLock = new ReentrantLock(); |
| Condition startCondition = metaStoreThreadsLock.newCondition(); |
| AtomicBoolean startedServing = new AtomicBoolean(); |
| startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing, |
| isMetaStoreHousekeepingLeader(conf), startedBackgroundThreads); |
| signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, startCondition, startedServing); |
| } |
| |
| |
| if (MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE) |
| .equalsIgnoreCase("zookeeper")) { |
| try { |
| zooKeeperHelper = MetastoreConf.getZKConfig(conf); |
| String serverInstanceURI = getServerInstanceURI(port); |
| zooKeeperHelper.addServerInstanceToZooKeeper(serverInstanceURI, serverInstanceURI, null, |
| new ZKDeRegisterWatcher(zooKeeperHelper)); |
| LOG.info("Metastore server instance with URL " + serverInstanceURI + " added to " + |
| "the zookeeper"); |
| } catch (Exception e) { |
| LOG.error("Error adding this metastore instance to ZooKeeper: ", e); |
| throw e; |
| } |
| } |
| |
| thriftServer.start(); |
| } |
以二进制协议为例,这里省略了 thrift server 启动的其他细节。其实就是包装了 IHMSHandler。
HiveMetaStore.java | private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridge bridge, |
| Configuration conf) throws Throwable { |
| ... |
| TProcessor processor; |
| ... |
| IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); |
| TServerSocket serverSocket; |
| if (useSasl) { |
| processor = saslServer.wrapProcessor( |
| new ThriftHiveMetastore.Processor<>(handler)); |
| LOG.info("Starting DB backed MetaStore Server in Secure Mode"); |
| } else { |
| |
| if (MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) { |
| processor = new TUGIBasedProcessor<>(handler); |
| LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); |
| } else { |
| processor = new TSetIpAddressProcessor<>(handler); |
| LOG.info("Starting DB backed MetaStore Server"); |
| } |
| } |
| ... |
| |
| TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket) |
| .processor(processor) |
| .transportFactory(transFactory) |
| .protocolFactory(protocolFactory) |
| .inputProtocolFactory(inputProtoFactory) |
| .executorService(executorService); |
| |
| TServer tServer = new TThreadPoolServer(args); |
| ... |
| } |
IHMSHandler 定义了 thrift server 接口,HMSHandler 进行了实现。
HMSHandler.java | @Override |
| public void create_table_req(final CreateTableRequest req) |
| throws AlreadyExistsException, MetaException, InvalidObjectException, |
| InvalidInputException { |
| Table tbl = req.getTable(); |
| startFunction("create_table_req", ": " + tbl.toString()); |
| boolean success = false; |
| Exception ex = null; |
| try { |
| create_table_core(getMS(), req); |
| success = true; |
| } catch (Exception e) { |
| LOG.warn("create_table_req got ", e); |
| ex = e; |
| throw handleException(e).throwIfInstance(MetaException.class, InvalidObjectException.class) |
| .throwIfInstance(AlreadyExistsException.class, InvalidInputException.class) |
| .convertIfInstance(NoSuchObjectException.class, InvalidObjectException.class) |
| .defaultMetaException(); |
| } finally { |
| endFunction("create_table_req", success, ex, tbl.getTableName()); |
| } |
| } |
通过 getMS () 获取 RawStore 来操作数据库,RawStore 定义了数据库操作的接口,ObjectStore 和 CachedStore 对其进行了实现,具体使用哪个根据配置来通过反射获取。CachedStore 默认是在 ObjectStore 之上加了一层缓存。
RawStore.java | void createTable(Table tbl) throws InvalidObjectException, MetaException; |
这里截取了 create_table_core 的关键代码,依赖 WareHouse 创建仓库路径,依赖 RawStore 进行数据库操作,通过 Listener 在创建数据过程中进行事件监听。
HMSHandler.java | private void create_table_core(final RawStore ms, final CreateTableRequest req) |
| throws AlreadyExistsException, MetaException, |
| InvalidObjectException, NoSuchObjectException, InvalidInputException { |
| |
| ms.openTransaction(); |
| firePreEvent(new PreCreateTableEvent(tbl, db, this)); |
| try { |
| wh.mkdirs(tblPath); |
| ms.createTable(tbl); |
| success = ms.commitTransaction(); |
| } finally { |
| if (!success) { |
| ms.rollbackTransaction(); |
| } |
| if (!listeners.isEmpty()) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE, |
| new CreateTableEvent(tbl, success, this, isReplicated), envContext, |
| transactionalListenerResponses, ms); |
| if (CollectionUtils.isNotEmpty(constraints.getPrimaryKeys())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY, |
| new AddPrimaryKeyEvent(constraints.getPrimaryKeys(), success, this), envContext); |
| } |
| if (CollectionUtils.isNotEmpty(constraints.getForeignKeys())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_FOREIGNKEY, |
| new AddForeignKeyEvent(constraints.getForeignKeys(), success, this), envContext); |
| } |
| if (CollectionUtils.isNotEmpty(constraints.getUniqueConstraints())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_UNIQUECONSTRAINT, |
| new AddUniqueConstraintEvent(constraints.getUniqueConstraints(), success, this), envContext); |
| } |
| if (CollectionUtils.isNotEmpty(constraints.getNotNullConstraints())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_NOTNULLCONSTRAINT, |
| new AddNotNullConstraintEvent(constraints.getNotNullConstraints(), success, this), envContext); |
| } |
| if (CollectionUtils.isNotEmpty(constraints.getDefaultConstraints())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_DEFAULTCONSTRAINT, |
| new AddDefaultConstraintEvent(constraints.getDefaultConstraints(), success, this), envContext); |
| } |
| if (CollectionUtils.isNotEmpty(constraints.getCheckConstraints())) { |
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_CHECKCONSTRAINT, |
| new AddCheckConstraintEvent(constraints.getCheckConstraints(), success, this), envContext); |
| } |
| } |
| } |
| } |
# 依赖方式
impala 中实现了两个连接池:MetaStoreClientPool 和 EmbeddedMetastoreClientPool。
MetaStoreClientPool 使用可重试的 HiveMetaStoreClient,根据 hive-site.xml 中配置使用对应的连接方式。
EmbeddedMetastoreClientPool 主要用于单测,使用内嵌模式。由于内嵌 Derby 的连接限制,连接池中只有一个连接。
# 自定义场景
CDH 环境中,独立部署了 hive metastore,所有应用通过远程方式访问。
mothership 环境中,没有部署 hive metastore,impala 通过本地方式访问;同时,catalogd 启动了一个带有 rpc 端口的 metastore,Flink 和 Spark 通过远程方式访问。