go 圣经杂乱的笔记 -- channel
12 July 2018

thumbnail

thumbnail 1

缺点:效率低

for _, f := range filenames {
        thumbnail.ImageFile(f)
}

thumbnail 2

缺点:太多 go routine

for _, f := range filenames {
        go thumbnail.ImageFile(f)
}

thumbnail 3

缺点:太多 go routine,没有处理错误

ch := make(chan bool)
for _, f := range filenames {
        go func(f string) {
                thumbnail.ImageFile(f)
                ch <- true
        }(f)
}

// Wait for goroutines to complete.
for range filenames {
        <-ch
}

thumbnail 4

缺点:go routine 泄露

errors := make(chan error)
for _, f := range filenames {
        go func(f string) {
                _, err := thumbnail.ImageFile(f)
                errors <- err
        }(f)
}
for range filenames {
        if err := <-errors; err != nil {
                return err
        }
}

这个程序有一个微秒的 bug。当它遇到第一个非 nil 的 error 时会直接将 error 返回到调用方,使得没有一个 goroutine 去排空 errors channel。

这样剩下的 worker goroutine 在向这个 channel 中发送值时,都会永远地阻塞下去,并且永远都不会退出。这种情况叫做 goroutine 泄露,可能会导致整个程序卡住或者跑出 out of memory 的错误。

最简单的解决办法就是用一个具有合适大小的 buffered channel,这样这些 worker goroutine 向 channel 中发送测向时就不会被阻塞。(一个可选的解决办法是创建一个另外的 goroutine,当 main goroutine 返回第一个错误的同时去排空 channel)

thumbnail 5 (buffer channel)

type item struct {
        thumbfile string
        err       error
}

ch := make(chan item, len(filenames))
for _, f := range filenames {
        go func(f string) {
                var it item
                it.thumbfile, it.err = thumbnail.ImageFile(f)
                ch <- it
        }(f)
}

for range filenames {
        it := <-ch
        if it.err != nil {
                return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
}

thumbnail 6

sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
                defer wg.Done()
                thumb, err := thumbnail.ImageFile(f)
                if err != nil {
                        log.Println(err)
                        return
                }
                info, _ := os.Stat(thumb) // OK to ignore error
                sizes <- info.Size()
        }(f)
}

// closer
go func() {
        wg.Wait()
        close(sizes)
}()

var total int64
for size := range sizes {
        total += size
}
return total

爬虫

爬虫 1

缺点:速度太快了

worklist := make(chan []string)

// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()

// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
        for _, link := range list {
                if !seen[link] {
                        seen[link] = true
                        go func(link string) {
                                worklist <- crawl(link)
                        }(link)
                }
        }
}

爬虫 2

修改 crawl 方法来限制速度:这个和一个夜店里限制客人数目是一个道理,只有当有客人离开时,才会允许新的客人进入店内

// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
        fmt.Println(url)
        tokens <- struct{}{} // acquire a token
        list, _ := links.Extract(url)
        <-tokens // release the token
        return list
}

爬虫 3

worklist := make(chan []string)

go func() { worklist <- os.Args[1:] }()

// Crawl the web concurrently.
seen := make(map[string]bool)

for n := 1; n > 0; n-- {
        list := <-worklist
        for _, link := range list {
                if !seen[link] {
                        seen[link] = true
                        n++
                        go func(link string) {
                                worklist <- crawl(link)
                        }(link)
                }
        }
}

爬虫 4

worklist := make(chan []string)  // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs

// Add command-line arguments to worklist.
go func() { worklist <- os.Args[1:] }()

// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
        go func() {
                for link := range unseenLinks {
                        foundLinks := crawl(link)
                        go func() { worklist <- foundLinks }()
                }
        }()
}

// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
        for _, link := range list {
                if !seen[link] {
                        seen[link] = true
                        unseenLinks <- link
                }
        }
}

du

du 1

filesizeList := make(chan int64)
walkDir(roots, filesizeList)

func walkDir(roots []string, filesizeList chan int64) {
        go func() {
                for _, root := range roots {
                        walkEachDir(root, filesizeList)
                }
                close(filesizeList)
        }()

        var num int64
        var total int64
        for f := range filesizeList {
                num += 1
                total += f
        }

        printDIskUsage(num, total)
}

func walkEachDir(dir string, filesizeList chan<- int64) {
        entries, _ := ioutil.ReadDir(dir)
        for _, v := range entries {
                path := filepath.Join(dir, v.Name())
                if v.IsDir() {
                        walkEachDir(path, filesizeList)
                } else {
                        filesizeList <- v.Size()
                }
        }
}

func printDIskUsage(num int64, total int64) {
        fmt.Printf("%v files, %.2f M\n", num, float64(total)/1e6)
}

du 2

filesizeList := make(chan int64)
walkDir(roots, filesizeList)

func walkDir(roots []string, filesizeList chan int64) {
        go func() {
                for _, root := range roots {
                        walkEachDir(root, filesizeList)
                }
                close(filesizeList)
        }()

        var num int64
        var total int64

        tick := time.Tick(1 * time.Second)

loop:
        for {
                select {
                case <-tick:
                        printDIskUsage(num, total)
                case f, ok := <-filesizeList:
                        if !ok {
                                break loop
                        }
                        num += 1
                        total += f
                }
        }

        printDIskUsage(num, total)
}

