关于gRPC和golang实现微服务的记录(二)

关于gRPC和golang实现微服务的记录(二)

四月 16, 2019

go-kit和gRPC结合使用


什么是Go Kit

Go kit is a collection of Go (golang) packages (libraries) that help you build robust, reliable, maintainable microservices. It was originally conceived as a toolkit to help larger (so-called modern enterprise) organizations adopt Go as an implementation language. But it very quickly “grew downward”, and now serves smaller startups and organizations just as well. For more about the origins of Go kit, see Go kit: Go in the modern enterprise.

Go Kit是使用golang编写的一个微服务工具包,帮助开发者能够快速展开微服务开发的工作

Transports 传输层

Go Kit 可以自由的去选择使用HTTP或者gRPC作为传输方式,你也可以在一个项目中使用HTTP API和RPC服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 使用gRPC
import (
. . .

gRPCtransport "github.com/go-kit/kit/transport/grpc"

. . .
)

// 使用HTTP
import (
. . .

httptransport "github.com/go-kit/kit/transport/http"

. . .
)

Endpoints 端点层

类似MVC设计模式下的Controller中的Action Handler,如果你使用了两个传输方式,你也需要两个方法来请求同一个端点(endpoint)

1
2
3
4
5
6
7
8
9
10
11
// endpoint
// RPC endpoint 服务请求响应适配器
func makeBookInfoEndpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.BookInfoRequest)
b := new(pb.BookInfoResponse)
b.BookId = req.BookId
b.BookName = "人人都是产品经理"
return b, nil
}
}

Services 服务层

实现所有业务逻辑的地方,将多个端点沾合在一起,Go Kit中,服务通常被定义为interface接口,这些接口包含需要被实现的业务(业务逻辑应该不了解端点或特别是传输域概念:您的服务不应该知道有关HTTP头或gRPC错误代码的任何信息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 服务接口
type StringService interface {
Uppercase(string) (string, error)
Count(string) int
}

// 接口实现
type stringService struct{}

func (stringService) Uppercase(s string) (string, error) {
if s == "" {
return "", ErrEmpty
}
return strings.ToUpper(s), nil
}

func (stringService) Count(s string) int {
return len(s)
}

Middlewares 中间件

目前在学习中了解到了使用etcd作为注册中心,Go Kit提供的负载均衡,使用DelayingLimiter限流器。熔断机制和服务请求追踪还未去接触

使用Etcd注册中心 & 负载均衡 & 限流器的使用

关于etcd存储的文章学习:https://www.cnblogs.com/softidea/p/6517959.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 下载最近的etcd存储仓库:https://github.com/etcd-io/etcd/releases & https://www.linuxidc.com/Linux/2015-02/112763.htm
// 设置为环境变量,使用 $ etcd,将它启起来,监听2379端口

var (
// 服务实例地址
instanceAddr = "127.0.0.1:50051"
// etcd服务地址
etcdAddr = "127.0.0.1:2379"
// 服务的信息目录
prefix = "/services/book/"
// 服务实例注册的路径
key = prefix + instanceAddr
// 服务实例注册的val
value = instanceAddr
ctx = context.Background()
// 服务监听的端口
serviceAddr = ":50051"
)

func main() {
// etcd的连接参数
options := etcdv3.ClientOptions{
DialTimeout: time.Second * 3,
DialKeepAlive: time.Second * 3,
}
// 创建etcd连接
client, err := etcdv3.NewClient(ctx, []string{etcdAddr}, options)
if err != nil {
grpclog.Fatalln(err)
}

// 创建注册器
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
Key: key,
Value: value,
}, log.NewNopLogger())

// 注册器启动注册
registrar.Register()

server := new(BookService)

