考虑下面两段功能完全一致的代码, 他们都启动了若干线程/进程并且通过一个队列传递了消息. 每个片段中, 首先一系列的线程/进程 task 会被启动, 接着每个 task 接受一个外部队列ch, 并在执行过程中向其发送一些数据. 接着, 等所有的任务都结束以后. 我们再读取队列里所有数据.

片段1:

import multiprocessing as mp
import threading
import queue
import time


def task(task_id: int, ch):
    print(f"task {task_id} started")
    ch.put(task_id)
    time.sleep(1)
    print(f"task {task_id} finished...")


if __name__ == "__main__":
    n_task = 5
    tasks = []
    use_mp = True
    if use_mp:
        gen_queue = mp.Queue
        gen_thread = mp.Process
        c = gen_queue(n_task)
        close_queue = c.close
    else:
        gen_queue = queue.Queue
        gen_thread = threading.Thread
        c = gen_queue(n_task)
        close_queue = c.task_done

    for i in range(n_task):
        task_p = gen_thread(target=task, args=(i, c))
        tasks.append(task_p)
        task_p.start()

    for tp in tasks:
        tp.join()

    while not c.empty():
        v = c.get()
        print(v)

    close_queue()

片段2:

package main

import (
	"fmt"
	"sync"
	"time"
)

func task(task_id int, wg *sync.WaitGroup, ch chan int) {
	s_start := fmt.Sprintf("task %v started", task_id)
	ch <- task_id
	fmt.Println(s_start)
	time.Sleep(1 * time.Second)
	fmt.Println(fmt.Sprintf("task %v done...", task_id))
	defer wg.Done()
}

func main() {
	var wg sync.WaitGroup
	n_task := 5
	c := make(chan int, n_task)
	for i := 0; i < n_task; i++ {
		wg.Add(1)
		go task(i, &wg, c)
	}
	wg.Wait()
	close(c)
	for v := range c {
		fmt.Println(v)
	}
	fmt.Println("all the tasks done...")
}

同步问题

从功能完全一致的片段1和片段2可以看出, 在Go里一旦让一个函数 go 起来了, 等于是自动把其变成了Thread或者Process (这个还稍有不同). 在 Python里, Thread或者Process 的同步是通过调用这俩对象本身的方法 startjoin 来完成的. 而在Go里这一步是透过 sync 来完成的.

Channel/队列

在Go中, 透过 make (chan type) 的方式创建channel c, 这个 c 就等价于 Python 中的 Queue. Go 中的 <- 则类似 Python``

Go语法 Python语法 功能 区别
make(chan int, max_size) Queue(max_size) 建立特定容量队列
c<-num c.put(num) 写入数据
v<-c c.get() 取出数据
close(c) c.close() (Queue) 或者c.task_done() (multiprocessing. Queue) 关闭队列 在Go中要首先调用close.否则的话会死锁, 因为那样就只有写入没有读取.