博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
导入HDFS的数据到Hive
阅读量:6480 次
发布时间:2019-06-23

本文共 5546 字,大约阅读时间需要 18 分钟。

1. 通过Hive view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE EXTERNAL TABLE 
if 
not exists finance.json_serde_optd_table (
  
retCode string,
  
retMsg string,
  
data array<struct< secid:string,=
"" 
tradedate:date,=
"" 
optid:string,=
"" 
ticker:string,=
"" 
secshortname:string,=
"" 
exchangecd:string,=
"" 
presettleprice:
double
,=
"" 
precloseprice:
double
,=
"" 
openprice:
double
,=
"" 
highestprice:
double
,=
"" 
lowestprice:
double
,=
"" 
closeprice:
double
,=
"" 
settlprice:
double
,=
"" 
turnovervol:
double
,=
"" 
turnovervalue:
double
,=
"" 
openint:
int
=
""
>>)
ROW FORMAT SERDE 
'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 
'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/'
;
create table 
if 
not exists finance.tb_optd
as
SELECT b.data.secID,
        
b.data.tradeDate,
        
b.data.optID,
        
b.data.ticker,
        
b.data.secShortName,
        
b.data.exchangeCD,
        
b.data.preSettlePrice,
        
b.data.preClosePrice,
        
b.data.openPrice,
        
b.data.highestPrice,
        
b.data.lowestPrice,
        
b.data.closePrice,
        
b.data.settlPrice,
        
b.data.turnoverVol,
        
b.data.turnoverValue,
        
b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
1
  

2. 通过Zeppelin

 

1
2
%dep
z.load(
"/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar"
);

 

1
2
3
4
// 定义导入的hive对象集合
 
case 
class 
HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde =
""
"  retCode string,
                              
retMsg string,
                              
data array<struct< secid=
"" 
:=
"" 
string,=
"" 
tradedate=
"" 
date,=
"" 
ticker=
"" 
secshortname=
"" 
exchangecd=
"" 
precloseprice=
"" 
double
,=
"" 
actprecloseprice:=
"" 
openprice=
"" 
highestprice=
"" 
lowestprice=
"" 
closeprice=
"" 
turnovervol=
"" 
turnovervalue=
"" 
dealamount=
"" 
int
,=
"" 
turnoverrate=
"" 
accumadjfactor=
"" 
negmarketvalue=
"" 
marketvalue=
"" 
pe=
"" 
pe1=
"" 
pb=
"" 
isopen=
"" 
int
=
""
>>
""
";
var schema_equd =
""
"b.data.secID,
                    
b.data.ticker,
                    
b.data.secShortName,
                    
b.data.exchangeCD,
                    
b.data.tradeDate,
                    
b.data.preClosePrice,
                    
b.data.actPreClosePrice,
                    
b.data.openPrice,
                    
b.data.highestPrice,
                    
b.data.lowestPrice,
                    
b.data.closePrice,
                    
b.data.turnoverVol,
                    
b.data.turnoverValue,
                    
b.data.dealAmount,
                    
b.data.turnoverRate,
                    
b.data.accumAdjFactor,
                    
b.data.negMarketValue,
                    
b.data.marketValue,
                    
b.data.PE,
                    
b.data.PE1,
                    
b.data.PB,
                    
