使用工人组的并发

尝试使用go通道和worker组模式来模拟CDN服务器worker。

func main() {
    var wg sync.WaitGroup
    fileJobs := make(chan string)
    compress := make(chan CompressionStatus)
    upload := make(chan UploadResult)

// get list of files to distribute from a text file. This is usually 1-1.5 Gb,just filenames
    go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/images.txt",fileJobs)
    go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/videos.txt",fileJobs)

    go compressFile(fileJobs,compress)
    go compressFile(fileJobs,compress)

    wg.Add(3)
    go uploadToAll(compress,upload,&wg)
    go uploadToAll(compress,&wg)

    wg.Wait()
    close(fileJobs)
    close(compress)
    close(upload)
}

func listFiles(filename string,c chan<- string) {
    file,err := os.OpenFile(filename,os.O_RDONLY,os.ModePerm)
    if err != nil {
        panic("file not found")
    }
    defer file.Close()
    r := bufio.NewScanner(file)
    for r.Scan() {
        c <- r.Text()
    }
}

type CompressionStatus struct {
    file       string
    compressed string
    status     bool
}

func compressFile(fileJob <-chan string,out chan<- CompressionStatus) {
    for fileName := range fileJob {
        fmt.Printf("compressing %s\n",fileName)
        fib(25) // calculate fibonnaci number,keep cpu busy
        fmt.Printf("compressed %s\n",fileName)
        out <- CompressionStatus{
            file:       fileName,compressed: fileName + ".compressed",status:     true,}
    }
}

func uploadToAll(comressedFile <-chan CompressionStatus,result chan<- UploadResult,wg *sync.WaitGroup) {
    for fileName := range comressedFile {
        go func() {
            result <- US(fileName.compressed)
        }()
        go func() {
            result <- IND(fileName.compressed)
        }()
    }
    wg.Done()
}

type UploadResult string

type UploadServer func(region string) UploadResult

var (
    US  = fileUploader("US")
    IND = fileUploader("IND")
)

// can't change construct
func fileUploader(region string) UploadServer {
    sleep := time.Millisecond * 500
    if region == "IND" {
        sleep *= 4
    }
    return func(fileName string) UploadResult {
        fmt.Printf("uploading %s to server %s\n",fileName,region)
        time.Sleep(sleep)
        fmt.Printf("upload %s completed to server  %s\n",region)
        return UploadResult(region)
    }
}

尽管代码有效,但完成后会显示类似的异常。当我们关闭从通道接收的功能而发送循环仍处于活动状态时,这似乎是一个例外。

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc00009a004)
        /usr/local/Cellar/go/1.13.3/libexec/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00009a004)
        /usr/local/Cellar/go/1.13.3/libexec/src/sync/waitgroup.go:130 +0x64
main.main()
        /**/file_compressor.go:34 +0x2e1

goroutine 20 [chan receive]:
main.compressFile(0xc000074060,0xc0000740c0)
        /**/file_compressor.go:75 +0x230
created by main.main
        /**/file_compressor.go:22 +0x13e

有些困惑的是解决此问题的最佳方法。 另外,这是假设要在8核计算机上运行,​​我为其cpu密集型进程添加了6个go worker进行压缩。想知道我们是否可以优化它或使代码更简洁。

zxcvbnmasd1234 回答:使用工人组的并发

compressFile正在等待从fileJob进行读取,但是没有写入的goroutine。 uploadToAll也正在从compressedFile等待,但这不会发生,因此将不会调用wg.Done。因此,主goroutine也正在等待wg.Wait。这意味着,所有goroutine都在等待某事,但是它们都没有在进行,所以陷入了僵局。

处理此问题的一种方法是在完成写入后关闭通道。这将终止从通道读取的for循环。由于您要从多个goroutine写入同一通道,因此您实际上并不知道何时完成操作。您可以为listFiles goroutine添加新的等待组,然后等待这两个等待组完成,然后关闭通道。对于compressuploadAll组来说是相同的。您可以执行以下操作:

wgList.Add(2)
go func() {
  wgList.Wait()
  close(fileJobs)
}()
go listFiles(&wgList,...)
...

别忘了这个:

func listFiles(wgList *sync.WaitGroup,...) {
  defer wgList.Done()
  ...
}

对于其他群体,......

本文链接:https://www.f2er.com/3105841.html

大家都在问