1. update clientset, deepcopy using code-generator
2. add a dummy file tools.go to force "go mod vendor" to see code-generator as dependencies 3. add a script to update CRD 4. add a README to document CRD updating steps run go mod tidy update README
This commit is contained in:
46
vendor/google.golang.org/grpc/internal/balancerload/load.go
generated
vendored
Normal file
46
vendor/google.golang.org/grpc/internal/balancerload/load.go
generated
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Package balancerload defines APIs to parse server loads in trailers. The
|
||||
// parsed loads are sent to balancers in DoneInfo.
|
||||
package balancerload
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// Parser converts loads from metadata into a concrete type.
|
||||
type Parser interface {
|
||||
// Parse parses loads from metadata.
|
||||
Parse(md metadata.MD) interface{}
|
||||
}
|
||||
|
||||
var parser Parser
|
||||
|
||||
// SetParser sets the load parser.
|
||||
//
|
||||
// Not mutex-protected, should be called before any gRPC functions.
|
||||
func SetParser(lr Parser) {
|
||||
parser = lr
|
||||
}
|
||||
|
||||
// Parse calls parser.Read().
|
||||
func Parse(md metadata.MD) interface{} {
|
||||
if parser == nil {
|
||||
return nil
|
||||
}
|
||||
return parser.Parse(md)
|
||||
}
|
30
vendor/google.golang.org/grpc/internal/channelz/funcs.go
generated
vendored
30
vendor/google.golang.org/grpc/internal/channelz/funcs.go
generated
vendored
@@ -24,6 +24,7 @@
|
||||
package channelz
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -95,9 +96,14 @@ func (d *dbWrapper) get() *channelMap {
|
||||
|
||||
// NewChannelzStorage initializes channelz data storage and id generator.
|
||||
//
|
||||
// This function returns a cleanup function to wait for all channelz state to be reset by the
|
||||
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
|
||||
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
|
||||
// to remove some entity just register by the new test, since the id space is the same.
|
||||
//
|
||||
// Note: This function is exported for testing purpose only. User should not call
|
||||
// it in most cases.
|
||||
func NewChannelzStorage() {
|
||||
func NewChannelzStorage() (cleanup func() error) {
|
||||
db.set(&channelMap{
|
||||
topLevelChannels: make(map[int64]struct{}),
|
||||
channels: make(map[int64]*channel),
|
||||
@@ -107,6 +113,28 @@ func NewChannelzStorage() {
|
||||
subChannels: make(map[int64]*subChannel),
|
||||
})
|
||||
idGen.reset()
|
||||
return func() error {
|
||||
var err error
|
||||
cm := db.get()
|
||||
if cm == nil {
|
||||
return nil
|
||||
}
|
||||
for i := 0; i < 1000; i++ {
|
||||
cm.mu.Lock()
|
||||
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
|
||||
cm.mu.Unlock()
|
||||
// all things stored in the channelz map have been cleared.
|
||||
return nil
|
||||
}
|
||||
cm.mu.Unlock()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
cm.mu.Lock()
|
||||
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
|
||||
cm.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
|
||||
|
40
vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
generated
vendored
40
vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
generated
vendored
@@ -25,47 +25,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
prefix = "GRPC_GO_"
|
||||
retryStr = prefix + "RETRY"
|
||||
requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
|
||||
)
|
||||
|
||||
// RequireHandshakeSetting describes the settings for handshaking.
|
||||
type RequireHandshakeSetting int
|
||||
|
||||
const (
|
||||
// RequireHandshakeHybrid (default, deprecated) indicates to not wait for
|
||||
// handshake before considering a connection ready, but wait before
|
||||
// considering successful.
|
||||
RequireHandshakeHybrid RequireHandshakeSetting = iota
|
||||
// RequireHandshakeOn (default after the 1.17 release) indicates to wait
|
||||
// for handshake before considering a connection ready/successful.
|
||||
RequireHandshakeOn
|
||||
// RequireHandshakeOff indicates to not wait for handshake before
|
||||
// considering a connection ready/successful.
|
||||
RequireHandshakeOff
|
||||
prefix = "GRPC_GO_"
|
||||
retryStr = prefix + "RETRY"
|
||||
)
|
||||
|
||||
var (
|
||||
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
|
||||
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
|
||||
// RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
|
||||
// environment variable.
|
||||
//
|
||||
// Will be removed after the 1.18 release.
|
||||
RequireHandshake RequireHandshakeSetting
|
||||
)
|
||||
|
||||
func init() {
|
||||
switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
|
||||
case "on":
|
||||
fallthrough
|
||||
default:
|
||||
RequireHandshake = RequireHandshakeOn
|
||||
case "off":
|
||||
RequireHandshake = RequireHandshakeOff
|
||||
case "hybrid":
|
||||
// Will be removed after the 1.17 release.
|
||||
RequireHandshake = RequireHandshakeHybrid
|
||||
}
|
||||
}
|
||||
|
19
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
19
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
@@ -23,6 +23,8 @@ package internal
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/connectivity"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -37,10 +39,25 @@ var (
|
||||
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
|
||||
// default, but tests may wish to set it lower for convenience.
|
||||
KeepaliveMinPingTime = 10 * time.Second
|
||||
// ParseServiceConfig is a function to parse JSON service configs into
|
||||
// opaque data structures.
|
||||
ParseServiceConfig func(sc string) (interface{}, error)
|
||||
// StatusRawProto is exported by status/status.go. This func returns a
|
||||
// pointer to the wrapped Status proto for a given status.Status without a
|
||||
// call to proto.Clone(). The returned Status proto should not be mutated by
|
||||
// the caller.
|
||||
StatusRawProto interface{} // func (*status.Status) *spb.Status
|
||||
)
|
||||
|
||||
// HealthChecker defines the signature of the client-side LB channel health checking function.
|
||||
type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
|
||||
//
|
||||
// The implementation is expected to create a health checking RPC stream by
|
||||
// calling newStream(), watch for the health status of serviceName, and report
|
||||
// it's health back by calling setConnectivityState().
|
||||
//
|
||||
// The health checking protocol is defined at:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
|
||||
type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
|
||||
|
||||
const (
|
||||
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
|
||||
|
14
vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
generated
vendored
14
vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
generated
vendored
@@ -22,18 +22,24 @@ package syscall
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
func init() {
|
||||
grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.")
|
||||
var once sync.Once
|
||||
|
||||
func log() {
|
||||
once.Do(func() {
|
||||
grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.")
|
||||
})
|
||||
}
|
||||
|
||||
// GetCPUTime returns the how much CPU time has passed since the start of this process.
|
||||
// It always returns 0 under non-linux or appengine environment.
|
||||
func GetCPUTime() int64 {
|
||||
log()
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -42,22 +48,26 @@ type Rusage struct{}
|
||||
|
||||
// GetRusage is a no-op function under non-linux or appengine environment.
|
||||
func GetRusage() (rusage *Rusage) {
|
||||
log()
|
||||
return nil
|
||||
}
|
||||
|
||||
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
|
||||
// between two Rusage structs. It a no-op function for non-linux or appengine environment.
|
||||
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
|
||||
log()
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
// SetTCPUserTimeout is a no-op function under non-linux or appengine environments
|
||||
func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error {
|
||||
log()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTCPUserTimeout is a no-op function under non-linux or appengine environments
|
||||
// a negative return value indicates the operation is not supported
|
||||
func GetTCPUserTimeout(conn net.Conn) (int, error) {
|
||||
log()
|
||||
return -1, nil
|
||||
}
|
||||
|
84
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
84
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
@@ -84,12 +85,24 @@ func (il *itemList) isEmpty() bool {
|
||||
// the control buffer of transport. They represent different aspects of
|
||||
// control tasks, e.g., flow control, settings, streaming resetting, etc.
|
||||
|
||||
// maxQueuedTransportResponseFrames is the most queued "transport response"
|
||||
// frames we will buffer before preventing new reads from occurring on the
|
||||
// transport. These are control frames sent in response to client requests,
|
||||
// such as RST_STREAM due to bad headers or settings acks.
|
||||
const maxQueuedTransportResponseFrames = 50
|
||||
|
||||
type cbItem interface {
|
||||
isTransportResponseFrame() bool
|
||||
}
|
||||
|
||||
// registerStream is used to register an incoming stream with loopy writer.
|
||||
type registerStream struct {
|
||||
streamID uint32
|
||||
wq *writeQuota
|
||||
}
|
||||
|
||||
func (*registerStream) isTransportResponseFrame() bool { return false }
|
||||
|
||||
// headerFrame is also used to register stream on the client-side.
|
||||
type headerFrame struct {
|
||||
streamID uint32
|
||||
@@ -102,6 +115,10 @@ type headerFrame struct {
|
||||
onOrphaned func(error) // Valid on client-side
|
||||
}
|
||||
|
||||
func (h *headerFrame) isTransportResponseFrame() bool {
|
||||
return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
|
||||
}
|
||||
|
||||
type cleanupStream struct {
|
||||
streamID uint32
|
||||
rst bool
|
||||
@@ -109,6 +126,8 @@ type cleanupStream struct {
|
||||
onWrite func()
|
||||
}
|
||||
|
||||
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
|
||||
|
||||
type dataFrame struct {
|
||||
streamID uint32
|
||||
endStream bool
|
||||
@@ -119,27 +138,41 @@ type dataFrame struct {
|
||||
onEachWrite func()
|
||||
}
|
||||
|
||||
func (*dataFrame) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type incomingWindowUpdate struct {
|
||||
streamID uint32
|
||||
increment uint32
|
||||
}
|
||||
|
||||
func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type outgoingWindowUpdate struct {
|
||||
streamID uint32
|
||||
increment uint32
|
||||
}
|
||||
|
||||
func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
|
||||
return false // window updates are throttled by thresholds
|
||||
}
|
||||
|
||||
type incomingSettings struct {
|
||||
ss []http2.Setting
|
||||
}
|
||||
|
||||
func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
|
||||
|
||||
type outgoingSettings struct {
|
||||
ss []http2.Setting
|
||||
}
|
||||
|
||||
func (*outgoingSettings) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type incomingGoAway struct {
|
||||
}
|
||||
|
||||
func (*incomingGoAway) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type goAway struct {
|
||||
code http2.ErrCode
|
||||
debugData []byte
|
||||
@@ -147,15 +180,21 @@ type goAway struct {
|
||||
closeConn bool
|
||||
}
|
||||
|
||||
func (*goAway) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type ping struct {
|
||||
ack bool
|
||||
data [8]byte
|
||||
}
|
||||
|
||||
func (*ping) isTransportResponseFrame() bool { return true }
|
||||
|
||||
type outFlowControlSizeRequest struct {
|
||||
resp chan uint32
|
||||
}
|
||||
|
||||
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type outStreamState int
|
||||
|
||||
const (
|
||||
@@ -238,6 +277,14 @@ type controlBuffer struct {
|
||||
consumerWaiting bool
|
||||
list *itemList
|
||||
err error
|
||||
|
||||
// transportResponseFrames counts the number of queued items that represent
|
||||
// the response of an action initiated by the peer. trfChan is created
|
||||
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
|
||||
// closed and nilled when transportResponseFrames drops below the
|
||||
// threshold. Both fields are protected by mu.
|
||||
transportResponseFrames int
|
||||
trfChan atomic.Value // *chan struct{}
|
||||
}
|
||||
|
||||
func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
||||
@@ -248,12 +295,24 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controlBuffer) put(it interface{}) error {
|
||||
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
|
||||
// controlbuf.
|
||||
func (c *controlBuffer) throttle() {
|
||||
ch, _ := c.trfChan.Load().(*chan struct{})
|
||||
if ch != nil {
|
||||
select {
|
||||
case <-*ch:
|
||||
case <-c.done:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controlBuffer) put(it cbItem) error {
|
||||
_, err := c.executeAndPut(nil, it)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
|
||||
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
|
||||
var wakeUp bool
|
||||
c.mu.Lock()
|
||||
if c.err != nil {
|
||||
@@ -271,6 +330,15 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
|
||||
c.consumerWaiting = false
|
||||
}
|
||||
c.list.enqueue(it)
|
||||
if it.isTransportResponseFrame() {
|
||||
c.transportResponseFrames++
|
||||
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
||||
// We are adding the frame that puts us over the threshold; create
|
||||
// a throttling channel.
|
||||
ch := make(chan struct{})
|
||||
c.trfChan.Store(&ch)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if wakeUp {
|
||||
select {
|
||||
@@ -304,7 +372,17 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
|
||||
return nil, c.err
|
||||
}
|
||||
if !c.list.isEmpty() {
|
||||
h := c.list.dequeue()
|
||||
h := c.list.dequeue().(cbItem)
|
||||
if h.isTransportResponseFrame() {
|
||||
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
|
||||
// We are removing the frame that put us over the
|
||||
// threshold; close and clear the throttling channel.
|
||||
ch := c.trfChan.Load().(*chan struct{})
|
||||
close(*ch)
|
||||
c.trfChan.Store((*chan struct{})(nil))
|
||||
}
|
||||
c.transportResponseFrames--
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return h, nil
|
||||
}
|
||||
|
3
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
3
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
@@ -149,6 +149,7 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||
n = uint32(math.MaxInt32)
|
||||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
|
||||
// can send without a window update.
|
||||
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
|
||||
@@ -169,10 +170,8 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
|
||||
f.delta = n
|
||||
}
|
||||
f.mu.Unlock()
|
||||
return f.delta
|
||||
}
|
||||
f.mu.Unlock()
|
||||
return 0
|
||||
}
|
||||
|
||||
|
34
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
34
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@@ -24,6 +24,7 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -63,9 +64,6 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
|
||||
if _, ok := w.(http.Flusher); !ok {
|
||||
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
|
||||
}
|
||||
if _, ok := w.(http.CloseNotifier); !ok {
|
||||
return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
|
||||
}
|
||||
|
||||
st := &serverHandlerTransport{
|
||||
rw: w,
|
||||
@@ -176,17 +174,11 @@ func (a strAddr) String() string { return string(a) }
|
||||
|
||||
// do runs fn in the ServeHTTP goroutine.
|
||||
func (ht *serverHandlerTransport) do(fn func()) error {
|
||||
// Avoid a panic writing to closed channel. Imperfect but maybe good enough.
|
||||
select {
|
||||
case <-ht.closedCh:
|
||||
return ErrConnClosing
|
||||
default:
|
||||
select {
|
||||
case ht.writes <- fn:
|
||||
return nil
|
||||
case <-ht.closedCh:
|
||||
return ErrConnClosing
|
||||
}
|
||||
case ht.writes <- fn:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,7 +229,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
if ht.stats != nil {
|
||||
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
|
||||
}
|
||||
close(ht.writes)
|
||||
}
|
||||
ht.Close()
|
||||
return err
|
||||
@@ -315,19 +306,13 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
}
|
||||
|
||||
// requestOver is closed when either the request's context is done
|
||||
// or the status has been written via WriteStatus.
|
||||
// requestOver is closed when the status has been written via WriteStatus.
|
||||
requestOver := make(chan struct{})
|
||||
|
||||
// clientGone receives a single value if peer is gone, either
|
||||
// because the underlying connection is dead or because the
|
||||
// peer sends an http2 RST_STREAM.
|
||||
clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
|
||||
go func() {
|
||||
select {
|
||||
case <-requestOver:
|
||||
case <-ht.closedCh:
|
||||
case <-clientGone:
|
||||
case <-ht.req.Context().Done():
|
||||
}
|
||||
cancel()
|
||||
ht.Close()
|
||||
@@ -363,7 +348,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
ht.stats.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
|
||||
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
|
||||
windowHandler: func(int) {},
|
||||
}
|
||||
|
||||
@@ -377,7 +362,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
for buf := make([]byte, readSize); ; {
|
||||
n, err := req.Body.Read(buf)
|
||||
if n > 0 {
|
||||
s.buf.put(recvMsg{data: buf[:n:n]})
|
||||
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
|
||||
buf = buf[n:]
|
||||
}
|
||||
if err != nil {
|
||||
@@ -407,10 +392,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
func (ht *serverHandlerTransport) runStream() {
|
||||
for {
|
||||
select {
|
||||
case fn, ok := <-ht.writes:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
case fn := <-ht.writes:
|
||||
fn()
|
||||
case <-ht.closedCh:
|
||||
return
|
||||
|
109
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
109
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@@ -117,6 +117,8 @@ type http2Client struct {
|
||||
|
||||
onGoAway func(GoAwayReason)
|
||||
onClose func()
|
||||
|
||||
bufferPool *bufferPool
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
|
||||
@@ -249,6 +251,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
onGoAway: onGoAway,
|
||||
onClose: onClose,
|
||||
keepaliveEnabled: keepaliveEnabled,
|
||||
bufferPool: newBufferPool(),
|
||||
}
|
||||
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||
if opts.InitialWindowSize >= defaultWindowSize {
|
||||
@@ -367,6 +370,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
closeStream: func(err error) {
|
||||
t.CloseStream(s, err)
|
||||
},
|
||||
freeBuffer: t.bufferPool.put,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
@@ -437,6 +441,15 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
|
||||
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
|
||||
var k string
|
||||
for k, vv := range md {
|
||||
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
|
||||
if isReservedHeader(k) {
|
||||
continue
|
||||
}
|
||||
for _, v := range vv {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
}
|
||||
for _, vv := range added {
|
||||
for i, v := range vv {
|
||||
if i%2 == 0 {
|
||||
@@ -450,15 +463,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
}
|
||||
for k, vv := range md {
|
||||
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
|
||||
if isReservedHeader(k) {
|
||||
continue
|
||||
}
|
||||
for _, v := range vv {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
}
|
||||
}
|
||||
if md, ok := t.md.(*metadata.MD); ok {
|
||||
for k, vv := range *md {
|
||||
@@ -489,6 +493,9 @@ func (t *http2Client) createAudience(callHdr *CallHdr) string {
|
||||
}
|
||||
|
||||
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
|
||||
if len(t.perRPCCreds) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
authData := map[string]string{}
|
||||
for _, c := range t.perRPCCreds {
|
||||
data, err := c.GetRequestMetadata(ctx, audience)
|
||||
@@ -509,7 +516,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
|
||||
}
|
||||
|
||||
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
|
||||
callAuthData := map[string]string{}
|
||||
var callAuthData map[string]string
|
||||
// Check if credentials.PerRPCCredentials were provided via call options.
|
||||
// Note: if these credentials are provided both via dial options and call
|
||||
// options, then both sets of credentials will be applied.
|
||||
@@ -521,6 +528,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "transport: %v", err)
|
||||
}
|
||||
callAuthData = make(map[string]string, len(data))
|
||||
for k, v := range data {
|
||||
// Capital header names are illegal in HTTP/2
|
||||
k = strings.ToLower(k)
|
||||
@@ -549,10 +557,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
s.write(recvMsg{err: err})
|
||||
close(s.done)
|
||||
// If headerChan isn't closed, then close it.
|
||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||
close(s.headerChan)
|
||||
}
|
||||
|
||||
}
|
||||
hdr := &headerFrame{
|
||||
hf: headerFields,
|
||||
@@ -713,7 +720,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
s.write(recvMsg{err: err})
|
||||
}
|
||||
// If headerChan isn't closed, then close it.
|
||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||
s.noHeaders = true
|
||||
close(s.headerChan)
|
||||
}
|
||||
@@ -765,6 +772,9 @@ func (t *http2Client) Close() error {
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// Call t.onClose before setting the state to closing to prevent the client
|
||||
// from attempting to create new streams ASAP.
|
||||
t.onClose()
|
||||
t.state = closing
|
||||
streams := t.activeStreams
|
||||
t.activeStreams = nil
|
||||
@@ -785,7 +795,6 @@ func (t *http2Client) Close() error {
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
t.onClose()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -794,21 +803,21 @@ func (t *http2Client) Close() error {
|
||||
// stream is closed. If there are no active streams, the transport is closed
|
||||
// immediately. This does nothing if the transport is already draining or
|
||||
// closing.
|
||||
func (t *http2Client) GracefulClose() error {
|
||||
func (t *http2Client) GracefulClose() {
|
||||
t.mu.Lock()
|
||||
// Make sure we move to draining only from active.
|
||||
if t.state == draining || t.state == closing {
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
return
|
||||
}
|
||||
t.state = draining
|
||||
active := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
return t.Close()
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
||||
@@ -946,9 +955,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
if len(f.Data()) > 0 {
|
||||
data := make([]byte, len(f.Data()))
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
buffer := t.bufferPool.get()
|
||||
buffer.Reset()
|
||||
buffer.Write(f.Data())
|
||||
s.write(recvMsg{buffer: buffer})
|
||||
}
|
||||
}
|
||||
// The server has closed the stream without sending trailers. Record that
|
||||
@@ -973,9 +983,9 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
statusCode = codes.Unknown
|
||||
}
|
||||
if statusCode == codes.Canceled {
|
||||
// Our deadline was already exceeded, and that was likely the cause of
|
||||
// this cancelation. Alter the status code accordingly.
|
||||
if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
|
||||
if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
|
||||
// Our deadline was already exceeded, and that was likely the cause
|
||||
// of this cancelation. Alter the status code accordingly.
|
||||
statusCode = codes.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
@@ -1080,11 +1090,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
default:
|
||||
t.setGoAwayReason(f)
|
||||
close(t.goAway)
|
||||
t.state = draining
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
|
||||
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
|
||||
// Notify the clientconn about the GOAWAY before we set the state to
|
||||
// draining, to allow the client to stop attempting to create streams
|
||||
// before disallowing new streams on this connection.
|
||||
t.onGoAway(t.goAwayReason)
|
||||
t.state = draining
|
||||
}
|
||||
// All streams with IDs greater than the GoAwayId
|
||||
// and smaller than the previous GoAway ID should be killed.
|
||||
@@ -1140,16 +1151,26 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
endStream := frame.StreamEnded()
|
||||
atomic.StoreUint32(&s.bytesReceived, 1)
|
||||
var state decodeState
|
||||
if err := state.decodeHeader(frame); err != nil {
|
||||
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
|
||||
// Something wrong. Stops reading even when there is remaining.
|
||||
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
|
||||
|
||||
if !initialHeader && !endStream {
|
||||
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
|
||||
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
|
||||
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
|
||||
return
|
||||
}
|
||||
|
||||
endStream := frame.StreamEnded()
|
||||
var isHeader bool
|
||||
state := &decodeState{}
|
||||
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
|
||||
state.data.isGRPC = !initialHeader
|
||||
if err := state.decodeHeader(frame); err != nil {
|
||||
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
|
||||
return
|
||||
}
|
||||
|
||||
isHeader := false
|
||||
defer func() {
|
||||
if t.statsHandler != nil {
|
||||
if isHeader {
|
||||
@@ -1167,29 +1188,33 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
}
|
||||
}()
|
||||
// If headers haven't been received yet.
|
||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||
|
||||
// If headerChan hasn't been closed yet
|
||||
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||
if !endStream {
|
||||
// Headers frame is not actually a trailers-only frame.
|
||||
// HEADERS frame block carries a Response-Headers.
|
||||
isHeader = true
|
||||
// These values can be set without any synchronization because
|
||||
// stream goroutine will read it only after seeing a closed
|
||||
// headerChan which we'll close after setting this.
|
||||
s.recvCompress = state.encoding
|
||||
if len(state.mdata) > 0 {
|
||||
s.header = state.mdata
|
||||
s.recvCompress = state.data.encoding
|
||||
if len(state.data.mdata) > 0 {
|
||||
s.header = state.data.mdata
|
||||
}
|
||||
} else {
|
||||
// HEADERS frame block carries a Trailers-Only.
|
||||
s.noHeaders = true
|
||||
}
|
||||
close(s.headerChan)
|
||||
}
|
||||
|
||||
if !endStream {
|
||||
return
|
||||
}
|
||||
|
||||
// if client received END_STREAM from server while stream was still active, send RST_STREAM
|
||||
rst := s.getState() == streamActive
|
||||
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
|
||||
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
|
||||
}
|
||||
|
||||
// reader runs as a separate goroutine in charge of reading data from network
|
||||
@@ -1220,6 +1245,7 @@ func (t *http2Client) reader() {
|
||||
|
||||
// loop to keep reading incoming messages on this transport.
|
||||
for {
|
||||
t.controlBuf.throttle()
|
||||
frame, err := t.framer.fr.ReadFrame()
|
||||
if t.keepaliveEnabled {
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
@@ -1307,6 +1333,7 @@ func (t *http2Client) keepalive() {
|
||||
timer.Reset(t.kp.Time)
|
||||
continue
|
||||
}
|
||||
infof("transport: closing client transport due to idleness.")
|
||||
t.Close()
|
||||
return
|
||||
case <-t.ctx.Done():
|
||||
@@ -1356,6 +1383,8 @@ func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
|
||||
return &s
|
||||
}
|
||||
|
||||
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
|
||||
|
||||
func (t *http2Client) IncrMsgSent() {
|
||||
atomic.AddInt64(&t.czData.msgSent, 1)
|
||||
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
|
||||
|
179
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
179
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@@ -35,9 +35,11 @@ import (
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
|
||||
spb "google.golang.org/genproto/googleapis/rpc/status"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -55,6 +57,9 @@ var (
|
||||
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
|
||||
// than the limit set by peer.
|
||||
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
|
||||
// statusRawProto is a function to get to the raw status proto wrapped in a
|
||||
// status.Status without a proto.Clone().
|
||||
statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
|
||||
)
|
||||
|
||||
// http2Server implements the ServerTransport interface with HTTP2.
|
||||
@@ -119,6 +124,7 @@ type http2Server struct {
|
||||
// Fields below are for channelz metric collection.
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
bufferPool *bufferPool
|
||||
}
|
||||
|
||||
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
||||
@@ -220,6 +226,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
kep: kep,
|
||||
initialWindowSize: iwz,
|
||||
czData: new(channelzData),
|
||||
bufferPool: newBufferPool(),
|
||||
}
|
||||
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||
if dynamicWindow {
|
||||
@@ -286,7 +293,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
|
||||
streamID := frame.Header().StreamID
|
||||
state := decodeState{serverSide: true}
|
||||
state := &decodeState{
|
||||
serverSide: true,
|
||||
}
|
||||
if err := state.decodeHeader(frame); err != nil {
|
||||
if se, ok := status.FromError(err); ok {
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
@@ -305,16 +314,16 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
st: t,
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
recvCompress: state.encoding,
|
||||
method: state.method,
|
||||
contentSubtype: state.contentSubtype,
|
||||
recvCompress: state.data.encoding,
|
||||
method: state.data.method,
|
||||
contentSubtype: state.data.contentSubtype,
|
||||
}
|
||||
if frame.StreamEnded() {
|
||||
// s is just created by the caller. No lock needed.
|
||||
s.state = streamReadDone
|
||||
}
|
||||
if state.timeoutSet {
|
||||
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
|
||||
if state.data.timeoutSet {
|
||||
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
|
||||
} else {
|
||||
s.ctx, s.cancel = context.WithCancel(t.ctx)
|
||||
}
|
||||
@@ -327,19 +336,19 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
}
|
||||
s.ctx = peer.NewContext(s.ctx, pr)
|
||||
// Attach the received metadata to the context.
|
||||
if len(state.mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
|
||||
if len(state.data.mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
|
||||
}
|
||||
if state.statsTags != nil {
|
||||
s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
|
||||
if state.data.statsTags != nil {
|
||||
s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
|
||||
}
|
||||
if state.statsTrace != nil {
|
||||
s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
|
||||
if state.data.statsTrace != nil {
|
||||
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
|
||||
}
|
||||
if t.inTapHandle != nil {
|
||||
var err error
|
||||
info := &tap.Info{
|
||||
FullMethodName: state.method,
|
||||
FullMethodName: state.data.method,
|
||||
}
|
||||
s.ctx, err = t.inTapHandle(s.ctx, info)
|
||||
if err != nil {
|
||||
@@ -403,9 +412,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
|
||||
s.trReader = &transportReader{
|
||||
reader: &recvBufferReader{
|
||||
ctx: s.ctx,
|
||||
ctxDone: s.ctxDone,
|
||||
recv: s.buf,
|
||||
ctx: s.ctx,
|
||||
ctxDone: s.ctxDone,
|
||||
recv: s.buf,
|
||||
freeBuffer: t.bufferPool.put,
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
@@ -426,6 +436,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||
defer close(t.readerDone)
|
||||
for {
|
||||
t.controlBuf.throttle()
|
||||
frame, err := t.framer.fr.ReadFrame()
|
||||
atomic.StoreUint32(&t.activity, 1)
|
||||
if err != nil {
|
||||
@@ -435,7 +446,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
s := t.activeStreams[se.StreamID]
|
||||
t.mu.Unlock()
|
||||
if s != nil {
|
||||
t.closeStream(s, true, se.Code, nil, false)
|
||||
t.closeStream(s, true, se.Code, false)
|
||||
} else {
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: se.StreamID,
|
||||
@@ -577,7 +588,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
}
|
||||
if size > 0 {
|
||||
if err := s.fc.onData(size); err != nil {
|
||||
t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
|
||||
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
|
||||
return
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||
@@ -589,9 +600,10 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
if len(f.Data()) > 0 {
|
||||
data := make([]byte, len(f.Data()))
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
buffer := t.bufferPool.get()
|
||||
buffer.Reset()
|
||||
buffer.Write(f.Data())
|
||||
s.write(recvMsg{buffer: buffer})
|
||||
}
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataEndStream) {
|
||||
@@ -602,11 +614,18 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
}
|
||||
|
||||
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
s, ok := t.getStream(f)
|
||||
if !ok {
|
||||
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
|
||||
if s, ok := t.getStream(f); ok {
|
||||
t.closeStream(s, false, 0, false)
|
||||
return
|
||||
}
|
||||
t.closeStream(s, false, 0, nil, false)
|
||||
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: f.Header().StreamID,
|
||||
rst: false,
|
||||
rstCode: 0,
|
||||
onWrite: func() {},
|
||||
})
|
||||
}
|
||||
|
||||
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
||||
@@ -748,6 +767,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Server) setResetPingStrikes() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
|
||||
func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
@@ -762,15 +785,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
streamID: s.id,
|
||||
hf: headerFields,
|
||||
endStream: false,
|
||||
onWrite: func() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
},
|
||||
onWrite: t.setResetPingStrikes,
|
||||
})
|
||||
if !success {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
|
||||
t.closeStream(s, true, http2.ErrCodeInternal, false)
|
||||
return ErrHeaderListSizeLimitViolation
|
||||
}
|
||||
if t.stats != nil {
|
||||
@@ -808,7 +829,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
|
||||
|
||||
if p := st.Proto(); p != nil && len(p.Details) > 0 {
|
||||
if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
|
||||
stBytes, err := proto.Marshal(p)
|
||||
if err != nil {
|
||||
// TODO: return error instead, when callers are able to handle it.
|
||||
@@ -824,9 +845,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
streamID: s.id,
|
||||
hf: headerFields,
|
||||
endStream: true,
|
||||
onWrite: func() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
},
|
||||
onWrite: t.setResetPingStrikes,
|
||||
}
|
||||
s.hdrMu.Unlock()
|
||||
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
|
||||
@@ -834,10 +853,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
|
||||
t.closeStream(s, true, http2.ErrCodeInternal, false)
|
||||
return ErrHeaderListSizeLimitViolation
|
||||
}
|
||||
t.closeStream(s, false, 0, trailingHeader, true)
|
||||
// Send a RST_STREAM after the trailers if the client has not already half-closed.
|
||||
rst := s.getState() == streamActive
|
||||
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
|
||||
if t.stats != nil {
|
||||
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
|
||||
}
|
||||
@@ -849,6 +870,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
if _, ok := err.(ConnectionError); ok {
|
||||
return err
|
||||
}
|
||||
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
|
||||
return status.Errorf(codes.Internal, "transport: %v", err)
|
||||
}
|
||||
@@ -873,12 +897,10 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
||||
hdr = append(hdr, data[:emptyLen]...)
|
||||
data = data[emptyLen:]
|
||||
df := &dataFrame{
|
||||
streamID: s.id,
|
||||
h: hdr,
|
||||
d: data,
|
||||
onEachWrite: func() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
},
|
||||
streamID: s.id,
|
||||
h: hdr,
|
||||
d: data,
|
||||
onEachWrite: t.setResetPingStrikes,
|
||||
}
|
||||
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||
select {
|
||||
@@ -944,6 +966,7 @@ func (t *http2Server) keepalive() {
|
||||
select {
|
||||
case <-maxAge.C:
|
||||
// Close the connection after grace period.
|
||||
infof("transport: closing server transport due to maximum connection age.")
|
||||
t.Close()
|
||||
// Resetting the timer so that the clean-up doesn't deadlock.
|
||||
maxAge.Reset(infinity)
|
||||
@@ -957,6 +980,7 @@ func (t *http2Server) keepalive() {
|
||||
continue
|
||||
}
|
||||
if pingSent {
|
||||
infof("transport: closing server transport due to idleness.")
|
||||
t.Close()
|
||||
// Resetting the timer so that the clean-up doesn't deadlock.
|
||||
keepalive.Reset(infinity)
|
||||
@@ -1006,15 +1030,17 @@ func (t *http2Server) Close() error {
|
||||
|
||||
// deleteStream deletes the stream s from transport's active streams.
|
||||
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; !ok {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
delete(t.activeStreams, s.id)
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.idle = time.Now()
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; ok {
|
||||
delete(t.activeStreams, s.id)
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.idle = time.Now()
|
||||
}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
@@ -1027,51 +1053,36 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// closeStream clears the footprint of a stream when the stream is not needed
|
||||
// any more.
|
||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
// Mark the stream as done
|
||||
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
||||
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
oldState := s.swapState(streamDone)
|
||||
if oldState == streamDone {
|
||||
// If the stream was already done, return.
|
||||
return
|
||||
}
|
||||
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
hdr.cleanup = &cleanupStream{
|
||||
streamID: s.id,
|
||||
rst: rst,
|
||||
rstCode: rstCode,
|
||||
onWrite: func() {
|
||||
t.deleteStream(s, eosReceived)
|
||||
},
|
||||
}
|
||||
t.controlBuf.put(hdr)
|
||||
}
|
||||
|
||||
// Deletes the stream from active streams
|
||||
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
s.swapState(streamDone)
|
||||
t.deleteStream(s, eosReceived)
|
||||
|
||||
cleanup := &cleanupStream{
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: s.id,
|
||||
rst: rst,
|
||||
rstCode: rstCode,
|
||||
onWrite: func() {},
|
||||
}
|
||||
|
||||
// No trailer. Puts cleanupFrame into transport's control buffer.
|
||||
if hdr == nil {
|
||||
t.controlBuf.put(cleanup)
|
||||
return
|
||||
}
|
||||
|
||||
// We do the check here, because of the following scenario:
|
||||
// 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
|
||||
// is put to control buffer.
|
||||
// 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
|
||||
// some point. So loopy can't act on trailer
|
||||
// 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
|
||||
// the result of the received RST_STREAM.
|
||||
// If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
|
||||
// response to received RST_STREAM into the control buffer and outStream in loopy writer will
|
||||
// never get cleaned up.
|
||||
|
||||
// If the stream is already done, don't send the trailer.
|
||||
if oldState == streamDone {
|
||||
return
|
||||
}
|
||||
|
||||
hdr.cleanup = cleanup
|
||||
t.controlBuf.put(hdr)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *http2Server) RemoteAddr() net.Addr {
|
||||
|
167
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
167
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@@ -78,7 +78,8 @@ var (
|
||||
codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
|
||||
codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
|
||||
}
|
||||
httpStatusConvTab = map[int]codes.Code{
|
||||
// HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
|
||||
HTTPStatusConvTab = map[int]codes.Code{
|
||||
// 400 Bad Request - INTERNAL.
|
||||
http.StatusBadRequest: codes.Internal,
|
||||
// 401 Unauthorized - UNAUTHENTICATED.
|
||||
@@ -98,9 +99,7 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// Records the states during HPACK decoding. Must be reset once the
|
||||
// decoding of the entire headers are finished.
|
||||
type decodeState struct {
|
||||
type parsedHeaderData struct {
|
||||
encoding string
|
||||
// statusGen caches the stream status received from the trailer the server
|
||||
// sent. Client side only. Do not access directly. After all trailers are
|
||||
@@ -120,8 +119,30 @@ type decodeState struct {
|
||||
statsTags []byte
|
||||
statsTrace []byte
|
||||
contentSubtype string
|
||||
|
||||
// isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP).
|
||||
//
|
||||
// We are in gRPC mode (peer speaking gRPC) if:
|
||||
// * We are client side and have already received a HEADER frame that indicates gRPC peer.
|
||||
// * The header contains valid a content-type, i.e. a string starts with "application/grpc"
|
||||
// And we should handle error specific to gRPC.
|
||||
//
|
||||
// Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we
|
||||
// are in HTTP fallback mode, and should handle error specific to HTTP.
|
||||
isGRPC bool
|
||||
grpcErr error
|
||||
httpErr error
|
||||
contentTypeErr string
|
||||
}
|
||||
|
||||
// decodeState configures decoding criteria and records the decoded data.
|
||||
type decodeState struct {
|
||||
// whether decoding on server side or not
|
||||
serverSide bool
|
||||
|
||||
// Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS
|
||||
// frame once decodeHeader function has been invoked and returned.
|
||||
data parsedHeaderData
|
||||
}
|
||||
|
||||
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
||||
@@ -202,11 +223,11 @@ func contentType(contentSubtype string) string {
|
||||
}
|
||||
|
||||
func (d *decodeState) status() *status.Status {
|
||||
if d.statusGen == nil {
|
||||
if d.data.statusGen == nil {
|
||||
// No status-details were provided; generate status using code/msg.
|
||||
d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg)
|
||||
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
|
||||
}
|
||||
return d.statusGen
|
||||
return d.data.statusGen
|
||||
}
|
||||
|
||||
const binHdrSuffix = "-bin"
|
||||
@@ -244,113 +265,146 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
|
||||
if frame.Truncated {
|
||||
return status.Error(codes.Internal, "peer header list size exceeded limit")
|
||||
}
|
||||
|
||||
for _, hf := range frame.Fields {
|
||||
if err := d.processHeaderField(hf); err != nil {
|
||||
return err
|
||||
d.processHeaderField(hf)
|
||||
}
|
||||
|
||||
if d.data.isGRPC {
|
||||
if d.data.grpcErr != nil {
|
||||
return d.data.grpcErr
|
||||
}
|
||||
if d.serverSide {
|
||||
return nil
|
||||
}
|
||||
if d.data.rawStatusCode == nil && d.data.statusGen == nil {
|
||||
// gRPC status doesn't exist.
|
||||
// Set rawStatusCode to be unknown and return nil error.
|
||||
// So that, if the stream has ended this Unknown status
|
||||
// will be propagated to the user.
|
||||
// Otherwise, it will be ignored. In which case, status from
|
||||
// a later trailer, that has StreamEnded flag set, is propagated.
|
||||
code := int(codes.Unknown)
|
||||
d.data.rawStatusCode = &code
|
||||
}
|
||||
}
|
||||
|
||||
if d.serverSide {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If grpc status exists, no need to check further.
|
||||
if d.rawStatusCode != nil || d.statusGen != nil {
|
||||
return nil
|
||||
// HTTP fallback mode
|
||||
if d.data.httpErr != nil {
|
||||
return d.data.httpErr
|
||||
}
|
||||
|
||||
// If grpc status doesn't exist and http status doesn't exist,
|
||||
// then it's a malformed header.
|
||||
if d.httpStatus == nil {
|
||||
return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
|
||||
}
|
||||
var (
|
||||
code = codes.Internal // when header does not include HTTP status, return INTERNAL
|
||||
ok bool
|
||||
)
|
||||
|
||||
if *(d.httpStatus) != http.StatusOK {
|
||||
code, ok := httpStatusConvTab[*(d.httpStatus)]
|
||||
if d.data.httpStatus != nil {
|
||||
code, ok = HTTPStatusConvTab[*(d.data.httpStatus)]
|
||||
if !ok {
|
||||
code = codes.Unknown
|
||||
}
|
||||
return status.Error(code, http.StatusText(*(d.httpStatus)))
|
||||
}
|
||||
|
||||
// gRPC status doesn't exist and http status is OK.
|
||||
// Set rawStatusCode to be unknown and return nil error.
|
||||
// So that, if the stream has ended this Unknown status
|
||||
// will be propagated to the user.
|
||||
// Otherwise, it will be ignored. In which case, status from
|
||||
// a later trailer, that has StreamEnded flag set, is propagated.
|
||||
code := int(codes.Unknown)
|
||||
d.rawStatusCode = &code
|
||||
return nil
|
||||
return status.Error(code, d.constructHTTPErrMsg())
|
||||
}
|
||||
|
||||
// constructErrMsg constructs error message to be returned in HTTP fallback mode.
|
||||
// Format: HTTP status code and its corresponding message + content-type error message.
|
||||
func (d *decodeState) constructHTTPErrMsg() string {
|
||||
var errMsgs []string
|
||||
|
||||
if d.data.httpStatus == nil {
|
||||
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
|
||||
} else {
|
||||
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
|
||||
}
|
||||
|
||||
if d.data.contentTypeErr == "" {
|
||||
errMsgs = append(errMsgs, "transport: missing content-type field")
|
||||
} else {
|
||||
errMsgs = append(errMsgs, d.data.contentTypeErr)
|
||||
}
|
||||
|
||||
return strings.Join(errMsgs, "; ")
|
||||
}
|
||||
|
||||
func (d *decodeState) addMetadata(k, v string) {
|
||||
if d.mdata == nil {
|
||||
d.mdata = make(map[string][]string)
|
||||
if d.data.mdata == nil {
|
||||
d.data.mdata = make(map[string][]string)
|
||||
}
|
||||
d.mdata[k] = append(d.mdata[k], v)
|
||||
d.data.mdata[k] = append(d.data.mdata[k], v)
|
||||
}
|
||||
|
||||
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
||||
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
|
||||
switch f.Name {
|
||||
case "content-type":
|
||||
contentSubtype, validContentType := contentSubtype(f.Value)
|
||||
if !validContentType {
|
||||
return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
|
||||
d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
|
||||
return
|
||||
}
|
||||
d.contentSubtype = contentSubtype
|
||||
d.data.contentSubtype = contentSubtype
|
||||
// TODO: do we want to propagate the whole content-type in the metadata,
|
||||
// or come up with a way to just propagate the content-subtype if it was set?
|
||||
// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
|
||||
// in the metadata?
|
||||
d.addMetadata(f.Name, f.Value)
|
||||
d.data.isGRPC = true
|
||||
case "grpc-encoding":
|
||||
d.encoding = f.Value
|
||||
d.data.encoding = f.Value
|
||||
case "grpc-status":
|
||||
code, err := strconv.Atoi(f.Value)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
|
||||
return
|
||||
}
|
||||
d.rawStatusCode = &code
|
||||
d.data.rawStatusCode = &code
|
||||
case "grpc-message":
|
||||
d.rawStatusMsg = decodeGrpcMessage(f.Value)
|
||||
d.data.rawStatusMsg = decodeGrpcMessage(f.Value)
|
||||
case "grpc-status-details-bin":
|
||||
v, err := decodeBinHeader(f.Value)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
return
|
||||
}
|
||||
s := &spb.Status{}
|
||||
if err := proto.Unmarshal(v, s); err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
return
|
||||
}
|
||||
d.statusGen = status.FromProto(s)
|
||||
d.data.statusGen = status.FromProto(s)
|
||||
case "grpc-timeout":
|
||||
d.timeoutSet = true
|
||||
d.data.timeoutSet = true
|
||||
var err error
|
||||
if d.timeout, err = decodeTimeout(f.Value); err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
|
||||
if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
|
||||
}
|
||||
case ":path":
|
||||
d.method = f.Value
|
||||
d.data.method = f.Value
|
||||
case ":status":
|
||||
code, err := strconv.Atoi(f.Value)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
|
||||
d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
|
||||
return
|
||||
}
|
||||
d.httpStatus = &code
|
||||
d.data.httpStatus = &code
|
||||
case "grpc-tags-bin":
|
||||
v, err := decodeBinHeader(f.Value)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
|
||||
return
|
||||
}
|
||||
d.statsTags = v
|
||||
d.data.statsTags = v
|
||||
d.addMetadata(f.Name, string(v))
|
||||
case "grpc-trace-bin":
|
||||
v, err := decodeBinHeader(f.Value)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
|
||||
d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
|
||||
return
|
||||
}
|
||||
d.statsTrace = v
|
||||
d.data.statsTrace = v
|
||||
d.addMetadata(f.Name, string(v))
|
||||
default:
|
||||
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
|
||||
@@ -359,11 +413,10 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
||||
v, err := decodeMetadataHeader(f.Name, f.Value)
|
||||
if err != nil {
|
||||
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
d.addMetadata(f.Name, v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type timeoutUnit uint8
|
||||
|
86
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
86
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@@ -22,6 +22,7 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -39,10 +40,32 @@ import (
|
||||
"google.golang.org/grpc/tap"
|
||||
)
|
||||
|
||||
type bufferPool struct {
|
||||
pool sync.Pool
|
||||
}
|
||||
|
||||
func newBufferPool() *bufferPool {
|
||||
return &bufferPool{
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(bytes.Buffer)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *bufferPool) get() *bytes.Buffer {
|
||||
return p.pool.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
func (p *bufferPool) put(b *bytes.Buffer) {
|
||||
p.pool.Put(b)
|
||||
}
|
||||
|
||||
// recvMsg represents the received msg from the transport. All transport
|
||||
// protocol specific info has been removed.
|
||||
type recvMsg struct {
|
||||
data []byte
|
||||
buffer *bytes.Buffer
|
||||
// nil: received some data
|
||||
// io.EOF: stream is completed. data is nil.
|
||||
// other non-nil error: transport failure. data is nil.
|
||||
@@ -117,8 +140,9 @@ type recvBufferReader struct {
|
||||
ctx context.Context
|
||||
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
|
||||
recv *recvBuffer
|
||||
last []byte // Stores the remaining data in the previous calls.
|
||||
last *bytes.Buffer // Stores the remaining data in the previous calls.
|
||||
err error
|
||||
freeBuffer func(*bytes.Buffer)
|
||||
}
|
||||
|
||||
// Read reads the next len(p) bytes from last. If last is drained, it tries to
|
||||
@@ -128,10 +152,13 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
|
||||
if r.err != nil {
|
||||
return 0, r.err
|
||||
}
|
||||
if r.last != nil && len(r.last) > 0 {
|
||||
if r.last != nil {
|
||||
// Read remaining data left in last call.
|
||||
copied := copy(p, r.last)
|
||||
r.last = r.last[copied:]
|
||||
copied, _ := r.last.Read(p)
|
||||
if r.last.Len() == 0 {
|
||||
r.freeBuffer(r.last)
|
||||
r.last = nil
|
||||
}
|
||||
return copied, nil
|
||||
}
|
||||
if r.closeStream != nil {
|
||||
@@ -157,6 +184,19 @@ func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
|
||||
// r.readAdditional acts on that message and returns the necessary error.
|
||||
select {
|
||||
case <-r.ctxDone:
|
||||
// Note that this adds the ctx error to the end of recv buffer, and
|
||||
// reads from the head. This will delay the error until recv buffer is
|
||||
// empty, thus will delay ctx cancellation in Recv().
|
||||
//
|
||||
// It's done this way to fix a race between ctx cancel and trailer. The
|
||||
// race was, stream.Recv() may return ctx error if ctxDone wins the
|
||||
// race, but stream.Trailer() may return a non-nil md because the stream
|
||||
// was not marked as done when trailer is received. This closeStream
|
||||
// call will mark stream as done, thus fix the race.
|
||||
//
|
||||
// TODO: delaying ctx error seems like a unnecessary side effect. What
|
||||
// we really want is to mark the stream as done, and return ctx error
|
||||
// faster.
|
||||
r.closeStream(ContextErr(r.ctx.Err()))
|
||||
m := <-r.recv.get()
|
||||
return r.readAdditional(m, p)
|
||||
@@ -170,8 +210,13 @@ func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error
|
||||
if m.err != nil {
|
||||
return 0, m.err
|
||||
}
|
||||
copied := copy(p, m.data)
|
||||
r.last = m.data[copied:]
|
||||
copied, _ := m.buffer.Read(p)
|
||||
if m.buffer.Len() == 0 {
|
||||
r.freeBuffer(m.buffer)
|
||||
r.last = nil
|
||||
} else {
|
||||
r.last = m.buffer
|
||||
}
|
||||
return copied, nil
|
||||
}
|
||||
|
||||
@@ -204,8 +249,8 @@ type Stream struct {
|
||||
// is used to adjust flow control, if needed.
|
||||
requestRead func(int)
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
|
||||
// hdrMu protects header and trailer metadata on the server-side.
|
||||
hdrMu sync.Mutex
|
||||
@@ -266,6 +311,14 @@ func (s *Stream) waitOnHeader() error {
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// We prefer success over failure when reading messages because we delay
|
||||
// context error in stream.Read(). To keep behavior consistent, we also
|
||||
// prefer success here.
|
||||
select {
|
||||
case <-s.headerChan:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
return ContextErr(s.ctx.Err())
|
||||
case <-s.headerChan:
|
||||
return nil
|
||||
@@ -327,8 +380,7 @@ func (s *Stream) TrailersOnly() (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// if !headerDone, some other connection error occurred.
|
||||
return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
|
||||
return s.noHeaders, nil
|
||||
}
|
||||
|
||||
// Trailer returns the cached trailer metedata. Note that if it is not called
|
||||
@@ -579,9 +631,12 @@ type ClientTransport interface {
|
||||
// is called only once.
|
||||
Close() error
|
||||
|
||||
// GracefulClose starts to tear down the transport. It stops accepting
|
||||
// new RPCs and wait the completion of the pending RPCs.
|
||||
GracefulClose() error
|
||||
// GracefulClose starts to tear down the transport: the transport will stop
|
||||
// accepting new RPCs and NewStream will return error. Once all streams are
|
||||
// finished, the transport will close.
|
||||
//
|
||||
// It does not block.
|
||||
GracefulClose()
|
||||
|
||||
// Write sends the data for the given stream. A nil stream indicates
|
||||
// the write is to be performed on the transport as a whole.
|
||||
@@ -611,6 +666,9 @@ type ClientTransport interface {
|
||||
// GetGoAwayReason returns the reason why GoAway frame was received.
|
||||
GetGoAwayReason() GoAwayReason
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
RemoteAddr() net.Addr
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
|
Reference in New Issue
Block a user