Fork me on GitHub

2023年3月

实战 APISIX 服务发现

APISIX 使用小记 中,我们通过 APISIX 官方提供的入门示例学习了 APISIX 的基本概念,并使用 Admin API 和 Dashboard 两种方法创建路由。在创建路由时,我们必须明确地知道服务的 IP 和端口,这给运维人员带来了一定的负担,因为服务的重启或扩缩容都可能会导致服务的 IP 和端口发生变化,当服务数量非常多的时候,维护成本将急剧升高。

APISIX 集成了多种服务发现机制来解决这个问题,通过服务注册中心,APISIX 可以动态地获取服务的实例信息,这样我们就不用在路由中写死固定的 IP 和端口了。

如下图所示,一个标准的服务发现流程大致包含了三大部分:

discovery-cn.png

  1. 服务启动时将自身的一些信息,比如服务名、IP、端口等信息上报到注册中心;各个服务与注册中心使用一定机制(例如心跳)通信,如果注册中心与服务长时间无法通信,就会注销该实例;当服务下线时,会删除注册中心的实例信息;
  2. 网关会准实时地从注册中心获取服务实例信息;
  3. 当用户通过网关请求服务时,网关从注册中心获取的实例列表中选择一个进行代理;

目前市面上流行着很多注册中心,比如 Eureka、Nacos、Consul 等,APISIX 内置了下面这些服务发现机制:

基于 Eureka 的服务发现

Eureka 是 Netflix 开源的一款注册中心服务,它也被称为 Spring Cloud Netflix,是 Spring Cloud 全家桶中的核心成员。本节将演示如何让 APISIX 通过 Eureka 来实现服务发现,动态地获取下游服务信息。

启动 Eureka Server

我们可以直接运行官方的示例代码 spring-cloud-samples/eureka 来启动一个 Eureka Server:

$ git clone https://github.com/spring-cloud-samples/eureka.git
$ cd eureka && ./gradlew bootRun

或者也可以直接使用官方制作好的镜像:

$ docker run -d -p 8761:8761 springcloud/eureka

启动之后访问 http://localhost:8761/ 看看 Eureka Server 是否已正常运行。

启动 Eureka Client

如果一切顺利,我们再准备一个简单的 Spring Boot 客户端程序,引入 spring-cloud-starter-netflix-eureka-client 依赖,再通过 @EnableEurekaClient 注解将服务信息注册到 Eureka Server:

@EnableEurekaClient
@SpringBootApplication
@RestController
public class EurekaApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }

    @RequestMapping("/")
    public String home() {
        return String.format("Hello, I'm eureka client.");
    }
}

在配置文件中设置服务名称和服务端口:

spring.application.name=eureka-client
server.port=8081

默认注册的 Eureka Server 地址是 http://localhost:8761/eureka/,可以通过下面的参数修改:

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/

默认情况下,Eureka Client 是将该服务所在的主机名注册到 Eureka Server,这在某些情况下可能会导致其他服务调不通该服务,我们可以通过下面的参数,让 Eureka Client 注册 IP 地址:

eureka.instance.prefer-ip-address=true
eureka.instance.ip-address=192.168.1.40

启动后,在 Eureka 页面的实例中可以看到我们注册的服务:

eureka-instances.png

APISIX 集成 Eureka 服务发现

接下来,我们要让 APISIX 通过 Eureka Server 找到我们的服务。首先,在 APISIX 的配置文件 config.yaml 中添加如下内容:

discovery:
  eureka:
    host:
      - "http://192.168.1.40:8761"
    prefix: /eureka/

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/11 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/eureka",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/eureka", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "eureka",
        "service_name": "EUREKA-CLIENT"
    }
}'

之前创建路由时,我们在 upstream 中通过 nodes 参数表示上游服务器节点,这里我们不再需要写死服务器节点信息,而是通过 "discovery_type": "eureka""service_name": "EUREKA-CLIENT" 来让 APISIX 使用 eureka 服务发现机制,上游的服务名称为 EUREKA-CLIENT

值得注意的是,虽然上面的 Eureka Client 的 spring.application.name 是小写,但是注册到 Eureka Server 的服务名称是大写,所以这里的 service_name 参数必须是大写。此外,这里我们还使用了 proxy-rewrite 插件,它相当于 Nginx 中的路径重写功能,当多个上游服务的接口地址相同时,通过路径重写可以将它们区分开来。

访问 APISIX 的 /eureka 地址验证一下:

$ curl http://127.0.0.1:9080/eureka
Hello, I'm eureka client.

我们成功通过 APISIX 访问到了我们的服务。

关于 APISIX 集成 Eureka 的更多信息,可以参考官方文档 集成服务发现注册中心 和官方博客 API 网关 Apache APISIX 集成 Eureka 作为服务发现

基于 Nacos 的服务发现

Nacos 是阿里开源的一款集服务发现、配置管理和服务管理于一体的管理平台,APISIX 同样支持 Nacos 的服务发现机制。

启动 Nacos Server

首先,我们需要准备一个 Nacos Server,Nacos 官网提供了多种部署方式,可以 通过源码或安装包安装通过 Docker 安装通过 Kubernetes 安装,我们这里直接使用 docker 命令启动一个本地模式的 Nacos Server:

$ docker run -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:v2.2.0

不知道为什么,有时候启动会报这样的错误:com.alibaba.nacos.api.exception.runtime.NacosRuntimeException: errCode: 500, errMsg: load derby-schema.sql error,多启动几次又可以了。

启动成功后,访问 http://localhost:8848/nacos/ 进入 Nacos 管理页面,默认用户名和密码为 nacos/nacos

nacos-home.png

启动 Nacos Client

接下来,我们再准备一个简单的 Spring Boot 客户端程序,引入 nacos-discovery-spring-boot-starter 依赖,并通过它提供的 NameService 将服务信息注册到 Nacos Server:

@SpringBootApplication
@RestController
public class NacosApplication implements CommandLineRunner {

    @Value("${spring.application.name}")
    private String applicationName;

    @Value("${server.port}")
    private Integer serverPort;
    
    @NacosInjected
    private NamingService namingService;
    
