Golang网络编程学习笔记

学习的课程:【马士兵】-GoLang进阶-网络通信(共17H33Mins)

net包介绍

Go语言的net包是标准库中用于网络编程的重要包之一。它提供了一组功能,用于创建和管理网络连接、实现各种网络协议,以及进行基本的网络通信。以下是net包的一些主要功能和类型:

  1. TCP和UDP网络编程:

    • net.Dial:用于建立TCP或UDP连接。
    • net.Listen:用于创建TCP或UDP服务器监听器。
    • net.ResolveTCPAddrnet.ResolveUDPAddr:用于解析TCP和UDP地址。
  2. 网络连接:

    • net.Conn:代表通用的网络连接,可以用于读写数据。
    • net.TCPConn:代表TCP连接,具有额外的TCP特定方法。
    • net.UDPConn:代表UDP连接。
  3. IP地址处理:

    • net.IP:代表IPv4或IPv6地址。
    • net.ResolveIPAddr:用于解析IP地址。
  4. URL解析:

    • net/url 包:允许解析URL并提取主机、路径和查询参数等。
  5. HTTP客户端和服务器:

    • net/http 包:用于构建HTTP客户端和服务器。它构建在net包之上,提供更高级的HTTP功能。
  6. Unix域套接字:

    • net.UnixConn:用于Unix域套接字连接。
    • net.UnixListener:用于Unix域套接字服务器监听。
  7. 解析主机名:

    • net.LookupHost:用于解析主机名为IP地址。
    • net.LookupPort:用于查找服务的端口号。
  8. 套接字选项:

    • net.Dialer:允许设置各种套接字选项,例如超时、本地地址等。
  9. 网络错误处理:

    • net.Error:代表网络错误,例如连接超时、地址不可达等。

这些是net包的一些核心功能。您可以使用这些功能来创建网络客户端、服务器,进行基本的网络通信,解析URL和IP地址,以及处理各种网络错误。如果您有特定的问题或需要更详细的信息,请随时提出。

TCP编程

TCP协议介绍

TCP(Transmission Control Protocol,传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议。它是Internet协议套件的一部分,与IP(Internet Protocol,互联网协议)一起构成了TCP/IP协议栈的基础。

以下是TCP协议的主要特性和工作原理:

  1. 面向连接: 在通信双方建立连接之后,数据的传输是可靠的、有序的。连接的建立和关闭过程包括三次握手和四次挥手。

  2. 可靠性: TCP通过使用序号、确认号、重传机制等来确保数据的可靠传输。如果一个数据包未能被正确接收,TCP将重新发送该数据包,直到对方正确接收。

  3. 字节流: TCP传输的是字节流而不是消息边界。这意味着应用层发送的数据被TCP看作是一连串的字节,而TCP负责将这些字节流划分为适当的数据包进行传输。

  4. 流量控制: TCP使用滑动窗口机制来进行流量控制,确保发送方发送的数据不会超过接收方处理的能力。

  5. 拥塞控制: TCP通过检测网络拥塞的迹象并采取相应的措施来避免网络拥塞的发生,如降低发送速率。

  6. 半关闭: TCP连接的一端可以先关闭,而不影响另一端继续发送数据。

  7. 端口: TCP使用端口号来标识不同的应用程序或服务。源和目标端口号包含在TCP头部中。

  8. 可插拔: TCP是一种通用的协议,可以在不同的网络环境中使用。它提供了一种可插拔的机制,可以支持不同的网络层协议,例如IPv4和IPv6。

  9. 安全性: 虽然TCP本身不提供加密功能,但可以与其他安全协议(如TLS/SSL)结合使用,以确保数据在传输过程中的安全性。

总体而言,TCP是一种可靠的协议,适用于需要高可靠性和顺序传输的应用场景,如文件传输、电子邮件和网页浏览。然而,由于其面向连接和较强的可靠性特性,TCP的通信延迟相对较高,因此在对实时性要求较高的应用中,可能会选择使用UDP等其他协议。

tcp的全双工性质

全双工通信意味着在TCP连接中,数据可以同时在两个方向上传输,而且不会相互干扰。以下是如何理解TCP全双工的一些关键点:

  1. 双向通信:TCP连接允许双方之间的双向通信。这意味着两个端点(通常是客户端和服务器)可以同时发送和接收数据。客户端可以发送请求,服务器可以发送响应,而这两个操作可以同时进行而不会冲突。

  2. 独立的数据流:在TCP连接中,每个方向的数据传输是独立的。数据从客户端到服务器的传输不会干扰从服务器到客户端的传输。这是因为TCP使用序列号和确认机制来确保数据的可靠性和完整性。

  3. 同时性:全双工通信意味着数据可以在两个方向上同时传输。这意味着不需要等待一个方向的传输完成才能开始另一个方向的传输。数据可以同时流动,这可以提高通信的效率。

  4. 独立的缓冲区:TCP连接通常维护独立的接收和发送缓冲区,以处理来自两个方向的数据。这确保了数据在两个方向上可以同时进行处理而不会混淆。

总之,TCP的全双工性质允许双方在连接上同时进行双向通信,不会相互干扰,提供高效的数据传输。这对于许多应用程序,尤其是网络通信和互联网服务来说是非常重要的。全双工连接在Web浏览、文件传输、视频流等许多应用中都得到了广泛应用。

tcp连接示例

服务器端

  • net.Listen()函数:用于创建一个网络服务监听器,以侦听客户端连接请求。

    1.创建监听器Listen 函数会根据指定的网络类型(例如,“tcp” 或 “udp”)和地址(IP地址和端口号)创建一个网络监听器。这监听器会监听指定地址和端口,等待客户端连接请求。 2.返回监听器对象Listen 函数返回一个实现了 net.Listener 接口的监听器对象,您可以使用该对象来接受客户端连接。这个监听器对象通常是 net.TCPListenernet.UDPConn 的实例,具体取决于您所选择的网络类型。

  • Accept 方法:它是Listener接口的方法,用于从网络监听器接受客户端连接请求并创建一个新的网络连接。它是用于构建服务器程序的关键部分,允许服务器接受多个客户端连接。

    1.等待客户端连接Accept 会在没有客户端连接时阻塞,直到有客户端尝试连接到服务器。一旦客户端连接请求到达,Accept 将返回并创建一个新的网络连接对象,该对象可用于与客户端通信。 2.返回连接对象Accept 返回一个实现了 net.Conn 接口的连接对象,这个对象代表与客户端的通信通道。通过这个连接对象,您可以进行数据的读取和写入,实现与客户端的交互。

  • conn.Read 方法:它是net.Conn 接口的方法,用于从连接中读取数据,并将数据存储在 buffer 中。请注意,conn.Read 是一个阻塞的操作,如果没有可用的数据,它将等待数据的到来。Read 方法的返回值包括读取的字节数和可能的错误。

客户端

  • net.Dial函数 :功能是与远程服务器建立连接,以便客户端可以与服务器进行通信。

    1.建立连接net.Dial 用于建立到指定网络地址的连接,可以是TCP、UDP等各种协议。 2.指定网络类型:您可以指定要使用的网络协议类型,如 “tcp” 或 “udp”。 3.指定远程地址:您需要提供远程服务器的地址和端口号,以便建立连接。 4.返回连接对象net.Dial 返回一个实现了 net.Conn 接口的连接对象,您可以使用该对象来进行数据的读取和写入。

  • conn.Write方法:用于将数据写入到网络连接。这个方法允许客户端或服务器向连接中发送数据。

    1.向连接写入数据Write 方法用于将字节数据写入到与 net.Conn 关联的网络连接。 2.阻塞式写入:默认情况下,Write 方法是阻塞的,它会等待直到数据被成功写入或出现错误。 3.返回写入字节数和错误Write 方法返回写入的字节数和可能的错误。您可以根据返回值来确定写入是否成功。

示例代码

服务器端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func main() {
    fmt.Println("服务器端开始监听8888端口...")

    host := "localhost"
    port := 8888

    // 服务器端开始监听端口
    listen, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       fmt.Println("listen err=", err)
       return
    }
    defer listen.Close()
    fmt.Println("listen=", listen)

    // 循环等待客户端的连接
    for {
       fmt.Println("等待客户端连接...")
       conn, err := listen.Accept() // 等待客户端连接
       if err != nil {
          fmt.Println("Accept() err=", err)
       } else {
          fmt.Printf("Accept() suc= %v 客户端ip=%v time=%s\n", conn, conn.RemoteAddr().String(), time.Now().Format("2006-01-02 15:04:05"))
       }
       go process(conn)
    }
}
func process(conn net.Conn) {
    // 循环接收客户端发送的数据
    defer conn.Close() // 关闭conn

    for {
       // 创建新的切片
       buf := make([]byte, 1024)
       fmt.Printf("服务器在等待客户端 %s 发送信息\n", conn.RemoteAddr().String())
       n, err := conn.Read(buf) // 从conn读取,如果客户端没发送任何数据,则一直在此等待
       if err == io.EOF {
          //fmt.Println("服务器端Read err=", err)
          fmt.Println("客户端已关闭,其process服务也将关闭!")
          return // 如果客户端已经关闭了,则本协程也关闭掉
       }
       // 显示客户端发送到服务器端的内容
       fmt.Printf(string(buf[:n])) // 注意,这里只取长度为n的内容,才是实际读到的内容
    }
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func main() {
    fmt.Println("客户端开始连接...")
    host := "localhost"
    port := 8888

    // 与服务器端建立连接
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       fmt.Println("client dial err=", err)
       return
    }
    defer conn.Close()
    fmt.Printf("conn 成功=%v time=%s\n", conn, time.Now().Format("2006-01-02 15:04:05"))

    reader := bufio.NewReader(os.Stdin)

    // 客户端可以一直向服务器端写入数据,每次写入一行。当输入exit时,关闭连接。
    for {
       line, err := reader.ReadString('\n')
       if err != nil {
          fmt.Println("readString err=", err)
       }

       if line == "exit\r\n" { // 注意,这里要加上\r\n
          fmt.Println("客户端退出!")
          break
       }
       n, err := conn.Write([]byte(line)) // 通过连接向服务器端写入数据
       if err != nil {
          fmt.Println("conn.Write err=", err)
       }
       fmt.Printf("%s 客户端发送了 %d 字节的数据\n", time.Now().Format("2006-01-02 15:04:05"), n)
    }
}

Read和Write

Write特点

  • 写成功, err == nil && writeLen == len(data) 表示写入成功。
  • 写阻塞,当无法继续写时,Write会进入阻塞状态。无法继续写,通常意味着TCP的窗口已满。
  • 已关闭的连接不能继续写入。
  • 可以使用如下方法控制Write的超时时长:
    • SetDeadline(t time.Time) error
    • SetWriteDeadline(t time.Time) error

Read特点

  • 当conn中无数据时,Read处于阻塞状态。

  • 当conn中有足够数据时,Read读满buf,并返回读取长度,需要循环读取,才可以读取全部内容。

  • 当conn中有部分数据时,Read读部分数据,并返回读取长度。

  • 当conn已经关闭时,通常会返回EOF error。

  • 可以使用如下方法控制Read的超时时长:

    • SetDeadline(t time.Time) error

    • SetReadDeadline(t time.Time) error

并发读写

并发读写,指的是两方面:

  • 读操作和写操作是并发执行的;
  • 可能出现多个Goroutine同时写或读。

因此在Go中,要使用Goroutine完成。同一个连接的并发读或写操作是Goroutine并发安全的。指的是同时存在多个Goroutine并发的读写,之间是不会相互影响的,这个在实操中,主要针对Write操作。conn.Write()是通过锁来实现的。

Write的底层源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// GOROOT\src\internal\poll\fd_windows.go
// Write implements io.Writer.
func (fd *FD) Write(buf []byte) (int, error) {
    if err := fd.writeLock(); err != nil {
       return 0, err
    }
    defer fd.writeUnlock()
    if fd.isFile {
       fd.l.Lock()
       defer fd.l.Unlock()
    }
    //...
}

注意,一次Write操作,表示一个原子的业务单元,不能再分。否则在Goroutine调度时不能保证连续性。

