入门案例
案例1
使用 rpcx 默认的数据序列化方案:golang 原生 struct + msgpack。
service
定义一个简单的服务和数据结构,用于演示如何使用 Go 语言中的 rpcx 框架实现远程过程调用(RPC)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Package example defines data structure and services.
package example
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A + args.B
fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
*reply = "hello " + *args
return nil
}
type Greeter struct{}
func (s *Greeter) Say(ctx context.Context, name *string, reply *string) error {
*reply = fmt.Sprintf("hello %s!", *name)
return nil
}
|
以上代码展示了如何通过 rpcx 框架实现了一个简单的 RPC 服务:
Arith
结构体定义了两个数学计算方法 Mul
和 Add
,用于计算乘积和加法。
Greeter
结构体实现了 Say
方法,用于向客户端返回一个包含问候语的字符串。
- 这些方法可以被远程客户端调用,并在调用时传递参数和接收结果。
server
使用 rpcx 框架来创建一个 RPC 服务器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package main
import (
example "RPCX_First_Proj/Test1"
"flag"
"github.com/smallnest/rpcx/server"
)
var addr = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
s := server.NewServer()
// s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}
|
这段代码的功能是使用 rpcx 框架创建一个 RPC 服务器,监听指定地址和端口,注册了一个名为 Arith
的服务,客户端可以通过该服务器调用 Arith
结构体中定义的方法。
client
创建一个 RPC 客户端,通过 rpcx 框架与远程的 Arith
服务进行通信。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package main
import (
"context"
"flag"
example "github.com/rpcxio/rpcx-examples"
"log"
"time"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/share"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
share.Trace = true
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
|
这段代码演示了如何使用 rpcx 框架创建一个简单的 RPC 客户端,通过点对点发现机制连接到指定地址的服务器,并循环调用 Arith
服务的 Mul
方法。这种实现可以用于构建分布式系统中的客户端程序,实现远程过程调用和数据交互。
案例2
采用 Protocol Buffers 进行序列化。
为了实现一个使用rpcx框架和Protobuf的简单示例,你需要按照以下步骤来进行:
- 安装依赖
- 编写proto文件
- 编译proto文件
- 编写服务器代码
- 编写客户端代码
1.安装依赖
1
2
|
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
|
2.编写proto文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// proto/greeter.proto
syntax = "proto3";
//-------------------注意!!!!!-------------------
//
//1. proto定义不能修改,不能删除,只能增加
//2. 不能改变原有字段的位置,否则会导致全局bug!!!!!!!
//3. 仔细考虑字段的扩展性,可有可无的字段都要定义为 optional
package example;
option go_package = "./proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
|
3.编译proto文件
1
|
protoc --go_out=. proto/greeter.proto
|
这样会在proto
目录下生成一个greeter.pb.go
文件。
4.编写服务器代码
使用 rpcx 框架创建的 RPC 服务器程序,实现一个简单的 Greeter
服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package main
import (
pb "RPCX_First_Proj/Test2/proto"
"context"
"github.com/smallnest/rpcx/server"
"log"
)
type Greeter struct{}
func (g *Greeter) SayHello(ctx context.Context, req *pb.HelloRequest, res *pb.HelloReply) error {
res.Message = "Hello, " + req.Name
return nil
}
func main() {
s := server.NewServer()
// 用于将服务注册到服务器中,使其能够处理客户端的请求
err := s.RegisterName("Greeter", new(Greeter), "")
if err != nil {
return
}
// 服务器启动并监听 RPC 请求,它被阻塞直到接收到来自客户端的连接
err = s.Serve("tcp", ":8972")
if err != nil {
log.Fatalf("failed to start server: %v", err)
}
}
|
这段代码实现了一个简单的 RPC 服务器,提供了一个 Greeter
服务,该服务包含一个 SayHello
方法,用于响应客户端发送的 HelloRequest
请求,并返回一个 HelloReply
响应。服务器启动后会在 8972
端口上监听来自客户端的连接,并处理其请求。
5.编写客户端代码
实现一个 rpcx 客户端,用于调用远程的 Greeter
服务中的 SayHello
方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package main
import (
pb "RPCX_First_Proj/Test2/proto"
"context"
"fmt"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/protocol"
"log"
)
func main() {
// 创建点对点发现机制,指向服务器地址
d, _ := client.NewPeer2PeerDiscovery("tcp@127.0.0.1:8972", "")
// 设置客户端选项使用Protobuf序列化
opt := client.Option{
SerializeType: protocol.ProtoBuffer,
}
// 创建XClient实例
xclient := client.NewXClient("Greeter", client.Failtry, client.RandomSelect, d, opt)
defer xclient.Close()
req := &pb.HelloRequest{Name: "World"}
res := &pb.HelloReply{}
// 远程调用SayHello方法
err := xclient.Call(context.Background(), "SayHello", req, res)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("Response: %s\n", res.Message)
}
|
这段代码实现了一个 rpcx 客户端,连接到本地 127.0.0.1:8972
端口上的 RPC 服务器,并调用 Greeter
服务的 SayHello
方法。客户端通过 Protobuf 进行数据序列化和反序列化,发送一个包含 Name
字段的请求,并接收一个包含 Message
字段的响应。最终,客户端打印出服务器返回的消息。
案例中用到的函数
客户端
NewPeer2PeerDiscovery()
client.NewPeer2PeerDiscovery
函数是 rpcx 框架中用于创建点对点(Peer-to-Peer)服务发现机制的方法。这个函数的参数说明如下:
1
|
func NewPeer2PeerDiscovery(pair string, metadata string) Discovery
|
参数说明
-
pair string
:
- 类型:
string
- 用途: 指定服务器的地址,包括协议、IP 地址和端口号。
- 格式:
protocol@ip:port
protocol
:通信协议,通常是 tcp
。
ip
:服务器的 IP 地址。
port
:服务器监听的端口号。
示例:
"tcp@127.0.0.1:8972"
:使用 TCP 协议连接到本地地址 127.0.0.1
的端口 8972
。
-
metadata string
:
- 类型:
string
- 用途: 附加的元数据,可以为空字符串。元数据可以用于传递额外的信息,这些信息可以用于服务发现、负载均衡或其他用途。
- 示例: 在简单的示例中,这个参数通常为空字符串
""
,表示没有附加的元数据。
返回值
- 类型:
Discovery
- 用途: 返回一个服务发现实例,用于客户端的服务发现机制。在点对点模式下,这个实例包含了指定的服务器地址。
client.NewXClient()
client.NewXClient
函数是 rpcx 框架中用于创建一个客户端 XClient
实例的方法。这个函数的参数说明如下:
1
|
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, d Discovery, opt Option) XClient
|
参数说明
-
servicePath string
:
- 类型:
string
- 用途: 指定要调用的服务路径或名称。在 rpcx 中,通常使用服务的路径或名称来标识要调用的服务。
-
failMode FailMode
:
- 类型:
FailMode
- 用途: 指定客户端的故障处理模式。定义了在调用失败时的处理策略。
- 枚举值:
Failover
: 失败自动切换到另一个节点。
Failtry
: 失败时重试,直到成功或达到最大重试次数。
Failfast
: 快速失败,只尝试一次。
Failbackup
: 失败后调用备份节点。
- 示例:
client.Failtry
表示在失败时进行重试。
-
selectMode SelectMode
:
- 类型:
SelectMode
- 用途: 指定客户端的负载均衡模式。定义了客户端选择服务节点的策略。
- 枚举值:
RandomSelect
: 随机选择一个节点。
RoundRobin
: 轮询选择节点。
WeightedRoundRobin
: 加权轮询选择节点。
ConsistentHash
: 一致性哈希选择节点。
Closest
: 选择最近的节点。
- 示例:
client.RandomSelect
表示随机选择一个节点进行服务调用。
-
d Discovery
:
- 类型:
Discovery
- 用途: 服务发现接口,用于客户端发现可用的服务节点。
- 示例: 可以使用
client.NewPeer2PeerDiscovery
或其他实现了 Discovery
接口的对象。
-
opt Option
:
- 类型:
Option
- 用途: 客户端选项,用于配置客户端的行为,比如序列化方式、超时设置等。
- 示例: 可以设置
SerializeType
为 protocol.ProtoBuffer
,表示使用 Protobuf 作为序列化方式。
返回值
- 类型:
XClient
- 用途: 返回一个客户端
XClient
实例,用于执行远程服务调用。
代码示例
结合前面提到的完整的客户端代码,这里是 NewXClient
函数的使用示例:
1
2
3
|
// 创建 XClient 实例
xclient := client.NewXClient("Greeter", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()
|
服务端
RegisterName()
在 rpcx 框架中,RegisterName
函数用于将服务注册到服务器中,使其能够处理客户端的请求。RegisterName
函数的定义如下:
1
|
func (s *Server) RegisterName(serviceName string, rcvr interface{}, metadata string) error
|
参数说明
-
serviceName string
:
- 类型:
string
- 用途: 指定服务的名称。客户端调用服务时会使用这个名称来标识具体的服务。在分布式系统中,服务名称用于在多个服务之间进行区分。这个名称应该是唯一的,以避免冲突。
-
rcvr interface{}
:
- 类型:
interface{}
- 用途: 这是一个实现了服务方法的接收者对象。这个接收者对象包含了所有的服务方法,当服务器接收到相应的请求时,会调用这个对象的方法来处理请求。这个接收者对象通常是一个结构体实例,结构体的方法会被注册为远程可调用的方法。
-
metadata string
:
- 类型:
string
- 用途: 附加的元数据,可以为空字符串。在某些场景下,元数据用于传递一些额外的信息,这些信息可以用于服务发现、负载均衡或其他目的。通常在简单的示例中,这个参数可以为空。
-
返回值 error
:
- 类型:
error
- 用途: 如果注册过程中发生错误,返回一个
error
对象。否则返回 nil
,表示服务成功注册。
总结
RegisterName
函数用于将服务注册到服务器中,使其能够处理客户端的请求。
serviceName
参数指定服务名称,rcvr
参数指定服务实例,metadata
参数用于附加元数据。
- 返回值
error
用于捕获和处理可能发生的错误。