我是Nifi的新手,正在尝试按以下流程进行POC。
我从Kakfa主题获取XML消息。我需要消耗从XML元素获取的少量XML消息和属性,这些属性和数据是GZIP压缩格式,GZIP解压缩数据(也是XML),然后加载到MySQL DB。我正在尝试此操作,并陷入了以下步骤。
(1)ConsumeKafka→(2)EvaluateXPath(flowfile-attribute =我将几个XML元素设置为flowfile-attributes,这对下游有用)→(3)EvaluateXPath(flowfile-content =使用 XPATH表达式=字符串(// ABC / data))→(4)UpdateAttribute(mime.type = application / gzip)→(5)CompressContent(压缩格式=使用mime.type属性而mode =解压缩)
我的CompressContent失败,并出现以下异常。
org.apache.nifi.processor.exception.ProcessException: IOException thrown from CompressContent[id=be4b9583-016e-1000-7cce-b9d822334c4c]: java.io.IOException: java.io.IOException: Input is not in the .gz format
这可能是因为我(3)EvaluateXPath的flowfile-content的数据类型设置为String。在馈送给CompressContent之前是否需要将String转换为字节?如果是,我如何使用某种函数toBytes()在同一(3)EvaluateXPath中做到这一点?
预先感谢您的帮助!