示例

下面的代码对客户端和服务器端都适用,二者均可以进行并发的读写操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func ConcurrentRW(conn net.Conn) {
    // 循环接收客户端发送的数据
    defer conn.Close() // 关闭conn
    wg := sync.WaitGroup{}
    // 并发地写
    wg.Add(1)
    go Write(conn, &wg)
    // 并发地读
    wg.Add(1)
    go Read(conn, &wg)

    wg.Wait()
}

func Write(conn net.Conn, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
       writeLen, err := conn.Write([]byte("send some data from server\n"))
       if err != nil {
          log.Println(err)
       }
       log.Printf("server write len is %d\n", writeLen)
       time.Sleep(time.Second * 2)
    }
}

func Read(conn net.Conn, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
       buf := make([]byte, 1024)
       readLen, err := conn.Read(buf)
       if err != nil {
          log.Println(err)
       }
       log.Printf("received from client data is: %s", string(buf[:readLen]))
    }
}

格式化消息

在发送或接收消息时,需要对消息进行格式化处理,才能在应用程序中保证消息具有逻辑含义。前面的例子,我们采用的是字符串传递消息,也是一种格式,但能够包含的数据字段有限。

典型编程时,我们会将两端处理好的数据,使用特定格式进行发送。典型的有两类:

  • 文本编码,例如JSON,YAML,CSV等
  • 二进制编码,例如GOB (Go Binary) ,Protocol Buffer等

格式化消息的典型流程,如图:

image-20231028170800603

发送端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 定义需要发送的数据
type Message struct {
    ID      uint   `json:"id,omitempty"`
    Code    string `json:"code,omitempty"`
    Content string `json:"content,omitempty"`
}

message := Message{
    ID:      uint(rand.Int()),
    Code:    "SERVER-STANDARD",
    Content: "message from server",
}

encoder := json.NewEncoder(conn)                // 创建编码器
if err := encoder.Encode(message); err != nil { // 利用编码器进行编码,encode成功后,会写入到conn,也就是完成了conn.Write()
    if err == io.EOF {
       fmt.Println("客户端已关闭,服务器端也将停止...")
       break
    }
    log.Println(err)
    continue
}
fmt.Printf("message %d was send!\n", count)

json.NewEncoder()传入建立好的连接,得到一个编码器,然后利用编码器将需要传输的内容进行编码。

接收端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type Message struct {
    ID      uint   `json:"id,omitempty"`
    Code    string `json:"code,omitempty"`
    Content string `json:"content,omitempty"`
}
message := Message{}

// 接收数据,接收到数据后,先解码
// 1.josn解码
decoder := json.NewDecoder(conn)                 // 创建解码器
if err := decoder.Decode(&message); err != nil { // 解码操作,从conn中读取内容,将解码结果放入message中
    log.Println(err)
    if err == io.EOF {
       fmt.Println("服务器端已关闭,客户端也将停止...")
       break
    }
    continue
}
log.Println(message)

json.NewDecoder()传入建立好的连接,返回一个解码器,解码器从conn中读取内容,通过Decode()方法将读取到的内容存放到指定变量中。

如果要换成其他编码,将编码和解码时的包替换为指定编码的包即可,如:gobxml

参考:https://pkg.go.dev/encoding

目前学习到第2.7节。

短连接和长连接

2023.11.01

介绍

短连接(Short Connection)和长连接(Long Connection)是计算机网络通信中两种不同的连接方式,它们在数据传输和通信效率方面有很大的区别。下面是对这两种连接方式的详细介绍:

  1. 短连接(Short Connection):

    • 短连接是一种瞬时性的连接方式,通常在数据传输完成后立即断开连接。
    • 每个短连接都需要建立和终止连接的开销,包括握手(handshake)和挥手(teardown)过程,这些过程会消耗一定的时间和资源。
    • 短连接适用于一次性的数据传输或请求-响应模型,其中客户端发送请求,服务器响应后即断开连接。例如,HTTP/1.0中的非持久连接就是短连接的一个示例。
    • 短连接适用于需要即时释放资源以响应其他连接请求的情况,但在高频率请求下,建立和断开连接的开销可能会影响性能。
  2. 长连接(Long Connection):

    • 长连接是一种持续保持连接状态的通信方式,允许多次数据传输和通信事件在同一连接上进行。
    • 长连接减少了建立和断开连接的开销,因为连接在一段时间内保持打开状态,不需要频繁的握手和挥手操作。
    • 长连接适用于需要频繁通信的场景,例如实时通信应用、在线游戏、聊天应用等。HTTP/1.1引入了持久连接,允许在同一连接上发送多个HTTP请求和响应,以减少延迟和提高性能。
    • 长连接需要管理连接的状态和资源,确保连接的稳定性,避免资源泄漏和过度消耗。

总结: 短连接适用于一次性的数据传输,需要即时释放资源,但可能会有建立和断开连接的性能开销。长连接适用于频繁通信的场景,可以减少连接开销,但需要管理连接状态和资源。选择连接方式取决于具体的应用需求和性能要求。

长连接的心跳检测

在使用长连接时,通常需要使用规律性的发送数据包,以维持在线状态,称为心跳检测。一旦心跳检测不能正确响应,那么就意味着对方(或者己方)不在线,关闭连接。心跳检测用来解决半连接问题。 测试:将连接建立后,关闭客户端或服务器,查看另一端的状态。 发送心跳检测的发送端:

  • 可以是客户端
  • 也可以是服务端
  • 甚至是两端都发

典型的有两种发送策略:

  1. 建立连接后,就使用固定的率发送;

  2. 一段时间没有接收到数据后,发送检测包。(TCP层的KeepAlive就是该策略)

心跳检测包的数据内容:

  • 可以无数据
  • 可以携带数据,例如做时钟同步,业务状态同步
  • 典型的ping pong结构

心跳检测包是否需要响应?

  • 可以不响应,发送成功即可
  • 可以响应,通常用于同步数据

总而言之,都是业务来决定。

示例,ping pong模式,在连接建立后持续心跳:

  • 定时心跳;
  • 判断是否接收到正确心跳响应;
  • 当N次心跳检测失败后,断开连接。

示例代码:

服务器端

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
// 长连接的心跳检测,服务器端发送心跳,客户端响应心跳。
func main() {
    TCPServer()
}

func TCPServer() {
    fmt.Println("服务器端开始监听8888端口...")

    host := "localhost"
    port := 8888

    listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       fmt.Println("listener err=", err)
       return
    }
    defer listener.Close()
    for {
       conn, err := listener.Accept()
       if err != nil {
          log.Println(err)
       }
       go HandlerConn(conn) // 进行连接的读写操作
    }
}

func HandlerConn(conn net.Conn) {
    defer func() {
       fmt.Println("connection closed...")
       conn.Close()
    }()
    wg := sync.WaitGroup{}

    // 独立的goroutine,在建立连接后,周期性发送ping
    wg.Add(1)
    go ServerSendPing(conn, &wg)

    wg.Wait()
}

func ServerSendPing(conn net.Conn, wg *sync.WaitGroup) {
    defer wg.Done()
    // ping失败的最大次数
    const maxPingNum = 3
    pingErrCounter := 0

    ctx, cancel := context.WithCancel(context.Background())

    go ServerReceivePong(conn, ctx)  // 使用context来控制ServerReceivePong协程,当ServerSendPing结束时,ServerReceivePong也随之结束

    type Message struct {
       ID   uint      `json:"id,omitempty"`
       Code string    `json:"code,omitempty"`
       Time time.Time `json:"time,omitempty"`
    }

    // 周期性地发送
    ticker := time.NewTicker(2 * time.Second) // 循环定时器,每隔2秒自动写入一个数据到管道中
    for t := range ticker.C {
       pingMsg := Message{
          ID:   uint(rand.Int()),
          Code: "PING-SERVER",
          Time: t,
       }

       log.Println("ping send to", conn.RemoteAddr())
       encoder := gob.NewEncoder(conn)
       if err := encoder.Encode(pingMsg); err != nil {
          log.Println(err)
          pingErrCounter++ // 累加错误计数器
          log.Printf("pingErrCounter:%d\n\n", pingErrCounter)
          if pingErrCounter == maxPingNum { // 判断是否达到次数上限
             cancel()
             log.Println("ping错误达到3次,自动关闭连接!")
             return // 心跳失败,同时,也需要终止pong的处理
          }
       }
    }
}

func ServerReceivePong(conn net.Conn, ctx context.Context) {
    type Message struct {
       ID   uint      `json:"id,omitempty"`
       Code string    `json:"code,omitempty"`
       Time time.Time `json:"time,omitempty"`
    }

    for {
       select {
       case <-ctx.Done():
          return
       default:
          message := Message{}
          decoder := gob.NewDecoder(conn)
          if err := decoder.Decode(&message); err != nil { // 解码操作
              log.Println(err)
              break
          }
          if message.Code == "PONG-CLIENT" {
             log.Println("receive pong from", conn.RemoteAddr())
          }
       }
    }
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func main() {
    TCPClient()
}

func TCPClient() {
    log.Println("客户端开始连接...")
    host := "localhost"
    port := 8888

    // 与服务器端建立连接
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       log.Println("client dial err=", err)
       return
    }
    defer conn.Close()
    log.Printf("conn 成功=%v time=%s\n", conn, time.Now().Format("2006-01-02 15:04:05"))
    wg := sync.WaitGroup{}
    wg.Add(1)
    go ClientReceivePing(conn, &wg)

    wg.Wait()
}

func ClientReceivePing(conn net.Conn, wg *sync.WaitGroup) {
    defer wg.Done()

    type Message struct {
       ID   uint      `json:"id,omitempty"`
       Code string    `json:"code,omitempty"`
       Time time.Time `json:"time,omitempty"`
    }
    message := Message{}

    for {
       decoder := gob.NewDecoder(conn)
       if err := decoder.Decode(&message); err != nil { // 解码操作
          log.Println(err)
          if errors.Is(err, io.EOF) {
             log.Println("连接已关闭,读取结束!")
             break
          }
       }
       log.Println(message)
       if message.Code == "PING-SERVER" {
          ClientSendPong(conn)
       }
    }
}

func ClientSendPong(conn net.Conn) {

    type Message struct {
       ID   uint      `json:"id,omitempty"`
       Code string    `json:"code,omitempty"`
       Time time.Time `json:"time,omitempty"`
    }
    pongMsg := Message{
       ID:   uint(rand.Int()),
       Code: "PONG-CLIENT",
       Time: time.Now(),
    }

    encoder := gob.NewEncoder(conn)                 // 创建编码器,将前面的json包改为gob包即可
    if err := encoder.Encode(pongMsg); err != nil { // 利用编码器进行编码
       log.Println(err)
       return
    }
    log.Println("pong was send to", conn.RemoteAddr())
    return
}

连接池

TCP连接的每次建立,都要进行三次握手,为了避免频繁创建销毁,复用连接的话,通常使用连接池技术。

连接池基本操作

  • 客户端(连接发起端),通过连接池获取连接,Get操作;
  • 当暂时使用完毕后,将连接归还连接池,Put操作;
  • 其他客户端,需要连接同样去池中获取,Get操作,只要连接没有被其他客户端占用,就可以重复使用;
  • 少量长链接,维护大量客户端的目的。否则,每个客户端,就需要1个连接。

典型的:数据库连接池,消息队列连接池等。

连接池的必要功能:

  • New,初始化连接池
  • Get,获取连接
  • Put,放回连接

示例接口如下:

1
2
3
4
5
6
type Pool interface {
	Get() (net.Conn, error)  //获取连接
	Put(conn net.Conn) error //放回连接
	Release() error          //释放池(全部连接)
	Len() int                //有效连接个数
}

除此之外,连接池还应该有能力创建新的连接。在Get操作时,若没有空闲可用的连接,在数量允许的情况下,会创造新的连接,该方法成为为连接工厂。示例接结构为:

1
2
3
4
5
type ConnFactory interface {
    Factory() (net.Conn, error) // 构造连接
    Close(conn net.Conn) error  // 关闭连接
    Ping(conn net.Conn) error   // 检查连接是否是有效的
}

