Go channels 进阶 — 从初学者到初学者+ ;)
Go 语言的并发模型是其最吸引人的特性之一,而通道(channels)作为实现并发通信的关键机制,是 Go 语言中的重要组成部分。本文将深入研究通道,探索其在并发编程中的作用、使用方式以及一些高级特性。
Go 语言的并发模型是其最吸引人的特性之一,而通道(channels)作为实现并发通信的关键机制,是 Go 语言中的重要组成部分。本文将深入研究通道,探索其在并发编程中的作用、使用方式以及一些高级特性。
1. 什么是Go Channel ?
Channel是允许 goroutines(轻量级线程或协程)相互通信并同步它们的执行的通道。它们是类型安全的,确保在 goroutines 之间发送的数据符合指定的类型。
从概念上来说,可以将Channel视为管道,其中数据可以从一端发送,从另一端接收。
下面代码是创建一个新的Channel
ch := make(chan int) // declare a channel of type int
2. 基本的Channel操作
发送和接收
向Channel发送一个值
ch <- 5 // Send value 5 into the channel
从Channel接收一个值
value := <-ch // Receive a value from the channel
下面的例子演示了如何使用Channel从两个goroutine里同步数据
package main
import (
"fmt"
"time"
)
func main() {
message := make(chan string)
go func() { // sending goroutine
time.Sleep(time.Second * 2)
message <- "Hello from goroutine!"
}()
msg := <-message // receiving in main goroutine
fmt.Println(msg)
}
关闭Channel
当发送者不再发送值时,应该由发送者主动关闭Channel。
close(ch)
3. 高级Channel技术
Buffered Channels
Channel默认情况下是无缓冲(unbuffered)的,这意味着在发送时会阻塞,除非有相应的接收操作。Buffered Channels允许您在阻塞之前发送多个值。
Buffered Channels具有容量,允许它在阻塞发送者之前保留多个值。
ch := make(chan int, 3) // A buffered channel with capacity 3
ch <- 1
ch <- 2
// ch <- 3 would block
在下面的示例中,我们将:
- 创建一个容量为3的Buffered Channel
- 启动5个运行任务的 goroutine。
- 使用Channel确保一次只有3个任务正在进行中。
package main
import (
"fmt"
"time"
)
func worker(id int, ch chan int) {
// Pretend we're doing some work
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
<-ch // Signal we're done
}
func main() {
// Create a buffered channel
bufferedChannel := make(chan int, 3)
// Start 5 workers
for i := 1; i <= 5; i++ {
go worker(i, bufferedChannel)
bufferedChannel <- i // Fill the channel slot to represent a worker in-progress
}
// Wait for all workers to finish
// This is a simple way to wait; in real-world scenarios, you might use sync.WaitGroup or similar
time.Sleep(7 * time.Second)
}
在运行上述代码时,您会注意到只有3个任务立即启动。每当一个任务完成时,另一个就会启动,直到所有5个工作程序都执行完毕。这种行为受到Buffered Channel的控制,它一次只允许3个“任务”(代表正在进行中的任务)存在。
Buffered Channel的工作原理取决了只有3个任务立即被执行。 它可以在不阻塞的情况下容纳指定数量的值。当您尝试向Channel发送数据或从空Channel接收数据时,它将分别阻塞,直到有空间可用或值可用为止。
在上面的示例中:
bufferedChannel := make(chan int, 3)
我们创建了一个容量为3的Channel。这意味着它可以容纳最多3个值而不会阻塞。
然后我们循环并启动5个任务:
for i := 1; i <= 5; i++ {
go worker(i, bufferedChannel)
bufferedChannel <- i
}
对于循环的每次迭代,我们:
- 启动一个执行任务的 goroutine。
- 向Channel发送一个值(i)。
如果Channel已满,则Send操作(bufferedChannel <- i)会阻塞。在这种情况下,由于通道的容量为3,一旦我们启动了3个任务并向Channel发送了3个值,Send操作就会阻塞。
在启动了前3个任务之后,主 goroutine 将在第四次发送操作上阻塞,因为Channel已满。它将保持阻塞,直到其中一个任务完成并从Channel读取一个值,释放一个插槽(slot)。
在任务函数内部,完成其任务后:
<-ch
任务从Channel中读取(或“接收”)一个值,因此移除了一个项目,为下一个任务腾出了空间。
总而言之:只有3个任务会立即启动,因为我们使用Buffered Channel作为信号量来控制并发性。通道的容量决定了可以同时运行的任务数量。在这种情况下,它设置为3,因此只有三个工作程序可以立即启动。随着每个任务完成并在通道中释放一个插槽,另一个任务开始执行。
遍历Channel
遍历Channel允许我们接收值,直到通道被关闭。为了让这个方法好用,发送者必须关闭Channel以表示不会再发送更多数据。
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch)
for v := range ch {
fmt.Println(v)
}
Select语法
用于处理多个Channel的情况,Select允许一个 goroutine 等待多个通信操作。Select语法类似于 switch 语句,但用于Channel:
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
time.Sleep(time.Second * 1)
ch1 <- 1
}()
go func() {
time.Sleep(time.Second * 2)
ch2 <- 2
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
}
}
在 select 语句中的默认情况在没有任何Channel可以立即发送或接收时执行。
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message received.")
}
检查Channel是否已被关闭
如果从Channel接收数据,并且该Channel已关闭,则还将接收到一个布尔值,指示Channel是否仍然处于打开状态。
v, open := <-ch
if !open {
fmt.Println("Channel is closed!")
}
4. 现实中使用Channel的场景和例子
任务队列
Go 语言的Channel可以用于实现任务队列,多个Producer可以添加任务,多个Consumer可以并发处理它们。
发布者/订阅者模式
Channel可以用于创建发布者/订阅者模型,多个订阅者可以监听发布者发出的事件或消息。
package main
import (
"fmt"
"sync"
)
// PubSub is the main structure for our simple pub-sub system.
type PubSub struct {
// mu is a RWMutex that allows multiple goroutines to read from the subscribers map,
// but only one goroutine to write to it at any given time.
mu sync.RWMutex
// subscribers is a map where the key is a topic (as a string) and the value is
// a slice of channels. Each channel corresponds to a subscriber listening to that topic.
subscribers map[string][]chan int
}
// NewPubSub creates a new PubSub instance and initializes its subscribers map.
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]chan int),
}
}
// Subscribe allows a subscriber to get updates for a specific topic.
// It returns a channel on which the subscriber will receive these updates.
func (ps *PubSub) Subscribe(topic string) <-chan int {
// Lock the map for writing.
ps.mu.Lock()
defer ps.mu.Unlock()
// Create a new channel for this subscriber.
ch := make(chan int, 1)
// Append this subscriber's channel to the slice of channels for the given topic.
ps.subscribers[topic] = append(ps.subscribers[topic], ch)
// Return the channel to the subscriber.
return ch
}
// Publish sends the given value to all subscribers of a specific topic.
func (ps *PubSub) Publish(topic string, value int) {
// Lock the map for reading.
ps.mu.RLock()
defer ps.mu.RUnlock()
// Iterate over all channels (subscribers) for this topic and send the value.
for _, subscriber := range ps.subscribers[topic] {
subscriber <- value
}
}
// Close removes a specific subscriber from a topic and closes its channel.
func (ps *PubSub) Close(topic string, subCh <-chan int) {
// Lock the map for writing.
ps.mu.Lock()
defer ps.mu.Unlock()
// Find the subscriber's channel in the slice for the given topic.
subscribers, found := ps.subscribers[topic]
if !found {
return
}
for i, subscriber := range subscribers {
if subscriber == subCh {
// Close the channel.
close(subscriber)
// Remove this channel from the slice.
ps.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
break
}
}
}
// Main function demonstrating the pub-sub functionality.
func main() {
// Create a new pub-sub instance.
ps := NewPubSub()
// A subscriber subscribes to "topic1".
subscriber := ps.Subscribe("topic1")
// A publisher publishes the value 42 to "topic1".
go func() {
ps.Publish("topic1", 42)
}()
// The subscriber receives the value.
value := <-subscriber
fmt.Println("Received value:", value) // Expected: Received value: 42
// Close the subscriber's channel and remove it from the topic's subscribers.
ps.Close("topic1", subscriber)
}
这个简单的发布-订阅实现演示了如何使用 Go 的Channel和 goroutines 来创建并发系统。注释提供了关于代码各部分目的和功能的见解。
工作原理:
- PubSub 结构体包含了每个主题的订阅者映射。
- Subscribe 方法让订阅者获取一个Channel,通过该Channel他们将接收特定主题的事件。
- Publish 方法将给定的值发送到该主题的所有订阅者。
- Close 方法移除订阅者的通道并关闭它。
错误处理:
在并发应用程序中,Channel可以用来传递和处理来自 goroutine 的错误,将其传递回主程序。
5. 使用Channel的窍门和技巧
- 避免全局Channel(Global Channel):与其使用全局Channel,最好将Channel作为函数参数传递。这样有助于更好地封装。
- 检测死锁:Go 的运行时可以检测到某些死锁,并在遇到死锁时引发 panic。
- Nil Channel:对Nil Channel进行发送操作会永久阻塞,而从空通道接收将始终阻塞。
- 避免“忙等待”:与其频繁轮询Channel,不如利用 select 或 range 有效地等待值的到来。
Conclusion
Channel是 Go 语言并发模型的核心。深入理解它们可以释放 Go 中并发编程的真正力量。从基本的发送/接收操作到高级模式如发布/订阅,通道为构建高效的并发应用程序提供了丰富的选择。
无论您是初学者刚刚开始学习 Go,还是经验丰富的专业人士想要完善自己的理解,Channel仍然是必须掌握的关键构造。拥抱它们,尝试它们,让 Go 的并发性推动您的下一个应用程序。