issue 关联

client: dubbo-go: v3.2.0-rc2
server: dubbo v3.3.0
registry: zookeeper

问题 1:biStream 客户端没有关闭长连接的接口;如果服务端不调用 onCompleted,那么 Receive 会永久 block

  • 客户端应当具有主动 Close 能力才对,底层使用的是 grpc,而 grpc 客户端是能够主动关闭连接的。

问题 2:Java 服务端调用 onCompleted() 后,biStream.Receive 返回 EOF

  • 但是和服务端的 TCP 连接仍然存在;我知道 dubbo client 内部是连接池,但是我测试了一下,TCP 连接似乎并没有被复用

源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestBiDiStream2(svc greet.GreetService) error {  
    fmt.Printf("start to test triple bidi stream 2\n")  
    stream, err := svc.GreetStream(context.Background())  
    if err != nil {  
       return err  
    }  
    if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "stream client!"}); sendErr != nil {  
       return err  
    }  
  
    resp, err := stream.Recv()  
    if err != nil {  
       return err  
    }  
    fmt.Printf("triple bidi stream2 resp: %s\n", resp.Greeting)  
    if err := stream.CloseRequest(); err != nil {  
       return err  
    }  
    if err := stream.CloseResponse(); err != nil {  
       return err  
    }  
    fmt.Printf("========>TestBiDiStream end, close stream...\n")  
    return nil  
}

在其中

1
2
3
4
5
6
if err := stream.CloseRequest(); err != nil {  
       return err  
    }  
    if err := stream.CloseResponse(); err != nil {  
       return err  
    } 
  1. 客户端调用 CloseRequest() 关闭请求部分,表示不再发送更多的请求。

  2. 客户端调用 CloseResponse() 关闭响应部分,表示不再接收更多的响应。

下面从closeRequest( )开始进行源码分析

closeRequest( )

 CloseRequest 方法的作用:关闭流的发送端。在双向流通信中,流的发送端用于客户端向服务器发送数据,关闭发送端意味着客户端不再发送更多的数据。

1
2
3
4
5
6
7
// CloseRequest closes the send side of the stream.
func (b *BidiStreamForClient) CloseRequest() error {
	if b.err != nil {
		return b.err
	}
	return b.conn.CloseRequest()
}

步入 b.conn.CloseRequest()

1
2
3
func (cc *errorTranslatingClientConn) CloseRequest() error {  
    return cc.fromWire(cc.StreamingClientConn.CloseRequest())  
}

这个结构体的作用:

  • 它是一个包装器(wrapper),用于包装 StreamingClientConn

  • 它的主要目的是确保从客户端返回的错误是经过编码的(coded errors),即错误信息是结构化的、可识别的,而不是原始的底层错误。

  • 它通常用于协议实现中,可能是为了在协议层对错误进行统一处理。

接着步入就来到了

1
2
3
func (cc *grpcClientConn) CloseRequest() error {  
    return cc.duplexCall.CloseWrite()  
}

开始观察核心代码 duplexCall (双工通信)

duplex_http_call.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package triple_protocol

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
)

// duplexHTTPCall 是一个全双工的 HTTP 调用,允许客户端和服务器之间双向通信。
// 请求体是客户端到服务器的流,响应体是服务器到客户端的流。
type duplexHTTPCall struct {
	ctx              context.Context          // 上下文,用于控制调用的生命周期
	httpClient       HTTPClient               // HTTP 客户端,用于发送请求
	streamType       StreamType               // 流类型(如单向流、双向流等)
	validateResponse func(*http.Response) *Error // 用于验证响应的函数

	// 使用 io.Pipe 作为请求体。我们将读端交给 net/http,写端用于写入数据。
	// 两端可以安全地并发使用。
	requestBodyReader *io.PipeReader // 请求体的读端
	requestBodyWriter *io.PipeWriter // 请求体的写端

	sendRequestOnce sync.Once // 确保请求只发送一次
	responseReady   chan struct{} // 用于通知响应已准备好
	request         *http.Request // HTTP 请求
	response        *http.Response // HTTP 响应

	errMu sync.Mutex // 保护 err 的互斥锁
	err   error      // 存储调用过程中发生的错误
}