除了必要的操作,连接池典型的配置有:

  • 初始连接数,池初始化时的连接数;
  • 最大连接数,池中最多支持多少连接;
  • 最大空闲连接数,池中最多有多少可用的连接;
  • 空闲连接超时时间,多久后空闲连接会被释放。

示例配置结构如下:

1
2
3
4
5
6
7
type PoolConfig struct {
	InitConnNum int           // 初始化连接数
	MaxConnNum  int           // 最大连接数
	MaxIdleNum  int           // 最大空闲连接数
	IdleTimeout time.Duration // 空闲连接超时时间
	Factory     ConnFactory   // 连接工厂
}

由于需要判断连接的空闲时间,因此,需要记录连接被放入到连接池中的时间,我们封装连接结构:

1
2
3
4
5
// 空闲连接结构
type IdleConn struct {
	conn    net.Conn  //连接本身
	putTime time.Time //放回时间(用于判断是否超过了IdleTimeout)
}

有了基本操作和典型配置后,连接池的结构设计如下:

  • 要实现TcpPool接口;
  • 可以找到Factory;
  • 记录当前池信息,例如当前正在使用的连接数量,空闲的连接队列等。
1
2
3
4
5
6
7
8
// 连接池的结构
type TcpPool struct {
    config         *PoolConfig    // 相关配置
    openingConnNum int            // 开放使用的连接数
    idleList       chan *IdleConn // 空闲的连接队列
    addr           string         // 连接地址
    mut            sync.RWMutex   // 并发安全锁
}

生产工厂的实现

工厂类型,实现ConnFactory接口,创建的对象用在配置中。实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type TcpConnFactory struct{}

func (*TcpConnFactory) Factory(addr string) (net.Conn, error) {
    // 校验参数的合理性
    if addr == "" {
       return nil, errors.New("addr is empty")
    }
    // 建立连接
    conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
    if err != nil {
       return nil, err
    }
    // 返回连接对象
    return conn, nil
}
func (*TcpConnFactory) Close(conn net.Conn) error {
    return conn.Close()
}
func (*TcpConnFactory) Ping(net.Conn) error {
    return nil
}

完善连接池基本结构

先依据Pool接口,将TcpPool的方法集实现完整。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (*TcpPool) Get() (net.Conn, error) {
    return nil, nil
}
func (*TcpPool) Put(conn net.Conn) error {
    return nil
}
func (*TcpPool) Release() error {
    return nil
}
func (*TcpPool) Len() int {
    return 0
}

创建连接池函数

定义函数New,用于创建TcpPool,具体的功能有如下几步:

  1. 校验参数;
  2. 初始化TcpPool;
  3. 初始化连接,关键步骤;
  4. 响应。

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
host := "localhost"
port := 8888

// 1.建立连接池
pool, err := NewTcpPool(fmt.Sprintf("%s:%d", host, port), PoolConfig{
    InitConnNum: 10,
    MaxIdleNum:  20,
    Factory:     &TcpConnFactory{},
})
if err != nil {
    log.Println(err)
    return
}

从连接中获取连接

编码实现TcpPool.Get方法,其核心步骤为:

  1. 并发安全锁;
  2. 利用for+select结构从chan *IdleConn中获取空闲连接;
  3. 判断连接的超时状态;
  4. 若不存在空闲连接,则利用工厂新建连接;
  5. 记录使用的连接数量;
  6. 最大连接数上限的错误处理。

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (pool *TcpPool) Get() (net.Conn, error) {
    // 1.加锁
    pool.mut.Lock()
    defer pool.mut.Unlock()

    // 2.获取空闲连接,若没有则创建连接
    for {
       select {
       // 获取空闲连接
       case idleConn, ok := <-pool.idleList:
          // 判断channel是否被关闭
          if !ok {
             return nil, errors.New("idle list closed")
          }
          // 判断连接是否超时(需要比较pool.config.IdleTimeout和idleConn.putTime)
          if pool.config.IdleTimeout > 0 { // 设置了超时时间(默认值0表示未设置超时时间)
             // 当前时间减去idleConn.putTime是否大于了超时时间(pool.config.IdleTimeout)
             if time.Now().Sub(idleConn.putTime) > pool.config.IdleTimeout {
                // 关闭连接,继续查找下一个连接
                _ = pool.config.Factory.Close(idleConn.conn)
                continue
             }
          }
          // 判断连接是否可用
          if err := pool.config.Factory.Ping(idleConn.conn); err != nil {
             // ping失败,连接不可用 ---> 关闭连接,继续查找
             _ = pool.config.Factory.Close(idleConn.conn)
             continue
          }
          // 找到了可用的空闲连接 ---> 返回连接
          log.Println("get from idle")
          pool.openingConnNum++
          return idleConn.conn, nil
       // 创建连接
       default:
          // 判断是否还可以继续创建(基于开放的连接数是否已经到达了连接池的最大连接数)
          if pool.openingConnNum >= pool.config.MaxConnNum {
             return nil, errors.New("max opening connection")
             // 另一种方案:
             //continue
          }
          // 创建连接
          conn, err := pool.config.Factory.Factory(pool.addr)
          if err != nil {
             return nil, err
          }

          // 正确地创建了可用连接
          log.Println("get from factory")
          pool.openingConnNum++ // 连接计数
          return conn, nil      // 返回连接
       }
    }
}

将连接放回到连接池

编码实现TcpPoolPut方法,其核心步骤为:

  1. 并发安全锁;
  2. 利用select结构向chan *IdleConn中发送连接;
  3. 若空闲连接队列已满,则关闭连接;
  4. 更新开放的连接数量;
  5. 做一些校验:
    1. channel是否可用;
    2. 连接是否可用。

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (pool *TcpPool) Put(conn net.Conn) error {
    // 1.加锁
    pool.mut.Lock()
    defer pool.mut.Unlock()

    // 2.校验
    if conn == nil { // 传入的连接是否存在
       return errors.New("conn is not exists")
    }
    if pool.idleList == nil { // 空闲连接列表是否存在
       // 关闭连接
       _ = pool.config.Factory.Close(conn)
       return errors.New("idle list is not exists")
    }

    // 3.放回连接
    select {
    case pool.idleList <- &IdleConn{ // 放回连接
       conn:    conn,
       putTime: time.Now(),
    }:
       // 更新开放的连接数量
       pool.openingConnNum--
       return nil // 只要可以发送成功,任务完成

    default: // 空闲连接列表已满,直接关闭该连接
       _ = pool.config.Factory.Close(conn)
       return nil
    }
}

释放连接池

当连接池不再使用,需要将池中的全部连接关闭,该操作称为释放连接池操作。

核心步骤:

  1. 关闭Idle List;
  2. 将Idle List中的全部连接关闭;
  3. 配合Put操作,关闭全部连接。

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (pool *TcpPool) Release() error {
    log.Println("release all conn")

    // 1.加锁(防止其他协程继续操作连接池)
    pool.mut.Lock()
    defer pool.mut.Unlock()

    // 2.确定连接池是否被释放
    if pool.idleList == nil {
       return nil
    }

    // 3.关闭IdleList
    close(pool.idleList)

    // 4.释放全部空闲连接
    // 可以继续接收已关闭channel中的元素
    for idleConn := range pool.idleList {
       _ = pool.config.Factory.Close(idleConn.conn)
    }
    return nil
}

获取连接池长度

也就是获取 pool.idelList 的长度。

示例代码:

1
2
3
func (pool *TcpPool) Len() int {
    return len(pool.idleList)
}

总结

  • 连接池的作用:复用连接;
  • 设计池与生产隔离
    • 池的管理;
    • 生产连接管理;
    • 适用于任何资源的池。
  • 编码
    • channel
    • select
    • sync.Mutex sunc.RWMutex
    • Interface
  • 使用连接池
    • 使用多goroutine并发模拟使用

TCP黏包

2023.11.08

粘包现象

指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。如图:

粘包

其实TCP是面向字节流的协议,就是没有界限的一串数据,本没有“包”的概念,包可以当作一个逻辑上的数据单元。“粘包“和“拆包”是逻辑上的概念。

代码示例:

服务器端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func TCPServerSticky() {
    log.Println("服务器端开始监听8888端口...")

    host := "localhost"
    port := 8888

    listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       log.Println("listener err=", err)
       return
    }
    defer listener.Close()
    for {
       conn, err := listener.Accept()
       if err != nil {
          log.Println(err)
       }
       go HandleConnSticky(conn)
    }
}

func HandleConnSticky(conn net.Conn) {
    log.Printf("accept conn from %s\n", conn.RemoteAddr())
    defer func() {
       log.Println("conn be closed")
       conn.Close()
    }()

    // 连续发送数据
    data := "package data."
    for i := 0; i < 50; i++ {
       _, err := conn.Write([]byte(data))
       if err != nil {
          log.Println(err)
       }
    }
}

客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func TCPClientSticky() {
    host := "localhost"
    port := 8888

    // 与服务器端建立连接
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
    if err != nil {
       fmt.Println("client dial err=", err)
       return
    }
    defer conn.Close()
    fmt.Printf("conn 成功=%v time=%s\n", conn, time.Now().Format("2006-01-02 15:04:05"))

    buf := make([]byte, 1024)
    for {
       readLen, err := conn.Read(buf)
       if err != nil {
          log.Println(err)
          break
       }
       log.Printf("received data: %s", string(buf[:readLen]))
    }
}

测试代码:

1
2
3
4
5
6
7
func TestTCPServerSticky(t *testing.T) {
    TCPServerSticky()
}

func TestTCPClientSticky(t *testing.T) {
    TCPClientSticky()
}

运行结果:

1
2
3
4
5
6
=== RUN   TestTCPClientSticky
conn 成功=&{{0xc00018c000}} time=2023-11-08 10:07:07
  2023/11/08 10:07:07 received data: package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.package data.
2023/11/08 10:07:07 EOF
--- PASS: TestTCPClientSticky (0.03s)
PASS

从结果上看,读取到的数据连在一起了,称为粘包。

粘包原因

  • 发送端:TCP使用Nagle算法来减少传输的报文数量,下面两个原因引发发送粘包问题:
    1. 前一个分组确认,发送下一个分组;
    2. 收集多个分组,收到确认后一起发送。
  • 接收端:TCP将接收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到的分组。应用程序不能及时接收到发送的数据。

当发送的多个数据包之间需要逻辑隔离,那么就需要处理粘包问题。反之若发送的数据本身就是一个连续的整体,那么不需要处理粘包问题。

解决方案

数据包连着发送这个是不能改变的。我们需要在数据包层面标注包与包的分离方案,来解决粘包现象带来的问题。典型的方案有:

  • 每个包都封装成固定的长度。读取到内容时,依据长度进行分割即可

  • 数据包使用特定分隔符。读取到内容时,依据分隔符分割数据即可,例如HTTP,FTP协议的\r\n。

  • 将消息封装为Header+Body形式,Header通常时固定长度,Header中包含Body的长度信息。读取到期待长度时,才表示成功。

不论何种方案,在编码实现时,通常采用自定义编解码器的方案来实现。就类似JSON和GOB编码。

示例编码,以方案三,Header+Body的模式为例

Header的长度为4字节,用于表示Body的长度。

先定义编码解码器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// CusEncoder 定义编码器
type CusEncoder struct {
    w io.Writer
}

// NewEncoder 创建编码器函数
func NewEncoder(w io.Writer) *CusEncoder {
    return &CusEncoder{
       w: w,
    }
}

// Encode 编码,将编码的结果写入到w(io.Writer)
func (encoder CusEncoder) Encode(message string) error {
    // 1.获取message的长度
    l := int32(len(message))   // 假设message的长度不超过4字节

    buf := new(bytes.Buffer) // 构建一个数据包缓冲区

    // 2.在数据包中写入长度
    // 需要二进制的写入操作,需要将数据以bit的形式写入
    if err := binary.Write(buf, binary.LittleEndian, l); err != nil {
       return err
    }

    // 3.将数据主体body写入
    // 方式1:
    // if err := binary.Write(buf, binary.LittleEndian, []byte(message)); err != nil {
    //     return err
    // }
    // 方式2:
    if _, err := buf.Write([]byte(message)); err != nil {
       return err
    }

    // 4. 利用io.Writer发送数据
    if _, err := encoder.w.Write(buf.Bytes()); err != nil {
       return err
    }
    return nil
}