    public static void main(String[] args) {
        SpringApplication.run(NacosApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        namingService.registerInstance(applicationName, "192.168.1.40", serverPort);
    }

    @RequestMapping("/")
    public String home() {
        return String.format("Hello, I'm nacos client.");
    }
}

在配置文件中设置服务名称和服务端口:

spring.application.name=nacos-client
server.port=8082

以及 Nacos Server 的地址:

nacos.discovery.server-addr=127.0.0.1:8848

启动后,在 Nacos 的服务管理页面中就可以看到我们注册的服务了:

nacos-service-management.png

APISIX 集成 Nacos 服务发现

接下来,我们要让 APISIX 通过 Nacos Server 找到我们的服务。首先,在 APISIX 的配置文件 config.yaml 中添加如下内容:

discovery:
  nacos:
    host:
      - "http://192.168.1.40:8848"
    prefix: "/nacos/v1/"

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/22 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/nacos",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/nacos", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "nacos",
        "service_name": "nacos-client"
    }
}'

和上面 Eureka 服务发现的例子一样,我们也使用 proxy-rewrite 插件实现了路径重写功能,访问 APISIX 的 /nacos 地址验证一下:

$ curl http://127.0.0.1:9080/nacos
Hello, I'm nacos client.

我们成功通过 APISIX 访问到了我们的服务。

关于 APISIX 集成 Nacos 的更多信息,可以参考官方文档 基于 Nacos 的服务发现 和官方博客 Nacos 在 API 网关中的服务发现实践

基于 Consul 的服务发现

Consul 是由 HashiCorp 开源的一套分布式系统的解决方案,同时也可以作为一套服务网格解决方案,提供了丰富的控制平面功能,包括:服务发现、健康检查、键值存储、安全服务通信、多数据中心等。

启动 Consul Server

Consul 使用 Go 语言编写,安装和部署都非常简单,官方提供了 Consul 的多种安装方式,包括 二进制安装Kubernetes 安装HCP 安装。这里我们使用最简单的二进制安装方式,这种方式只需要执行一个可执行文件即可,首先,我们从 Install Consul 页面找到对应操作系统的安装包并下载:

$ curl -LO https://releases.hashicorp.com/consul/1.15.1/consul_1.15.1_linux_amd64.zip
$ unzip consul_1.15.1_linux_amd64.zip

下载并解压之后,Consul 就算安装成功了,使用 consul version 命令进行验证:

$ ./consul version
Consul v1.15.1
Revision 7c04b6a0
Build Date 2023-03-07T20:35:33Z
Protocol 2 spoken by default, understands 2 to 3 (agent will automatically use protocol >2 when speaking to compatible agents)

Consul 安装完成后,就可以启动 Consul Agent 了,Consul Agent 有 -server-client 两种模式,-client 一般用于服务网格等场景,这里我们通过 -server 模式启动:

$ ./consul agent -server -ui -bootstrap-expect=1 -node=agent-one -bind=127.0.0.1 -client=0.0.0.0 -data-dir=./data_dir
==> Starting Consul agent...
              Version: '1.15.1'
           Build Date: '2023-03-07 20:35:33 +0000 UTC'
              Node ID: '8c1ccd5a-69b3-4c95-34c1-f915c19a3d08'
            Node name: 'agent-one'
           Datacenter: 'dc1' (Segment: '<all>')
               Server: true (Bootstrap: true)
          Client Addr: [0.0.0.0] (HTTP: 8500, HTTPS: -1, gRPC: -1, gRPC-TLS: 8503, DNS: 8600)
         Cluster Addr: 127.0.0.1 (LAN: 8301, WAN: 8302)
    Gossip Encryption: false
     Auto-Encrypt-TLS: false
            HTTPS TLS: Verify Incoming: false, Verify Outgoing: false, Min Version: TLSv1_2
             gRPC TLS: Verify Incoming: false, Min Version: TLSv1_2
     Internal RPC TLS: Verify Incoming: false, Verify Outgoing: false (Verify Hostname: false), Min Version: TLSv1_2

==> Log data will now stream in as it occurs:

其中 -ui 表示开启内置的 Web UI 管理界面,-bootstrap-expect=1 表示服务器希望以 bootstrap 模式启动,-node=agent-one 用于指定节点名称,-bind=127.0.0.1 这个地址用于 Consul 集群内通信,-client=0.0.0.0 这个地址用于 Consul 和客户端之间的通信,包括 HTTP 和 DNS 两种通信方式,-data-dir 参数用于设置数据目录。关于 consul agent 更多的命令行参数,可以参考 Agents OverviewAgents Command-line Reference

简单起见,我们也可以使用 -dev 参数以开发模式启动 Consul Agent:

$ ./consul agent -dev

如果 Consul Agent 启动成功,访问 http://localhost:8500/ 进入 Consul 的管理页面,在服务列表可以看到 consul 这个服务:

consul-services.png

在节点列表可以看到 agent-one 这个节点:

consul-nodes.png

启动 Consul Client

让我们继续编写 Consul Client 程序,引入 spring-cloud-starter-consul-discovery 依赖,并通过 @EnableDiscoveryClient 注解将服务信息注册到 Consul Server:

@EnableDiscoveryClient
@SpringBootApplication
@RestController
public class ConsulApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ConsulApplication.class, args);
    }

    @RequestMapping("/")
    public String home() {
        return String.format("Hello, I'm consul client.");
    }
}

可以看到和 Eureka Client 的代码几乎是完全一样的,不过有一点要注意,我们还需要在 pom.xml 文件中引入 spring-boot-starter-actuator 依赖,开启 Actuator 端点,因为 Consul 默认是通过 /actuator/health 接口来对程序做健康检查的。

在配置文件中设置服务名称和服务端口:

spring.application.name=consul-client
server.port=8083

以及 Consul 相关的配置:

spring.cloud.consul.host=127.0.0.1
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.service-name=${spring.application.name}
spring.cloud.consul.discovery.prefer-ip-address=true
spring.cloud.consul.discovery.ip-address=192.168.1.40

启动后,在 Consul 的服务管理页面中就可以看到我们注册的服务了:

consul-service-detail.png

APISIX 集成 Consul 服务发现

接下来,我们要让 APISIX 通过 Consul Server 找到我们的服务。首先,在 APISIX 的配置文件 config.yaml 中添加如下内容:

discovery:
  consul:
    servers:
      - "http://192.168.1.40:8500"

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/33 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/consul",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/consul", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "consul",
        "service_name": "consul-client"
    }
}'

访问 APISIX 的 /consul 地址验证一下:

$ curl http://127.0.0.1:9080/consul
Hello, I'm consul client.

关于 APISIX 集成 Consul 的更多信息,可以参考官方文档 基于 Consul 的服务发现

基于 Consul KV 的服务发现

Consul 还提供了分布式键值数据库的功能,这个功能和 Etcd、ZooKeeper 类似,主要用于存储配置参数和元数据。基于 Consul KV 我们也可以实现服务发现的功能。

首先准备 consul-kv-client 客户端程序,它的地址为 192.168.1.40:8084,我们通过 Consul KV 的 HTTP API 手工注册服务:

$ curl -X PUT http://127.0.0.1:8500/v1/kv/upstreams/consul-kv-client/192.168.1.40:8084 -d ' {"weight": 1, "max_fails": 2, "fail_timeout": 1}'

其中,/v1/kv/ 后的路径按照 {Prefix}/{Service Name}/{IP}:{Port} 的格式构成。可以在 Consul 的 Key/Value 管理页面看到我们注册的服务:

consul-key-value.png

然后在 APISIX 的配置文件 config.yaml 中添加如下内容:

discovery:
  consul_kv:
    servers:
      - "http://192.168.1.40:8500"
    prefix: "upstreams"

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/44 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/consul_kv",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/consul_kv", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "consul_kv",
        "service_name": "http://192.168.1.40:8500/v1/kv/upstreams/consul-kv-client/"
    }
}'

注意这里的 service_name 参数需要设置为 KV 的 URL 路径,访问 APISIX 的 /consul_kv 地址验证一下:

$ curl http://127.0.0.1:9080/consul_kv
Hello, I'm consul_kv client.

另一点需要注意的是,这种方式注册的服务没有健康检查机制,服务退出后需要手工删除对应的 KV:

$ curl -X DELETE http://127.0.0.1:8500/v1/kv/upstreams/consul-kv-client/192.168.1.40:8084

关于 APISIX 集成 Consul KV 的更多信息,可以参考官方文档 基于 Consul KV 的服务发现 和官方博客 Apache APISIX 集成 Consul KV,服务发现能力再升级

基于 DNS 的服务发现

Consul 不仅支持 HTTP API,而且还支持 DNS API,它内置了一个小型的 DNS 服务器,默认端口为 8600,我们以上面的 consul-client 为例,介绍如何在 APISIX 中集成 DNS 的服务发现。

注册到 Consul 中的服务默认会在 Consul DNS 中添加一条 <服务名>.service.consul 这样的域名记录,使用 dig 命令可以查询该域名的信息:

$ dig @192.168.1.40 -p 8600 consul-client.service.consul

; <<>> DiG 9.11.3-1ubuntu1.17-Ubuntu <<>> @192.168.1.40 -p 8600 consul-client.service.consul
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 32989
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
;; WARNING: recursion requested but not available

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;consul-client.service.consul.  IN      A

;; ANSWER SECTION:
consul-client.service.consul. 0 IN      A       192.168.1.40

;; Query time: 4 msec
;; SERVER: 192.168.1.40#8600(192.168.1.40)
;; WHEN: Tue Mar 21 07:17:40 CST 2023
;; MSG SIZE  rcvd: 73

上面的查询结果中只包含 A 记录,A 记录中只有 IP 地址,没有服务端口,如果用 A 记录来做服务发现,服务的端口必须得固定;好在 Consul 还支持 SRV 记录,SRV 记录中包含了服务的 IP 和端口信息:

$ dig @192.168.1.40 -p 8600 consul-client.service.consul SRV

; <<>> DiG 9.11.3-1ubuntu1.17-Ubuntu <<>> @192.168.1.40 -p 8600 consul-client.service.consul SRV
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 41141
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 3
;; WARNING: recursion requested but not available

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;consul-client.service.consul.  IN      SRV

;; ANSWER SECTION:
consul-client.service.consul. 0 IN      SRV     1 1 8083 c0a80128.addr.dc1.consul.

;; ADDITIONAL SECTION:
c0a80128.addr.dc1.consul. 0     IN      A       192.168.1.40
agent-one.node.dc1.consul. 0    IN      TXT     "consul-network-segment="

;; Query time: 3 msec
;; SERVER: 192.168.1.40#8600(192.168.1.40)
;; WHEN: Tue Mar 21 07:18:22 CST 2023
;; MSG SIZE  rcvd: 168

我们在 APISIX 的配置文件 config.yaml 中添加如下内容:

discovery:
  dns:
    servers:
      - "192.168.1.40:8600"

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/55 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/dns",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/dns", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "dns",
        "service_name": "consul-client.service.consul"
    }
}'

访问 APISIX 的 /dns 地址验证一下:

$ curl http://127.0.0.1:9080/dns
Hello, I'm consul client.

关于 Consul DNS 的更多信息,可以参考官方文档 Discover services with DNS,除了 Consul DNS,我们也可以使用其他的 DNS 服务器来做服务发现,比如 CoreDNS 就是 Kubernetes 环境下的服务发现默认实现。

关于 APISIX 集成 DNS 的更多信息,可以参考官方文档 基于 DNS 的服务发现 和官方博客 API 网关 Apache APISIX 携手 CoreDNS 打开服务发现新大门

基于 APISIX-Seed 架构的控制面服务发现

上面所介绍的所有服务发现机制都是在 APISIX 上进行的,我们需要修改 APISIX 的配置,并重启 APISIX 才能生效,这种直接在网关上实现的服务发现也被称为 数据面服务发现,APISIX 还支持另一种服务发现机制,称为 控制面服务发现

控制面服务发现不直接对 APISIX 进行修改,而是将服务发现结果保存到 Etcd 中,APISIX 实时监听 Etcd 的数据变化,从而实现服务发现:

control-plane-service-discovery.png

APISIX 通过 APISIX-Seed 项目实现了控制面服务发现,这样做有下面几个好处:

  • 简化了 APISIX 的网络拓扑,APISIX 只需要关注 Etcd 的数据变化即可,不再和每个注册中心保持网络连接;
  • APISIX 不用额外存储注册中心的服务数据,减小内存占用;
  • APISIX 的配置变得简单,更容易管理;

虽然如此,目前 APISIX-Seed 还只是一个实验性的项目,从 GitHub 上的活跃度来看,官方似乎对它的投入并不是很高,目前只支持 ZooKeeperNacos 两种服务发现,而且官方也没有提供 APISIX-Seed 安装包的下载,需要我们自己通过源码来构建:

