1. hbase连接
首先用hbase shell 命令来进入到hbase数据库,然后用list命令来查看hbase下所有表,以其中表“DB_level0”为例,可以看到库名“baotouyiqi”是拼接的,python代码访问时先连接:
def hbase_connection(hbase_master, hbase_port, table_prefix=None): connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix) return connection connection = hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间 table = connection.table(tablename) # 获取表连接
备注:完整代码在最后,想运行的直接滑倒最后复制即可
2. 按条件读取hbase数据
然后按照条件来查询表中想要的数据集,这里只列举两个条件:时间区间和指定列。同样,我们在shell下用scan命令来查看表中的数据结构:
可以看到第一列是ROW,第二列是COLUMN+CELL,python代码取数据方法差不多:
date_prex_start = bytes('dt_' + starttime, encoding='utf-8') # row_start date_prex_end = bytes('dt_' + endtime, encoding='utf-8') # row_stop # 通过设置row key的前缀row_prefix参数来进行局部扫描 outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end, columns=[onecolumn]))
得到的结果如下,是个字典格式:
3. 按格式输出hbase数据结果
我们希望输出的结果是dataframe的,而且第一列是time,第二列是value,所以就做个简单格式处理:
timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys())) tempdata = list(outdata.values()) valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata)) if len(timesep) > 0: db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist}) db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']] db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last') # 一分钟内多次数值取一个即可 else: db_data2 = pd.DataFrame() if len(db_data2) < 1: return pd.DataFrame() db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']] db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False) # 将最新的数值放最前面 db_data3 = db_data2.drop(columns=['时间2', '时间戳']) db_data3.columns = ['time', 'value']
4. 完整代码(code)
import happybase import time import pandas as pd from pathlib import Path os_file_name = Path(__file__).name def hbase_connection(hbase_master, hbase_port, table_prefix=None): connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix) return connection def get_data_by_tum(hbase_master, hbase_port, table_prefix, tablename, columnslist, starttime, endtime): columnsid = '$'.join(columnslist) onecolumn = 'TimeSe:dt_' + columnsid # column connection = hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间 table = connection.table(tablename) # 获取表连接 date_prex_start = bytes('dt_' + starttime, encoding='utf-8') # row_start date_prex_end = bytes('dt_' + endtime, encoding='utf-8') # row_stop # 通过设置row key的前缀row_prefix参数来进行局部扫描 outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end, columns=[onecolumn])) timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys())) tempdata = list(outdata.values()) valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata)) if len(timesep) > 0: db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist}) db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']] db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last') # 一分钟内多次数值取一个即可 else: db_data2 = pd.DataFrame() if len(db_data2) < 1: return pd.DataFrame() db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']] db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False) # 将最新的数值放最前面 db_data3 = db_data2.drop(columns=['时间2', '时间戳']) db_data3.columns = ['time', 'value'] return db_data3 if __name__ == '__main__': begin_time = '2023-08-22 00:00:00' end_time = '2023-08-23 00:00:00' hbase_master = "142.21.8.22" hbase_port = 9097 table_prefix = "baotouyiqi" table_name = "DB_level0" onedata = ["62340", "20", "204"] dataget = get_data_by_tum(hbase_master, hbase_port, table_prefix, table_name, onedata, begin_time, end_time) print(dataget)