专栏名称: GoCN
最具规模和生命力的 Go 开发者社区
目录
相关文章推荐
环球物理  ·  【物理笔记】学霸笔记力学篇全部汇总 ·  9 小时前  
环球物理  ·  【中考物理】总复习知识清单 ·  昨天  
中国国家地理  ·  这里才是江南水乡之夜! ·  昨天  
51好读  ›  专栏  ›  GoCN

Go 程序如何实现优雅退出?来看看 K8s 是怎么做的——下篇

GoCN  · 公众号  ·  · 2024-08-27 08:00

正文

本文带大家一起来详细学习下 Go 中的优雅退出,由于文章过长,拆分成上下两篇,本文为下篇。

上篇: 《Go 程序如何实现优雅退出?来看看 K8s 是怎么做的——上篇》


K8s 的优雅退出

现在,我们已经掌握了 Go 中 HTTP Server 程序如何实现优雅退出,是时候看一看 K8s 中提供的一种更为优雅的优雅退出退出方案了😄。

这要从 K8s API Server 启动入口说起:

https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/apiserver.go

func main() {
 command := app.NewAPIServerCommand()
 code := cli.Run(command)
 os.Exit(code)
}

K8s API Server 启动入口代码非常简单,我们可以进入 app.NewAPIServerCommand() 查看更多细节:

https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/app/server.go#L122

// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
 ...
 cmd := &cobra.Command{
  ...
  RunE: func(cmd *cobra.Command, args []string) error {
   ...
   return Run(cmd.Context(), completedOptions)
  },
  ...
 }
 cmd.SetContext(genericapiserver.SetupSignalContext())

 ...
 return cmd
}

NewAPIServerCommand 函数中,我们要关注的核心代码只有两行:

一行是 cmd.SetContext(genericapiserver.SetupSignalContext()) ,这是在为 cmd 对象设置 ctx 属性。

另一行是 RunE 属性中最后一行代码 Run(cmd.Context(), completedOptions) ,这里是启动程序,并使用了 cmd 对象的 ctx 属性。

很明显,K8s 使用了 Go 语言中流行的 Cobra 命令行框架作为程序的启动框架,Cobra 提供了如下两个方法可以设置和获取 Context

func (c *Command) Context() context.Context {
 return c.ctx
}

func (c *Command) SetContext(ctx context.Context) {
 c.ctx = ctx
}

NOTE: 如果你对 Cobra 不太熟悉,可以参考我的另一篇文章 《万字长文:Go 语言现代命令行框架 Cobra 详解》

这里的 ctx 就是串联起 K8s 实现优雅退出的核心对象。

首先通过 genericapiserver.SetupSignalContext() 获取到一个 context.Context 对象,根据函数名称可以猜测到它可能跟信号有关。

对于 Run(cmd.Context(), completedOptions) 方法的调用,由于嵌套层级比较深,逻辑比较复杂,我就不把整个代码调用链都贴出来讲了。总之,这个启动过程最终可以定位到 preparedGenericAPIServer.RunWithContext 这个方法的执行。在 RunWithContext 方法内部的第一行代码 stopCh := ctx.Done() 是重点,它拿到了一个控制程序退出时机的 channel (这跟我们前文讲解的优雅退出示例中 quit := make(chan os.Signal, 1) 变量作用相同),而这个 ctx 实际上就是 genericapiserver.SetupSignalContext() 的返回值,如果你感兴趣可以详细研究下这个 stopCh 的使用过程。

我们直接去分析 genericapiserver.SetupSignalContext() 的实现:

https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal.go

package server

import (
 "context"
 "os"
 "os/signal"
)

var onlyOneSignalHandler = make(chan struct{})
var shutdownHandler chan os.Signal

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() chan struct{} {
 return SetupSignalContext().Done()
}

// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
 close(onlyOneSignalHandler) // panics when called twice

 shutdownHandler = make(chan os.Signal, 2)

 ctx, cancel := context.WithCancel(context.Background())
 signal.Notify(shutdownHandler, shutdownSignals...)
 go func() {
    cancel()
    os.Exit(1// second signal. Exit directly.
 }()

 return ctx
}

// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
// This returns whether a handler was notified
func RequestShutdown() bool {
 if shutdownHandler != nil {
  select {
  case shutdownHandler 0]:
   return true
  default:
  }
 }

 return false
}