$ git clone https://github.com/api7/apisix-seed.git
$ make build

构建完成后,可以得到一个 apisix-seed 可执行文件,然后我们以上面的 Nacos 为例,介绍如何通过 APISIX-Seed 来实现控制面服务发现。

首先,我们将 APISIX 的配置文件中所有服务发现相关的配置都删掉,并重启 APISIX,接着打开 conf/conf.yaml 配置文件,文件中已经提前配置好了 Etcd、ZooKeeper、Nacos 等相关的配置,我们对其做一点裁剪,只保留下面这些信息:

etcd:
  host:
    - "http://127.0.0.1:2379"
  prefix: /apisix
  timeout: 30
    
log:
  level: warn
  path: apisix-seed.log
  maxage: 168h
  maxsize: 104857600
  rotation_time: 1h

discovery:
  nacos:
    host:
      - "http://127.0.0.1:8848"
    prefix: /nacos

然后启动 apisix-seed

$ ./apisix-seed
panic: no discoverer with key: dns

goroutine 15 [running]:
github.com/api7/apisix-seed/internal/discoverer.GetDiscoverer(...)
        D:/code/apisix-seed/internal/discoverer/discovererhub.go:42
        D:/code/apisix-seed/internal/core/components/watcher.go:84 +0x1d4
created by github.com/api7/apisix-seed/internal/core/components.(*Watcher).Init
        D:/code/apisix-seed/internal/core/components/watcher.go:48 +0x2b6
panic: no discoverer with key: consul

goroutine 13 [running]:
github.com/api7/apisix-seed/internal/discoverer.GetDiscoverer(...)
        D:/code/apisix-seed/internal/discoverer/discovererhub.go:42
github.com/api7/apisix-seed/internal/core/components.(*Watcher).handleQuery(0x0?, 0xc000091200, 0x0?)
        D:/code/apisix-seed/internal/core/components/watcher.go:84 +0x1d4
created by github.com/api7/apisix-seed/internal/core/components.(*Watcher).Init
        D:/code/apisix-seed/internal/core/components/watcher.go:48 +0x2b6

不过由于上面我们在路由中添加了 dns、consul 这些服务发现类型,这些 APISIX-Seed 是不支持的,所以启动会报错,我们需要将这些路由删掉:

$ curl -X DELETE http://127.0.0.1:9180/apisix/admin/routes/11 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'
$ curl -X DELETE http://127.0.0.1:9180/apisix/admin/routes/33 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'
$ curl -X DELETE http://127.0.0.1:9180/apisix/admin/routes/44 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'
$ curl -X DELETE http://127.0.0.1:9180/apisix/admin/routes/55 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'

只保留一条 /nacos 的路由,然后重启 apisix-seed 即可:

$ ./apisix-seed
2023-03-22T07:49:53.849+0800    INFO    naming_client/push_receiver.go:80       udp server start, port: 55038

访问 APISIX 的 /nacos 地址验证一下:

$ curl http://127.0.0.1:9080/nacos
Hello, I'm nacos client.

关于 APISIX-Seed 的更多信息,可以参考官方文档 基于 APISIX-Seed 架构的控制面服务发现APISIX-Seed 项目文档

基于 Kubernetes 的服务发现

我们的服务还可能部署在 Kubernetes 集群中,这时,不用依赖外部的服务注册中心也可以实现服务发现,因为 Kubernetes 提供了强大而丰富的监听资源的接口,我们可以通过监听 Kubernetes 集群中 Services 或 Endpoints 等资源的实时变化来实现服务发现,APISIX 就是这样做的。

我们以 Kubernetes 使用小记 中的 kubernetes-bootcamp 为例,体验一下 APISIX 基于 Kubernetes 的服务发现。

首先在 Kubernetes 集群中创建对应的 Deployment 和 Service:

$ kubectl create deployment kubernetes-bootcamp --image=jocatalin/kubernetes-bootcamp:v1
deployment.apps/kubernetes-bootcamp created

$ kubectl expose deployment/kubernetes-bootcamp --type="NodePort" --port 8080
service/kubernetes-bootcamp exposed

通过 kubectl get svc 获取 NodePort 端口,并验证服务能正常访问:

$ kubectl get svc
NAME                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)          AGE
kubernetes            ClusterIP   10.96.0.1       <none>        443/TCP          115d
kubernetes-bootcamp   NodePort    10.101.31.128   <none>        8080:32277/TCP   59s

$ curl http://192.168.1.40:32277
Hello Kubernetes bootcamp! | Running on: kubernetes-bootcamp-857b45f5bb-jtzs4 | v=1

接下来,为了让 APISIX 能查询和监听 Kubernetes 的 Endpoints 资源变动,我们需要创建一个 ServiceAccount

kind: ServiceAccount
apiVersion: v1
metadata:
 name: apisix-test
 namespace: default

以及一个具有集群级查询和监听 Endpoints 资源权限的 ClusterRole

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: apisix-test
rules:
- apiGroups: [ "" ]
  resources: [ endpoints ]
  verbs: [ get,list,watch ]

再将这个 ServiceAccountClusterRole 关联起来:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: apisix-test
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: apisix-test
subjects:
  - kind: ServiceAccount
    name: apisix-test
    namespace: default

然后我们需要获取这个 ServiceAccount 的 token 值,如果 Kubernetes 是 v1.24 之前的版本,可以通过下面的方法获取 token 值:

$ kubectl get secrets | grep apisix-test
$ kubectl get secret apisix-test-token-c64cv -o jsonpath={.data.token} | base64 -d

Kubernetes 从 v1.24 版本开始,不能再通过 kubectl get secret 获取 token 了,需要使用 TokenRequest API 来获取,首先开启代理:

$ kubectl proxy --port=8001
Starting to serve on 127.0.0.1:8001

然后调用 TokenRequest API 生成一个 token:

