在某些情况下,我需要扇出发送到同一频道的接收器:
func MessagesFromSQS(ctx context.Context,sqsClient sqsiface.SQSAPI) chan *sqs.Message {
messages := make(chan *sqs.Message)
go func() {
defer close(messages)
wg := sync.WaitGroup{}
for i := 0; i < parallelSQSReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// ...
for _,message := range result.Messages {
messages <- message
}
}
}
}()
}
wg.Wait()
}()
return messages
}
对我来说这很有意义。但是,竞赛检测器抱怨说,不同的goroutines以及发送和关闭通道。我意识到负责发送的goroutine应该与关闭的goroutine相同,但是正确的方法是什么?
编辑/已解决:感谢您的回复。事实证明,我没有正确读取种族探测器堆栈跟踪。我以为我更改的代码引入了错误,而不是发现SQS模拟中的错误。一旦我正确同步了ReceiveMessage()
,就可以了。