这里代码不多,但却相当精妙,可以一窥 K8s 设计之优雅。

我们从 SetupSignalContext 函数开始分析。

SetupSignalContext 函数第一行代码,通过调用 close(onlyOneSignalHandler) 来确保在整个程序中只调用一次 SetupSignalContext 函数,调用多次则直接 panic 。这能强制调用方写出正确的代码,避免出现意料之外的情况。

shutdownHandler 是一个包含了两个缓冲区的 channel ,而不像我们定义的 quit := make(chan os.Signal, 1) 那样只有一个缓冲区大小。

我们前文讲过,通过 signal.Notify(c chan 函数注册所关注的信号后, signal 包在给 c 发送信号时不会阻塞。因为我们要接收两次退出信号,所以 shutdownHandler 缓冲区大小为 2

这也是 SetupSignalContext 函数的精髓所在,它实现了收到一次 SIGINT/SIGTERM 信号,程序优雅退出,收到两次 SIGINT/SIGTERM 信号,程序强制退出的功能。

代码片段如下:

ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
  cancel()
  os.Exit(1 // second signal. Exit directly.
}()

这里使用一个带有取消功能的 Context ,当第一次收到信号时,就调用 cancel() 取消这个 ctx 。而这个 ctx 会作为函数返回值返给调用方,调用方拿到它,就可以在需要的地方调用 来等待退出信号了。这就是 preparedGenericAPIServer.RunWithContext 方法中调用 stopCh := ctx.Done() 拿到 channel ,然后等待 退出信号的逻辑了。

这里用到的 shutdownSignals 变量,定义在 signal_posix.go 文件中:

https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal_posix.go

package server

import (
 "os"
 "syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

shutdownSignals 是一个保存了两个信号的切片对象。

os.Interrupt 实际上是一个变量,它的值等于 syscall.SIGINT

// The only signal values guaranteed to be present in the os package on all
// systems are os.Interrupt (send the process an interrupt) and os.Kill (force
// the process to exit). On Windows, sending os.Interrupt to a process with
// os.Process.Signal is not implemented; it will return an error instead of
// sending a signal.
var (
 Interrupt Signal = syscall.SIGINT
 Kill      Signal = syscall.SIGKILL
)

这里为实现优雅退出,监控了两个信号 SIGINT SIGTERM ,并没有监控 SIGQUIT 信号。不过这已经足够用了,根据我的经验,绝大多数情况下我们都会使用 Ctrl + C 终止程序,而非使用 Ctrl + \

SetupSignalHandler 函数内部调用了 SetupSignalContext 函数,它唯一的作用就是直接返回给调用方 ctx.Done() 所返回的 channel ,以此来方便调用方。

RequestShutdown 函数可以主动触发退出事件信号( SIGTERM/SIGINT ),返回值表示是否触发成功。

现在将 K8s 优雅退出方案集成进我们的 net/http 优雅退出示例程序中:

package main

import (
 "context"
 "errors"
 "log"
 "net/http"
 "time"

 genericapiserver "k8s.io/apiserver/pkg/server"
)

func main()  {
 srv := &http.Server{
  Addr: ":8000",
 }

 http.HandleFunc("/sleep"func(w http.ResponseWriter, r *http.Request) {
  duration, err := time.ParseDuration(r.FormValue("duration"))
  if err != nil {
   http.Error(w, err.Error(), 400)
   return
  }

  time.Sleep(duration)
  _, _ = w.Write([]byte("Welcome HTTP Server"))
 })

 go func() {
  if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
   log.Fatalf("HTTP server error: %v", err)
  }
  log.Println("Stopped serving new connections")
 }()

 // NOTE: 只需要替换这 3 行代码,Gin 版本同理
 // quit := make(chan os.Signal, 1)
 // signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 // 

 // 可以直接丢弃,context.Context.Done() 返回的就是普通空结构体
 
 log.Println("Shutdown Server...")

 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()

 // We received an SIGINT/SIGTERM signal, shut down.
 if err := srv.Shutdown(ctx); err != nil {
  // Error from closing listeners, or context timeout:
  log.Printf("HTTP server Shutdown: %v", err)
 }
 log.Println("HTTP server graceful shutdown completed")
}

我们只需要将如下 3 行代码:

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

替换成 K8s 提供的 SetupSignalHandler 函数调用即可:

其他代码都不用修改。

执行示例程序,按一次 Ctrl + C 测试优雅退出:

$ go build -o main main.go && ./main
^C2024/08/22 09:24:46 Shutdown Server...
2024/08/22 09:24:46 Stopped serving new connections
2024/08/22 09:24:49 HTTP server graceful shutdown completed
echo $?
0
$ curl "http://localhost:8000/sleep?duration=5s"
Welcome HTTP Server

执行示例程序,按两次 Ctrl + C 测试强制退出:

$ go build -o main main.go && ./main
^C2024/08/22 09:25:28 Shutdown Server...
2024/08/22 09:25:28 Stopped serving new connections
^C
echo $?                                       
1
$ curl "http://localhost:8000/sleep?duration=5s"
curl: (52) Empty reply from server

完美,K8s 为我们提供了优雅退出的新思路。这样在开发环境,为了方便调试,我们可以无需等待优雅退出,只要连续发送两次 SIGTERM/SIGINT 即可强制退出程序。在生产环境发送一次 SIGTERM/SIGINT 信号等待优雅退出。

使用 Gin 框架开发的 Web 程序也可以这样修改,你可以自行尝试。

gRPC 的优雅退出

gRPC Server 优雅退出

接下来我们一起看下 gRPC Server 程序如何实现优雅退出。

示例程序目录结构如下:

$ tree grpc
grpc
├── Makefile
├── client
│   └── main.go
├── pb
│   ├── helloworld.pb.go
│   ├── helloworld.proto
│   └── helloworld_grpc.pb.go
└── server
    └── main.go

helloworld.proto 中定义了 gRPC Server 支持的服务接口:

syntax = "proto3";

option go_package = ".;pb";

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
string duration = 2;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

server/main.go 中 Server 端代码如下:

// Package main implements a server for Greeter service.
package main

import (
 "context"
 "flag"
 "fmt"
 "log"
 "net"
 "time"

 "google.golang.org/grpc"
 genericapiserver "k8s.io/apiserver/pkg/server"

 "github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)

var (
 port = flag.Int("port"50051"The server port")
)

// server is used to implement helloworld.GreeterServer.
type server struct {
 pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
 log.Printf("Received: %v", in.GetName())

 duration, _ := time.ParseDuration(in.GetDuration())
 time.Sleep(duration)

 return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
 flag.Parse()
 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
 if err != nil {
  log.Fatalf("failed to listen: %v", err)
 }

 s := grpc.NewServer()
 pb.RegisterGreeterServer(s, &server{})
 log.Printf("server listening at %v", lis.Addr())

 go func() {
  if err := s.Serve(lis); err != nil {
   log.Fatalf("failed to serve: %v", err)
  }
 }()

  log.Printf("Shutdown Server..." )
 s.GracefulStop()
 log.Println("gRPC server graceful shutdown completed")
}

这与 HTTP Server 的优雅退出逻辑基本相同,同样由 grpc 包提供了优雅退出方法 GracefulStop

在接收到退出信号以后,调用 s.GracefulStop() 方法即可实现优雅退出。可以发现,这其实是一个优雅退出的套路。

client/main.go 中 Client 端代码如下:

// Package main implements a client for Greeter service.
package main

import (
 "context"
 "flag"
 "log"
 "time"

 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials/insecure"

 "github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)

const (
 defaultName = "world"
)

var (
 addr = flag.String("addr""localhost:50051""the address to connect to")
 name = flag.String("name", defaultName, "Name to greet")
)

func main() {
 flag.Parse()
 // Set up a connection to the server.
 conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 if err != nil {
  log.Fatalf("did not connect: %v", err)
 }
 defer conn.Close()
 c := pb.NewGreeterClient(conn)

 // Contact the server and print out its response.
 ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
 defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name, Duration: "10s"})
 if err != nil {
  log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.GetMessage())
}

执行示例程序,测试优雅退出逻辑:

# 执行服务端代码
$ go build -o main main.go && ./main
2024/08/22 09:26:17 server listening at [::]:50051
2024/08/22 09:26:24 Received: world
^C2024/08/22 09:26:26 Shutdown Server...
2024/08/22 09:26:34 gRPC server graceful shutdown completed
echo $?
0
# 执行客户端代码
$ go build -o main main.go && ./main
2024/08/22 09:26:34 Greeting: Hello world

优雅退出生效。

既然 gRPC Server 中的优雅退出方案已经介绍完了,同讲解 HTTP Server 优雅退出一样,接下来我再带你一起深入了解一下 GracefulStop 的源码是如何实现的。

GracefulStop 源码

GracefulStop 方法源码如下:

https://github.com/grpc/grpc-go/blob/v1.65.0/server.go#L1882

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
 s.stop(true)
}