再利用编解码器,完成读写操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// CusDecoder 定义解码器
type CusDecoder struct {
    r io.Reader
}

// NewDecoder 创建Decoder解码器
func NewDecoder(r io.Reader) *CusDecoder {
    return &CusDecoder{
       r: r,
    }
}

// Decode 解码
func (decoder *CusDecoder) Decode(message *string) error {
    // 1.读取前4个字节
    header := make([]byte, 4)
    headerLen, err := decoder.r.Read(header)
    if err != nil {
       return err
    }
    if headerLen != 4 {
       return errors.New("header is not enough")
    }

    // 2.将前4个字节转换为int32类型,确定了body的长度
    var l int32
    headerBuf := bytes.NewBuffer(header)
    if err := binary.Read(headerBuf, binary.LittleEndian, &l); err != nil {
       return err
    }

    // 3.读取body
    body := make([]byte, l)
    bodyLen, err := decoder.r.Read(body)
    if err != nil {
       return err
    }
    if int32(bodyLen) != l {
       return errors.New("body is not enough")
    }

    // 4.设置message
    *message = string(body)
    return nil
}

server发送端,编码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func HandleConnEncoder(conn net.Conn) {
    log.Printf("accept conn from %s\n", conn.RemoteAddr())
    defer func() {
       log.Println("conn be closed")
       conn.Close()
    }()

    // 连续发送数据
    data := []string{
       "package data1: hello",
       "package data2: user name is tom",
       "package data3: password is 123456",
       "package data4: over",
    }
    encoder := NewEncoder(conn) // 创建自定义编码器
    index := 0
    for i := 0; i < 50; i++ {
       if err := encoder.Encode(data[index]); err != nil { // 利用编码器进行编码
          log.Println(err)
       }
       index = (index + 1) % 4
    }
}

接收端client,解码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
data := ""
decoder := NewDecoder(conn) // 创建解码器
count := 0
for {
    if err := decoder.Decode(&data); err != nil { // 解码操作
        log.Println(err)
        break
    }
    log.Println(count, "received data:", data)
    count++
}

TCP专用方法

除了通用的Listen,Accept,Dial外,net包还提供了专门的TCP方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Listen
func Listen(network, address string) (Listener, error)
func ListenTcP(network string, laddr *TCPAddr)(*TCPListener,error)

// Accept
func (Listener) Accept()(Connerror)
func (l *TCPListener) AcceptTCP()(*TCPConn,error)

// Dial
func Dial(network, address string)(Conn,error)
func DialTcP(network string, laddrraddr *TCPAddr)(*TCPConnerror)

其中,TCP特定的类型:

1
2
3
*TCPAddr
*TCPListene
*TCPConn

服务端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 服务器端开始监听端口
laddr, err := net.ResolveTCPAddr("tcp", ":8888")
if err != nil {
    log.Fatalln("listen err=", err)
}
tcpListener, err := net.ListenTCP("tcp", laddr)
if err != nil {
    log.Fatalln("listen err=", err)
}

// 循环等待客户端的连接
for {
    conn, err := tcpListener.AcceptTCP() // 等待客户端连接
    if err != nil {
        log.Println("Accept() err=", err)
    } else {
        log.Printf("Accept() suc= %v 客户端ip=%v time=%s\n", conn, conn.RemoteAddr().String(), time.Now().Format("2006-01-02 15:04:05"))
    }
    go HandleTcpConnSpecial(conn)
}

客户端代码:

1
2
3
4
5
6
laddr, err := net.ResolveTCPAddr("tcp", ":8888")
conn, err := net.DialTCP("tcp", laddr, laddr)
if err != nil {
    log.Println("client dial err=", err)
    return
}

注意,几个建立连接的相关方法即可。 建立连接后,传输数据的操作是通用的。

使用TCPConn的目的,是需要对TCP连接的特定属性进行配置,例如:

1
2
3
4
5
6
// 设置连接属性
tcpConn.SetKeepAlive(true)

// SetKeepAlive sets whether the operating system should send
// keep-alive messages on the connection.
func (c *TCPConn) SetKeepAlive(keepalive bool) error

TCP连接属性设置

*net.Tcpconn 提供如下几个设置连接熟悉的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 设置读写操作的Deadline (截至时间)
func (c *conn) SetDeadline(t time.Time) error
func (c *conn) SetReadDeadline(t time.Time) error
func (c *conn) SetWriteDeadline(t time.Time) error

// 设置读缓冲尺寸
func (c *conn) SetReadBuffer(bytes int) error
// 设置写缓存尺寸
func (c *conn) SetWriteBuffer(bytes int) error

// 设置TCP连接关闭后的逗留时间
func (c *TCPConn) SetLinger(sec int) error
// 设置是否开启KeepAlive,在一定时间段内(7200s,取决于os),连接上没有数据传输,会发送测试数据以用来探测对方的在线状态
func (c *TCPConn) SetKeepAlive(keepalive bool) error
// 设置KeepAlive的空闲时间
func (c *TCPConn) SetKeepAlivePeriod(d time.Duration) error
// 设置是否不延迟。默认false,表示有延迟,其实就是使用Nagle算法。true为无延迟
func (c *TCPConn) SetNoDelay(noDelay bool) error

缓冲示例图:

image-20231112094111992

延迟和不延迟:

image-20231112094204374

conn.(*net.TCPConn)可以将Conn接口断言为*net.TCPConn类型,使用其特定的方法。

1
2
3
4
5
6
// 断言为TCPConn即可
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
    log.PrintIn("not tcp connection")
}
tcpConn.SetNoDelay(true)

UDP编程

2023.11.12

UDP协议介绍

UDP(User Datagram Protocol)是一种无连接的、轻量级的传输层协议。它与TCP(Transmission Control Protocol)一样,位于 OSI 模型的传输层,但是与TCP不同,UDP是无连接的,它不建立持久的连接,而是尽力交付数据。以下是UDP协议的一些关键特点:

  1. 无连接性(Connectionless)

    • UDP是无连接的协议,这意味着在传输数据之前不需要建立连接。每个数据包(数据报)都是独立的,不依赖于之前的数据包。
  2. 不可靠性(Unreliable)

    • UDP不提供可靠的数据传输。它不保证数据的完整性、顺序性或可靠性,因此,数据包可能会丢失或以不同的顺序到达。
  3. 轻量级(Lightweight)

    • UDP头部相对较小,仅包含必需的信息。这使得UDP成为一种更轻量级的协议,适用于对实时性要求较高的应用。
  4. 不拥塞控制(No Congestion Control)

    • UDP不提供拥塞控制机制,因此,在网络拥塞时,UDP数据包可能会丢失。这使得UDP适用于实时应用,如语音和视频传输。
  5. 广播和多播支持(Broadcast and Multicast Support)

    • UDP允许数据包通过广播和多播方式发送到多个目标地址。
  6. 适用场景

    • UDP适用于需要快速传输的应用,如实时音视频传输、在线游戏等。由于其无连接的特性,它通常用于那些能够容忍一些数据包丢失的应用场景。
  7. 端口号

    • UDP使用端口号来标识不同的应用程序。发送端和接收端通过端口号来确定对应的应用程序。

UDP的简洁和高效性使得它在某些特定场景下非常有用,但在其他需要可靠性和顺序性的场景中,更常使用TCP。UDP主要用于实时应用,如VoIP、视频流、在线游戏等。在设计网络应用时,需要根据应用的要求选择适当的传输协议。

基本示例

由于UDP是“无连接”的,所以服务器端不需要创建监听套接字,只需要监听地址,等待客户端与之建立连接,即可通信。net包支持的典型UDP函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 解析UDPAddr
func ResolveUDPAddr(network address string) (*UDPAddr, error)
// 监听UDP地址
func ListenUDP(network string, laddr *UDPAddr) (*UDPConn, error)
// 连接UDP服务器
func DialUDP(network string laddr, raddr *UDPAddr) (*UDPConn, error)
// UDP读
func (c *UDPConn) ReadFromUDP(b []byte) (n int, addr *UDPAddr, err error)
// UDP写
func (c *UDPConn) WriteToUDP(b []byte, addr *UDPAddr) (int, error)

编写示例,一次读写操作:

服务端流程:

  • 解析UDP地址

  • 监听UDP

  • 读内容

  • 写内容

客户端流程:

  • 建立连接
  • 写操作
  • 读操作

示例代码:

服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func UDPServerBasic() {
    // 1.解析地址
    laddr, err := net.ResolveUDPAddr("udp", ":9876")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.监听
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("%s server is listening on %s", "UDP", udpConn.LocalAddr().String())
    defer udpConn.Close()

    // 3.读
    buf := make([]byte, 1024)
    readLen, raddr, err := udpConn.ReadFromUDP(buf)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("received  %s from %s\n", string(buf[:readLen]), raddr.String())

    // 4.写
    data := []byte("received:" + string(buf[:readLen]))
    writeLen, err := udpConn.WriteToUDP(data, raddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("send  %s(%d) to %s\n", string(data), writeLen, raddr.String())
}

客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func UDPClientBasic() {
    // 1.建立连接
    raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
    if err != nil {
       log.Fatalln(err)
    }
    udpConn, err := net.DialUDP("udp", nil, raddr)
    if err != nil {
       log.Fatalln(err)
    }

    // 2.写
    data := []byte("Go UDP Program")
    writeLen, err := udpConn.Write(data)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("send  %s(%d) to %s\n", string(data), writeLen, raddr.String())

    // 3.读
    buf := make([]byte, 1024)
    readLen, raddr, err := udpConn.ReadFromUDP(buf)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("received  %s from %s\n", string(buf[:readLen]), raddr.String())
}

connected和unconnected

UDP的连接分为:

  • 已连接,connected,使用方法DialUDP建立的连接,称为已连接,pre-connected。
  • 未连接,unconnected,使用方法 ListenUDP获得的连接,称为未连接,not connected。

如果*UDPConnconnected ,读写方法 Readwrite 。 如果 *UDPConnunconnected ,读写方法 ReadFromUDPWriteToUDP

主要的差异在写操作上。读操作如果使用混乱,不会影响读操作本身,但一些参数细节上要注意:

示例:获取远程地址,conn.RemoteAddr()

unconnected,ListenUDP:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// UDPServerConnect 测试无连接的UDP是否能获取到远程地址
func UDPServerConnect() {
    // 1.解析地址
    laddr, err := net.ResolveUDPAddr("udp", ":9876")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.监听
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("%s server is listening on %s", "UDP", udpConn.LocalAddr().String())
    defer udpConn.Close()

    // 测试输出远程地址
    log.Println(udpConn.RemoteAddr())
}

返回结果:udpConn.RemoteAddr()nil,说明无连接的方式无法获取到远程地址。

connected,DialUDP:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// UDPClientConnect 测试已连接的UDP是否能获取到远程地址
func UDPClientConnect() {
    // 1.建立连接
    raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
    if err != nil {
       log.Fatalln(err)
    }
    udpConn, err := net.DialUDP("udp", nil, raddr)
    if err != nil {
       log.Fatalln(err)
    }

    // 测试输出远程地址
    log.Println(udpConn.RemoteAddr())
}

返回结果:udpConn.RemoteAddr()127.0.0.1:9876,说明已连接的方式可以获取到远程地址。

示例:connected+WriteToUDP错误:

1
2
udpConn, err := net.DialUDP("udp", nil, raddr)
wn, err := udpConn.WriteToUDP(data, raddr)

错误信息:

1
2
127.0.0.1:52787->127.0.0.1:9876: use of WriteTo with pre-connected
connection

示例:unconnected+Write错误:

1
2
udpConn, err := net.ListenUDP("udp", laddr)
wn, err := udpConn.Write(data)

错误信息:

1
write udp [::]:9876: wsasend: A request to send or receive data was disallowed because the socket is notconnected and (when sending on a datagram socket using a sendto call) no address was supplied.

Read的使用尽量遵循原则,但语法上可以混用,内部有兼容处理。

对等服务端和客户端

函数 func ListenUDP(network string, laddr *UDPAddr) (*UDPConn, error)

可以直接返回UDPConn,是unconnected状态。在编程时,我们的客户端和服务端都可以使用该函数建立UDP连接。而不是一定要在客户端使用DialUDP函数。

这样创建的客户端和服务端时对等的关系。适用于例如广播类的网络通信项目中。

示例代码:

服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func UDPServerPeer() {
    // 1.解析地址
    laddr, err := net.ResolveUDPAddr("udp", ":9876")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.监听
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("%s server is listening on %s", "UDP", udpConn.LocalAddr().String())
    defer udpConn.Close()

    // 远程地址
    raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6789")
    if err != nil {
       log.Fatalln(err)
    }

    // 3.读
    buf := make([]byte, 1024)
    readLen, raddr, err := udpConn.ReadFromUDP(buf)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("received  %s from %s\n", string(buf[:readLen]), raddr.String())

    // 4.写
    data := []byte("received:" + string(buf[:readLen]))
    writeLen, err := udpConn.WriteToUDP(data, raddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("send  %s(%d) to %s\n", string(data), writeLen, raddr.String())
}

客户端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func UDPClientPeer() {
    // 1.解析地址
    laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6789")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.监听
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("%s server is listening on %s", "UDP", udpConn.LocalAddr().String())
    defer udpConn.Close()

    // 远程地址
    raddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:9876")
    if err != nil {
       log.Fatalln(err)
    }

    // 3.写
    data := []byte("Go UDP Program")
    writeLen, err := udpConn.WriteToUDP(data, raddr)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("send  %s(%d) to %s\n", string(data), writeLen, raddr.String())

    // 4.读
    buf := make([]byte, 1024)
    readLen, raddr, err := udpConn.ReadFromUDP(buf)
    if err != nil {
       log.Fatalln(err)
    }
    log.Printf("received  %s from %s\n", string(buf[:readLen]), raddr.String())
}

多播编程

多播(Multicast)方式的数据传输是基于 UDP 完成的。与 UDP 服务器端/客户端的单播方式不同,区别是,单播数据传输以单一目标进行,而多播数据同时传递到加入 (注册)特定组的大量主机。换言之,采用多播方式时,可以同时向多个主机传递数据。

多播的特点如下:

  • 多播发送端针对特定多播组;
  • 发送端发送 1 次数据,但该组内的所有接收端都会接收数据;
  • 多播组数可以在 IP 地址范围内任意增加。

如图所示:

image-20231112112931815

多播组是 D 类IP地址 (224.0.0.0~239.255.255.255) :

  • 224.0.0.0 ~ 224.0.0.255为预留的组播地址(永久组地址),地址224.0.0.0保留不做分配,其它地址供路由协议使用;
  • 224.0.1.0~ 224.0.1.255是公用组播地址,Internetwork Control Block;
  • 224.0.2.0~ 238.255.255.255为用户可用的组播地址 (临时组地址),全网范围内有效;
  • 239.0.0.0~239.255.255.255为本地管理组播地址,仅在特定的本地范围内有效。

Go的标准库net支持多播编程,主要的函数:

1
func ListenMulticastuDP(network string, ifi *Interface, gaddr *UDPAddr) (*UDPConn, error)

用于创建多播的UDP连接。

示例:多播通信

接收端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// UDPReceiverMulticast 多播接收端
func UDPReceiverMulticast() {
	// 1.多播监听地址
	address := "224.1.1.2:6789"
	gaddr, err := net.ResolveUDPAddr("udp", address)
	if err != nil {
		log.Fatalln(err)
	}

	// 2.多播监听
	// 获取本地网络接口
	ifi, err := net.InterfaceByName("WLAN")
	if err != nil {
		log.Fatalln(err)
	}

	udpConn, err := net.ListenMulticastUDP("udp", ifi, gaddr)
	if err != nil {
		log.Fatalln(err)
	}
	log.Printf("%s server is listening on %s\n", "UDP", udpConn.LocalAddr().String())
	//defer udpConn.Close()

	// 3.接收数据
	// 循环接收
	buf := make([]byte, 1024)
	for {
		n, raddr, err := udpConn.ReadFromUDP(buf)
		if err != nil {
			log.Println(err)
		}
		log.Printf("received \"%s\" from %s\n", string(buf[:n]), raddr.String())
	}
}

发送端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// UDPSenderMulticast 多播发送端
func UDPSenderMulticast() {
	// 1.建立UDP多播组连接
	address := "192.168.1.255:6789"
	raddr, err := net.ResolveUDPAddr("udp", address)
	if err != nil {
		log.Fatalln(err)
	}

	udpConn, err := net.DialUDP("udp", nil, raddr)
	if err != nil {
		log.Fatalln(err)
	}

	// 2.发送内容
	// 循环发送
	for {
		data := fmt.Sprintf("[%s]: %s",
			time.Now().Format("03:04:05.000"), "hello!")
		wn, err := udpConn.Write([]byte(data))
		if err != nil {
			log.Println(err)
		}
		log.Printf("send \"%s\"(%d) to %s\n", data, wn, raddr.String())
		time.Sleep(time.Second)
	}
}

广播编程

2023.11.16

广播地址,Broadcast,指的是将消息发送到在同一广播网络上的每个主机。

例如对于网络:

1
2
3
# ip a
ens33: <BROADCAST,MULTICAST,UPLOWER UP>
inet 192.168.5.130/24 brd 192.168.5.255

来说,IP ADDR 就是 192.168.50.130/24,广播地址就是 192.168.50.255。

意味着,只要发送数据包的目标地址(接收地址)为192.168.50.255时,那么该数据会发送给该网段上的所有计算机。如图:

image-20231116102847383

编码实现:

编码时数据发的发送端,同样使用ListenUDp方法建立UDP连接,调用WriteToUDP完成数据的发送。就是上面的对等服务端和客户端模式。

接收端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// UDPReceiverBroadcast 广播的接收端
func UDPReceiverBroadcast() {
    // 1.广播监听地址
    laddr, err := net.ResolveUDPAddr("udp", ":6789")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.广播监听
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }

    // 3.接收数据
    // 4.处理数据
    buf := make([]byte, 1024)
    for {
       n, raddr, err := udpConn.ReadFromUDP(buf)
       if err != nil {
          log.Println(err)
       }
       log.Printf("received \"%s\" from %s\n", string(buf[:n]), raddr.String())
    }
}

发送端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// UDPSenderBroadcast 广播的发送端
func UDPSenderBroadcast() {
    // 1.处理监听地址
    laddr, err := net.ResolveUDPAddr("udp", ":9876")
    if err != nil {
       log.Fatalln(err)
    }

    // 2.建立连接
    udpConn, err := net.ListenUDP("udp", laddr)

    // 3.发送数据
    // 广播地址
    rAddress := "192.168.1.255:6789"
    raddr, err := net.ResolveUDPAddr("udp", rAddress)
    if err != nil {
       log.Fatalln(err)
    }

    // 循环发送
    for {
       data := fmt.Sprintf("[%s]: %s",
          time.Now().Format("03:04:05.000"), "hello!")
       wn, err := udpConn.WriteToUDP([]byte(data), raddr)
       if err != nil {
          log.Println(err)
       }
       log.Printf("send \"%s\"(%d) to %s\n", data, wn, raddr)
       time.Sleep(time.Second)
    }
}

文件传输案例

案例说明 UDP协议在传输数据时,由于不能保证稳定性传输,因此比较适合多媒体通信领域,例如直播、视频、音频即时播放,即时通信等领域。

本案例使用文件传输为例。

客户端设计:

  • 发送文件mp3(注意类型都ok)
  • 发送文件名

服务端设计:

  • 接收文件
  • 存储为客户端发送的名字
  • 接收文件内容
  • 写入到具体的文件中

编码实现

文件上传

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// UDPFileUpload 文件上传
func UDPFileUpload() {
    // 1.获取文件信息
    // 打开文件
    filename := "./data/music.mp3"
    file, err := os.Open(filename)
    if err != nil {
       log.Fatalln(err)
    }
    defer file.Close() // 关闭文件
    fileInfo, err := file.Stat()
    if err != nil {
       log.Fatalln(err)
    }
    // 主要用到的两个文件信息:fileInfo.Size()  fileInfo.Name()
    log.Println("send file size:", fileInfo.Size())

    // 2.连接服务器
    rAddress := "192.168.1.6:5678"
    raddr, err := net.ResolveUDPAddr("udp", rAddress)
    if err != nil {
       log.Fatalln(err)
    }
    udpConn, err := net.DialUDP("udp", nil, raddr)
    defer udpConn.Close()

    // 3.发送文件名
    if _, err := udpConn.Write([]byte(fileInfo.Name())); err != nil {
       log.Fatalln(err)
    }

    // 4.服务器确认
    buf := make([]byte, 16*1024)
    rn, err := udpConn.Read(buf)
    if err != nil {
       log.Fatalln(err)
    }
    if "filename ok" != string(buf[:rn]) {
       log.Fatalln(errors.New("server not ready"))
    }

    // 5.发送文件内容
    // 读取文件内容,利用连接发送到接收端
    // file.Read()
    i := 0
    for {
       // 读取文件内容
       rn, err := file.Read(buf)
       if err != nil {
          // io.EOF错误表示文件读取完毕
          if err == io.EOF {
             break
          }
          log.Fatalln(err)
       }
       // 发送到接收端
       if _, err := udpConn.Write(buf[:rn]); err != nil {
          log.Fatalln(err)
       }
       i++
    }
    // 文件发送完成
    log.Println("transfer times:", i)
    log.Println("file send complete.")
    time.Sleep(time.Second) // 可能存在接收端还没有接收完,发送端提前退出的情况
}

文件下载

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// UDPFileDownload 文件下载
func UDPFileDownload() {
    // 1.建立UDP连接
    lAddress := ":5678"
    laddr, err := net.ResolveUDPAddr("udp", lAddress)
    if err != nil {
       log.Fatalln(err)
    }
    udpConn, err := net.ListenUDP("udp", laddr)
    if err != nil {
       log.Fatalln(err)
    }
    defer udpConn.Close()
    log.Printf("%s server is listening on %s", "UDP", udpConn.LocalAddr().String())

    // 2.接收文件名,并确认
    buf := make([]byte, 16*1024)
    rn, raddr, err := udpConn.ReadFromUDP(buf)
    if err != nil {
       log.Fatalln(err)
    }
    Filename := string(buf[:rn])
    if _, err := udpConn.WriteToUDP([]byte("filename ok"), raddr); err != nil {
       log.Fatalln(err)
    }

    // 3.接收文件内容,并写入文件
    // 创建文件
    file, err := os.Create(Filename)
    if err != nil {
       log.Fatalln(err)
    }
    defer file.Close()

    // 读取数据
    i := 0
    for {
       rn, _, err := udpConn.ReadFromUDP(buf)
       if err != nil {
          log.Fatalln(err)
       }
       log.Println("read length:", rn)
       if _, err := file.Write(buf[:rn]); err != nil {
          log.Fatalln(err)
       }
       i++
       log.Println(i, "file write some content.")
    }
}

文件上传成功,但文件内容未完整接收,注意这个UDP内容传输的特点(劣势)。

小结

  • UDP,User Datagram Protocol,用户数据报协议,是一个简单的面向数据报(package-oriented)的传输层协议
  • 单播,点对点
  • 多播,组内,使用多播 (组播) 地址
  • 广播,网段内,使用广播地址
  • udp连接
    • connected — net.DialUDP — Read, Write
    • unconnected — net.ListenUDP — ReadFromUDP, WriteToUDP
  • 场景:
    • 实时性要求高
    • 完整性要求不高

网络轮询器 NetPoller

网络轮询器是 Go 语言运行时用来处理 I/O 操作的关键组件,它使用了操作系统提供的 I/O 多路复用机制增强程序的并发处理能力。网络轮询器不仅用于监控网络 I/O,还能用于监控文件的 I/O ,它利用了操作系统提供的 I/O 多路复用模型来提升 I/O 设备的利用率以及程序的性能。

image-20231116222102513

I/O模型

操作系统中包含:

  • 阻塞I/O模型:Blocking I/O Model
  • 非阻塞I/O模型,Non-Blocking I/O Model
  • 信号驱动I/O模型
  • 异步I/O模型
  • I/O多路复用模型

五种I/O模型。

在 Unix 和类 Unix 操作系统中,文件描述符(File descriptor,FD) 是用于访问文件或者其他 I/O 资源的抽象柄,例如:管道或者网络套接字。而不同的 I/O 模型会使用不同的方式操作文件描述符。

阻塞I/O模型

当发出 I/O 读写的系统调用时,应用程序被阻寒。是最常见的 I/O 模型。

系统调用syscall,I/O 操作,需要与操作系统交换(文件,网络属于操作系统资源),这类操作称为系统调用。

如图:

image-20231116223049221

编码时,常用到的也是阻塞I/O。

网络阻塞I/O示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// BIONet 网络IO的阻塞
func BIONet() {
    addr := "127.0.0.1:5678"
    wg := sync.WaitGroup{}

    // 1.模拟读,体会读的阻塞状态
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       conn, _ := net.Dial("tcp", addr)
       defer conn.Close()
       buf := make([]byte, 1024)
       log.Println(time.Now().Format("03:04:05.000"), "start read.")
       n, _ := conn.Read(buf) // 当发送端没有发送内容到buf中时,Read()操作就处于阻塞状态
       log.Println(time.Now().Format("03:04:05.000"), "content:", string(buf[:n]))
    }(&wg)

    // 2.模拟写
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       l, _ := net.Listen("tcp", addr)
       defer l.Close()

       for {
          conn, _ := l.Accept()
          go func(conn net.Conn) {
             defer conn.Close()
             log.Println("connected.")

             // 阻塞时长
             time.Sleep(2 * time.Second)
             conn.Write([]byte("Blocking I/O"))
          }(conn)
       }
    }(&wg)

    wg.Wait()
}