$ curl 'http://127.0.0.1:8001/api/v1/namespaces/default/serviceaccounts/apisix-test/token' \
  -H "Content-Type:application/json" -X POST -d '{}'
{
  "kind": "TokenRequest",
  "apiVersion": "authentication.k8s.io/v1",
  "metadata": {
    "name": "apisix-test",
    "namespace": "default",
    "creationTimestamp": "2023-03-22T23:57:20Z",
    "managedFields": [
      {
        "manager": "curl",
        "operation": "Update",
        "apiVersion": "authentication.k8s.io/v1",
        "time": "2023-03-22T23:57:20Z",
        "fieldsType": "FieldsV1",
        "fieldsV1": {
          "f:spec": {
            "f:expirationSeconds": {}
          }
        },
        "subresource": "token"
      }
    ]
  },
  "spec": {
    "audiences": [
      "https://kubernetes.default.svc.cluster.local"
    ],
    "expirationSeconds": 3600,
    "boundObjectRef": null
  },
  "status": {
    "token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImtLdHRyVzFmNTRHWGFVUjVRS3hrLVJMSElNaXM4aENLMnpfSGk1SUJhbVkifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiXSwiZXhwIjoxNjc5NTMzMDQwLCJpYXQiOjE2Nzk1Mjk0NDAsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJkZWZhdWx0Iiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImFwaXNpeC10ZXN0IiwidWlkIjoiMzVjZWJkYTEtNGZjNC00N2JlLWIxN2QtZDA4NWJlNzU5ODRlIn19LCJuYmYiOjE2Nzk1Mjk0NDAsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmFwaXNpeC10ZXN0In0.YexM_VoumpdwZNbSkwh6IbEu59PCtZrG1lkTnCqG24G-TC0U1sGxgbXf6AnUQ5ybh-CHWbJ7oewhkg_J4j7FiSAnV_yCcEygLkaCveGIQbWldB3phDlcJ52f8YDpHFtN2vdyVTm79ECwEInDsqKhn4n9tPY4pgTodI6D9j-lcK0ywUdbdlL5VHiOw9jlnS7b60fKWBwCPyW2uohX5X43gnUr3E1Wekgpo47vx8lahTZQqnORahTdl7bsPsu_apf7LMw40FLpspVO6wih-30Ke8CNBxjpORtX2n3oteE1fi2vxYHoyJSeh1Pro_Oykauch0InFUNyEVI4kJQ720glOw",
    "expirationTimestamp": "2023-03-23T00:57:20Z"
  }
}

默认的 token 有效期只有一个小时,可以通过参数改为一年:

$ curl 'http://127.0.0.1:8001/api/v1/namespaces/default/serviceaccounts/apisix-test/token' \
  -H "Content-Type:application/json" -X POST \
  -d '{"kind":"TokenRequest","apiVersion":"authentication.k8s.io/v1","metadata":{"name":"apisix-test","namespace":"default"},"spec":{"audiences":["https://kubernetes.default.svc.cluster.local"],"expirationSeconds":31536000}}'

我们在 APISIX 的配置文件 config.yaml 中添加如下内容( 将上面生成的 token 填写到 token 字段 ):

discovery:
  kubernetes:
    service:
      schema: https
      host: 127.0.0.1
      port: "6443"
    client:
      token: ...

这里有一个比较坑的地方,port 必须是字符串,否则会导致 APISIX 启动报错:

invalid discovery kubernetes configuration: object matches none of the required

然后重启 APISIX,接着向 APISIX 中添加如下路由:

