我正在尝试从kafka创建管道,并使用Apache骆驼将其写入s3,但是我面临的问题是我实际上不知道要在标头中设置的消息的大小(因为每个消息都是不同大小)。有没有一种方法可以将消息批量插入s3中,例如等待来自kafka的骆驼消耗10条消息,然后一次将10条消息插入s3存储桶中(例如批处理)。到目前为止,我到这里为止。
from("kafka:{{kafka-s3.configure.Producer}}?brokers={{kafka-s3.configure.brokers}}&groupId={{kafka-s3.configure.groupId}}")
.removeHeaders(".*")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
//some logic
}
})
.to("aws-s3://{{awsS3BucketName}}"
+ "?deleteAfterWrite=false®ion=eu-west-1"
+ "&accessKey={{awsaccessKey}}"
+ "&secretKey=RAW({{awsaccessKeySecret}})")
.log("done.");
到目前为止,我已经能够创建此文件,但无法批量处理骆驼中的消息。我想念什么?