尝试使用go通道和worker组模式来模拟CDN服务器worker。
func main() {
var wg sync.WaitGroup
fileJobs := make(chan string)
compress := make(chan CompressionStatus)
upload := make(chan UploadResult)
// get list of files to distribute from a text file. This is usually 1-1.5 Gb,just filenames
go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/images.txt",fileJobs)
go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/videos.txt",fileJobs)
go compressFile(fileJobs,compress)
go compressFile(fileJobs,compress)
wg.Add(3)
go uploadToAll(compress,upload,&wg)
go uploadToAll(compress,&wg)
wg.Wait()
close(fileJobs)
close(compress)
close(upload)
}
func listFiles(filename string,c chan<- string) {
file,err := os.OpenFile(filename,os.O_RDONLY,os.ModePerm)
if err != nil {
panic("file not found")
}
defer file.Close()
r := bufio.NewScanner(file)
for r.Scan() {
c <- r.Text()
}
}
type CompressionStatus struct {
file string
compressed string
status bool
}
func compressFile(fileJob <-chan string,out chan<- CompressionStatus) {
for fileName := range fileJob {
fmt.Printf("compressing %s\n",fileName)
fib(25) // calculate fibonnaci number,keep cpu busy
fmt.Printf("compressed %s\n",fileName)
out <- CompressionStatus{
file: fileName,compressed: fileName + ".compressed",status: true,}
}
}
func uploadToAll(comressedFile <-chan CompressionStatus,result chan<- UploadResult,wg *sync.WaitGroup) {
for fileName := range comressedFile {
go func() {
result <- US(fileName.compressed)
}()
go func() {
result <- IND(fileName.compressed)
}()
}
wg.Done()
}
type UploadResult string
type UploadServer func(region string) UploadResult
var (
US = fileUploader("US")
IND = fileUploader("IND")
)
// can't change construct
func fileUploader(region string) UploadServer {
sleep := time.Millisecond * 500
if region == "IND" {
sleep *= 4
}
return func(fileName string) UploadResult {
fmt.Printf("uploading %s to server %s\n",fileName,region)
time.Sleep(sleep)
fmt.Printf("upload %s completed to server %s\n",region)
return UploadResult(region)
}
}
尽管代码有效,但完成后会显示类似的异常。当我们关闭从通道接收的功能而发送循环仍处于活动状态时,这似乎是一个例外。
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc00009a004)
/usr/local/Cellar/go/1.13.3/libexec/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00009a004)
/usr/local/Cellar/go/1.13.3/libexec/src/sync/waitgroup.go:130 +0x64
main.main()
/**/file_compressor.go:34 +0x2e1
goroutine 20 [chan receive]:
main.compressFile(0xc000074060,0xc0000740c0)
/**/file_compressor.go:75 +0x230
created by main.main
/**/file_compressor.go:22 +0x13e
有些困惑的是解决此问题的最佳方法。 另外,这是假设要在8核计算机上运行,我为其cpu密集型进程添加了6个go worker进行压缩。想知道我们是否可以优化它或使代码更简洁。