$ curl -X PUT http://127.0.0.1:9180/apisix/admin/routes/66 \
    -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -i -d '
{
    "methods": ["GET"],
    "uri": "/kubernetes",
    "plugins": {
        "proxy-rewrite" : {
            "regex_uri": ["/kubernetes", "/"]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "discovery_type": "kubernetes",
        "service_name": "kubernetes-bootcamp"
    }
}'

访问 APISIX 的 /kubernetes 地址验证一下:

$ curl http://127.0.0.1:9080/kubernetes

不过,如果你的 APISIX 运行在 Kubernetes 集群之外,大概率是访问不通的,因为 APISIX 监听的 Endpoints 地址是 Kubernetes 集群内的 Pod 地址:

$ kubectl describe endpoints/kubernetes-bootcamp
Name:         kubernetes-bootcamp
Namespace:    default
Labels:       app=kubernetes-bootcamp
Annotations:  endpoints.kubernetes.io/last-change-trigger-time: 2023-03-25T00:31:43Z
Subsets:
  Addresses:          10.1.5.12
  NotReadyAddresses:  <none>
  Ports:
    Name     Port  Protocol
    ----     ----  --------
    <unset>  8080  TCP

Events:  <none>

所以想使用基于 Kubernetes 的服务发现,最佳做法是将 APISIX 部署在 Kubernetes 中,或者使用 APISIX Ingress,关于 APISIX 集成 Kubernetes 的更多信息,可以参考官方文档 基于 Kubernetes 的服务发现 和官方博客 借助 APISIX Ingress,实现与注册中心的无缝集成

参考

更多

实现自定义服务发现

https://apisix.apache.org/zh/docs/apisix/discovery/

扫描二维码,在手机上阅读!

gRPC 快速入门

RPC 又被称为 远程过程调用,英文全称为 Remote Procedure Call,是一种服务间的通信规范,它可以让你像调用本地方法一样调用远程服务提供的方法,而不需要关心底层的通信细节。RPC 的概念早在上个世纪七八十年代就已经被提出,1984 年,Birrell 和 Nelson 在 ACM Transactions on Computer Systems 期刊上发表了一篇关于 RPC 的经典论文 Implementing remote procedure calls,论文中首次给出了实现 RPC 的基本框架:

rpc.png

从这个框架中可以看到很多现代 RPC 框架的影子,比如客户端和服务端的 Stub、序列化和反序列化等,事实上,所有后来的 RPC 框架几乎都是源自于这个原型。

不过在那个年代,RPC 的争议是非常大的,由于网络环境的不可靠性,RPC 永远都不可能做到像调用本地方法一样。大家提出了一堆问题,比如:故障恢复、请求重试、异步请求、服务寻址等,在那个互联网都还没有出现的年代,一堆大神们就已经在讨论分布式系统间的调用问题了,而他们讨论的问题焦点,基本上都演变成了 RPC 历史中永恒的话题。

为了解决这些问题,软件架构经历了一代又一代的发展和演进。1988 年,Sun 公司推出了第一个商业化的 RPC 库 Sun RPC ,并被定义为标准的 RPC 规范;1991 年,非营利性组织 OMG 发布 CORBA,它通过接口定义语言 IDL 中的抽象类型映射让异构环境之间的互操作成为了可能;不过由于其复杂性,很快就被微软推出的基于 XML 的 SOAP 技术所打败,随后 SOAP 作为 W3C 标准大大推动了 Web Service 概念的发展;像 SOAP 这种基于 XML 的 RPC 技术被称为 XML-RPC,它最大的问题是 XML 报文内容过于冗余,对 XML 的解析效率也很低,于是 JSON 应运而生,进而导致 RESTful 的盛行;不过无论是 XML 还是 JSON,都是基于文本传输,性能都无法让人满意,直到 2008 年,Google 开源 Protocol Buffers,这是一种高效的结构化数据存储格式,可以用于结构化数据的序列化,非常适合做数据存储或 RPC 数据交换格式;可能是由于微服务的流行,之后的 RPC 框架如雨后春笋般蓬勃发展,同年,Facebook 向 Apache 贡献了开源项目 Thrift,2009 年,Hadoop 之父 Doug Cutting 开发出 Avro,成为 Hadoop 的一个子项目,随后又脱离 Hadoop 成为 Apache 顶级项目;2011 年,阿里也开源了它自研的 RPC 框架 Dubbo,和前两个一样,最后也贡献给了 Apache;2015 年,Google 开源 gRPC 框架,开创性地使用 HTTP/2 作为传输协议,基于 HTTP/2 的多路复用和服务端推送技术,gRPC 支持双向流式通信,这使得 RPC 框架终于不再拘泥于万年不变的 C/S 模型了。

2017 年,gRPC 作为孵化项目成为 CNCF 的一员,不论是 Envoy 还是 Istio 等 Service Mesh 方案,都将 gRPC 作为一等公民,可以预见的是,谷歌正在将 gRPC 打造成云原生时代通信层事实上的标准。

Hello World 开始

这一节我们使用 Go 语言实现一个简单的 Hello World 服务,学习 gRPC 的基本概念。首先,我们通过 go mod init 初始化示例项目:

$ mkdir demo && cd demo
$ go mod init example.com/demo
go: creating new go.mod: module example.com/demo
go: to add module requirements and sums:
        go mod tidy

然后获取 grpc 依赖:

$ go get google.golang.org/grpc@latest
go: downloading golang.org/x/net v0.5.0
go: downloading golang.org/x/sys v0.4.0
go: downloading google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
go: downloading golang.org/x/text v0.6.0
go: added github.com/golang/protobuf v1.5.2
go: added golang.org/x/net v0.5.0
go: added golang.org/x/sys v0.4.0
go: added golang.org/x/text v0.6.0
go: added google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
go: added google.golang.org/grpc v1.53.0
go: added google.golang.org/protobuf v1.28.1

编写 .proto 文件

正如前文所述,Google 在 2009 年开源了一种高效的结构化数据存储格式 Protocol Buffers,这种格式非常适合用于 RPC 的数据交换,所以顺理成章的,Google 在开发 gRPC 时就采用了 Protocol Buffers 作为默认的数据格式。不过要注意的是 Protocol Buffers 不仅仅是一种数据格式,而且也是一种 IDL(Interface Description Language,接口描述语言),它通过一种中立的方式来描述接口和数据类型,从而实现跨语言和跨平台开发。

一般使用 .proto 后缀的文件来定义接口和数据类型,所以接下来,我们要创建一个 hello.proto 文件,我们将其放在 proto 目录下:

$ mkdir proto && cd proto
$ vim hello.proto

文件内容如下:

syntax = "proto3";

option go_package = "example.com/demo/proto";

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

我们在第一行指定使用 proto3 语法,这是目前推荐的版本,如果不指定,默认将使用 proto2,可能会导致一些版本兼容性的问题。随后我们用关键字 service 定义了一个 HelloService 服务,该服务包含一个 SayHello 方法,方法的入参为 HelloRequest,出参为 HelloResponse,这两个消息类型都在后面通过关键字 message 所定义。Protocol Buffers 的语法非常直观,也比较容易理解,这里只是使用了一些简单的语法,其他更复杂的语法可以参考 Protocol Buffers 的官方文档,另外这里有一份 中文语法指南 也可供参考。

编写好 hello.proto 文件之后,我们还需要一些工具将其转换为 Go 语言。这些工具包括:

  • protoc
  • protoc-gen-go
  • protoc-gen-go-grpc

protoc 是 Protocol Buffers 编译器,用于将 .proto 文件转换为其他编程语言,而不同语言的转换工作由不同语言的插件来实现。Go 语言的插件有两个:protoc-gen-goprotoc-gen-go-grpc,插件 protoc-gen-go 会生成一个后缀为 .pb.go 的文件,其中包含 .proto 文件中定义数据类型和其序列化方法;插件 protoc-gen-go-grpc 会生成一个后缀为 _grpc.pb.go 的文件,其中包含供客户端调用的服务方法和服务端要实现的接口类型。

protoc 可以从 Protocol Buffers 的 Release 页面 下载,下载后将 bin 目录添加到 PATH 环境变量即可:

$ curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v22.0/protoc-22.0-linux-x86_64.zip

protoc-gen-goprotoc-gen-go-grpc 两个插件可以通过 go install 命令直接安装:

$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0

安装完成后使用 --version 参数检测各个命令是否正常:

$ protoc --version
libprotoc 22.0

$ protoc-gen-go --version
protoc-gen-go v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.2.0

一切就绪后,就可以使用下面这行命令生成相应的 Go 代码了:

$ cd proto
$ protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    hello.proto

这个命令在当前目录下生成了 hello.pb.gohello_grpc.pb.go 两个文件。

实现服务端

在生成的 hello_grpc.pb.go 文件中,定义了一个 HelloServiceServer 接口:

// HelloServiceServer is the server API for HelloService service.
// All implementations must embed UnimplementedHelloServiceServer
// for forward compatibility
type HelloServiceServer interface {
    SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
    mustEmbedUnimplementedHelloServiceServer()
}

并且在接口的下面提供了一个默认实现:

type UnimplementedHelloServiceServer struct {
}

func (UnimplementedHelloServiceServer) SayHello(context.Context, *HelloRequest) (*HelloResponse, error) {
    return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {}

注意看 HelloServiceServer 的上面有一行注释:All implementations must embed UnimplementedHelloServiceServer for forward compatibility,为了保证向前兼容性,我们自己在实现这个接口时必须要嵌入 UnimplementedHelloServiceServer 这个默认实现,这篇文章 对此有一个简单的说明。

接下来我们创建一个 server 目录,并创建一个 main.go 文件:

$ mkdir server && cd server
$ vim main.go

定义 server 结构体,继承 UnimplementedHelloServiceServer 并重写 SayHello 方法:

type server struct {
    proto.UnimplementedHelloServiceServer
}

func (s *server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) {
    log.Printf("Request recieved: %v\n", request.GetName())
    return &proto.HelloResponse{
        Message: "Hello " + request.GetName(),
    }, nil
}

然后在入口方法中,通过 proto.RegisterHelloServiceServer(s, &server{}) 将我们的实现注册到 grpc Server 中:

func main() {

    lis, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatalf("Server listen failed!")
    }
    log.Printf("Server listening at: %s", lis.Addr())

    s := grpc.NewServer()
    proto.RegisterHelloServiceServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Server serve failed!")
    }
}