func (s *Server) stop(graceful bool) {
 s.quit.Fire()
 defer s.done.Fire()

 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
 s.mu.Lock()
 s.closeListenersLocked()
 // Wait for serving threads to be ready to exit.  Only then can we be sure no
 // new conns will be created.
 s.mu.Unlock()
 s.serveWG.Wait()

 s.mu.Lock()
 defer s.mu.Unlock()

 if graceful {
  s.drainAllServerTransportsLocked()
 } else {
  s.closeServerTransportsLocked()
 }

 for len(s.conns) != 0 {
  s.cv.Wait()
 }
 s.conns = nil

 if s.opts.numServerWorkers > 0 {
  // Closing the channel (only once, via grpcsync.OnceFunc) after all the
  // connections have been closed above ensures that there are no
  // goroutines executing the callback passed to st.HandleStreams (where
  // the channel is written to).
  s.serverWorkerChannelClose()
 }

 if graceful || s.opts.waitForHandlers {
  s.handlersWG.Wait()
 }

 if s.events != nil {
  s.events.Finish()
  s.events = nil
 }
}

GracefulStop 方法直接调用了 s.stop(true) 方法。

stop 方法的 graceful 参数用来决定是否启用优雅退出,传递 true 表示优雅退出,传递 false 表示强制退出。