// newDuplexHTTPCall 创建一个新的 duplexHTTPCall 实例。
func newDuplexHTTPCall(
	ctx context.Context,
	httpClient HTTPClient,
	url *url.URL,
	spec Spec,
	header http.Header,
) *duplexHTTPCall {
	// 复制 URL,避免外部修改影响内部逻辑
	url = cloneURL(url)
	// 创建一个 io.Pipe,用于请求体的读写
	pipeReader, pipeWriter := io.Pipe()

	// 创建一个 HTTP 请求,使用 POST 方法,并将请求体设置为 pipeReader
	request := (&http.Request{
		Method:     http.MethodPost, // 使用 POST 方法
		URL:        url,             // 请求的 URL
		Header:     header,          // 请求头
		Proto:      "HTTP/1.1",      // 协议版本
		ProtoMajor: 1,               // 主版本号
		ProtoMinor: 1,               // 次版本号
		Body:       pipeReader,      // 请求体
		Host:       url.Host,        // 请求的主机
	}).WithContext(ctx) // 将上下文绑定到请求

	// 返回一个新的 duplexHTTPCall 实例
	return &duplexHTTPCall{
		ctx:               ctx,
		httpClient:        httpClient,
		streamType:        spec.StreamType,
		requestBodyReader: pipeReader,
		requestBodyWriter: pipeWriter,
		request:           request,
		responseReady:     make(chan struct{}), // 初始化 responseReady 通道
	}
}

// Write 向请求体写入数据。如果 SetError 被调用,返回一个包装了 io.EOF 的错误。
func (d *duplexHTTPCall) Write(data []byte) (int, error) {
	// 确保请求已经发送
	d.ensureRequestMade()
	// 检查上下文是否已取消
	if err := d.ctx.Err(); err != nil {
		d.SetError(err) // 设置错误
		return 0, wrapIfContextError(err) // 返回上下文错误
	}
	// 向请求体写入数据
	bytesWritten, err := d.requestBodyWriter.Write(data)
	if err != nil && errors.Is(err, io.ErrClosedPipe) {
		// 如果管道已关闭,返回 io.EOF 而不是 io.ErrClosedPipe
		return bytesWritten, io.EOF
	}
	return bytesWritten, err
}

// CloseWrite 关闭请求体的写端。在使用 HTTP/1.x 时,调用者必须在 Read 之前调用 CloseWrite。
func (d *duplexHTTPCall) CloseWrite() error {
	// 确保请求已经发送
	d.ensureRequestMade()
	// 关闭请求体的写端
	return d.requestBodyWriter.Close()
}

// URL 返回请求的 URL。
func (d *duplexHTTPCall) URL() *url.URL {
	return d.request.URL
}

// Read 从响应体读取数据。返回第一个通过 SetError 设置的错误。
func (d *duplexHTTPCall) Read(data []byte) (int, error) {
	// 等待响应准备好
	d.BlockUntilResponseReady()
	// 检查是否有错误
	if err := d.getError(); err != nil {
		return 0, err // 返回错误
	}
	// 检查上下文是否已取消
	if err := d.ctx.Err(); err != nil {
		d.SetError(err) // 设置错误
		return 0, wrapIfContextError(err) // 返回上下文错误
	}
	if d.response == nil {
		return 0, fmt.Errorf("nil response from %v", d.request.URL) // 返回错误
	}
	// 从响应体读取数据
	n, err := d.response.Body.Read(data)
	return n, wrapIfRSTError(err) // 返回读取结果
}

// CloseRead 关闭响应体的读端。
func (d *duplexHTTPCall) CloseRead() error {
	d.BlockUntilResponseReady()
	if d.response == nil {
		return nil
	}
	// 丢弃响应体的剩余数据
	if err := discard(d.response.Body); err != nil {
		return wrapIfRSTError(err)
	}
	// 如果上下文中设置了 outgoing 数据,将 trailers 存入 incoming 上下文
	if ExtractFromOutgoingContext(d.ctx) != nil {
		newIncomingContext(d.ctx, d.ResponseTrailer())
	}
	// 关闭响应体
	return wrapIfRSTError(d.response.Body.Close())
}

// BlockUntilResponseReady 阻塞直到响应准备好。
func (d *duplexHTTPCall) BlockUntilResponseReady() {
	<-d.responseReady
}

// ensureRequestMade 确保请求已发送。
func (d *duplexHTTPCall) ensureRequestMade() {
	d.sendRequestOnce.Do(func() {
		go d.makeRequest() // 在后台发送请求
	})
}

// makeRequest 发送 HTTP 请求并处理响应。
func (d *duplexHTTPCall) makeRequest() {
	defer close(d.responseReady) // 确保 responseReady 通道被关闭

	// 发送请求并获取响应
	response, err := d.httpClient.Do(d.request)
	if err != nil {
		// 处理错误
		err = wrapIfContextError(err)
		err = wrapIfLikelyH2CNotConfiguredError(d.request, err)
		err = wrapIfLikelyWithGRPCNotUsedError(err)
		err = wrapIfRSTError(err)
		if _, ok := asError(err); !ok {
			err = NewError(CodeUnavailable, err)
		}
		d.SetError(err) // 设置错误
		return
	}
	d.response = response
	// 验证响应
	if err := d.validateResponse(response); err != nil {
		d.SetError(err)
		return
	}
	// 检查是否为双向流且 HTTP 版本低于 2
	if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
		d.SetError(errorf(
			CodeUnimplemented,
			"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
			d.request.URL,
			response.ProtoMajor,
			response.ProtoMinor,
		))
	}
}