使用 go run 运行该代码:

$ go run ./server/main.go
2023/03/02 07:40:50 Server listening at: [::]:8080

一个 gRPC 的服务端就启动成功了!

实现客户端

接下来,我们来实现客户端。其实,在 hello_grpc.pb.go 文件中,protoc 也为我们定义了一个 HelloServiceClient 接口:

type HelloServiceClient interface {
    SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
}

并提供了该接口的默认实现:

type helloServiceClient struct {
    cc grpc.ClientConnInterface
}

func NewHelloServiceClient(cc grpc.ClientConnInterface) HelloServiceClient {
    return &helloServiceClient{cc}
}

func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
    out := new(HelloResponse)
    err := c.cc.Invoke(ctx, "/HelloService/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

HelloServiceServer 不同的是,这个客户端实现我们无需修改,可以直接使用。首先我们创建一个 client 目录,并创建一个 main.go 文件:

$ mkdir client && cd client
$ vim main.go

然后在入口方法中,通过 grpc.Dial 创建一个和服务端的连接:

conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
    log.Fatalf("Connect grpc server failed: %v", err)
}
defer conn.Close()

注意我们的服务端没有开启 TLS,连接是不安全的,所以我们需要加一个不安全证书的连接选项,否则连接的时候会报错:

Connect grpc server failed: grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)

然后使用 hello_grpc.pb.go 文件中提供的 NewHelloServiceClient 方法创建一个客户端实例:

c := proto.NewHelloServiceClient(conn)

同时使用 context 创建一个带超时的上下文:

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

使用创建的客户端调用 SayHello 方法:

r, err := c.SayHello(ctx, &proto.HelloRequest{Name: "zhangsan"})
if err != nil {
    log.Fatalf("Call SayHello failed: %v", err)
}
log.Printf("SayHello response: %s", r.GetMessage())

从调用的代码上看起来,确实和调用本地方法一样,传入 HelloRequest 请求,得到 HelloResponse 响应。至此,一个简单的客户端就编写完成了,使用 go run 运行该代码:

$ go run ./client/main.go
2023/03/03 07:03:34 SayHello response: Hello zhangsan

测试服务端

除了编写客户端,我们也可以使用其他的工具来测试服务端,对于 HTTP 服务,我们一般使用 curlPostman 之类的工具;而对于 gRPC 服务,也有类似的工具,比如 grpcurlgrpcui 等,这里整理了一份 关于 gRPC 工具的清单

这里使用 grpcurl 来对我们的服务端进行简单的测试。首先从它的 Release 页面 下载并安装 grpcurl

$ curl -LO https://github.com/fullstorydev/grpcurl/releases/download/v1.8.7/grpcurl_1.8.7_linux_x86_64.tar.gz

grpcurl 中最常使用的是 list 子命令,它可以列出服务端支持的所有服务:

$ grpcurl -plaintext localhost:8080 list
Failed to list services: server does not support the reflection API

不过这要求我们的服务端必须开启 反射 API,打开 server/main.go 文件,在其中加上下面这行代码:

reflection.Register(s)

这样我们就通过 Go 语言中提供的 reflection 包开启了反射 API,然后使用 grpcurllist 命令重新列出服务端的所有服务:

$ grpcurl -plaintext localhost:8080 list
HelloService
grpc.reflection.v1alpha.ServerReflection

如果服务端没有开启反射 API,grpc 也支持直接使用 Proto 文件Protoset 文件

我们还可以使用 list 命令继续列出 HelloService 服务的所有方法:

$ grpcurl -plaintext localhost:8080 list HelloService
HelloService.SayHello

如果要查看某个方法的详细信息,可以使用 describe 命令:

$ grpcurl -plaintext localhost:8080 describe HelloService.SayHello
HelloService.SayHello is a method:
rpc SayHello ( .HelloRequest ) returns ( .HelloResponse );

可以看出,这和我们在 proto 文件中的定义是一致的。最后,使用 grpcurl 来调用这个方法:

$ grpcurl -plaintext -d '{"name": "zhangsan"}' localhost:8080 HelloService.SayHello
{
  "message": "Hello zhangsan"
}

如果入参比较大,可以将其保存在一个文件中,使用下面的方法来调用:

$ cat input.json | grpcurl -plaintext -d @ localhost:8080 HelloService.SayHello
{
  "message": "Hello zhangsan"
}

gRPC 的四种形式

