Bufio Scanner Goroutine-截断/无序输出

我正在编写一个程序,该程序从CLI命令读取stderr并将stderr日志通过gRPC流传输到客户端。

cmd的实例化如下(CLI命令需要我以stdin传递的配置):

ctxTimeout,cancel := context.WithTimeout(context.Background(),time.Duration(t)*time.Second)

defer cancel()

cmd := exec.CommandContext(ctxTimeout,"java","-jar","/opt/myapp/myapp.jar","scan","-config","-",)

cmd.Stdin = config

我正在使用两个单独的缓冲区:一个用于将stderr“实时”流式传输到客户端,另一个用于将日志持久存储在数据库中。为此,我使用io.MultiWriter并将其映射到cmd stdin:

bufStream := bytes.NewBuffer(make([]byte,4096))

bufPersist := new(bytes.Buffer)


stderr := io.MultiWriter(bufStream,bufPersist)

// Map the command Standard Error Output to the multiwriter
cmd.Stderr = stderr

最后,在启动命令之前,我下面有一个goroutine,该例程使用bufio.Scanner来读取stderr缓冲区并通过gRPC逐行传输流:

// Go Routine to stream the scan job logs
go func() {
    for {
        select {
        case <-done:
            return
        default:
            scanner := bufio.NewScanner(bufStream)
            for scanner.Scan() {
                time.Sleep(3 * time.Second)
                logging.MyAppLog("warning","%v",scanner.Text())
                _ = stream.Send(&agentpb.ScanResultsresponse{
                    ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{ScanLogs: scanner.Bytes()},},)
            }

        }
    }

}()
err := cmd.Run()

done <- true

我的问题是我必须在goroutine中使用time.sleep(time.Seconds * 3)才能获得正确的输出。如果不是,我得到的输出顺序不正确并被截断。

我认为这是由于io.multiwriter和bufio.scanner并非“同步”造成的,但我希望获得有关最佳方法的一些指导。

谢谢。

lmy87488860 回答:Bufio Scanner Goroutine-截断/无序输出

From the Scanner docs

  

Bytes返回通过调用Scan生成的最新令牌。 基础数组可能指向后续调用Scan 会覆盖的数据。它没有分配。

gRPC进行自己的缓冲。这意味着,当Send返回时,字节不一定已写入线路,并且下一个Scan调用将修改尚未写入的字节。

复制Scan返回的字节,就可以了:

for scanner.Scan() {
    b := append([]byte(nil),scanner.Bytes()...)
    stream.Send(&agentpb.ScanResultsResponse{
        ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{
            ScanLogs: b,},})
}
本文链接:https://www.f2er.com/3167665.html

大家都在问