stop 方法第一段代码逻辑如下:

s.quit.Fire()
defer  s.done.Fire()

Server 对象的 quit done 属性类型都为 *grpcsync.Event ,前者用来标记 gRPC Server 正在执行退出流程,后者标记退出完成。

Event 定义如下:

https://github.com/grpc/grpc-go/blob/v1.65.0/internal/grpcsync/event.go

// Event represents a one-time event that may occur in the future.
type Event struct {
 fired int32
 c     chan struct{}
 o     sync.Once
}

// Fire causes e to complete.  It is safe to call multiple times, and
// concurrently.  It returns true iff this call to Fire caused the signaling
// channel returned by Done to close.
func (e *Event) Fire() bool {
 ret := false
 e.o.Do(func() {
  atomic.StoreInt32(&e.fired, 1)
  close(e.c)
  ret = true
 })
 return ret
}

// Done returns a channel that will be closed when Fire is called.
func (e *Event) Done() chan struct{} {
 return e.c
}

// HasFired returns true if Fire has been called.
func (e *Event) HasFired() bool {
 return atomic.LoadInt32(&e.fired) == 1
}

// NewEvent returns a new, ready-to-use Event.
func NewEvent() *Event {
 return &Event{c: make(chan struct{})}
}

可以发现, *Event 对象的 Fire 方法就是将 fired 字段值置为 1 ,并且关闭类型为 channel 的字段 c ,所以其实只要调用了 Fire ,那么调用 Done 方法将立即返回,调用 HasFired 方法就是在判断 fired 字段值是否为 1 (即是否调用过 Fire 方法)。

s.quit.Fire() 代码被调用以后, Serve 方法就能够感知到当前服务正在退出,接下来就不会再接收新的请求进来了。