gRPC 支持四种不同的通信方式:

  • 简单 RPC(Simple RPC
  • 服务端流 RPC(Server-side streaming RPC
  • 客户端流 RPC(Client-side streaming RPC
  • 双向流 RPC(Bidirectional streaming RPC

上一节中的 SayHello 就是一个简单 RPC 的例子:

rpc SayHello (HelloRequest) returns (HelloResponse) {}

这种 RPC 有时候也被称为 Unary RPC,除此之外,gRPC 还支持三种流式通信方法,也即 Streaming RPC

服务端流 RPC(Server-side streaming RPC

第一种叫服务端流 RPC,它接受一个正常的请求,并以流的形式向客户端发送多个响应。在下面的例子中,客户端向服务端发送一个字符串,服务端对字符串进行分词,并将分词结果以流式返回给客户端。首先,我们在 .proto 文件中定义 Split 方法和相应的消息体:

rpc Split (SplitRequest) returns (stream SplitResponse) {}

然后,使用 protoc 生成服务端和客户端的代码,接着在 server/main.go 文件中添加服务端实现:

func (s *server) Split(request *proto.SplitRequest, stream proto.HelloService_SplitServer) error {
    log.Printf("Request recieved: %v\n", request.GetSentence())
    words := strings.Split(request.GetSentence(), " ")
    for _, word := range words {
        if err := stream.Send(&proto.SplitResponse{Word: word}); err != nil {
            return err
        }
    }
    return nil
}

和简单 RPC 的 SayHello 方法相比,服务端流 RPC 的 Split 方法在参数上有一些细微的差别,少了一个 ctx context.Context 参数,而多了一个 stream proto.HelloService_SplitServer 参数,这是 protoc 自动生成的一个接口:

type HelloService_SplitServer interface {
    Send(*SplitResponse) error
    grpc.ServerStream
}

这个接口继承自 grpc.ServerStream 接口,并具有一个 Send 方法,用来向客户端发送响应。

client/main.go 文件中添加客户端实现:

stream, err := c.Split(ctx, &proto.SplitRequest{Sentence: "Hello World"})
if err != nil {
    log.Fatalf("Call Split failed: %v", err)
}
for {
    r, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.Split(_) = _, %v", c, err)
    }
    log.Printf("Split response: %s", r.GetWord())
}

和简单 RPC 的客户端代码相比,Split 方法不是直接返回 SplitResponse,而是返回一个 stream 流,它的类型为 HelloService_SplitClient 接口:

type HelloService_SplitClient interface {
    Recv() (*SplitResponse, error)
    grpc.ClientStream
}

这个接口继承自 grpc.ClientStream 接口,并具有一个 Recv 方法,用来接受服务端发送的响应,当服务端发送结束后,Recv 方法将返回 io.EOF 错误。

客户端流 RPC(Client-side streaming RPC

第二种叫客户端流 RPC,它以流的形式接受客户端发送来的多个请求,服务端处理之后返回一个正常的响应。在下面的例子中,客户端向服务端发送多个数字,服务端收集之后进行求和,并将求和结果返回给客户端。首先,我们在 .proto 文件中定义 Sum 方法和相应的消息体:

rpc Sum (stream SumRequest) returns (SumResponse) {}

然后,使用 protoc 生成服务端和客户端的代码,接着在 server/main.go 文件中添加服务端实现:

func (s *server) Sum(stream proto.HelloService_SumServer) error {
    var sum int32 = 0
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&proto.SumResponse{Sum: sum})
        }
        if err != nil {
            return err
        }
        sum = sum + r.GetNum()
    }
}

从上面的代码可以看到,Sum 方法没有了 request 参数,只有一个 stream 参数,请求参数通过 stream.Recv 以流的形式读取,当读取结束后,stream.Recv 方法将返回 io.EOF 错误,这时我们通过 stream.SendAndClose 将处理之后的结果返回给客户端,并关闭连接。

client/main.go 文件中添加客户端实现:

stream2, err := c.Sum(ctx)
if err != nil {
    log.Fatalf("%v.Sum(_) = _, %v", c, err)
}
nums := []int32{1, 2, 3, 4, 5, 6, 7}
for _, num := range nums {
    if err := stream2.Send(&proto.SumRequest{Num: num}); err != nil {
        log.Fatalf("%v.Send(%v) = %v", stream, num, err)
    }
}
response, err := stream2.CloseAndRecv()
if err != nil {
    log.Fatalf("%v.CloseAndRecv() failed: %v", stream2, err)
}
log.Printf("Sum response: %v", response.GetSum())

在上面的代码中,Sum 方法返回一个 stream 变量,然后通过 stream.Send 不断向服务端发送请求,当数据发送结束后,再通过 stream.CloseAndRecv 关闭连接,并接受服务端响应。

双向流 RPC(Bidirectional streaming RPC

第三种叫双向流 RPC,这有点像网络聊天,服务端和客户端双方以任意的顺序互相通信,服务端可以在每次接受客户端请求时就返回一次响应,也可以接受多个请求后再返回一次响应。首先,我们在 .proto 文件中定义 Chat 方法和相应的消息体:

rpc Chat (stream ChatRequest) returns (stream ChatResponse) {}

然后,使用 protoc 生成服务端和客户端的代码,接着在 server/main.go 文件中添加服务端实现:

func (s *server) Chat(stream proto.HelloService_ChatServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        if err = stream.Send(&proto.ChatResponse{Message: "Reply to " + r.GetMessage()}); err != nil {
            return err
        }
    }
}

上面的代码和客户端流 RPC 比较类似,只不过服务端的响应变得更及时,每次接受到客户端请求时都会响应,而不是等客户端请求结束后再响应。

client/main.go 文件中添加客户端实现:

stream3, err := c.Chat(ctx)
if err != nil {
    log.Fatalf("%v.Chat(_) = _, %v", c, err)
}
waitc := make(chan struct{})
go func() {
    for {
        in, err := stream3.Recv()
        if err == io.EOF {
            close(waitc)
            return
        }
        if err != nil {
            log.Fatalf("Failed to receive: %v", err)
        }
        log.Printf("Chat response: %s", in.GetMessage())
    }
}()

messages := []string{"Hello", "How're you?", "Bye"}
for _, message := range messages {
    if err := stream3.Send(&proto.ChatRequest{Message: message}); err != nil {
        log.Fatalf("Failed to send: %v", err)
    }
}
stream3.CloseSend()
<-waitc

双向流 RPC 的客户端实现要稍微复杂一点。首先,我们通过 stream.Send 来发送请求,由于发送和接受都是流式的,所以我们没法像客户端流 RPC 那样通过 stream.CloseAndRecv() 来获取响应,我们只能调用 stream.CloseSend() 告诉服务端发送结束,然后我们需要创建一个新的 goroutine 来接受响应,另外,我们创建了一个 channel,用于在响应接受结束后通知主线程,以便程序能正常退出。

参考

更多

gRPC 安全认证

根据官方的 gRPC 认证指南,gRPC 支持多种不同的认证方法,包括:SSL/TLS 认证ALTS 认证 以及一些基于 token 的认证方法,如 OAuth2、GCE 等。

除了这些原生的认证方法,我们也可以通过 Metadata 来传送认证信息,从而实现 gRPC 的认证功能;另外,gRPC 还支持拦截器特性,通过拦截器也可以实现安全认证,Go gRPC Middleware 提供了很多拦截器的例子。

扫描二维码,在手机上阅读!