我正在尝试使用由pyspark创建的RDD文件运行pyeeg函数。我想使用火花映射和减少方法获取pyegg函数的输出。
我尝试将RDD和所有值转换为float以与pyeeg函数一起使用。但是,运行此命令时似乎出现错误。
我已经使用以下代码初始化了spark上下文
sc = SparkContext("local")
和下面是我遍历输入并创建“ windows”的地方,将在其中用作pyegg函数“ bin_power”的输入。我从每个窗口创建一个rdd文件,然后尝试将函数映射到该rdd文件。但是,这是我遇到错误的地方。
Fs = 128 # sampling rate
Band = [0.5,4,7,12,30,100] # EEG bands boundary points
data_signal = all_sdd.map(lambda all_sdd:(all_sdd[0]))
data_len = data_signal.count()
data_time = all_sdd.map(lambda all_sdd:(all_sdd[1]))
while end_point < data_len-1:
while end_window < data_len-1 and all_sdd.collect()[end_window][1]==all_sdd.collect()[end_window+1][1]:
end_window = end_window+1
segment = data_signal.take(end_window)[start_point:end_window+1]
rdd_segment = sc.parallelize(segment)
rdd_segment_float = rdd_segment.map(lambda s: (float(s)))
psi.rdd= rdd_segment.map(bin_power(rdd_segment_float,Band,Fs))
#psi.rdd,rir.rdd= rdd_segment.map(bin_power(rdd_segment_float,Fs))
我在不使用火花的情况下运行了该程序,并能够获得预期的结果。但是,当我尝试使用spark运行时,会收到此错误。
我收到的错误是
48
49 if n is None:
---> 50 n = a.shape[axis]
51
52 if n < 1:
IndexError: tuple index out of range
任何帮助或建议,将不胜感激。