微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

golang总结-并发

2.7 并发编程

go协程

golang 通过一个go关键字就可以开启一个协程。

func main() {
    //两个交错输出
    go sayHello()
    go sayHello2()
    time.Sleep(time.Second * 3) //阻塞主线程
}

func sayHello() {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
}

func sayHello2() {
    for i := 0; i < 30; i++ {
        fmt.Println("你好中国")
    }
}
//通过sync.WaitGroup来等待所有线程完成
package main

import (
    "fmt"
    "sync"
)

func main() {
    var w = &sync.WaitGroup{}
    w.Add(2)
    go sayEn(w)
    go sayZh(w)
    w.Wait()
}

func sayEn(w *sync.WaitGroup) {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
    w.Done() //每当这个方法完成则减少1
}

func sayZh(w *sync.WaitGroup) {
    for i := 0; i < 30; i++ {
        fmt.Println("中国你好")
    }
    w.Done() //每当这个方法完成则减少1
}

go管道

管道的定义:

//无缓冲管道
flag := make(chan bool)
//有缓冲管道
data := make(chan int,10)
//向管道中添加值
data <- 10
//从管道中取值
agr := <- data
<- data //也可以直接释放值,不用变量接收

1. 通过go实现同步

package main

import (
    "fmt"
)

func main() {
    w1,w2 := make(chan bool),make(chan bool)
    go sayEn_chan(w1)
    go sayZh_chan(w2)
    <- w1 //阻塞,直到chan 可以取出数据
    <- w2
}

func sayEn_chan(w chan bool) {
    for i := 0; i < 30; i++ {
        fmt.Println("hello world")
    }
    w <- true //方法完成写入通道
}

func sayZh_chan(w chan bool) {
    for i := 0; i < 30; i++ {
        fmt.Println("中国你好")
    }
    w <- true
}

2. 正确处理累加

package main

import (
    "fmt"
    "sync/atomic"
)

var (
    count int64
)

func main() {
    w1,make(chan bool)
    go add(w1)
    go add(w2)
    <- w1 //阻塞,直到chan 可以取出数据
    <- w2
    fmt.Println(count)
}

func add(w chan bool) {
    for i := 0; i < 5000; i++ {
        atomic.AddInt64(&count,1)
    }
    w <- true
}

3. 通道实现数据共享

package main

import (
    "fmt"
    "math/rand"
    "sync"
)

var wg sync.WaitGroup

func main() {
    count := make(chan int)
    wg.Add(2)
    go player("张三",count)
    go player("李四",count)
    //发球
    count <- 1
    wg.Wait() //阻塞等待2个线程完成
}

func player(name string,count chan int) {
    defer wg.Done()

    for {
        i,ok := <-count

        if !ok { //通道关闭
            fmt.Printf("运动员 %s 赢了\n",name)
            return
        }

        tmp := rand.Intn(100)
        if tmp % 13 == 0 { //没有接到球
            fmt.Printf("运动员 %s 输了\n",name)
            close(count)
            return
        }
        fmt.Printf("运动员 %s 击球 %d \n",name,i)
        i ++
        count <- i
    }
}

4. 缓冲管道

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    numberTasks = 10
    workers = 4
)

var wg2 sync.WaitGroup

func main() {
    wg2.Add(workers)
    tasks := make(chan int,numberTasks)

    for i := 0; i < workers; i++ {
        go work(tasks,i)
    }

    for j := 1; j <= numberTasks; j++ {
        tasks <- j
    }
    close(tasks)

    wg2.Wait()
}

func work(tasks chan int,worker int) {
    defer wg2.Done()
    for {
        task,ok := <- tasks
        if !ok {
            fmt.Printf("任务完成,工号:%d\n",worker)
            return
        }
        fmt.Printf("工号:%d,开始工作:%d\n",worker,task)
        time.Sleep(time.Microsecond * 100)
        fmt.Printf("工号:%d,完成工作:%d\n",task)

    }

}

5. select

select 的特点是:不会阻塞,哪个管道有值,我取哪个。所以,下面当运行到go的时候,a,b还没有添值,所以只能选择defaul运行,这里可以把defualt部分和b<-2去掉,select会被阻塞,直到a<-1执行

func main() {
    a := make(chan int)
    b := make(chan int)
    go func() {
        b <- 2
        time.Sleep(time.Second * 3)
        a <- 1
    }()
    select {
    case <- a:
        fmt.Println("a")
    case <- b:
        fmt.Println("b")
        time.Sleep(time.Second * 3)
    default:
        fmt.Println("hello world")
    }
}

6. runner并发模型

package runner

import (
    "errors"
    "os"
    "os/signal"
    "time"
)

type Runner struct {
    interrupt chan os.Signal

    complete chan error

    timeout <-chan time.Time //声明一个只读的管道

    tasks []func(int)
}

var ErrorTimeout = errors.New("receive timeout")

var ErrorInterrupt = errors.New("interrupt error")

func New(duration time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal,1),complete: make(chan error),timeout: time.After(duration),}
}

func (r *Runner) Add(tasks...func(int)) {
    r.tasks = append(r.tasks,tasks...)
}

func (r *Runner) getInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}

func (r *Runner) run() error {
    for id,task := range r.tasks {
        if r.getInterrupt() {
            return ErrorInterrupt
        }
        task(id)
    }
    return nil
}

func (r *Runner) Start() error {
    signal.Notify(r.interrupt,os.Interrupt)
    go func() {
        r.complete <- r.run()
    }()
    
    select {
    case err := <- r.complete:
        return err
    case <- r.timeout:
        return ErrorTimeout
    }
}

测试

package main

import (
    "gorounting/runner"
    "log"
    "os"
    "time"
)

const (
    timeout  = 4 * time.Second
)

func main() {
    log.Println("任务开始")
    ru := runner.New(timeout)
    ru.Add(createTask(),createTask(),createTask())

    if err := ru.Start(); err != nil {
        switch err {
        case runner.ErrorInterrupt:
            log.Println("系统被中断")
            os.Exit(1)
        case runner.ErrorTimeout:
            log.Println("系统超时")
            os.Exit(2)

        }
    }
    log.Println("程序结束")

}

func createTask() func(int) {
    return func(id int) {
        log.Printf("process-task #%d\n",id)
        time.Sleep(time.Duration(id) * time.Second )
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