-
未能找到主题的负责人; org.apache.kafka.common.utils.Utils.formatAddress上的java.lang.NullPointerException NullPointerException
当我们尝试从启用SSL的Kafka主题流式传输数据时,我们将面临以下error。您能在这个问题上帮助我们吗? -
Spark批处理可在2个Cassandra群集之间迁移数据
我正在使用spark将一些数据从一个cassandra表移动到另一个集群上的另一个cassandra表。 我为以下源集 -
修改图案以查找数字
我有这种模式可以从字符串中提取数字。 <pre><code>ptns = { 'clean1': re.compile(r'[-&\s]+', re.U -
如何从上一行中减去值并更新PySpark中的数据框?
<a href="https://i.stack.imgur.com/UIbsq.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/UIbsq.png" alt="Dataframe"/> -
如何将数组元素映射到Spark Dataframe中的每个记录
我正在处理一个看起来像这样的数据框- <pre><code>val df = Seq( (0.0 ), (0.0 ), (0.0 ), (0.317), (0.0 ), (0.0 ), -
如何在对齐行中一次从两个不同的表中提取不同的计数
我有两个单独的表:<code>emr</code>和<code>treatment</code>。每个表都有一个<code>userID</code>列和一个<code>provider -
从主节点或主从节点触发与数据源的连接
我有一个集群,其中有一个主机(主机名:masterA)和两个从机(主机名:slaveA和slaveB)。我的代码将从s -
PySpark错误''py4j.protocol.Py4JJavaError:调用o175.withColumn时发生错误。''
我正在尝试使用withColumn函数将spark数据框中的列从中间的某个位置移到第一列。 下面是我的PySpark -
使用pyspark
我正在尝试使用pyspark将RDD保存到AWS S3,但出现“目录已存在”错误。 当“ content1”文件夹不存在 -
使用Kafka源进行Spark结构化流式处理,在查询运行时更改主题分区的数量
我已经建立了一个从Kafka主题读取的Spark结构化流查询。 如果在运行Spark查询时更改了主题中的分区数, -
数据整理和ETL(提取,转换和加载)之间有什么区别?
我对ETL的基本理解是像Data Analyst这样的人会使用它。 ETL将用于从数据库(MySQL)提取数据,转换为类似Ex -
如何释放pyspark模型(JavaModel)占用的内存?
如上所述,我通过pyspark加载了经过训练的word2vec模型。 <pre><code>word2vec_model = Word2VecModel.load("saving -
澄清火花聚集功能
我是Scala和Spark的新手。我不知道聚合函数。我收到了以下代码,但不了解输出值。 <pre><code> val z = -
聚合函数以查找分区之间的平均值
我是Spark和Scala的新手。我有一个正在解决的问题。花园中有四个象限:南,北,东和西-每个象限在一个 -
如何将Spark Dataframe的时间戳列转换为字符串列
我想将所有TIMESTAMP列的Spark数据帧转换为String列。有人可以说如何针对每个数据框自动执行此操作吗?</p -
读取镶木地板文件时刷新数据帧的元数据
我正在尝试将实木复合地板文件作为数据帧读取,该数据帧将定期更新(路径为<code>/folder_name</code>。每 -
如何将ArrayType(StructType)的spark dataframe列拆分为pyspark中的多个列?
我正在使用具有以下模式的databricks spark xml读取xml。子元素X_PAT可以发生多次,以进行处理 这是我使用过 -
如何在Pyspark中过滤数组列值
我有一个<code>pyspark Dataframe</code>,其中包含许多列,其中列为Array类型和String列: <pre><code>numbers < -
Pyspark:在运行时动态生成when()子句的条件
我已将一个csv文件读入<code>pyspark dataframe</code>。 现在,如果我在<code>when()</code>子句中应用条件,则在<co -
如何使用流数据帧进行rdd转换
我想对流数据帧进行自定义转换,例如: <pre><code>lines = spark \ .readStream....blabla df1 = line.rdd.map(xxx) -
如何从pyspark中的Spark数据框的列中删除引号“”
我有一个数据框。 <pre><code>SELECT * FROM [table1] WHERE [ColumName] LIKE '[^A-Za-z0-9]%' OR [ColumName] LIKE  -
无法停止Kerberos调试日志记录 JVM库日志记录 Hadoop端JAAS调试
我正在使用启用Kerberos的Spark集群来运行我们的Spark应用程序。 Kerberos是由组织的其他成员先前设置的, -
如何使用Java生成具有大量数据的拼花文件并将其上传到AWS S3存储桶
我正在使用第<a href="https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decim -
为什么在写入Hive表期间更改了Spark数据帧中的时间戳列?
我试图使用JDBC读取Oracle表并将数据插入到Hive表中,但是所有timestamp列的值均已更改(-6小时)。为什么 -
Spark强制转换默认值
我正在执行将列转换为其数据类型的操作。 Col(c).cast(数据类型) 当强制转换失败时,它将转 -
如何在PySpark中使用foreach或foreachBatch写入数据库?
我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x)。 我想使用流式Spark数据框 -
在Spark中将POJO转换为StructType时出错
在测试某些Spark功能时,尝试从具有给定架构的镶木地板文件中构建数据时,才发现此错误。 <code -
通过Spark使用BigQuery Storage API:请求多个分区,但仅获得1个
我正在使用bigquery-spark-connector读取使用BigQuery Storage API的BigQuer。我的脚本(自动)从BigQuery Storage API请求 -
检索spark数据框数组列值,并将其作为UDF中的列名重用
我正在尝试从一列获取列名,并将其作为参数传递给udf。例如 我有一个DataFrame: <pre><code> | name | ar -
“提升”参数在Spark FP-Growth算法中意味着什么?
我目前正在尝试在<strong> Spark 2.4 </strong>中实现的篮子分析算法,称为<strong> FP-Growth </strong>。当显示关联