Go 併發模型:管道和取消

類別: IT

簡介

Golang的原子併發特性使得它很容易構造流資料管道,這使得Golang可有效的使用I/O和多CPU特性。本文提出一些關於管道的示例,在這個過程中突出了操作失敗的微妙之處和介紹處理失敗的具體技術。

什麼是管道

在Golang對於管道沒有明確的定義;它只是許多種併發程式中的一種。管道是通道連線的一系列階段, 每個階段是一組goroutine執行相同的功能。在每個階段,goroutine執行步驟為:

  • 從上游經過入境通道接受值

  • 對資料執行一些功能操作,通常會產生新的值

  • 從下游經過出通道傳送值

除了開始和最後階段只有一個入境通道或者一個出境通道外,其他每個階段有任意數量的入境通道和出境通道,。開始階段有時又稱為源或者生產者;最後一個階段又稱為sink或者消費者。

我們將開始一個簡單的示例來解釋管道的思想和技術。稍後,我們將展示更多相關的例子。

平方數

一個通道有三個階段。

第一階段:gen,以從列表讀出整數的方式轉換整數列表到一個通道。gen函式開始goroutine後, 在通道上傳送整數並且在在所有的值被髮送完後將通道關閉:

func gen(nums ...int) <-chan int {    out := make(chan int)    go func() {        for _, n := range nums {            out <- n        }        close(out)    }()    return out}

第二階段:sq,從通道接受整數,然後將接受到的每個整數值的平方後返回到一個通道 。在入境通道關閉和傳送所有下行值的階段結束後,關閉出口通道:

func sq(in <-chan int) <-chan int {    out := make(chan int)    go func() {        for n := range in {            out <- n * n        }        close(out)    }()    return out}

main函式建立了通道並執行最後一個階段:它接受來自第二階段的值並列印出每個值,直到通道關閉:

