我在测试环境中运行着Spark 2.4.3和Cassandra 3.11.4的工作集群。
现在,我有一列带有整数的列,该整数确定了处理流程中定义的ID的数据所在的阶段。
在测试过程中,注意到更新到此状态列的行为不会立即将其持久化。这不是关键问题,但会导致流程延迟。给定下面带有主键ID的示例表,目标是在完成每个步骤后递增p中的值
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main (void)
{
char* ptr;
ptr = malloc(32 * sizeof(char));
if(ptr == NULL)
{
puts("Allocation failed");
return EXIT_FAILURE;
}
strcpy(ptr,"hello");
printf("value entered is %s\n",ptr);
free(ptr);
return 0;
}
因此,假设我在处理流程中有3个阶段,则第一阶段将在列p中写入1,这时阶段1完成的查询ID的连续运行作业将获得p = 1以来的值,然后执行作业2。然后将p = 2更新到该列。
我一直在对RF = 3的读写操作使用一致性仲裁,但是请注意,不能保证p更新已完成,这会导致作业2再次获得相同的ID(不是致命的,但会令人讨厌,因为延迟会堆叠起来,并可能重复整个作业过程)。同样,这不是常规的。
因此,有一种方法可以在给定所有分区键和群集键的情况下强制更新列?我当前的解决方法是连续尝试写入列,直到读取返回值已更改。
任何想法都很棒。由于与项目中的其他元素集成而仅限于Pyspark。
使用spark-cassandra-connector和pyspark-cassandra。如果我希望对Cassandra的这组特定写入执行LWT,那么我是否认为应该实施LWT?