Serve 方法源码如下:

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, callers need to do the
// following two things:
//   - pass a net.Listener created by calling the Listen method on a
//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
//     will result in the Go standard library not overriding OS defaults for TCP
//     keepalive interval and time. But this will also result in the Go standard
//     library not enabling TCP keepalives by default.
//   - override the Accept method on the passed in net.Listener and set the
//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func (s *Server) Serve(lis net.Listener) error {
 s.mu.Lock()
 s.printf("serving")
 s.serve = true
 if s.lis == nil {
  // Serve called after Stop or GracefulStop.
  s.mu.Unlock()
  lis.Close()
  return ErrServerStopped
 }

 s.serveWG.Add(1)
 defer func() {
  s.serveWG.Done()
  // 判断当前服务是否正在退出
  if s.quit.HasFired() {
   // Stop or GracefulStop called; block until done and return nil.
     }
 }()

 ls := &listenSocket{
  Listener: lis,
  channelz: channelz.RegisterSocket(&channelz.Socket{
   SocketType:    channelz.SocketTypeListen,
   Parent:        s.channelz,
   RefName:       lis.Addr().String(),
   LocalAddr:     lis.Addr(),
   SocketOptions: channelz.GetSocketOption(lis)},
  ),
 }
 s.lis[ls] = true

 defer func() {
  s.mu.Lock()
  if s.lis != nil && s.lis[ls] {
   ls.Close()
   delete(s.lis, ls)
  }
  s.mu.Unlock()
 }()

 s.mu.Unlock()
 channelz.Info(logger, ls.channelz, "ListenSocket created")

 var tempDelay time.Duration // how long to sleep on accept failure
 for {
  rawConn, err := lis.Accept()
  if err != nil {
   if ne, ok := err.(interface {
    Temporary() bool
   }); ok && ne.Temporary() {
    if tempDelay == 0 {
     tempDelay = 5 * time.Millisecond
    } else {
     tempDelay *= 2
    }
    if max := 1 * time.Second; tempDelay > max {
     tempDelay = max
    }
    s.mu.Lock()
    s.printf("Accept error: %v; retrying in %v", err, tempDelay)
    s.mu.Unlock()
    timer := time.NewTimer(tempDelay)
    select {
    case     // 判断当前服务是否正在退出
    case      timer.Stop()
     return nil
    }
    continue
   }
   s.mu.Lock()
   s.printf("done serving; Accept = %v", err)
   s.mu.Unlock()

   // 判断当前服务是否正在退出
   if s.quit.HasFired() {
    return nil
   }
   return err
  }
  tempDelay = 0
  // Start a new goroutine to deal with rawConn so we don't stall this Accept
  // loop goroutine.
  //
  // Make sure we account for the goroutine so GracefulStop doesn't nil out
  // s.conns before this conn can be added.
  s.serveWG.Add(1)
  go func() {
   s.handleRawConn(lis.Addr().String(), rawConn)
   s.serveWG.Done()
  }()
 }
}

我们可以直接跳到 for 循环部分的代码段,每次通过 rawConn, err := lis.Accept() 接收到一个新的请求进来,都会使用 if s.quit.HasFired() 来判断当前服务是否正在退出,如果返回结果为 true ,则 Serve 方法直接退出。

此时 Serve 方法的 defer 语句开始执行,这里会再次使用 if s.quit.HasFired() 判断当前服务是否正在退出(之所以判断两次,因为 Serve 方法也可能由于其他原因导致退出,进入 defer 逻辑),如果是,则调用 阻塞在这里,直到 GracefulStop 优雅退出逻辑执行完成。

此外, for 循环内部的 select 语句中,有一个 case 调用了 ,也是在判断当前服务是否正在退出,如果是,则调用 timer.Stop() 清理定时器后, Serve 方法直接退出。

我们接着往下看:

s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
s.mu.Lock()
s.closeListenersLocked()
// Wait for serving threads to be ready to exit.  Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()

这里的 channelz 是 gRPC 的一个监控工具,用于跟踪 gRPC 的内部状态, RemoveEntry 会移除该服务器的监控数据。

s.closeListenersLocked() 方法是不是很熟悉,这与 net/http 包中的 Shutdown 方法命名都一样,作用也就不言而喻了。

定义如下:

// s.mu must be held by the caller.
func (s *Server) closeListenersLocked() {
 for lis := range s.lis {
  lis.Close()
 }
 s.lis = nil
}

对于 s.serveWG.Wait() 这行代码,根据这个操作的属性名和方法名可以猜到, serveWG 明显是 sync.WaitGroup 类型。

既然有 Wait() ,那就应该会有 Add(1) 操作。其正在前文中贴出 Serve 代码中。

刚进入 Serve 方法时,就会调用 s.serveWG.Add(1) Serve 方法退出时执行 s.serveWG.Done()

s.serveWG.Add(1)
defer func() {
 s.serveWG.Done()
 if s.quit.HasFired() {
  // Stop or GracefulStop called; block until done and return nil.
   }
}()

