从akka.stream.scaladsl.Source读取第一个字节

我正在尝试从akka.stream.scaladsl.Source[ByteString,Any]读取前16个字节并返回[Array[Byte],Source[ByteString,Any]]

读取前16个字节后,我想照常流式传输其余的Source

用例:

Source[ByteString,Any]是一个加密的流,该流中的前16个字节是初始化向量。我需要获取初始化向量才能解密流的其余部分。

这是我尝试过的:

Source.single(ByteString("This is my test string"))
      .prefixAndTail(16).runWith(Sink.head)

我想要类似的东西,但是prefixAndTail将元素数量作为输入。 元素数不是字节数。

如果您有任何建议,请告诉我。谢谢!

a962319828 回答:从akka.stream.scaladsl.Source读取第一个字节

以下示例对您的用例进行了一些假设:

  • ByteString中的第一个Source元素始终包含16字节的初始化矢量(在此将其称为“键”)。第一个元素中的剩余字节(即前16个字节以外的字节)可以用密钥解密。 (为简单起见,此示例将前三个字节视为键。)
  • 解密后的值为String
val b1 = ByteString.fromString("abcdef")
val b2 = ByteString.fromString("ghijkl")
val b3 = ByteString.fromString("mnopqr")
val b4 = ByteString.fromString("stuvwx")

val byteStringSource = Source(Vector(b1,b2,b3,b4))

// The first value in the tuple argument is the ByteString key,the second is
// the encrypted ByteString. Returns the original encrypted ByteString and the
// decrypted String as a Some (or None if the decryption fails).
def decrypt(keyAndEncrypted: (ByteString,ByteString)): (ByteString,Option[String]) = {
  // do fancy decryption stuff with the key
  (keyAndEncrypted._2,Option(keyAndEncrypted._2.utf8String.toUpperCase))
}

val decryptionFlow = Flow.fromFunction(decrypt)

val decryptedSource: Source[(ByteString,Option[String]),NotUsed] =
  byteStringSource
    .prefixAndTail(1)
    .map {
      case (prefix,tail) =>
        val (key,rest) = prefix.head.splitAt(3) // using head instead of headOption for simplicity
        (key,Source(Vector(rest)).concat(tail))
    }
    .collect { case (key,bSource) => bSource.map(b => (key,b)) }
    .flatMapConcat(identity)
    .via(decryptionFlow)

decryptedSource.runForeach {
  case (encrypted,decrypted) =>
    println((encrypted.utf8String,decrypted))
}

运行以上命令会显示以下内容:

(def,Some(DEF))
(ghijkl,Some(GHIJKL))
(mnopqr,Some(MNOPQR))
(stuvwx,Some(STUVWX))

在此示例中,我将ByteString中第一个Source的前三个字节用作键。该初始ByteString中的其余三个字节被前缀为Source的其余部分(尾部),然后转换所得的Source,以使密钥与每个{{1 }}元素。然后ByteString会通过Source进行展平和解密。 Flow返回原始加密的Flow和包含解密值的ByteString

希望这至少会为您的用例提供一些启发和想法。

,

需要注意的几件事:

  1. 由于输入源来自网络,因此某些ByteString可能为空
  2. 我们需要前16个字节来正确解密流的其余部分

我将在代码中留下一些注释作为解释。

source
    .via(Flow[ByteString].map(d => {
      // Converts Source[ByteString] to Source[List[Byte]]
      d.toByteBuffer.array().toList
    }))
    // Source[List[Byte]] to Source[Byte]
    .mapConcat(identity)
    // Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte],Source[Byte])
    .prefixAndTail(16)
    // Source[(Seq[byte],Source[Byte]) to Source[Source[(Seq[Byte],Array[Byte])]]
    .collect { case (key,source) =>      
      source.map(b => (key,Array(b)))
    }
    // Source[Source[(Seq[Byte],Array[Byte])]] to Source[(Seq[Byte],Array[Byte])]
    .flatMapConcat(identity)
    .runForeach {
      case (key,rest) =>
        println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
    }

包含空ByteString的测试示例:

val source = Source(Iterable[ByteString](
    ByteString(""),// empty ByteString to imitate empty element from database stream
    ByteString("abcdefghijklmnop <- first 16 bytes"))
  )

结果期望abcdefghijklmnop为前16个字节

abcdefghijklmnop :  
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :  
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :  
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :  
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s
,

好的旧Java来解救:

val ivBytesBuffer = new Array[Byte](16)
val is = new FileInputStream(fileName)
is.read(ivBytesBuffer)

val source = StreamConverters.fromInputStream(() => is)
decryptAes(source,keySpec,ivBytesBuffer)

Read First 4 Bytes of File

中所述
本文链接:https://www.f2er.com/2346514.html

大家都在问