Go channels 进阶 — 从初学者到初学者+ ;)

Go 语言的并发模型是其最吸引人的特性之一,而通道(channels)作为实现并发通信的关键机制,是 Go 语言中的重要组成部分。本文将深入研究通道,探索其在并发编程中的作用、使用方式以及一些高级特性。

Go channels 进阶 — 从初学者到初学者+ ;)

Go 语言的并发模型是其最吸引人的特性之一,而通道(channels)作为实现并发通信的关键机制,是 Go 语言中的重要组成部分。本文将深入研究通道,探索其在并发编程中的作用、使用方式以及一些高级特性。

1. 什么是Go Channel ?

Channel是允许 goroutines(轻量级线程或协程)相互通信并同步它们的执行的通道。它们是类型安全的,确保在 goroutines 之间发送的数据符合指定的类型。

从概念上来说,可以将Channel视为管道,其中数据可以从一端发送,从另一端接收。

下面代码是创建一个新的Channel

ch := make(chan int) // declare a channel of type int

2. 基本的Channel操作

发送和接收

向Channel发送一个值

ch <- 5 // Send value 5 into the channel

从Channel接收一个值

value := <-ch // Receive a value from the channel

下面的例子演示了如何使用Channel从两个goroutine里同步数据

package main 
import ( 
 "fmt" 
 "time" 
) 
func main() { 
 message := make(chan string) 
 go func() { // sending goroutine 
  time.Sleep(time.Second * 2) 
  message <- "Hello from goroutine!" 
 }() 
 msg := <-message // receiving in main goroutine 
 fmt.Println(msg) 
}

关闭Channel

当发送者不再发送值时,应该由发送者主动关闭Channel。

close(ch)

3. 高级Channel技术

Buffered Channels

Channel默认情况下是无缓冲(unbuffered)的,这意味着在发送时会阻塞,除非有相应的接收操作。Buffered Channels允许您在阻塞之前发送多个值。

Buffered Channels具有容量,允许它在阻塞发送者之前保留多个值。

ch := make(chan int, 3) // A buffered channel with capacity 3 
ch <- 1 
ch <- 2 
// ch <- 3 would block

在下面的示例中,我们将:

  1. 创建一个容量为3的Buffered Channel
  2. 启动5个运行任务的 goroutine。
  3. 使用Channel确保一次只有3个任务正在进行中。
package main 
 
import ( 
 "fmt" 
 "time" 
) 
 
func worker(id int, ch chan int) { 
 // Pretend we're doing some work 
 fmt.Printf("Worker %d started\n", id) 
 time.Sleep(time.Second) 
 fmt.Printf("Worker %d finished\n", id) 
 <-ch // Signal we're done 
} 
 
func main() { 
 // Create a buffered channel 
 bufferedChannel := make(chan int, 3) 
 
 // Start 5 workers 
 for i := 1; i <= 5; i++ { 
  go worker(i, bufferedChannel) 
  bufferedChannel <- i // Fill the channel slot to represent a worker in-progress 
 } 
 
 // Wait for all workers to finish 
 // This is a simple way to wait; in real-world scenarios, you might use sync.WaitGroup or similar 
 time.Sleep(7 * time.Second) 
}

在运行上述代码时,您会注意到只有3个任务立即启动。每当一个任务完成时,另一个就会启动,直到所有5个工作程序都执行完毕。这种行为受到Buffered Channel的控制,它一次只允许3个“任务”(代表正在进行中的任务)存在。

Buffered Channel的工作原理取决了只有3个任务立即被执行。 它可以在不阻塞的情况下容纳指定数量的值。当您尝试向Channel发送数据或从空Channel接收数据时,它将分别阻塞,直到有空间可用或值可用为止。

在上面的示例中:

bufferedChannel := make(chan int, 3)

我们创建了一个容量为3的Channel。这意味着它可以容纳最多3个值而不会阻塞。

然后我们循环并启动5个任务:

for i := 1; i <= 5; i++ { 
 go worker(i, bufferedChannel) 
 bufferedChannel <- i  
}

对于循环的每次迭代,我们:

  1. 启动一个执行任务的 goroutine。
  2. 向Channel发送一个值(i)。

如果Channel已满,则Send操作(bufferedChannel <- i)会阻塞。在这种情况下,由于通道的容量为3,一旦我们启动了3个任务并向Channel发送了3个值,Send操作就会阻塞。

在启动了前3个任务之后,主 goroutine 将在第四次发送操作上阻塞,因为Channel已满。它将保持阻塞,直到其中一个任务完成并从Channel读取一个值,释放一个插槽(slot)。

在任务函数内部,完成其任务后:

<-ch

任务从Channel中读取(或“接收”)一个值,因此移除了一个项目,为下一个任务腾出了空间。

总而言之:只有3个任务会立即启动,因为我们使用Buffered Channel作为信号量来控制并发性。通道的容量决定了可以同时运行的任务数量。在这种情况下,它设置为3,因此只有三个工作程序可以立即启动。随着每个任务完成并在通道中释放一个插槽,另一个任务开始执行。

遍历Channel

遍历Channel允许我们接收值,直到通道被关闭。为了让这个方法好用,发送者必须关闭Channel以表示不会再发送更多数据。

ch := make(chan int, 3) 
ch <- 1 
ch <- 2 
ch <- 3 
close(ch) 
 
for v := range ch { 
 fmt.Println(v) 
}

Select语法

用于处理多个Channel的情况,Select允许一个 goroutine 等待多个通信操作。Select语法类似于 switch 语句,但用于Channel:

ch1 := make(chan int) 
ch2 := make(chan int) 
 
go func() { 
 time.Sleep(time.Second * 1) 
 ch1 <- 1 
}() 
 
