程序因频道挂起

我想使用goroutines批处理来自不同日期的不同客户的请求。

我的意思是说50个消费者goroutine消耗了db中的所有客户,而2个日期消费者goroutine消耗了日期切片。

主要代码如下,但是它挂起了并且没有按预期退出。

为什么它没有按预期退出?

func Run(){
    var syncWg sync.WaitGroup
    syncWg.Add(1)
    go SyncCustomerMetricsHistory(&syncWg)
    syncWg.Wait()
}

func SyncCustomerMetricsHistory(wg *sync.WaitGroup){
    defer wg.Done()
    odb := orm.NewOrm()
    start := time.Now()
    logs.Info("start sync  customer metrics,time:[%v]",start)

    qs := odb.QueryTable("gg_customer")
    var customers []*db.GgCustomer
    if num,err := qs.All(&customers); err != nil || num == 0 {
        logs.Error("Get customer error,rows:[%v],err:[%v]",num,err)
    }

    customersChan := make(chan *db.GgCustomer,50)

    var wgC sync.WaitGroup
    wgC.Add(50)
    for i := 0; i < 50; i++ {
        go syncCustomerMetricsHistory(customersChan,&wgC)
    }

    go func() {
        for _,customer := range customers {
            customersChan <- customer
        }
        close(customersChan)
    }()

    wgC.Wait()
}

func  syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer,wg *sync.WaitGroup){
    defer wg.Done()
    for customer := range customerChan{
            dateChan := make(chan string,2)
            var wgD sync.WaitGroup
            wgD.Add(2)
            for i := 1; i < 2; i++{
                go test(dateChan,customer,&wgD)
            }
            go func(){
                for _,date := range GetallYearDate(){
                    dateChan <- date
                }
                close(dateChan)
            }()

            wgD.Wait()
        }
    }
}

func test(dateChan <- chan string,customer *db.GgCustomer,wg *sync.WaitGroup){
    defer wg.Done()
    for date := range dateChan{
        fmt.Println(date,customer)
    }
}


func  GetallYearDate()  []string{
  return []string{"2019-10-01","2019-10-02"}
}
caoyue7758521 回答:程序因频道挂起

我没有尝试运行此命令(因为它需要其他代码),但相信您的问题是:

wgD.Add(2)
for i := 1; i < 2; i++{
 go test(dateChan,customer,&wgD)
}

该for循环只会迭代一次,但您调用了wgD.Add(2)(我想您可能是说该循环要迭代两次;请尝试i <= 2)。

另一点反馈;您使用等待组的方式可以工作,但是很难遵循(可能导致您未发现问题);怎么样:

func Run(){
    SyncCustomerMetricsHistory()  // No wait group needed as this will not return before done
}

func SyncCustomerMetricsHistory(){
    odb := orm.NewOrm()
    start := time.Now()
    logs.Info("start sync  customer metrics,time:[%v]",start)

    qs := odb.QueryTable("gg_customer")
    var customers []*db.GgCustomer
    if num,err := qs.All(&customers); err != nil || num == 0 {
        logs.Error("Get customer error,rows:[%v],err:[%v]",num,err)
    }

    customersChan := make(chan *db.GgCustomer,50)

    var wgC sync.WaitGroup
    wgC.Add(50)
    for i := 0; i < 50; i++ {
        go func() {
            syncCustomerMetricsHistory(customersChan)
            wgC.Done()
        }()
    }

    go func() {
        for _,customer := range customers {
            customersChan <- customer
        }
        close(customersChan)
    }()
    wgC.Wait()
}



func  syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer){
    for customer := range customerChan{
            dateChan := make(chan string,2)
            var wgD sync.WaitGroup
            wgD.Add(2)
            for i := 1; i < 2; i++{
                go func() {
                    test(dateChan,customer)
                    wgD.Done()
                }()
            }
            go func(){
                for _,date := range GetAllYearDate(){
                    dateChan <- date
                }
                close(dateChan)
            }()
            wgD.Wait()
        }
    }
}

我认为这很容易理解,因为您可以看到在哪里调用了wg.Done()。在任一侧粘贴一些fmt.Println命令也非常容易,这使得调试此类问题更加容易。

本文链接:https://www.f2er.com/3139648.html

大家都在问