微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将多条消息发送到一个通道以进行整理

如何解决将多条消息发送到一个通道以进行整理

我正在努力思考 go。我想做一个简单的程序,基本上

  1. 启动一堆 goroutine
  2. 处理消息
  3. 将处理后的结果发送到通道
  4. 让主线程收集这些结果
  5. 关闭

看起来很简单。我开始时完全没有逻辑。我只是发送一个号码并尝试取回该号码。

问题:我陷入了僵局,我不知道为什么。我想我可能会误用带通道的等待组,因为它们是单独工作的,但我不确定如何让主线程阻塞任意数量的启动 go 例程。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    queue := make(chan int)
    start := time.Now()
    var wg sync.WaitGroup

    for i := 0; i < 10; i += 1 {
        wg.Add(1)
        go count(i,queue,&wg)
    }

    wg.Wait()

    for value := range queue {
        println(value)
    }

    close(queue)

    fmt.Println(time.Now().Sub(start))
    // fmt.Println(summation)
}

func count(number int,queue chan int,wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("Starting ",number)
    queue <- number
    fmt.Println("ending")

}

解决方法

你的 goroutine 阻塞在 queue <- number 上,因为 queue 是一个无缓冲通道,没有人读取它,作为 wg.Wait 上的主要阻塞。

改为将 queue 声明为缓冲通道。例如:queue := make(chan int,10)

Go Tour (concurrency) 和后续页面:

默认情况下,发送和接收阻塞,直到对方准备好。这允许 goroutine 在没有显式锁或条件变量的情况下进行同步。

仅当缓冲区已满时才发送到缓冲通道块。缓冲区为空时接收块。

或者,在 wg.Wait 循环之后移动 for v := range queue

,

这应该会有所帮助。

package main

import (
    "fmt"
    "sync"
    "time"
)

type event struct {
    data      chan string
    numWorker int
}

func (e event) Send() {
    var wg sync.WaitGroup
    // Spaw numWorker goroutines that sends message to
    // the same channel.
    for i := 0; i < e.numWorker; i++ {
        wg.Add(1)
        go func(id int) {
            // Do some fake work
            time.Sleep(1 * time.Second)
            e.data <- fmt.Sprintf("message from go #%d",id)
            wg.Done()
        }(i)
    }
    // Wait for goroutines to finish their work.
    wg.Wait()
    // Close the channel to signal Recv to stop ranging
    // over the channel.
    close(e.data)
}

func (e event) Recv() {
    // Range over the data channel to receive message(s).
    for msg := range e.data {
        fmt.Println(msg)
    }
}

func main() {
    e := event{
        numWorker: 10,// Number of worker goroutine(s)
        data:      make(chan string,5 /* Buffer Size */),}
    // Spawn a goroutine for Send
    go e.Send()
    // Recv receives data from Send
    e.Recv()
}
,

为了避免死锁,你可以在单独的 goroutine 中管理通道和等待组。尝试更改:

    wg.Wait()

    for value := range queue {
        println(value)
    }

    close(queue)

这样:

    go func() {
        wg.Wait()
        close(queue)
    }()
    for value := range queue {
        println(value)
    }
    

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。