Channel阻塞I/O示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// BIOChannel channel的阻塞
func BIOChannel() {
    // 0.初始化数据
    wg := sync.WaitGroup{}
    ch := make(chan struct{}) // IO channel

    // 1.模拟读
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       log.Println(time.Now().Format("03:04:05.000"), "start read.")
       content := <-ch // IO Read
       log.Println(time.Now().Format("03:04:05.000"), "content:", content)
    }(&wg)

    // 2.模拟写
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       time.Sleep(time.Second) // 阻塞时长
       ch <- struct{}{}
    }(&wg)

    wg.Wait()
}

非阻塞I/O模型

Non-Blocking I/O Model,当FD为非阻塞时,I/O 操作,会立即返回,不会在未就绪时阻塞。当然资源就绪时是可以正确返回的,如图:

image-20231116231126039

非阻塞 I/O ,在并发编程时非常常用,可以将原本应该阻塞的goroutine (或者线程),来处理其他事件。网络非阻塞编码示例:

conn.Read()或 conn.Write() I/O 操作,目前不具备非阻塞操作,但可以通过设置截止时间来完成。

1
2
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) // 100毫秒之后,Read()一定要返回结果(可以设置在一个很短的时间之后,看上去好像是非阻塞的)
n, _ := conn.Read(buf)                                       // 当发送端没有发送内容到buf中时,Read()操作就处于阻塞状态

Channel配合Select的default子句,可以完成非阻塞操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NIOChannel channel的阻塞
func NIOChannel() {
    // 0.初始化数据
    wg := sync.WaitGroup{}
    ch := make(chan struct{ id uint }) // IO channel

    // 1.模拟读,体会读的阻塞状态
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       log.Println(time.Now().Format("03:04:05.000"), "start read.")
       var content struct{ id uint }

       // 使用select的default子句,完成非阻塞操作
       select {
       case content = <-ch: // IO Read
       default:
       }

       log.Println(time.Now().Format("03:04:05.000"), "content:", content)
    }(&wg)

    // 2.模拟写
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       time.Sleep(time.Second) // 阻塞时长
       ch <- struct{ id uint }{42}
    }(&wg)

    wg.Wait()
}

【强调】以上的Go的程序的例子,都不是通过设置FD的属性实现的。而是通过外部技术实现的,截止时间和 Select语句。某些语言是支持在FD上设置属性的,如C语言:

1
2
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fdF_SETFLflags | O_NONBLOCK);

网络的 I/O 也可以配合select+channel完成非阻塞的操作,I/O 操作,将内容发送到Channel,外层select处理channel,在网络编程中,非常常用,示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// NIONetChannel 网络IO的非阻塞
func NIONetChannel() {
    addr := "127.0.0.1:5678"
    wg := sync.WaitGroup{}

    // 1.模拟读,体会读的阻塞状态
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       conn, _ := net.Dial("tcp", addr)
       defer conn.Close()
       log.Println(time.Now().Format("03:04:05.000"), "start read.")

       // 独立的goroutine,完成Read操作,将结果Send到channel中
       wgwg := sync.WaitGroup{}
       chRead := make(chan []byte)
       wgwg.Add(1)
       go func() {
          defer wgwg.Done()
          buf := make([]byte, 1024)
          n, _ := conn.Read(buf)
          chRead <- buf[:n]
       }()

       // 使用select+default实现非阻塞操作
       var data []byte
       select {
       case data = <-chRead:
       default:
       }

       log.Println(time.Now().Format("03:04:05.000"), "content:", string(data))
       wgwg.Wait()
    }(&wg)

    // 2.模拟写
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
       defer wg.Done()
       l, _ := net.Listen("tcp", addr)
       defer l.Close()

       for {
          conn, _ := l.Accept()
          go func(conn net.Conn) {
             defer conn.Close()
             log.Println("connected.")

             // 阻塞时长
             time.Sleep(2 * time.Second)
             conn.Write([]byte("Non-Blocking I/O"))
          }(conn)
       }
    }(&wg)

    wg.Wait()
}

信号驱动I/O模型

信号驱动I/O(signal-driven I/O),就是预先告知系统内核,当某个FD准备发生某件事情的时候,让内核发送一个信号通知应用进程。

如图:

image-20231117000332707

在等待信号的期间,应用程序不阻塞。

异步I/O模型

应用告知内核启动某个操作,并让内核在整个操作完成之后,通知应用。这种模型与信号驱动模型的主要区别在于,信号驱动 I/O 只是由内核通知我们合适可以开始下一个I/O操作,而异步I/O模型是由内核通知我们操作什么时候完成。

如图:

image-20231117000742117

多路复用I/O模型

ChatGPT对IO多路复用的介绍:

IO多路复用(Input/Output Multiplexing)是一种通过一种机制使单个进程能够监控多个文件描述符(通常是套接字或文件),并在其中任何一个文件描述符就绪时进行处理的技术。它允许一个进程同时处理多个输入/输出通道,而无需为每个通道创建一个单独的线程或进程。

在传统的阻塞式IO中,当一个IO操作进行时,程序会被阻塞,直到操作完成。而IO多路复用允许程序同时监控多个IO操作,只有在至少一个IO操作准备就绪时才执行实际的读取或写入操作,从而提高了程序的效率。

常见的IO多路复用机制有以下几种:

  1. select: select是Unix/Linux系统中最早引入的IO多路复用函数,它通过一个fd_set的数据结构来保存一组文件描述符,并在这组文件描述符上等待某种事件的发生。但是select的效率在文件描述符数量较大时较低。

  2. poll: poll是select的改进版本,使用poll数据结构代替了fd_set,解决了select中文件描述符数量受限的问题。

  3. epoll: epoll是Linux特有的IO多路复用机制,它使用事件驱动的方式,效率比select和poll更高。epoll提供了三个系统调用:epoll_create创建一个epoll实例,epoll_ctl用于控制epoll实例的行为,epoll_wait用于等待IO事件的发生。

IO多路复用在网络编程中特别有用,因为它可以让程序同时处理多个网络连接而不需要为每个连接创建一个线程,避免了线程创建和切换的开销。这种技术在高性能服务器和网络应用中得到广泛应用。

多路复用,Multiplexing,指的是监听一组FD,当FD的状态发生变化时(变为可读或可写),通知应用程序,应用程序完成对FD的操作。

如图所示:

image-20231117091740470

我们使用net包,完成TCP的服务器端,就是典型的多路复用:

  • 一个goroutine在持续监听是否有连接建立
  • 一旦建立连接,就获取了一个FD,交由一个独立的goorutine处理
  • 核心方法: net.Listen, net.Accept, net.Dial, conn.Read, conn.Write
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func() {
    l, _ := net.Listen("tcp", addr)
    defer l.Close()

    for {
       conn, _ := l.Accept()
       go func(conn net.Conn) {
          defer conn.Close()
          log.Println("connected.")

          // 阻塞时长
          time.Sleep(2 * time.Second)
          conn.Write([]byte("Blocking I/O"))
       }(conn)
    }
}()

Go语言中对10多路复用做了封装,底层同样是基于OS操作系统的多路复用特性,不同操作系统使用的多路复用实现不同,Go支持的列表如下:

1
2
3
4
5
6
7
8
9
// linux
src/runtime/netpoll_epoll.go
// macos
src/runtime/netpol1_kqueue.go
src/runtime/netpoll_solaris.go
// windows
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go

Go会基于当前的操作系统,选择对应的实现。所以我们经常说,go的net网络编程是基于epoll实现的,可见这主要针对于Linux来说的。

系统不同的I/O多路复用模型介绍:

  • select

    • 存在最大描述符数量的限制,通常为1024(底层实现为文件描述符数组)
    • 需要在内核开辟空间存储文件描述符
    • 良好的跨平台性能。会将所有的文件描述符都返回,需要程序自己去遍历区分是哪一个文件描述符
  • poll

    • 底层采用链表的方式实现,没有数量上的限制
      • 每一个节点上的node都是一个pollfd
      • 包含文件描述符、发生的事件
    • 同样需要程序自己去遍历获取发生变化的文件描述符
  • epoll

    • 没有描述符数量的限制
    • 使用红黑树的形式管理文件描述符以及对应的监听事件
    • 当触发对应事件的时候,进行回调,放入双向链表节点
    • 程序获取发生的变化的文件描述符的时候,只需要去检查双向链表节点中有没有即可,有的话,将事件以及事件数量返回给用户

网络轮询器

Go的网络轮询器是对 I/O 多路复用技术的封装,配合Groutine的GMP并发调度,实现Go语言层面的 I/O 多路复用。应用在文件 I/O 、网络 I/O 以及计时器操作中。

网络轮询器的核心操作有:

  1. 网络轮询器的初始化;
  2. 如何向网络轮询器加入待监听的FD;
  3. 如何从网络轮询器获取触发的事件;

三个核心操作。

Epoll核心概念

源码分析会使用 Linux 操作系统上的 epoll 实现作介绍,其他 I/O 多路复用模块的实现大同小异。在学习网络轮询器时,注意如何封装的epoll或其他实现。

核心概念:

  • epoll在Linux内核中构建了一个文件系统,该文件系统采用红黑树来构建。以为数据结构红黑树在增加和删除上面的效率高。
  • epoll提供了两种触发模式,水平触发(LT,Level Triggered)和边沿触发(ET,Edge Triggered)。
    • 水平触发:只要满足条件,就触发一个事件
    • 边沿触发:每当状态变化时,触发一个事件
  • epoll的工作流程
    • epoll_create,epoll初始化
    • epoll_ctl,epoll操作
    • epoll_wait,事件就绪等待

初始化

当使用文件 I/O、网络 I/O 以及计时器时:

  • internal/poll.pollDesc.init
    • net.netFD.init,初始化网络 I/O
    • os.newFile,初始化文件
  • runtime.doaddtimer, 增加新的计时器

以上调用会通过:netpollGenericInit 函数完成初始化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// src/runtime/netpoll.go