// cloneURL 复制一个 url.URL 对象。
func cloneURL(oldURL *url.URL) *url.URL {
	if oldURL == nil {
		return nil
	}
	newURL := new(url.URL)
	*newURL = *oldURL
	if oldURL.User != nil {
		newURL.User = new(url.Userinfo)
		*newURL.User = *oldURL.User
	}
	return newURL
}

分析方法 Write(data []byte) 来研究发送的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (d *duplexHTTPCall) Write(data []byte) (int, error) {  
    // ensure stream has been initialized  
    d.ensureRequestMade()  
    // Before we send any data, check if the context has been canceled.  
    if err := d.ctx.Err(); err != nil {  
       d.SetError(err)  
       return 0, wrapIfContextError(err)  
    }  
    // It's safe to write to this side of the pipe while net/http concurrently  
    // reads from the other side.    
    bytesWritten, err := d.requestBodyWriter.Write(data)  
    if err != nil && errors.Is(err, io.ErrClosedPipe) {  
	    return bytesWritten, io.EOF  
    }  
    return bytesWritten, err  
}

其中,d.requestBodyWriter.Write(data) 是给 pipe 写入数据。

d.ensureRequestMade()httpClient 用来读数据并发送请求的,这段代码在写数据前是因为, pipe 是无缓冲的通道,所以在读数据时会阻塞直到写入数据。

数据流动过程
步骤 1:初始化
  • 创建 duplexHTTPCall 实例,初始化 io.Pipe

  • 将 d.request.Body 设置为 pipeReader

步骤 2:调用 ensureRequestMade
  • 在 Write 方法中,首先调用 d.ensureRequestMade()

  • ensureRequestMade 通过 sync.Once 确保 d.httpClient.Do(d.request) 只调用一次。

  • d.httpClient.Do(d.request) 开始执行,并尝试从 pipeReader 读取数据。

步骤 3:写入数据
  • 客户端调用 Write 方法,向 pipeWriter 写入数据。

  • 写入的数据会立即被 pipeReader 读取,并作为请求体发送到服务器。

步骤 4:HTTP 客户端发送请求
  • HTTP 客户端从 pipeReader 读取数据,并将其封装到 HTTP 请求体中。

  • 请求被发送到服务器。

步骤 5:服务器处理请求
  • 服务器从请求体中读取客户端发送的数据。

  • 服务器处理数据后,通过响应体返回结果。

步骤 6:客户端读取响应
  • 客户端通过 Read 方法从响应体中读取服务器返回的数据。

io.Pipe()

pipeReader, pipeWriter := io.Pipe() 是 Go 语言中用于创建一个同步的内存管道的代码。这个管道可以用于在两个 goroutine 之间传递数据,其中一个 goroutine 负责写入数据(通过 pipeWriter),另一个 goroutine 负责读取数据(通过 pipeReader)。它的核心特点是阻塞式读写,即写入和读取操作是同步的,写入时会阻塞直到数据被读取,读取时也会阻塞直到有数据可读。

1. io.Pipe() 的作用

io.Pipe() 返回一对 *io.PipeReader 和 *io.PipeWriter,它们分别代表管道的读端和写端。这两个对象是紧密关联的:

  • pipeWriter:用于向管道写入数据。

  • pipeReader:用于从管道读取数据。

写入到 pipeWriter 的数据会立即被 pipeReader 读取,反之亦然。如果没有数据可读,pipeReader 会阻塞;如果没有空间可写,pipeWriter 也会阻塞。

2. io.Pipe() 的特点
  • 同步性io.Pipe 是同步的,写入和读取操作是阻塞的。写入操作会等待数据被读取,读取操作会等待数据被写入。

  • 无缓冲io.Pipe 没有内部缓冲区,数据直接从写端传递到读端。

  • 线程安全io.Pipe 是线程安全的,多个 goroutine 可以安全地并发读写。

  • 单向流:数据只能从 pipeWriter 流向 pipeReader,不能反向流动。

3. io.Pipe() 的典型使用场景

io.Pipe 通常用于以下场景:

  • 流式数据处理:例如将一个流的数据实时传递给另一个流,而不需要中间存储。

  • HTTP 请求和响应:例如在 HTTP 请求中将请求体数据流式写入,同时在另一个 goroutine 中读取响应数据。

  • 测试和模拟:在测试中模拟一个流式数据源或目标。

双向流通信的核心机制与资源管理要点解析

