以下示例对您的用例进行了一些假设:
-
ByteString
中的第一个Source
元素始终包含16字节的初始化矢量(在此将其称为“键”)。第一个元素中的剩余字节(即前16个字节以外的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为键。)
- 解密后的值为
String
。
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")
val byteStringSource = Source(Vector(b1,b2,b3,b4))
// The first value in the tuple argument is the ByteString key,the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString,ByteString)): (ByteString,Option[String]) = {
// do fancy decryption stuff with the key
(keyAndEncrypted._2,Option(keyAndEncrypted._2.utf8String.toUpperCase))
}
val decryptionFlow = Flow.fromFunction(decrypt)
val decryptedSource: Source[(ByteString,Option[String]),NotUsed] =
byteStringSource
.prefixAndTail(1)
.map {
case (prefix,tail) =>
val (key,rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
(key,Source(Vector(rest)).concat(tail))
}
.collect { case (key,bSource) => bSource.map(b => (key,b)) }
.flatMapConcat(identity)
.via(decryptionFlow)
decryptedSource.runForeach {
case (encrypted,decrypted) =>
println((encrypted.utf8String,decrypted))
}
运行以上命令会显示以下内容:
(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))
在此示例中,我将ByteString
中第一个Source
的前三个字节用作键。该初始ByteString
中的其余三个字节被前缀为Source
的其余部分(尾部),然后转换所得的Source
,以使密钥与每个{{1 }}元素。然后ByteString
会通过Source
进行展平和解密。 Flow
返回原始加密的Flow
和包含解密值的ByteString
。
希望这至少会为您的用例提供一些启发和想法。
,
需要注意的几件事:
- 由于输入源来自网络,因此某些ByteString可能为空
- 我们需要前16个字节来正确解密流的其余部分
我将在代码中留下一些注释作为解释。
source
.via(Flow[ByteString].map(d => {
// Converts Source[ByteString] to Source[List[Byte]]
d.toByteBuffer.array().toList
}))
// Source[List[Byte]] to Source[Byte]
.mapConcat(identity)
// Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte],Source[Byte])
.prefixAndTail(16)
// Source[(Seq[byte],Source[Byte]) to Source[Source[(Seq[Byte],Array[Byte])]]
.collect { case (key,source) =>
source.map(b => (key,Array(b)))
}
// Source[Source[(Seq[Byte],Array[Byte])]] to Source[(Seq[Byte],Array[Byte])]
.flatMapConcat(identity)
.runForeach {
case (key,rest) =>
println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
}
包含空ByteString的测试示例:
val source = Source(Iterable[ByteString](
ByteString(""),// empty ByteString to imitate empty element from database stream
ByteString("abcdefghijklmnop <- first 16 bytes"))
)
结果期望abcdefghijklmnop
为前16个字节
abcdefghijklmnop :
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s
,
好的旧Java来解救:
val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)
val source = StreamConverters.fromInputStream(() => is)
decryptAes(source,keySpec,ivBytesBuffer)
如Read First 4 Bytes of File
中所述
本文链接:https://www.f2er.com/2346514.html