b.data.isOpen
""
";
hiveConfigList  = hiveConfigList :+ HiveConfig(
"finance"
"equd"
"hdfs://wdp.xxxxx.cn:8020/nifi/finance1/"
, schema_json_equd_serde, schema_equd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde =
""
"  retCode string,
                              
retMsg string,
                              
data array<struct< indexid:string,=
"" 
tradedate:date,=
"" 
ticker:string,=
"" 
porgfullname:string,=
"" 
secshortname:string,=
"" 
exchangecd:string,=
"" 
precloseindex:
double
,=
"" 
openindex:
double
,=
"" 
lowestindex:
double
,=
"" 
highestindex:
double
,=
"" 
closeindex:
double
,=
"" 
turnovervol:
double
,=
"" 
turnovervalue:
double
,=
"" 
chg:
double
,=
"" 
chgpct:
double
=
""
>>
""
";
var schema_idxd =
""
"b.data.indexID,
                    
b.data.tradeDate,
                    
b.data.ticker,
                    
b.data.porgFullName,
                    
b.data.secShortName,
                    
b.data.exchangeCD,
                    
b.data.preCloseIndex,
                    
b.data.openIndex,
                    
b.data.lowestIndex,
                    
b.data.highestIndex,
                    
b.data.closeIndex,
                    
b.data.turnoverVol,
                    
b.data.turnoverValue,
                    
b.data.CHG,
                    
b.data.CHGPct
""
";
hiveConfigList = hiveConfigList :+ HiveConfig(
"finance"
"idxd"
"hdfs://wdp.xxxxx.cn:8020/nifi/finance1/"
, schema_json_idxd_serde, schema_idxd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 循环加载数据中
  
def loadDataToHive(args:HiveConfig){
    
val loadPath = args.hdfsPath + args.modelName;
    
val tb_json_serde = 
"json_serde_" 
+ args.modelName +
"_table"
;
    
val tb= 
"tb_" 
+ args.modelName;
    
val hiveContext = 
new 
org.apache.spark.sql.hive.HiveContext(sc)
    
if
(args.database != 
"" 
&& args.schema != 
""
) {
        
print(
"正在创建项目..." 
+ args.modelName)
        
hiveContext.sql(
"CREATE DATABASE IF NOT EXISTS " 
+ args.database);
        
print(
"正在构造扩展模型..."
);
        
hiveContext.sql(
"CREATE TABLE IF NOT EXISTS " 
+ args.database + 
"." 
+ tb_json_serde + 
"(" 
+ args.schema + 
") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " 
"'" 
+ loadPath + 
"/'"
);
        
println(
"CREATE TABLE IF NOT EXISTS " 
+ args.database + 
"." 
+ tb + 
" as select " 
+ args.schema_tb + 
" from " 
+ args.database + 
"." 
+ tb_json_serde + 
" LATERAL VIEW explode(" 
+ tb_json_serde + 
".data) b AS data"
);
        
hiveContext.sql(
"CREATE TABLE IF NOT EXISTS " 
+ args.database + 
"." 
+ tb + 
" as select " 
+ args.schema_tb + 
" from " 
+ args.database + 
"." 
+ tb_json_serde + 
" LATERAL VIEW explode(" 
+ tb_json_serde + 
".data) b AS data"
);
        
println(args.modelName + 
" 扩展模型加载已完成!"
);
    
}
  
}
  
hiveConfigList.size;
  
hiveConfigList.foreach { x => loadDataToHive(x) };

 

 3. 第二种取法

由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:

用splitjson 来提取、分隔 data 数组

NewImage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE EXTERNAL TABLE 
if 
not exists finance.awen_optd (
  
secid string,
  
tradedate date,
  
optid string,
  
ticker string,
  
secshortname string,
  
exchangecd string,
  
presettleprice 
double
,
  
precloseprice 
double
,
  
openprice 
double
,
  
highestprice 
double
,
  
lowestprice 
double
,
  
closeprice 
double
,
  
settlprice 
double
,
  
turnovervol 
double
,
  
turnovervalue 
double
,
  
openint 
int
)
ROW FORMAT SERDE 
'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 
'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/'
;

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5631455.html,如需转载请自行联系原作者

你可能感兴趣的文章
Puppet module命令参数介绍(六)
查看>>
《UNIX网络编程》中第一个timer_server的例子
查看>>
CISCO 路由器(4)
查看>>
网络服务搭建、配置与管理大全(Linux版)
查看>>
Silverlight 5 Beta新特性[4]文本缩进控制
查看>>
springMVC多数据源使用 跨库跨连接
查看>>
Git服务端和客户端安装笔记
查看>>
Spring Security(14)——权限鉴定基础
查看>>
IntelliJ IDEA快捷键
查看>>
【iOS-cocos2d-X 游戏开发之十三】cocos2dx通过Jni调用Android的Java层代码(下)
查看>>
MongoDB的基础使用
查看>>
进程间通信——命名管道
查看>>
ssh登陆不需要密码
查看>>
ARP
查看>>
java mkdir()和mkdirs()区别
查看>>
虚拟化--003 vcac licence -成功案例
查看>>
windows server 2003各版本及2008各版本的最大识别内存大小
查看>>
OSChina 周六乱弹 ——揭秘后羿怎么死的
查看>>
IT人员的职业生涯规划
查看>>
sorry,you must have a tty to run sudo
查看>>