一、双向流通信的核心机制 在HTTP/2协议下,双向流的本质是通过d.httpClient.Do(d.request)建立持久化连接实现的。该接口会创建底层TCP连接,同时维护请求/响应双工通道,其中:

  1. 请求端管理:
  • 通过requestBodyWriter *io.PipeWriter进行请求体写入

  • CloseRequest()方法的核心职责:

    • 关闭管道写入端:d.requestBodyWriter.Close()

    • 向服务端发送流结束信号

    • 触发服务端onCompleted回调(Java示例中的响应终止处理)

你可以在 java 的 server 端进行如下操作:

1
2
3
4
5
@Override  
public void onCompleted() {  
      responseObserver.onCompleted();  
      System.out.println("biStream completed");  
}
  1. 响应端管理:
  • 通过response.Body进行响应体读取

  • CloseResponse()的本质操作:d.response.Body.Close()

二、客户端实现的最佳实践 测试代码展示了符合生产级要求的实现模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func TestBiDiStream2(svc greet.GreetService) error {  
    // 初始化流通道  
    stream, err := svc.GreetStream(context.Background())  
    if err != nil {  
        return err  
    }  
  
    // 异步响应处理协程  
    waitc := make(chan struct{})  
    go func() {  
        defer close(waitc)  
        for {  
            in, err := stream.Recv()  
            if err != nil { // 捕获EOF或其他错误  
                fmt.Printf("Recv terminal: %v\n", err)  
                return  
            }  
            fmt.Printf("Response: %s\n", in.Greeting)  
        }  
    }()  
  
    // 同步发送阶段  
    for i := 0; i < 5; i++ {  
        if err := stream.Send(&greet.GreetStreamRequest{  
            Name: "stream client!"  
        }); err != nil {  
            return err  
        }  
    }  
  
    // 优雅关闭流程  
    stream.CloseRequest()  // 主动终止发送  
    <-waitc               // 等待响应处理完成  
    defer stream.CloseResponse() // 最终资源清理  
  
    fmt.Println("Stream closed properly")  
    return nil  
}

三、资源管理的必要性

为什么需要stream.CloseResponse(),我在相关文章找到了如下解释

为什么需要response.Body.Close()

resp.Body.Close() 做了什么?

如果返回值res的主体未关闭,client 下层的 RoundTripper 接口(一般为 Transport 类型)可能无法重用 res 主体下层保持的 TCP 连接去执行之后的请求。所以它的作用就是用来确保body读干净,释放出该连接

为什么这样做?

连接复用

如果不这么做会发生什么?

第一则是:无法重新使用与服务器的持久 TCP 连接来进行后续的“保持活动”请求,在下次发起HTTP请求的时候,就会重新建立TCP连接

第二如果不关闭当前请求,readLoop 和 writeLoop 两个 goroutine 在 写入请求并获取 response 返回后,并没有跳出 for 循环,而继续阻塞在下一次 for 循环的 select 语句里面,goroutine 一直无法被回收,cpu 和 memory 全部打满。发生goroutine内存泄漏

第三如果请求完成后,对端关闭了连接(对端的HTTP服务器向我发送了FIN),如果这边不调用response.Body.Close(),那么可以看到与这个请求相关的TCP连接的状态一直处于CLOSE_WAIT状态,态,不会被系统回收,则文件描述符不会被释放,出现资源泄漏。

关于客户端管理与TCP连接复用的澄清说明

一、命名误解与技术本质

  1. 客户端创建的核心逻辑 NewGreetService 的命名虽可能引起误解,但其技术本质是创建客户端实例而非直接建立TCP连接。通过源码分析可见:
1
2
3
4
svc, err := greet.NewGreetService(cli)    
// 实际调用链:    
// => newClientManager(url)    
//    => 初始化 transport (http.RoundTripper) 

每个客户端实例内部维护独立的连接池(Transport),负责管理底层TCP连接的复用。

  1. 与gRPC设计的对标性 gRPC的经典实现方式与当前方案高度一致:
1
2
3
conn := grpc.NewClient(addr)          // 物理连接管理    
    client := pb.NewRouteGuideClient(conn) // 逻辑客户端    
    runRouteChat(client)                  // 复用连接  

二者的核心差异仅体现在API抽象层级,而非连接管理机制。

二、连接复用的实现机制

  1. Transport 的核心作用

    • 作为 http.RoundTripper 接口实现,管理HTTP/2连接池

    • 自动复用空闲TCP连接,减少三次握手开销

    • 通过 MaxIdleConns 等参数控制连接池行为

  2. 客户端生命周期管理

1
2
3
4
5
// 正确用法:单例客户端复用    
    client := NewGreetService()    
    for i := 0; i < 10; i++ {    
        runBiDiStream(client) // 复用同一Transport    
    }  

关键准则:避免重复创建客户端实例,防止产生冗余连接池。

todo:探究连接池问题