并且,在 Serve 方法的 for 循环逻辑中,每次有新的请求进来, s.serveWG.Add(1) s.serveWG.Done() 也会被调用一次:

s.serveWG.Add(1)
go func() {
 s.handleRawConn(lis.Addr().String(), rawConn)
 s.serveWG.Done()
}()

所以,这里其实是在等待 Serve 方法执行完成并退出。

接下来的代码段是根据是否要进行优雅退出,执行不同的逻辑:

s.mu.Lock()
defer s.mu.Unlock()

if graceful {
    s.drainAllServerTransportsLocked()
else {
    s.closeServerTransportsLocked()
}

可以发现,接下来的全部操作都加锁处理。

优雅退出走 s.drainAllServerTransportsLocked() 逻辑:

// s.mu must be held by the caller.
func (s *Server) drainAllServerTransportsLocked() {
 if !s.drain {
  for _, conns := range s.conns {
   for st := range conns {
    st.Drain("graceful_stop")
   }
  }
  s.drain = true
 }
}

它的主要作用是在服务器优雅停止的过程中,让所有的服务器传输层(ServerTransports)停止接收新的请求,但继续处理现有的请求,直到它们完成。

这里有一行注释: s.mu must be held by the caller

说明了在调用 drainAllServerTransportsLocked 方法之前,调用者必须已经持有 s.mu 锁。这是为了确保在执行方法体时,服务器的状态不会被并发修改。

对于 if !s.drain 这个条件判断,其用于确保 drainAllServerTransportsLocked 方法只会执行一次。

s.conns 属性保存了所有连接,是 map[string]map[transport.ServerTransport]bool 类型。

通过嵌套的 for 循环遍历每个连接中的 ServerTransport 实例。 ServerTransport 是 gRPC 中的一个概念,它表示一个抽象的传输层实现,负责处理客户端和服务器之间的实际数据传输。

调用 st.Drain("graceful_stop") 方法的作用,是告诉传输层不要再接收新的请求或连接了,但允许继续处理现有的请求,直到它们完成。

这个方法会向客户端发送信号,表明服务器正在进行优雅关闭。它会给所有的客户端发送一个控制帧 GOAWAY (因为 gRPC 是基于 HTTP/2 的,所以才会这样处理),告诉客户端关闭 TCP 连接。

Drain 方法实现如下:

func (t *http2Server) Drain(debugData string) {
 t.mu.Lock()
 defer t.mu.Unlock()
 if t.drainEvent != nil {
  return
 }
 t.drainEvent = grpcsync.NewEvent()
 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
}

在完成对所有连接的遍历和 Drain 操作后,将 s.drain 设置为 true ,表示服务器已经进入了 "drain" 状态,这样后续不会再次执行相同的操作。

结合之前持有锁的操作,这里不会重复执行。

相对来说,没有采用优雅退出的另一个方法 closeServerTransportsLocked 就要暴力一些:

// s.mu must be held by the caller.
func (s *Server) closeServerTransportsLocked() {
 for _, conns := range s.conns {
  for st := range conns {
   st.Close(errors.New("Server.Stop called"))
  }
 }
}

这里直接调用 Close 方法关闭连接,省略了控制帧 GOAWAY 的发送。

stop 函数接下来会等待所有现有的连接被安全关闭:

for len(s.conns) != 0 {
    s.cv.Wait()
}
s.conns = nil

继续往下执行:

if s.opts.numServerWorkers > 0 {
    // Closing the channel (only once, via grpcsync.OnceFunc) after all the
    // connections have been closed above ensures that there are no
    // goroutines executing the callback passed to st.HandleStreams (where
    // the channel is written to).
    s.serverWorkerChannelClose()
}

这段代码用于关闭工作线程的 channel ,确保所有处理程序都已经终止,不会再处理新的请求。

s.serverWorkerChannelClose 在初始化操作时被赋值:

// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
 s.serverWorkerChannel = make(chan func())
 s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
  close(s.serverWorkerChannel)
 })
 for i := uint32(0); i   go s.serverWorker()
 }
}

接下来,如果是优雅退出或者配置了 s.opts.waitForHandlers 选项,则代码会调用 s.handlersWG.Wait() ,等待所有的处理程序完成:







请到「今天看啥」查看全文