func netpollGenericInit() {
    if netpollInited.Load() == 0 {
       lockInit(&netpollInitLock, lockRankNetpollInit)
       lock(&netpollInitLock)
       if netpollInited.Load() == 0 {
          // 初始化网络轮询器
          netpollinit()
          netpollInited.Store(1)
       }
       unlock(&netpollInitLock)
    }
}

注意函数:netpollnit() 会基于特定OS平台的多路复用技术来说实现。本例中,Linux上就是epoll。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// src/runtime/netpoll_epoll.go

var (
    epfd int32 = -1 // epoll descriptor

    netpollBreakRd, netpollBreakWr uintptr // for netpollBreak

    netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
)

func netpollinit() {
    var errno uintptr
    // 创建文件描述符FD
    epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
    if errno != 0 {
       println("runtime: epollcreate failed with", errno)
       throw("runtime: netpollinit failed")
    }
    // 创建非阻塞管道
    r, w, errpipe := nonblockingPipe()
    if errpipe != 0 {
       println("runtime: pipe failed with", -errpipe)
       throw("runtime: pipe failed")
    }
    ev := syscall.EpollEvent{
       Events: syscall.EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
    // 将epoll FD加入事件监听
    errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
       println("runtime: epollctl failed with", errno)
       throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

初始化的核心功能在以上函数中实现:

  • 是调用 syscall.EpollCreate1 创建一个新的 epoll 文件描述符,这个文件描述符会在整个程序的生命周期中使用
  • 通过 runtime.nonblockingPipe 创建一个用于通信的管道
  • 使用 epolct1 将用于读取数据的文件描述符打包成 epollevent 事件加入监听

轮询事件

在epoll的pollOpen实现中,使用EpollCtl,完成了注册具体事件监听的操作:

1
2
3
4
5
6
7
8
9
// src/runtime/netpoll_epoll.go

func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
    tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
    *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

事件循环

初始化epoll(操作系统的多路复用)后,就需要操作系统中的 I/O 多路复用机制和 Go 语言的运时联系起来,进而达到基于OS的Multiplexing来实现Go语言层面的IO多路复用编程的目的。

存在两个核心过程:

  • Go运行时调度Goroutine让出线程并等待读写事件
  • 当多路复用读写事件触发时唤醒Goroutine执行

该过程,称为事件循环(loopEvent)。

Goroutine让出线程并等待读写事件

当我们在FD上执行读写操作时,如果FD不可用,当前 Goroutine 会等待FD的可读或者可写:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/runtime/netpoll.go

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	errcode := netpollcheckerr(pd, int32(mode))
	if errcode != pollNoError {
		return errcode
	}
	// As for now only Solaris, illumos, AIX and wasip1 use level-triggered IO.
	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
		netpollarm(pd, mode)
	}
    // epoll阻塞
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
		// Can happen if timeout has fired and unblocked us,
		// but before we had a chance to run, timeout has been reset.
		// Pretend it has not happened and retry.
	}
	return pollNoError
}

其中netpollblock就是Block阻塞。会调用 runtime.gopark() 让出当前线程M,将Goroutine转为阻塞休眠状态。

轮询网络

Go的runtime会在调度中轮询网络,轮询网络的核心过程是:

  • 计算 epol1 系统调用需要等待的时间;
  • 调用 epollwait 等待可读或者可写事件的发生;
  • 循环处理 epollevent 事件;

轮询网络:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// src/runtime/netpoll_epoll.go

func netpoll(delay int64) gList {
    if epfd == -1 {
       return gList{}
    }
    // 计算系统调用等待时间
    var waitms int32
    if delay < 0 {
       waitms = -1
    } else if delay == 0 {
       waitms = 0
    } else if delay < 1e6 {
       waitms = 1
    } else if delay < 1e15 {
       waitms = int32(delay / 1e6)
    } else {
       // An arbitrary cap on how long to wait for a timer.
       // 1e9 ms == ~11.5 days.
       waitms = 1e9
    }
    var events [128]syscall.EpollEvent
    
    // 等待可读或者可写事件的发生
retry:
    n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
    if errno != 0 {
       if errno != _EINTR {
          println("runtime: epollwait on fd", epfd, "failed with", errno)
          throw("runtime: netpoll failed")
       }
       // If a timed sleep was interrupted, just return to
       // recalculate how long we should sleep now.
       if waitms > 0 {
          return gList{}
       }
       goto retry
    }
    var toRun gList
    
    // 循环处理`epollevent`事件
    for i := int32(0); i < n; i++ {
       ev := events[i]
       if ev.Events == 0 {
          continue
       }

       if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
          if ev.Events != syscall.EPOLLIN {
             println("runtime: netpoll: break fd ready for", ev.Events)
             throw("runtime: netpoll: break fd ready for something unexpected")
          }
          if delay != 0 {
             // netpollBreak could be picked up by a
             // nonblocking poll. Only read the byte
             // if blocking.
             var tmp [16]byte
             read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
             netpollWakeSig.Store(0)
          }
          continue
       }

       var mode int32
       if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
          mode += 'r'
       }
       if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
          mode += 'w'
       }
       if mode != 0 {
          tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
          pd := (*pollDesc)(tp.pointer())
          tag := tp.tag()
          if pd.fdseq.Load() == tag {
             pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
             netpollready(&toRun, pd, mode)
          }
       }
    }
    return toRun
}

其中,在处理事件时,利用 netpollready 来处理IO事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// src/runtime/netpoll.go

//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
       rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
       wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
       toRun.push(rg)
    }
    if wg != nil {
       toRun.push(wg)
    }
}

其中,netpollunblock,会在读写事件发生时,取消Goroutine的阻塞,唤醒Goroutine的执行。

也就是netpollblock的逆向操作。

小结

  • IO模型
    • 阻塞I/O模型,Blocking I/O Model
    • 非阻塞I/O模型,Non-Blocking I/O Model
    • 信号驱动I/O模型,Signal-Driven I/O Model
    • 异步I/O模型,Asynchronous I/O Model
    • I/O多路复用模型,Multiplexing I/O Model
  • Go的网络轮询器实现Go层面的多路复用

gRPC通信

RPC介绍

ChatGPT的介绍

RPC(Remote Procedure Call)是一种通信协议,允许程序调用另一个地址空间(通常是网络上的另一台机器)的过程或函数,而不需要显式地处理细节,使得分布式计算更加方便。

以下是RPC的一些关键概念和步骤:

  1. 客户端调用: 客户端应用程序通过本地的调用方式调用远程服务的过程,就像调用本地函数一样。这个调用过程被称为"调用语义"。

  2. 通信层: 在RPC中,有一个通信层,负责在客户端和服务器之间传输数据。这个通信层可以基于不同的协议,如HTTP、TCP、UDP等。

  3. 序列化: 在RPC中,数据需要在客户端和服务器之间进行序列化和反序列化,以便在网络上传输。序列化是将数据结构或对象转换为字节流的过程,而反序列化则是将字节流还原为数据结构或对象。

  4. 接口定义语言(IDL): RPC系统通常使用IDL定义远程服务接口。IDL是一种中立的语言,描述了服务接口的方法、参数、返回值等信息。常见的IDL包括Protobuf、Thrift、IDL等。

  5. Stub(存根): 在客户端和服务器端,都有一个称为Stub(存根)的组件。客户端Stub负责将本地调用转换为远程调用,而服务器端Stub负责接收远程调用并将其转发到实际的服务实现。

  6. 远程过程调用: 一旦客户端发起远程调用,RPC系统会负责将调用传递给服务器,并等待服务器的响应。这个过程通常被称为远程过程调用(Remote Procedure Call)。

  7. 响应: 服务器执行完远程调用后,将结果返回给客户端。客户端Stub负责将返回的结果转换为本地调用的结果。

  8. 异步RPC: 有时候,RPC系统支持异步调用,允许客户端发起调用而不必等待服务器的响应。这在一些异步、并发或性能敏感的场景中很有用。

一些常见的RPC框架包括gRPC、Apache Thrift、Protocol Buffers、CORBA等。这些框架提供了自动生成Stub、IDL支持、序列化、通信等功能,简化了开发分布式系统的复杂性。不同的RPC框架在实现细节和性能上有所不同,开发者可以根据具体需求选择适合的框架。

RPC:Remote Procedure Call,远程过程调用。与 HTTP 一致,也是应用层协议。该协议的目标是实现:调用远程过程(方法、函数)就如用本地方法一致。

如图所示:

image-20231118173450175

说明:

  • ServiceA 需要调用 ServiceB 的 FuncOnB 函数,对于 ServiceA 来说 FuncOnB 就是远程过程。

  • RPC的目的是让 ServiceA 可以像调用 ServiceA 本地的函数一样调用远程函数 FuncOnB,也就是 ServieA

    上代码层面使用:serviceB.FuncOnB()即可完成调用。

  • RPC是C/S模式,调用方为 Client,远程方为 Server。

  • RPC 把整体的调用过程,数据打包、网络请求等,封装完毕,在 C、S 两端的 Stub 中。

  • Stub(代码存根)调用流程如下:

    1. ServiceA 将调回需求告知 Client Stub。
    2. Client Stub 将调用目标(Call ID)、参数数据(params)等调用信息进行打包(序列化),并将打包好的调用信息通过网络传输给 Server Stub。
    3. Server Stub 将根据调用信息,调用相应过程。期间涉及到数据的拆包(反序列化)等操作。
    4. 远程过程 FuncOnB 运行,并得到结果,将结果告知 Server Stub。

以上就是典型RPC的流程。

RPC 协议没有对网络层做规范,那也就意味着具体的 RPC 实现可以基于 TCP,也可以基于其他协议,例如HTTP,UDP 等。RPC 也没有对数据传输格式做规范,也就是逻辑层面,传输JSON、Text、protobuf 都可以。这些都要看具体的RPC产品的实现。广泛使用的 RPC产品有gRPC,Thrift等。

gRPC介绍

ChatGPT的介绍

gRPC(gRPC Remote Procedure Call)是由Google开发的一种高性能、开源的RPC框架,基于HTTP/2协议进行通信,使用Protocol Buffers(ProtoBuf)作为接口定义语言(IDL)。gRPC的设计目标是简单、高效、跨语言,适用于构建分布式系统中的客户端和服务端。

以下是gRPC的一些关键特点和组成部分:

  1. IDL与ProtoBuf: gRPC使用ProtoBuf作为接口定义语言。ProtoBuf是一种轻量级的数据序列化协议,支持多种语言,并提供了简洁的接口描述语言。通过定义.proto文件,开发者可以描述服务的方法、消息类型等。

  2. 多语言支持: gRPC支持多种编程语言,包括但不限于C++, Java, Python, Go, Ruby, C#, Node.js等。这使得开发者可以在不同语言之间更轻松地构建分布式系统。

  3. HTTP/2协议: gRPC基于HTTP/2协议进行通信,提供了一些优势,如多路复用、头部压缩、双向流等。这有助于提高性能,减少网络开销。

  4. 双向流: gRPC支持双向流,允许客户端和服务器之间同时发起多个消息。这种双向通信的特性使其在实时应用程序和流式处理中很有用。

  5. 自动代码生成: 基于.proto文件的接口定义,gRPC工具可以自动生成客户端和服务器端的代码,包括Stub(存根)和消息类。这简化了开发过程,减少了手动编写大量重复代码的工作。

  6. 拦截器: gRPC支持拦截器机制,允许开发者在请求到达目标服务之前或响应返回给客户端之前执行自定义的逻辑。这为实现认证、日志记录等提供了灵活性。

  7. 服务发现: gRPC可以集成服务发现工具,如etcd、Consul等,以实现服务的动态注册和发现。

  8. 安全性: gRPC提供了多种安全性特性,包括基于TLS的加密传输、认证机制等,确保通信的安全性。

总体而言,gRPC是一个强大的RPC框架,广泛用于构建高性能、可扩展、跨语言的分布式系统。由于其设计的特性,gRPC适用于各种场景,包括微服务架构、实时通信、大规模分布式系统等。