go func() { 
 time.Sleep(time.Second * 2) 
 ch2 <- 2 
}() 
 
for i := 0; i < 2; i++ { 
 select { 
 case msg1 := <-ch1: 
  fmt.Println("Received from ch1:", msg1) 
 case msg2 := <-ch2: 
  fmt.Println("Received from ch2:", msg2) 
 } 
}

在 select 语句中的默认情况在没有任何Channel可以立即发送或接收时执行。

select { 
case msg := <-ch: 
 fmt.Println("Received:", msg) 
default: 
 fmt.Println("No message received.") 
}

检查Channel是否已被关闭

如果从Channel接收数据,并且该Channel已关闭,则还将接收到一个布尔值,指示Channel是否仍然处于打开状态。

v, open := <-ch 
if !open { 
 fmt.Println("Channel is closed!") 
}

4. 现实中使用Channel的场景和例子

任务队列

Go 语言的Channel可以用于实现任务队列,多个Producer可以添加任务,多个Consumer可以并发处理它们。

发布者/订阅者模式

Channel可以用于创建发布者/订阅者模型,多个订阅者可以监听发布者发出的事件或消息。

package main 
 
import ( 
 "fmt" 
 "sync" 
) 
 
// PubSub is the main structure for our simple pub-sub system. 
type PubSub struct { 
 // mu is a RWMutex that allows multiple goroutines to read from the subscribers map, 
 // but only one goroutine to write to it at any given time. 
 mu sync.RWMutex 
 
 // subscribers is a map where the key is a topic (as a string) and the value is 
 // a slice of channels. Each channel corresponds to a subscriber listening to that topic. 
 subscribers map[string][]chan int 
} 
 
// NewPubSub creates a new PubSub instance and initializes its subscribers map. 
func NewPubSub() *PubSub { 
 return &PubSub{ 
  subscribers: make(map[string][]chan int), 
 } 
} 
 
// Subscribe allows a subscriber to get updates for a specific topic.  
// It returns a channel on which the subscriber will receive these updates. 
func (ps *PubSub) Subscribe(topic string) <-chan int { 
 // Lock the map for writing. 
 ps.mu.Lock() 
 defer ps.mu.Unlock() 
 
 // Create a new channel for this subscriber. 
 ch := make(chan int, 1) 
  
 // Append this subscriber's channel to the slice of channels for the given topic. 
 ps.subscribers[topic] = append(ps.subscribers[topic], ch) 
  
 // Return the channel to the subscriber. 
 return ch 
} 
 
// Publish sends the given value to all subscribers of a specific topic. 
func (ps *PubSub) Publish(topic string, value int) { 
 // Lock the map for reading. 
 ps.mu.RLock() 
 defer ps.mu.RUnlock() 
 
 // Iterate over all channels (subscribers) for this topic and send the value. 
 for _, subscriber := range ps.subscribers[topic] { 
  subscriber <- value 
 } 
} 
 
// Close removes a specific subscriber from a topic and closes its channel. 
func (ps *PubSub) Close(topic string, subCh <-chan int) { 
 // Lock the map for writing. 
 ps.mu.Lock() 
 defer ps.mu.Unlock() 
 
 // Find the subscriber's channel in the slice for the given topic. 
 subscribers, found := ps.subscribers[topic] 
 if !found { 
  return 
 } 
 
 for i, subscriber := range subscribers { 
  if subscriber == subCh { 
   // Close the channel. 
   close(subscriber) 
    
   // Remove this channel from the slice. 
   ps.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...) 
   break 
  } 
 } 
} 
 
// Main function demonstrating the pub-sub functionality. 
func main() { 
 // Create a new pub-sub instance. 
 ps := NewPubSub() 
 
 // A subscriber subscribes to "topic1". 
 subscriber := ps.Subscribe("topic1") 
 
 // A publisher publishes the value 42 to "topic1". 
 go func() { 
  ps.Publish("topic1", 42) 
 }() 
 
 // The subscriber receives the value. 
 value := <-subscriber 
 fmt.Println("Received value:", value) // Expected: Received value: 42 
 
 // Close the subscriber's channel and remove it from the topic's subscribers. 
 ps.Close("topic1", subscriber) 
}

这个简单的发布-订阅实现演示了如何使用 Go 的Channel和 goroutines 来创建并发系统。注释提供了关于代码各部分目的和功能的见解。

工作原理:

  1. PubSub 结构体包含了每个主题的订阅者映射。
  2. Subscribe 方法让订阅者获取一个Channel,通过该Channel他们将接收特定主题的事件。
  3. Publish 方法将给定的值发送到该主题的所有订阅者。
  4. Close 方法移除订阅者的通道并关闭它。

错误处理:

在并发应用程序中,Channel可以用来传递和处理来自 goroutine 的错误,将其传递回主程序。

5. 使用Channel的窍门和技巧

  • 避免全局Channel(Global Channel):与其使用全局Channel,最好将Channel作为函数参数传递。这样有助于更好地封装。
  • 检测死锁:Go 的运行时可以检测到某些死锁,并在遇到死锁时引发 panic。
  • Nil Channel:对Nil Channel进行发送操作会永久阻塞,而从空通道接收将始终阻塞。
  • 避免“忙等待”:与其频繁轮询Channel,不如利用 select 或 range 有效地等待值的到来。

Conclusion

Channel是 Go 语言并发模型的核心。深入理解它们可以释放 Go 中并发编程的真正力量。从基本的发送/接收操作到高级模式如发布/订阅,通道为构建高效的并发应用程序提供了丰富的选择。

无论您是初学者刚刚开始学习 Go,还是经验丰富的专业人士想要完善自己的理解,Channel仍然是必须掌握的关键构造。拥抱它们,尝试它们,让 Go 的并发性推动您的下一个应用程序。