func main() {    // Set up the pipeline.    c := gen(2, 3)    out := sq(c)    // Consume the output.    fmt.Println(<-out) // 4    fmt.Println(<-out) // 9}

由於sq有相同型別的入境和出境通道,我們可以寫任意次。我們也可以重寫main函式,像其他階段一樣做一系列迴圈 :

func main() {    // Set up the pipeline and consume the output.    for n := range sq(sq(gen(2, 3))) {        fmt.Println(n) // 16 then 81    }}

扇出,扇入

扇出(fan-out):多個函式能從相同的通道中讀資料,直到通道關閉;這提供了一種在一組“人員”中分發任務的方式,使得CPU和I/O的並行處理.

扇入(fan-in):一個函式能從多個輸入中讀取並處理資料,而這多個輸入通道對映到一個單通道,該單通道隨著所有輸入的結束而關閉。

我們可以改變通道去執行兩個sq例項,每個例項從相同的輸入通道讀取資料。我們引入了一個新函式merge去扇入結果:

func main() {    in := gen(2, 3)    // Distribute the sq work across two goroutines that both read from in.    c1 := sq(in)    c2 := sq(in)    // Consume the merged output from c1 and c2.    for n := range merge(c1, c2) {        fmt.Println(n) // 4 then 9, or 9 then 4    }}

merge函式通過為每一個入境通道開啟一個goroutine去複製數值到唯一的出境通道,從而實現了轉換通道列表到一個單通道 。一旦所有的output goroutine啟動,所有在通道上的傳送完成後merge函式開啟一個以上的goroutine用於關閉出境通道。

在一個關閉的通道上傳送沒有意義,所以在關閉之前確保所有的傳送完成是重要的。sync.WaitGroup型別提供了一個簡單的方法去組織同步:

func merge(cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)    // Start an output goroutine for each input channel in cs.  output    // copies values from c to out until c is closed, then calls wg.Done.    output := func(c <-chan int) {        for n := range c {            out <- n        }        wg.Done()    }    wg.Add(len(cs))    for _, c := range cs {        go output(c)    }    // Start a goroutine to close out once all the output goroutines are    // done.  This must start after the wg.Add call.    go func() {        wg.Wait()        close(out)    }()    return out}

短暫停止

管道函式模型:

  • 當所有的傳送操作結束後, 階段關閉他們的出境通道。

  • 階段持續接收來自入境通道的值,直到那些通道關閉。

這個模型允許每一個接收階段通過range迴圈的寫資料,確保一旦所有向下遊傳送的值傳送成功,所有的goroutine退出。 

但在一個真實的管道上,階段並不總是接收所有的入境值。有時設計是這樣的:接收者可能只需要一個子集值就能取得進展。更多時候是一個階段早早的退出,因為一個入境值代表一個早期階段的錯誤。 在這兩種情況下接收者不應該等待剩餘的值到達,我們想要早期階段停止產生後續階段不需要的值。

在我們的示例中,如果一個階段不能處理所有的入境值,那麼試圖傳送這些值得goroutine將無限期的阻塞:

// Consume the first value from output.    out := merge(c1, c2)    fmt.Println(<-out) // 4 or 9    return    // Since we didn't receive the second value from out,    // one of the output goroutines is hung attempting to send it.}

這是一個資源鎖:goroutine消耗記憶體和執行資源,並且在goroutine棧中的堆引用防止資料被回收。Goroutine不能垃圾回收;它們必須自己退出。

當下遊階段在接收所有的入境值失敗後,我們需要安排管道的上游階段退出。一種實現方法是將出境通道改為一個緩衝區。該緩衝區能儲存固定數量的值;如果緩衝區有空閒就立即傳送操作完成訊號:

c := make(chan int, 2) // buffer size 2c <- 1  // succeeds immediatelyc <- 2  // succeeds immediatelyc <- 3  // blocks until another goroutine does <-c and receives 1

當在通道建立就預先知道待傳送的數值個數時,通過使用緩衝區可以簡化程式碼。例如,我們可以重寫  gen 來將整數列表複製到帶有緩衝區的通道中,也可以避免建立一個新的 goroutine:

func gen(nums ...int) <-chan int {    out := make(chan int, len(nums))    for _, n := range nums {        out <- n    }    close(out)    return out}

回到管道中處於阻塞狀態的 goroutine,我們可以考慮為 merge 返回的出境通道增加一個緩衝區:

func merge(cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int, 1) // enough space for the unread inputs    // ... the rest is unchanged ...

儘管它修正了程式中 goroutine 的阻塞問題,但卻不能稱為好程式碼。在這裡,緩衝區大小選取為 1,取決於預知 merge 將會接收的數值個數及下游各階段將會消費的數值個數。這很脆弱:如果我們給 gen 多傳了一個數值,或者下游階段少讀了一些數值,goroute 的阻塞問題會再次出現。

作為代替,我們需要為下游各階段提供一種手段,來向傳送方表明指明它們將停止接收資料的輸入。

顯式取消

當main沒有接受完out所有的值就決定退出時,它必須告知上游狀態(upstream stage)的goroutines,讓它丟棄正在傳送中的資料。通過在一個叫做done的channel上傳送資料,即可實現。例子裡有兩個受阻的傳送方,所以傳送的值有兩組:

func main() {    in := gen(2, 3)    // Distribute the sq work across two goroutines that both read from in.    c1 := sq(in)    c2 := sq(in)    // Consume the first value from output.    done := make(chan struct{}, 2)    out := merge(done, c1, c2)    fmt.Println(<-out) // 4 or 9    // Tell the remaining senders we're leaving.    done <- struct{}{}    done <- struct{}{}}

使用select語句,讓傳送中的goroutines取代了傳送操作。這條語句既可以處理在傳送out的情形,也可以處理從done中接受一個值的情況。done的值型別是空結構,因為它的數值並不重要:它是一個接受事件,表明out的傳送應該被丟棄。output goroutines繼續在channel c內迴圈執行,而不會阻塞上游狀態(upstream stage):

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {    var wg sync.WaitGroup    out := make(chan int)    // Start an output goroutine for each input channel in cs.  output    // copies values from c to out until c is closed or it receives a value    // from done, then output calls wg.Done.    output := func(c <-chan int) {        for n := range c {            select {            case out <- n:            case <-done:            }        }        wg.Done()    }    // ... the rest is unchanged ...

但是這種方法有個問題:下游的接收者需要知道潛在會被阻塞的上游傳送者的數量。追蹤這些數量不僅枯燥,還容易出錯。

我們要有一個方法告知一個未知的、無限數量的go程式向下遊傳送它們的值。在GO裡面我們通過關閉一個通道來實現,因為一個在已關閉通道上的接收操作總能立即執行,並返回該元素型別的零值。

這意味著main函式只需關閉“done”通道就能開啟所有傳送者。close實際上是傳給傳送者的一個廣播訊號。我們擴充套件每一個管道函式接收“done”引數並通過一個“defer”語句觸發“close”,這樣所有來自main的返回路徑都會以訊號通知管道退出。

func main() {    // Set up a done channel that's shared by the whole pipeline,    // and close that channel when this pipeline exits, as a signal    // for all the goroutines we started to exit.    done := make(chan struct{})    defer close(done)    in := gen(done, 2, 3)    // Distribute the sq work across two goroutines that both read from in.    c1 := sq(done, in)    c2 := sq(done, in)    // Consume the first value from output.    out := merge(done, c1, c2)    fmt.Println(<-out) // 4 or 9    // done will be closed by the deferred call.}

管道里的每個狀態現在都可以隨意的提早退出了:sq可以在它的迴圈中退出,因為我們知道如果done已經被關閉了,也會關閉上游的gen狀態。sq通過defer語句,保證不管從哪個返回路徑,它的outchannel 都會被關閉。

func sq(done <-chan struct{}, in <-chan int) <-chan int {    out := make(chan int)    go func() {        defer close(out)        for n := range in {            select {            case out <- n * n:            case <-done:                return            }        }    }()    return out}

下面列出了構建管道的指南:

  • 狀態會在所有傳送操作做完後,關閉它們的流出 channel

  • 狀態會持續接收從流入 channel 輸入的數值,直到 channel 關閉或者其傳送者被釋放。

管道要麼保證足夠能存下所有傳送資料的緩衝區,要麼接收來自接收者明確的要放棄 channel 的訊號,來保證釋放傳送者。

生成樹的摘要

讓我們考慮一個更真實的管道情況。

MD5摘要生成演算法在校驗檔案時非常有用。命令列工具md5sum會生成檔案的摘要的列表。

% md5sum *.god47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們的樣例程式就像md5sum差不多,不一樣的是它會以資料夾作為引數,並列印每個資料夾內檔案的索引值,按路徑名排序。

% go run serial.go .d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

程式的主函式呼叫輔助函式MD5ALL,它會返回一個map,將路徑名對應到索引值,之後排序並列印結果。

func main() {    // Calculate the MD5 sum of all files under the specified directory,    // then print the results sorted by path name.    m, err := MD5All(os.Args[1])    if err != nil {        fmt.Println(err)        return    }    var paths []string    for path := range m {        paths = append(paths, path)    }    sort.Strings(paths)    for _, path := range paths {        fmt.Printf("%x  %s\n", m[path], path)    }}

MD5ALL函式是我們要討論的。在serial.go中,它的實現沒有使用併發,只是簡單地遍歷檔案樹,讀取檔案並生成摘要。

// MD5All reads all the files in the file tree rooted at root and returns a map// from file path to the MD5 sum of the file's contents.  If the directory walk// fails or any read operation fails, MD5All returns an error.func MD5All(root string) (map[string][md5.Size]byte, error) {    m := make(map[string][md5.Size]byte)    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {        if err != nil {            return err        }        if info.IsDir() {            return nil        }        data, err := ioutil.ReadFile(path)        if err != nil {            return err        }        m[path] = md5.Sum(data)        return nil    })    if err != nil {        return nil, err    }    return m, nil}

並行摘要

I在parallel.go裡,我們把MD5All分解為兩個狀態的管道。第一個狀態,sumFiles,遍歷目錄,在一個新的 Goroutine 裡對每個檔案做摘要,並把結果傳送到型別為result的 channel: 

type result struct {    path string    sum  [md5.Size]byte    err  error}

sumFiles返回兩個 channel:一個用來傳遞result,另一個用來返回filepath.Walk的錯誤。遍歷函式啟動一個新的 Goroutine 來處理每個常規檔案,之後檢查done。如果done已經被關閉了,遍歷就立刻停止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {    // For each regular file, start a goroutine that sums the file and sends    // the result on c.  Send the result of the walk on errc.    c := make(chan result)    errc := make(chan error, 1)    go func() {        var wg sync.WaitGroup        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {            if err != nil {                return err            }            if info.IsDir() {                return nil            }            wg.Add(1)            go func() {                data, err := ioutil.ReadFile(path)                select {                case c <- result{path, md5.Sum(data), err}:                case <-done:                }                wg.Done()            }()            // Abort the walk if done is closed.            select {            case <-done:                return errors.New("walk canceled")            default:                return nil            }        })        // Walk has returned, so all calls to wg.Add are done.  Start a        // goroutine to close c once all the sends are done.        go func() {            wg.Wait()            close(c)        }()        // No select needed here, since errc is buffered.        errc <- err    }()    return c, errc}

MD5All從c接收所有的摘要值。MD5All返回早先的錯誤,通過defer關閉done:

func MD5All(root string) (map[string][md5.Size]byte, error) {    // MD5All closes the done channel when it returns; it may do so before    // receiving all the values from c and errc.    done := make(chan struct{})    defer close(done)    c, errc := sumFiles(done, root)    m := make(map[string][md5.Size]byte)    for r := range c {        if r.err != nil {            return nil, r.err        }        m[r.path] = r.sum    }    if err := <-errc; err != nil {        return nil, err    }    return m, nil}

受限的併發

parallel.go裡實現的MD5All對每個檔案啟動一個新的 Goroutine。如果目錄裡含有很多大檔案,這可能會導致申請大量記憶體,超出機器上的可用記憶體。

我們可以通過控制並行讀取的檔案數量來限制記憶體的申請。在bounded.go,我們建立固定數量的用於讀取檔案的 Goroutine,來限制記憶體使用。現在整個管道有三個狀態:遍歷樹,讀取並對檔案做摘要,收集摘要值。

第一個狀態,walkFiles,傳送樹裡的每個常規檔案的路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {    paths := make(chan string)    errc := make(chan error, 1)    go func() {        // Close the paths channel after Walk returns.        defer close(paths)        // No select needed for this send, since errc is buffered.        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {            if err != nil {                return err            }            if info.IsDir() {                return nil            }            select {            case paths <- path:            case <-done:                return errors.New("walk canceled")            }            return nil        })    }()    return paths, errc}

中間的狀態啟動固定數量的digesterGoroutine,從paths接收檔名,並將結果result傳送到 channelc:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {    for path := range paths {        data, err := ioutil.ReadFile(path)        select {        case c <- result{path, md5.Sum(data), err}:        case <-done:            return        }    }}

不像之前的例子,digester並不關閉輸出 channel,因為多個 Goroutine 會傳送到共享的 channel。另一邊,MD5All中的程式碼會在所有digester完成後關閉 channel:

// Start a fixed number of goroutines to read and digest files.c := make(chan result)var wg sync.WaitGroupconst numDigesters = 20wg.Add(numDigesters)for i := 0; i < numDigesters; i++ {    go func() {        digester(done, paths, c)        wg.Done()    }()}go func() {    wg.Wait()    close(c)}()

我們也可以讓每個digester建立並返回自己的輸出 channel,但是這就需要一個單獨的 Goroutine 來扇入所有結果。

最終從c收集到所有結果result,並檢查從errc傳入的錯誤。這個錯誤的檢查不能提早,因為在這個時間點之前,walkFiles可能會因為正在傳送訊息給下游而阻塞:

m := make(map[string][md5.Size]byte)    for r := range c {        if r.err != nil {            return nil, r.err        }        m[r.path] = r.sum    }    // Check whether the Walk failed.    if err := <-errc; err != nil {        return nil, err    }    return m, nil}

結論

這篇文章展示了使用Go構建流資料管道的技術。要慎重處理這種管道產生的錯誤,因為管道里的每個狀態都可能因為向下遊傳送數值而阻塞,而下游的狀態卻不再關心輸入的資料。我們展示瞭如何將關閉channel作為“完成”訊號廣播給所有由管道啟動的Goroutine,並且定義了正確構建管道的指南。

進一步閱讀:

Go併發模式視訊)展示了Go的併發特性的基礎知識,並演示了應用這些知識的方法。
高階Go併發模式視訊)覆蓋了關於Go特性更復雜的使用場景,尤其是select。
Douglas McIlroy的論文《一窺級數數列》展示了Go使用的這類併發技術是如何優雅地支援複雜計算。

Go 併發模型:管道和取消原文請看這裡

推薦文章