如何使用Python对Hive实现数据操作?【原创】
1 前言
在对hive表进行数据导入时,针对有分区表插入数据时,总是使用外部临时表映射静态数据文件,然后再用查询的方式插入数据。
这样每次都需要进行创建临时表、上传HDFS、执行插入SQL等一系列操作,这样步骤非常繁琐,也容易在环境中残留一些垃圾文件。
其实HIVE也是支持SQL语句插入数据的,也就是insert语句,当插入大量数据时,可以拼装insert语句进行插入。
想要快速使用并且轻量化,就要用到我们今天说的Python脚本了。
首先我们先说一下使用的大数据组件Impala。
2 Impala
什么是Impala
Impala是Cloudera公司主导开发的新型查询系统,它提供SQL语义,能查询存储在Hadoop的HDFS和HBase中的PB级大数据。已有的Hive系统虽然也提供了SQL语义,但由于Hive底层执行使用的是MapReduce引擎,仍然是一个批处理过程,难以满足查询的交互性。相比之下,Impala的最大特点也是最大卖点就是它的快速。
想要在python中安装impala,直接pip install impala是不行的,需要按照一下步骤依次进行:
pip install six
pip install bit_array
pip install thriftpy
pip install thrift_sasl
pip install impyla
测试,不报错则表示安装成功:
>>> from impala.dbapi import connect
>>>
3 Python脚本
程序在运行时需要使用impala,并且需要通过csv文件导入数据,所以需要引用一下包。
import pandas as pd
from impala.dbapi import connect
首先要写一个连接hive并且可以执行sql的类。
class Connect_Hive:
def __init__(self, host, port, auth_mechanism, user, password, database):
self.host = host
self.port = port
self.auth_mechanism = auth_mechanism
self.user = user
self.password = password
self.database = database
def __GetConnect(self):
self.conn = connect(host=self.host, port=self.port, auth_mechanism=self.auth_mechanism, user=self.user, password=self.password, database=self.database)
cur = self.conn.cursor()
if not cur:
raise (NameError, "连接数据库失败")
else:
return cur
定义一些常用的方法,执行查询返回结果list,一般用于查询数据。
def ExecQuery(self, sql):
cur = self.__GetConnect()
cur.execute(sql)
resList = cur.fetchall()
# 查询完毕后必须关闭连接
self.conn.close()
return resList
无返回值的执行sql,一般用于执行insert或者update。
def ExecNonQuery(self, sql):
cur = self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
获取数据库列表,一般用于显示要插入的数据库。
def get_databases_list(self):
DATABASES_LIST = []
DATABASES = self.ExecQuery('SHOW DATABASES')
for i in DATABASES:
i = str(i).replace("('", "").replace("',)", "")
DATABASES_LIST.append(i)
default = [i for i, x in enumerate(DATABASES_LIST) if x == 'xxxx']
return DATABASES_LIST, default[0]
获取所有表,一般用于选择要导入的目标表。
def get_tables_list(self):
TABLES_LIST = []
TABLES = self.ExecQuery('SHOW TABLES')
for i in TABLES:
i = str(i).replace("('", "").replace("',)", "")
TABLES_LIST.append(i)
return TABLES_LIST
下面编写一个通过csv文件,把要导入的数据转成一个sql
def insert_info_hive(input_file, table, host, port, auth_mechanism, user, password, database):
if input_file is not None and input_file.name.endswith(".csv"):
data = pd.read_csv(input_file)
resList = Connect_Hive(host, port, auth_mechanism, user, password, database).ExecQuery('DESCRIBE ' + table)
header_list = {}
# 封装查询结果
for value in resList:
if value[0] == '' or '#' in value[0]:
header_list = header_list
else:
header_list[value[0]] = value[1]
headers = header_list
headkeys = str(list(headers.keys())).replace('[', '(').replace(']', ')').replace("'", "")
data_headers = data.columns
Max_value = ''
for i in range(len(data)):
values = []
for head in headers:
if head in data_headers:
if head == 'dt':
value = int(data[head].loc[i])
else:
value = Type.type_conversion(headers[head], data[head].loc[i])
values.append(value)
else:
if head == 'dt':
value = Type.type_conversion(headers[head], Time.func("$date", 0))
else:
if 'decimal' in headers[head]:
co = list(headers[head].replace('decimal', '').replace('(', '').replace(')', '').split(','))
d = int(co[1])
getcontext().prec = d + 1
value = Decimal(random.uniform(1, 10)) / 1
value = float(str(value).replace("'Decimal('", "").replace("')", ""))
else:
value = Type.type_conversion(headers[head], random.randrange(1, 100, 1))
values.append(value)
if i == len(data) - 1:
values = str(values).replace('[', '(').replace(']', ')')
else:
values = str(values).replace('[', '(').replace(']', '),')
Max_value = Max_value + values
SQL = f'''INSERT INTO TABLE {table} {headkeys} VALUES {Max_value}'''
return SQL, data
最后传入文件,选择好要插入的表,执行sql语句即可
DATABASES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[0]
index = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[1]
database = st.selectbox('DataBases List', DATABASES_LIST, index=index)
TABLES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_tables_list()
table = st.selectbox('Tables List', TABLES_LIST)
input_file = st.file_uploader("Upload a CSV File", type=['csv'], key="upload")
Connect_Hive(host, port, auth_mechanism, user, password, database).ExecNonQuery(info[0])
结束语
我是知道,一只大数据萌新,上能code下能teach的全能奶爸,家有两只吞金兽,嘉与嘉
如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~
最后
更多Python知识尽在【Python都知道】公众号,欢迎大家!!
扫描下方二维码,关注公众号,了解更多Python内容