在python中,在groupby pandas数据帧之后,我使用aggregate
方法从hbase获取数据:
hbase_util.get_connect()
df = pdf.groupby('user')['task'].aggregate(inner_func)
hbase_util.close_connect()
def inner_func(grouped):
labels = set()
for task_id in grouped:
task_labels = hbase_util.get(table_name=dp_zy_profile,row_key=task_id,column="info:message")
if task_labels is not None:
labels = labels.union(set(json.loads(task_labels).keys()))
我想从每个组的hbase中获取数据,所以我定义了inner_func
函数,但是它出错了:
TApplicationException Traceback (most recent call last)
/usr/local/python3/lib/python3.5/site-packages/pandas/core/groupby/ops.py in
agg_series(self,obj,func)
662 try:
--> 663 return self._aggregate_series_fast(obj,func)
664 except Exception:
/usr/local/python3/lib/python3.5/site-packages/pandas/core/groupby/ops.py in _aggregate_series_fast(self,func)
680 grouper = reduction.SeriesGrouper(obj,func,group_index,ngroups,dummy)
--> 681 result,counts = grouper.get_result()
682 return result,counts
pandas/_libs/reduction.pyx in
pandas._libs.reduction.SeriesGrouper.get_result()
pandas/_libs/reduction.pyx in
pandas._libs.reduction.SeriesGrouper.get_result()
/usr/local/python3/lib/python3.5/site-packages/pandas/core/groupby/groupby.py in <lambda>(x)
893 func = self._is_builtin_func(func)
--> 894 f = lambda x: func(x,*args,**kwargs)
895
<ipython-input-45-1f0e3dd7619a> in inner_func(grouped)
3 for task_id in grouped:
----> 4 task_labels = hbase_util.get(table_name=dp_zy_profile,column="info:message")
5 if task_labels is not None:
<ipython-input-1-217604440946> in get(self,table_name,row_key,column,with_timestamp)
51 """
---> 52 col_value = self.client.get(tableName=table_name,row=row_key,column=column)
53 if len(col_value) == 0:
/usr/local/python3/lib/python3.5/site-packages/hbase/Hbase.py in get(self,tableName,row,column)
933 self.send_get(tableName,column)
--> 934 return self.recv_get()
935
/usr/local/python3/lib/python3.5/site-packages/hbase/Hbase.py in recv_get(self)
951 self._iprot.readMessageEnd()
--> 952 raise x
953 result = get_result()
TApplicationException: Internal error processing get
我该如何解决这个问题?