如何使用Python对Hive实现数据操作?【原创】

1 前言

在对hive表进行数据导入时,针对有分区表插入数据时,总是使用外部临时表映射静态数据文件,然后再用查询的方式插入数据。
这样每次都需要进行创建临时表、上传HDFS、执行插入SQL等一系列操作,这样步骤非常繁琐,也容易在环境中残留一些垃圾文件。
其实HIVE也是支持SQL语句插入数据的,也就是insert语句,当插入大量数据时,可以拼装insert语句进行插入。
想要快速使用并且轻量化,就要用到我们今天说的Python脚本了。
首先我们先说一下使用的大数据组件Impala。

2 Impala

什么是Impala

Impala是Cloudera公司主导开发的新型查询系统,它提供SQL语义,能查询存储在Hadoop的HDFS和HBase中的PB级大数据。已有的Hive系统虽然也提供了SQL语义,但由于Hive底层执行使用的是MapReduce引擎,仍然是一个批处理过程,难以满足查询的交互性。相比之下,Impala的最大特点也是最大卖点就是它的快速。

想要在python中安装impala,直接pip install impala是不行的,需要按照一下步骤依次进行:

pip install six
pip install bit_array
pip install thriftpy
pip install thrift_sasl
pip install impyla

测试,不报错则表示安装成功:

>>> from impala.dbapi import connect
>>>

3 Python脚本

程序在运行时需要使用impala,并且需要通过csv文件导入数据,所以需要引用一下包。

import pandas as pd
from impala.dbapi import connect

首先要写一个连接hive并且可以执行sql的类。

class Connect_Hive:
    def __init__(self, host, port, auth_mechanism, user, password, database):
        self.host = host
        self.port = port
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.database = database

    def __GetConnect(self):
        self.conn = connect(host=self.host, port=self.port, auth_mechanism=self.auth_mechanism, user=self.user, password=self.password, database=self.database)
        cur = self.conn.cursor()
        if not cur:
            raise (NameError, "连接数据库失败")
        else:
            return cur

定义一些常用的方法,执行查询返回结果list,一般用于查询数据。

    def ExecQuery(self, sql):
        cur = self.__GetConnect()
        cur.execute(sql)
        resList = cur.fetchall()

        # 查询完毕后必须关闭连接
        self.conn.close()
        return resList

无返回值的执行sql,一般用于执行insert或者update。

    def ExecNonQuery(self, sql):
        cur = self.__GetConnect()
        cur.execute(sql)
        self.conn.commit()
        self.conn.close()

获取数据库列表,一般用于显示要插入的数据库。

    def get_databases_list(self):
        DATABASES_LIST = []
        DATABASES = self.ExecQuery('SHOW DATABASES')
        for i in DATABASES:
            i = str(i).replace("('", "").replace("',)", "")
            DATABASES_LIST.append(i)
        default = [i for i, x in enumerate(DATABASES_LIST) if x == 'xxxx']
        return DATABASES_LIST, default[0]

获取所有表,一般用于选择要导入的目标表。

    def get_tables_list(self):
        TABLES_LIST = []
        TABLES = self.ExecQuery('SHOW TABLES')
        for i in TABLES:
            i = str(i).replace("('", "").replace("',)", "")
            TABLES_LIST.append(i)
        return TABLES_LIST

下面编写一个通过csv文件,把要导入的数据转成一个sql

def insert_info_hive(input_file, table, host, port, auth_mechanism, user, password, database):
    if input_file is not None and input_file.name.endswith(".csv"):
        data = pd.read_csv(input_file)
        resList = Connect_Hive(host, port, auth_mechanism, user, password, database).ExecQuery('DESCRIBE ' + table)
        header_list = {}
        # 封装查询结果
        for value in resList:
            if value[0] == '' or '#' in value[0]:
                header_list = header_list
            else:
                header_list[value[0]] = value[1]
        headers = header_list
        headkeys = str(list(headers.keys())).replace('[', '(').replace(']', ')').replace("'", "")
        data_headers = data.columns
        Max_value = ''
        for i in range(len(data)):
            values = []
            for head in headers:
                if head in data_headers:
                    if head == 'dt':
                        value = int(data[head].loc[i])
                    else:
                        value = Type.type_conversion(headers[head], data[head].loc[i])
                    values.append(value)
                else:
                    if head == 'dt':
                        value = Type.type_conversion(headers[head], Time.func("$date", 0))
                    else:
                        if 'decimal' in headers[head]:
                            co = list(headers[head].replace('decimal', '').replace('(', '').replace(')', '').split(','))
                            d = int(co[1])
                            getcontext().prec = d + 1
                            value = Decimal(random.uniform(1, 10)) / 1
                            value = float(str(value).replace("'Decimal('", "").replace("')", ""))
                        else:
                            value = Type.type_conversion(headers[head], random.randrange(1, 100, 1))
                    values.append(value)
            if i == len(data) - 1:
                values = str(values).replace('[', '(').replace(']', ')')
            else:
                values = str(values).replace('[', '(').replace(']', '),')
            Max_value = Max_value + values
        SQL = f'''INSERT INTO TABLE {table} {headkeys} VALUES {Max_value}'''
        return SQL, data

最后传入文件,选择好要插入的表,执行sql语句即可

DATABASES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[0]
index = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[1]
database = st.selectbox('DataBases List', DATABASES_LIST, index=index)
TABLES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_tables_list()
table = st.selectbox('Tables List', TABLES_LIST)
input_file = st.file_uploader("Upload a CSV File", type=['csv'], key="upload")
Connect_Hive(host, port, auth_mechanism, user, password, database).ExecNonQuery(info[0])

结束语

我是知道,一只大数据萌新,上能code下能teach的全能奶爸,家有两只吞金兽,嘉与嘉
如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

最后

更多Python知识尽在【Python都知道】公众号,欢迎大家!!
扫描下方二维码,关注公众号,了解更多Python内容


小白学堂 » 如何使用Python对Hive实现数据操作?【原创】

就聊挣钱,一个带着你做副业的社群。

立即查看 了解详情