gPRC 官网(https://grpc.io/) 上的 Slogan 是: A high performance, open source universal RPC framework。就是:一个高性能、开源的通用 RPC框架。

支持多数主流语言:C#、C++、Dart、Go、Java、Kotlin、Node、Objective-C、PHP、Python、Ruby。其中Go支持Windows,Linux,Mac 上的 Go 1.13+ 版本

gRPC是一个Google 开源的高性能远程过程调用(RPC)框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持有效地连接数据中心内和跨数据中心的服务。它也适用于分布式计算的最后一步,将设备、移动应用程序和浏览器与后端服务接。

准备gRPC环境

使用gRPC需要:

  • Go
  • Protocol Buffer 编译器,protoc,推荐版本3
  • Go Plugin,用于 Protocol Buffer 编译器

安装 protoc:

可以使用 yum 或 apt 包管理器安装,但通常版本会比较滞后。因此更建议使用预译的二进制安装。 下载地址:

1
https://github.com/protocolbuffers/protobuf/releases

基于系统和版本找到合适的二进制下载并安装

CentOS 演示:

1
2
3
4
5
6
7
8
# 下载特定版本,当前(2022年08月)最新 21.4
$ curl -L0 https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protoc-21.4-linux-x86_64.zip
# 解压到持定目录
$ sudo unzip protoc-21.4-linux-x86_64.zip -d /usr/local
# 如果特定目录中的bin不在环境变量path 中,手动加入 path
# 测试安装结果,注章版本应该是 3.x
$ protoc --version
libprotoc 3.21.4

Win 演示,下载,解压到指定目录,在 CMD 中运行:

1
2
3
4
# 解压到指定目录即可,要保证 protoc/bin 位于环境变量 path 中,可以随处调用

> protoc.exe --version
libprotoc 3.21.4

安装 Go Plugin:

1
2
3
4
5
6
7
#下载特定版本,当前(2022年08月)最新 v1.28.1
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
#下载特定版本,当前(2022年08月)最新 v1.2.0
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 安装完毕后,要保证 $GOPATH/bin 位于环境变量 path 中
# 测试安装结果
$ protoc-gen-go-grpc -version

protoc-gen-go-grpc 1.2.0 Protocol Buffer 的基础使用O 默认情况下,gRPC使用 Protocol Buffers,这是 Google 用于序列化结构化数据的成熟开源机制(尽管它可以与JSON等其他数据格式一起使用)。

Protocol Buffers 的文档: https://developers.google.com/protocol-buffers/docs/overview

使用 Protocol Buffers 的基本步骤是:

  1. 定义消息格式(.proto 文件): 创建一个 .proto 文件来定义你的消息格式。这个文件包括消息的字段和其数据类型。例如:

    1
    2
    3
    4
    5
    6
    7
    
    syntax = "proto3";
    
    message MyMessage {
      int32 id = 1;
      string name = 2;
      // 添加其他字段...
    }

    在这个例子中,我们定义了一个消息类型 MyMessage,包含两个字段:idname

  2. 编译 .proto 文件: 使用 Protocol Buffers 编译器(protoc)将 .proto 文件编译成相应语言的源代码。例如,如果你使用的是 Go 语言,你可以运行以下命令:

    1
    
    protoc --go_out=. yourfile.proto

    这将生成一个名为 yourfile.pb.go 的 Go 文件,其中包含 Protocol Buffers 生成的代码。

  3. 在代码中使用生成的类: 导入生成的类,并在代码中使用它们来创建、序列化和反序列化消息。例如,在 Go 中:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
    package main
    
    import (
      "fmt"
      "github.com/golang/protobuf/proto"
    )
    
    func main() {
      // 创建消息
      myMessage := &MyMessage{
        Id:   1,
        Name: "John Doe",
      }
    
      // 序列化消息
      data, err := proto.Marshal(myMessage)
      if err != nil {
        fmt.Println("Error marshaling:", err)
        return
      }
    
      // 反序列化消息
      newMessage := &MyMessage{}
      err = proto.Unmarshal(data, newMessage)
      if err != nil {
        fmt.Println("Error unmarshaling:", err)
        return
      }
    
      // 使用新消息
      fmt.Println(newMessage)
    }

    这个例子演示了如何创建、序列化和反序列化 MyMessage

  4. 其他操作(可选): Protocol Buffers 还支持一些其他功能,如枚举、服务定义等。你可以在 .proto 文件中定义它们,并在生成的代码中使用。

总体而言,这些步骤概括了使用 Protocol Buffers 的基本过程。具体的步骤可能会因所选择的编程语言而有所不同,因为每种语言都有自己的 Protocol Buffers 实现。

这部分的学习视频存在缺失,后面再找其他的视频学习这一部分的内容。(2023.11.18)

HTTP

介绍

HTTP(Hypertext Transfer Protocol)是一种用于在计算机之间传输超文本的协议。它是互联网上数据通信的基础,通常用于从Web服务器传输到浏览器的超文本文档,但也可以用于其他目的。HTTP遵循客户端-服务器模型,其中客户端通过发送HTTP请求与服务器进行通信,服务器则通过发送HTTP响应进行响应。

以下是HTTP协议的一些关键概念和详细介绍:

  1. 通信过程:

    • 客户端发送请求: 客户端(通常是浏览器)通过建立与服务器的TCP连接,并发送HTTP请求,其中包含请求方法(GET、POST等)、资源路径、协议版本、请求头部等信息。

    • 服务器处理请求: 服务器接收到HTTP请求后,根据请求的内容执行相应的处理。这可能涉及到从数据库中检索数据、生成动态内容等操作。

    • 服务器发送响应: 服务器将HTTP响应发送回客户端,响应包含一个状态码、响应头部和响应正文。状态码表示请求的成功或失败,响应头包含有关响应的信息,响应正文包含所请求资源的数据。

    • 连接关闭: 一旦响应发送完成,连接可以关闭(取决于是否使用持久连接)。

  2. HTTP请求方法:

    • GET: 从服务器获取资源。
    • POST: 向服务器提交数据,用于创建新资源。
    • PUT: 更新服务器上的资源。
    • DELETE: 从服务器删除资源。
    • HEAD: 与GET类似,但只返回头部信息而不返回正文。
    • OPTIONS: 获取服务器支持的HTTP方法。
    • PATCH: 部分更新服务器上的资源。
  3. HTTP响应状态码:

    • 1xx(信息性状态码): 服务器收到请求,需要请求者继续执行操作。
    • 2xx(成功状态码): 请求成功被服务器接收、理解、并接受。
    • 3xx(重定向状态码): 需要进一步的操作来完成请求。
    • 4xx(客户端错误状态码): 请求包含语法错误或无法完成请求。
    • 5xx(服务器错误状态码): 服务器在处理请求的过程中发生错误。
  4. HTTP头部:

    HTTP请求和响应都包含头部,头部包含与请求或响应相关的元信息。一些常见的头部包括:

    • Content-Type: 指定请求或响应中的MIME类型。
    • Content-Length: 指定请求或响应正文的长度。
    • Cache-Control: 控制缓存行为。
    • User-Agent: 标识发送请求的用户代理(通常是浏览器)。
  5. URL(Uniform Resource Locator):

    URL是用于标识和定位资源的统一资源定位符。它包含协议、主机名、端口、路径和查询参数等组成部分。

  6. 版本:

    目前广泛使用的版本是HTTP/1.1,但HTTP/2和HTTP/3也在逐渐推广使用。新版本通常引入性能改进、多路复用等特性。

各版本比较

HTTP(Hypertext Transfer Protocol)协议经历了多个版本的演变,其中最为重要和广泛使用的版本包括HTTP/1.0、HTTP/1.1、HTTP/2和HTTP/3。下面是它们之间的主要比较:

  1. HTTP/1.0:
  • 发布时间: 1996年。
  • 特点:
    • 非持久连接:每个请求/响应都需要单独的连接。
    • 每个连接只能处理一个请求。
    • 简单而直观,但在性能上存在一些问题,因为每个请求都需要建立新的连接。
    • 不支持请求和响应的分块传输(Chunked Transfer)。
    • 没有明确规定头部的缓存处理。
  1. HTTP/1.1:
  • 发布时间: 1997年。
  • 特点:
    • 引入持久连接:多个请求/响应可以共享一个连接,减少了连接建立和关闭的开销。
    • 引入管道机制(Pipeline):客户端可以在一个连接上发送多个请求而不等待响应。
    • 引入Host头部:允许一个物理服务器(IP地址)承载多个逻辑服务器(域名)。
    • 支持分块传输(Chunked Transfer):允许服务器逐块发送响应。
    • 引入缓存控制头部,如Cache-Control。
  1. HTTP/2:
  • 发布时间: 2015年。
  • 特点:
    • 二进制传输:将消息从文本格式改为二进制格式,提高了解析效率。
    • 多路复用:允许在一个连接上并发发送多个请求和响应,避免了"队头阻塞"问题。
    • 头部压缩:使用HPACK算法对头部进行压缩,减小了传输开销。
    • 优化了TLS的握手过程,提高了安全性。
    • 引入优先级和依赖关系,允许客户端指定资源的加载顺序。
  1. HTTP/3:
  • 发布时间: 2020年。
  • 特点:
    • 基于UDP传输:使用QUIC协议(Quick UDP Internet Connections),提供更低的延迟和更好的性能。
    • 多路复用:保留HTTP/2的多路复用特性。
    • 零RTT连接:允许在第一次连接时发送数据,减少了连接建立的时间。
    • 优化头部压缩和传输。
    • 支持快速握手和迁移。
  1. 性能比较:
  • HTTP/2相对于HTTP/1.1:

    • 多路复用显著减少了页面加载时间。
    • 二进制传输降低了解析开销。
    • Header Compression减小了头部大小。
    • 支持服务端推送,服务器可以主动推送资源给客户端。
  • HTTP/3相对于HTTP/2:

    • 使用UDP而不是TCP,减少了握手时间。
    • 实现了快速握手和迁移。
    • 零RTT连接进一步降低了延迟。

总体而言,每个新版本都试图解决前一个版本中存在的问题,并提供更好的性能、安全性和功能。HTTP/3作为最新版本,基于QUIC协议,通过引入UDP传输等特性,进一步提高了性能和安全性。

简单的HTTP服务器

函数:

1
2
// http.ListenAndServe
func ListenAndServe(addr string, handler Handler) error

用于启动HTTP服务器,监听addr,并使用handler来处理请求。返回启动错误。其中:

  • addr,TCP address,形式为 IP:port,IP省略表示监听全部网络接口;
  • handler,经常的被设置为nil,表示使用DefaultServeMux (默认服务复用器)来处理请求。
  • DefaultServeMux要使用以下两个函数来添加请求处理器:
    • func Handle(pattern string, handler Handler)
    • func HandleFunc(pattern string, handler func(ResponseWriter,*Request))

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
    HttpServerSimple()
}

func HttpServerSimple() {
    // 1.设置不同路由,对于不同的处理器
    // /ping < - > pong
    http.HandleFunc("/ping", pingHandler)

    // 2.启动监听并提供服务
    addr := ":8888"
    log.Println("http server is listening on", addr)
    if err := http.ListenAndServe(addr, nil); err != nil {
       panic(err)
    }
}

// pingHandler 处理ping请求的函数
func pingHandler(w http.ResponseWriter, r *http.Request) {
    // http.ResponseWriter 响应Writer  *http.Request 请求对象,包含了请求信息
    // 向客户端发送响应
    fmt.Fprintln(w, "pong") // 将指定内容写入w中
}

复杂的HTTP服务器

定制性的HTTP服务器,通过 Server 类型进行设置。其定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/net/http/server

type Server struct {
	// TCP地址
	Addr string
	// 调用的handler,如果为nil,则使用http.DefaultServeMux
    Handler Handler
    // 可选地提供TLS配置,供ServeTLS和ListenAndServeTLS使用
	TLSConfig *tls.Config
	// 读请求超时时间
	ReadTimeout time.Duration
	// 读请求头超时时间
	ReadHeaderTimeout time.Duration
	// 写响应超时时间
	WriteTimeout time.Duration
	// 空闲超时时间
	IdleTimeout time.Duration
	// Header最大长度
	MaxHeaderBytes int

	// 其他字段略
}

该类型的 func (srv *Server) ListenAndServe() error 函数用于监听和服务。

示例代码:

0%