func walkEachDir(dir string, filesizeList chan<- int64) {
        entries, _ := ioutil.ReadDir(dir)
        for _, v := range entries {
                path := filepath.Join(dir, v.Name())
                if v.IsDir() {
                        walkEachDir(path, filesizeList)
                } else {
                        filesizeList <- v.Size()
                }
        }
}

func printDIskUsage(num int64, total int64) {
        fmt.Printf("%v files, %.2f M\n", num, float64(total)/1e6)
}

du 3

var sema = make(chan bool, 10)

filesizeList := make(chan int64)
walkDir(roots, filesizeList)

func walkDir(roots []string, filesizeList chan int64) {
        var wg sync.WaitGroup

        for _, root := range roots {
                wg.Add(1)
                go walkEachDir(root, &wg, filesizeList)
        }

        go func() {
                wg.Wait()
                close(filesizeList)
        }()

        var num int64
        var total int64

        tick := time.Tick(1 * time.Second)

loop:
        for {
                select {
                case <-tick:
                        printDIskUsage(num, total)
                case f, ok := <-filesizeList:
                        if !ok {
                                break loop
                        }
                        num += 1
                        total += f
                }
        }

        printDIskUsage(num, total)
}

func walkEachDir(dir string, wg *sync.WaitGroup, filesizeList chan<- int64) {
        defer wg.Done()
        sema <- true
        defer func() { <-sema }()

        entries, _ := ioutil.ReadDir(dir)
        for _, v := range entries {

                path := filepath.Join(dir, v.Name())
                if v.IsDir() {
                        wg.Add(1)
                        go walkEachDir(path, wg, filesizeList)
                } else {
                        filesizeList <- v.Size()
                }
        }
}

func printDIskUsage(num int64, total int64) {
        fmt.Printf("%v files, %.2f M\n", num, float64(total)/1e6)
}

du 4

var sema = make(chan bool, 10)
var done = make(chan bool)

filesizeList := make(chan int64)
go func() {
        os.Stdin.Read(make([]byte, 1))
        close(done)
}()
walkDir(roots, filesizeList)

func cancelled(chan bool) bool {
        select {
        case <-done:
                return true
        default:
                return false
        }

}

func walkDir(roots []string, filesizeList chan int64) {
        var wg sync.WaitGroup

        for _, root := range roots {
                wg.Add(1)
                go walkEachDir(root, &wg, filesizeList)
        }

        go func() {
                wg.Wait()
                close(filesizeList)
        }()

        var num int64
        var total int64

        tick := time.Tick(1 * time.Second)

loop:
        for {
                select {
                case <-done:
                        for range filesizeList {
                        }
                        return
                case <-tick:
                        printDIskUsage(num, total)
                case f, ok := <-filesizeList:
                        if !ok {
                                break loop
                        }
                        num += 1
                        total += f
                }
        }

        printDIskUsage(num, total)
}

func walkEachDir(dir string, wg *sync.WaitGroup, filesizeList chan<- int64) {
        defer wg.Done()
        if cancelled(done) {
                return
        }

        sema <- true
        defer func() { <-sema }()

        entries, _ := ioutil.ReadDir(dir)
        for _, v := range entries {

                path := filepath.Join(dir, v.Name())
                if v.IsDir() {
                        wg.Add(1)
                        go walkEachDir(path, wg, filesizeList)
                } else {
                        filesizeList <- v.Size()
                }
        }
}

func printDIskUsage(num int64, total int64) {
        fmt.Printf("%v files, %.2f M\n", num, float64(total)/1e6)
}

chat

type client chan<- string

var (
        messages = make(chan string)
        enters   = make(chan client)
        leaves   = make(chan client)
)

func main() {
        listener, _ := net.Listen("tcp", ":8080")

        go broadcaster()

        for {
                conn, _ := listener.Accept()
                go handler(conn)
        }
}

func broadcaster() {
        clients := make(map[client]bool)

        for {
                select {
                case msg := <-messages:
                        fmt.Printf("%v\n", msg)
                        for cli := range clients {
                                cli <- msg
                        }
                case leave := <-leaves:
                        delete(clients, leave)
                        fmt.Printf("%v leave\n", leave)
                case enter := <-enters:
                        clients[enter] = true
                        fmt.Printf("%v enter\n", enter)
                }

        }
}

func handler(conn net.Conn) {
        who := conn.RemoteAddr().String()
        cli := make(chan string)

        go func(conn net.Conn, ch chan string) {
                for msg := range ch {
                        fmt.Fprintln(conn, msg)
                }
        }(conn, cli)

        enters <- cli

        hasInput := make(chan bool)
        go func() {
                input := bufio.NewScanner(conn)
                for input.Scan() {
                        hasInput <- true
                        msg := who + ": " + input.Text()
                        messages <- msg
                }
        }()

loop:
        for {
                select {
                case <-time.After(3 * time.Second):
                        conn.Close()
                        break loop
                case <-hasInput:
                }
        }

        leaves <- cli
}