我想使用goroutines批处理来自不同日期的不同客户的请求。
我的意思是说50个消费者goroutine消耗了db中的所有客户,而2个日期消费者goroutine消耗了日期切片。
主要代码如下,但是它挂起了并且没有按预期退出。
为什么它没有按预期退出?
func Run(){
var syncWg sync.WaitGroup
syncWg.Add(1)
go SyncCustomerMetricsHistory(&syncWg)
syncWg.Wait()
}
func SyncCustomerMetricsHistory(wg *sync.WaitGroup){
defer wg.Done()
odb := orm.NewOrm()
start := time.Now()
logs.Info("start sync customer metrics,time:[%v]",start)
qs := odb.QueryTable("gg_customer")
var customers []*db.GgCustomer
if num,err := qs.All(&customers); err != nil || num == 0 {
logs.Error("Get customer error,rows:[%v],err:[%v]",num,err)
}
customersChan := make(chan *db.GgCustomer,50)
var wgC sync.WaitGroup
wgC.Add(50)
for i := 0; i < 50; i++ {
go syncCustomerMetricsHistory(customersChan,&wgC)
}
go func() {
for _,customer := range customers {
customersChan <- customer
}
close(customersChan)
}()
wgC.Wait()
}
func syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer,wg *sync.WaitGroup){
defer wg.Done()
for customer := range customerChan{
dateChan := make(chan string,2)
var wgD sync.WaitGroup
wgD.Add(2)
for i := 1; i < 2; i++{
go test(dateChan,customer,&wgD)
}
go func(){
for _,date := range GetallYearDate(){
dateChan <- date
}
close(dateChan)
}()
wgD.Wait()
}
}
}
func test(dateChan <- chan string,customer *db.GgCustomer,wg *sync.WaitGroup){
defer wg.Done()
for date := range dateChan{
fmt.Println(date,customer)
}
}
func GetallYearDate() []string{
return []string{"2019-10-01","2019-10-02"}
}