bookListEndpoint := makeBookListEndpoint()
// 创建限流器 1r/s
bookListLimiter := rate.NewLimiter(rate.Every(time.Second * 1), 1)
// 通过DelayingLimiter中间件,在bookListEndpoint的外层再包裹一层限流的endpoint
bookListEndpoint = ratelimit.NewDelayingLimiter(bookListLimiter)(bookListEndpoint)
// 创建handler
bookListHandler := gRPCtransport.NewServer(
bookListEndpoint,
decodeReq,
encodeRes,
)
// go-kik 处理逻辑
server.bookListHandler = bookListHandler

bookInfoEndpoint := makeBookInfoEndpoint()
bookInfoLimiter := rate.NewLimiter(rate.Every(time.Second * 1), 1)
bookInfoEndpoint = ratelimit.NewDelayingLimiter(bookInfoLimiter)(bookInfoEndpoint)
bookInfoHandler := gRPCtransport.NewServer(
bookInfoEndpoint,
decodeReq,
encodeRes,
)
server.bookInfoHandler = bookInfoHandler

// 启动gRPC服务
listen, err := net.Listen("tcp", serviceAddr)
if err != nil {
grpclog.Fatalf("failed to listen: \n%v", err)
}
gRPCserver := grpc.NewServer(grpc.UnaryInterceptor(gRPCtransport.Interceptor))
pb.RegisterBookServiceServer(gRPCserver, server)

fmt.Println("Listen port on ", serviceAddr)
gRPCserver.Serve(listen)
}

测试负载均衡,只需在新建个server_1.go文件,将服务端代码复制进去,修改监听端口(50052),客服端进行轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/etcdv3"
"github.com/go-kit/kit/sd/lb"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"io"
pb "microservice-demo/gRPC/book/proto"
"time"
)

var (
addr = "127.0.0.1:50051"
// etcd服务地址 注册中心地址
etcdAddr = "127.0.0.1:2379"
// 监听的服务前缀
prefix = "/services/book/"
ctx = context.Background()
)

func main() {
options := etcdv3.ClientOptions{
DialTimeout: time.Second * 3,
DialKeepAlive: time.Second * 3,
}
// 连接注册中心
client, err := etcdv3.NewClient(ctx, []string{etcdAddr}, options)
if err != nil {
grpclog.Fatalln("client error: ", err)
}
logger := log.NewNopLogger()
// 实例管理器
// 监听etcd中prefix的目录变化更新缓存的实例数据
instancer, err := etcdv3.NewInstancer(client, prefix, logger)
if err != nil {
grpclog.Fatalln("instancer error: ", err)
}

// endpoint管理器
// 根据Factory和监听得到实例创建endpoint并订阅实例管理器的变化动态更新Factory创建的endpoint
endpointer := sd.NewEndpointer(instancer, reqFactory, logger)
// 负载均衡器
balancer := lb.NewRoundRobin(endpointer)

// 1.可以通过负载均衡器直接获取请求的endpoint,发起请求
// 2.也可以通过retry定义尝试次数进行请求
reqEndpoint := lb.Retry(3, time.Second * 3, balancer)

// 通过endpoint发起请求
var req interface{}
// 轮询
for i := 0; i < 8; i++ {
fmt.Println("请求服务: ", addr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
if _, err := reqEndpoint(ctx, req); err != nil {
grpclog.Errorln("reqEndpoint error: ", err)
}
}
}

// 根据实例地址 创建对应请求的endpoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
fmt.Println("请求服务:", instanceAddr)
conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
if err != nil {
grpclog.Errorln("connect error: ", err)
}
defer conn.Close()
bookClient := pb.NewBookServiceClient(conn)
bookInfo, _ := bookClient.GetBookInfo(context.Background(), &pb.BookInfoRequest{ BookId: 1 })
fmt.Println("书籍详情")
fmt.Println("bookId: 1", " => ", "bookName: ", bookInfo.BookName)

bookList, _ := bookClient.GetBookList(context.Background(), &pb.BookListRequest{ Page: 1, Limit: 10 })
fmt.Println("书籍列表")
for _, item := range bookList.BookList {
fmt.Println("bookId: ", item.BookId, " => ", "bookName: ", item.BookName)
}
return nil, nil
}, nil, nil
}