Update k8s dependencies to v1.22.0-rc.0

This commit is contained in:
Chris Henzie
2021-07-27 17:45:59 -07:00
parent e6e14c1601
commit 59725e39fe
708 changed files with 25967 additions and 20268 deletions

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package credentials
import (
"context"
)
// requestInfoKey is a struct to be used as the key to store RequestInfo in a
// context.
type requestInfoKey struct{}
// NewRequestInfoContext creates a context with ri.
func NewRequestInfoContext(ctx context.Context, ri interface{}) context.Context {
return context.WithValue(ctx, requestInfoKey{}, ri)
}
// RequestInfoFromContext extracts the RequestInfo from ctx.
func RequestInfoFromContext(ctx context.Context) interface{} {
return ctx.Value(requestInfoKey{})
}
// clientHandshakeInfoKey is a struct used as the key to store
// ClientHandshakeInfo in a context.
type clientHandshakeInfoKey struct{}
// ClientHandshakeInfoFromContext extracts the ClientHandshakeInfo from ctx.
func ClientHandshakeInfoFromContext(ctx context.Context) interface{} {
return ctx.Value(clientHandshakeInfoKey{})
}
// NewClientHandshakeInfoContext creates a context with chi.
func NewClientHandshakeInfoContext(ctx context.Context, chi interface{}) context.Context {
return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
}

View File

@@ -41,16 +41,37 @@ func split2(s, sep string) (string, string, bool) {
// not parse "unix:[path]" cases. This should be true in cases where a custom
// dialer is present, to prevent a behavior change.
//
// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
// target}.
// If target is not a valid scheme://authority/endpoint as specified in
// https://github.com/grpc/grpc/blob/master/doc/naming.md,
// it returns {Endpoint: target}.
func ParseTarget(target string, skipUnixColonParsing bool) (ret resolver.Target) {
var ok bool
if strings.HasPrefix(target, "unix-abstract:") {
if strings.HasPrefix(target, "unix-abstract://") {
// Maybe, with Authority specified, try to parse it
var remain string
ret.Scheme, remain, _ = split2(target, "://")
ret.Authority, ret.Endpoint, ok = split2(remain, "/")
if !ok {
// No Authority, add the "//" back
ret.Endpoint = "//" + remain
} else {
// Found Authority, add the "/" back
ret.Endpoint = "/" + ret.Endpoint
}
} else {
// Without Authority specified, split target on ":"
ret.Scheme, ret.Endpoint, _ = split2(target, ":")
}
return ret
}
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
if strings.HasPrefix(target, "unix:") && !skipUnixColonParsing {
// Handle the "unix:[path]" case, because splitting on :// only
// handles the "unix://[/absolute/path]" case. Only handle if the
// dialer is nil, to avoid a behavior change with custom dialers.
// Handle the "unix:[local/path]" and "unix:[/absolute/path]" cases,
// because splitting on :// only handles the
// "unix://[/absolute/path]" case. Only handle if the dialer is nil,
// to avoid a behavior change with custom dialers.
return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]}
}
return resolver.Target{Endpoint: target}
@@ -61,7 +82,7 @@ func ParseTarget(target string, skipUnixColonParsing bool) (ret resolver.Target)
}
if ret.Scheme == "unix" {
// Add the "/" back in the unix case, so the unix resolver receives the
// actual endpoint.
// actual endpoint in the "unix://[/absolute/path]" case.
ret.Endpoint = "/" + ret.Endpoint
}
return ret

View File

@@ -38,12 +38,6 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// NewRequestInfoContext creates a new context based on the argument context attaching
// the passed in RequestInfo to the new context.
NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
// NewClientHandshakeInfoContext returns a copy of the input context with
// the passed in ClientHandshakeInfo struct added to it.
NewClientHandshakeInfoContext interface{} // func(context.Context, credentials.ClientHandshakeInfo) context.Context
// ParseServiceConfigForTesting is for creating a fake
// ClientConn for resolver testing only
ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
@@ -60,7 +54,16 @@ var (
// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo
// stored in the passed in attributes. This is set by
// credentials/xds/xds.go.
GetXDSHandshakeInfoForTesting interface{} // func (attr *attributes.Attributes) *xds.HandshakeInfo
GetXDSHandshakeInfoForTesting interface{} // func (*attributes.Attributes) *xds.HandshakeInfo
// GetServerCredentials returns the transport credentials configured on a
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@@ -0,0 +1,50 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package metadata contains functions to set and get metadata from addresses.
//
// This package is experimental.
package metadata
import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
type mdKeyType string
const mdKey = mdKeyType("grpc.internal.address.metadata")
// Get returns the metadata of addr.
func Get(addr resolver.Address) metadata.MD {
attrs := addr.Attributes
if attrs == nil {
return nil
}
md, _ := attrs.Value(mdKey).(metadata.MD)
return md
}
// Set sets (overrides) the metadata in addr.
//
// When a SubConn is created with this address, the RPCs sent on it will all
// have this metadata.
func Set(addr resolver.Address, md metadata.MD) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(mdKey, md)
return addr
}

View File

@@ -24,13 +24,16 @@ import (
"sync"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// ConfigSelector controls what configuration to use for every RPC.
type ConfigSelector interface {
// Selects the configuration for the RPC.
SelectConfig(RPCInfo) *RPCConfig
// Selects the configuration for the RPC, or terminates it using the error.
// This error will be converted by the gRPC library to a status error with
// code UNKNOWN if it is not returned as a status error.
SelectConfig(RPCInfo) (*RPCConfig, error)
}
// RPCInfo contains RPC information needed by a ConfigSelector.
@@ -49,6 +52,74 @@ type RPCConfig struct {
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}
// ClientStream is the same as grpc.ClientStream, but defined here for circular
// dependency reasons.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
}
// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream produces a ClientStream for an RPC which may optionally use
// the provided function to produce a stream for delegation. Note:
// RPCInfo.Context should not be used (will be nil).
//
// done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is unimplementable; do not use.
type ServerInterceptor interface {
notDefined()
}
type csKeyType string
@@ -86,7 +157,7 @@ func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
}
// SelectConfig defers to the current ConfigSelector in scs.
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) *RPCConfig {
func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
scs.mu.RLock()
defer scs.mu.RUnlock()
return scs.cs.SelectConfig(r)

View File

@@ -34,6 +34,7 @@ import (
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
@@ -46,6 +47,13 @@ var EnableSRVLookups = false
var logger = grpclog.Component("dns")
// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)
func init() {
resolver.Register(NewBuilder())
}
@@ -143,7 +151,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@@ -201,28 +208,38 @@ func (d *dnsResolver) Close() {
func (d *dnsResolver) watcher() {
defer d.wg.Done()
backoffIndex := 1
for {
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
state, err := d.lookup()
if err != nil {
// Report error to the underlying grpc.ClientConn.
d.cc.ReportError(err)
} else {
d.cc.UpdateState(*state)
err = d.cc.UpdateState(*state)
}
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
t := time.NewTimer(minDNSResRate)
var timer *time.Timer
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
// to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
select {
case <-t.C:
case <-d.ctx.Done():
t.Stop()
timer.Stop()
return
case <-timer.C:
}
}
}

View File

@@ -20,21 +20,34 @@
package unix
import (
"fmt"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
)
const scheme = "unix"
const unixScheme = "unix"
const unixAbstractScheme = "unix-abstract"
type builder struct{}
type builder struct {
scheme string
}
func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint}, "unix")}})
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
if target.Authority != "" {
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
}
addr := resolver.Address{Addr: target.Endpoint}
if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract
addr.Addr = "\x00" + addr.Addr
}
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil
}
func (*builder) Scheme() string {
return scheme
func (b *builder) Scheme() string {
return b.scheme
}
type nopResolver struct {
@@ -45,5 +58,6 @@ func (*nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (*nopResolver) Close() {}
func init() {
resolver.Register(&builder{})
resolver.Register(&builder{scheme: unixScheme})
resolver.Register(&builder{scheme: unixAbstractScheme})
}

View File

@@ -46,6 +46,22 @@ type BalancerConfig struct {
type intermediateBalancerConfig []map[string]json.RawMessage
// MarshalJSON implements the json.Marshaler interface.
//
// It marshals the balancer and config into a length-1 slice
// ([]map[string]config).
func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
if bc.Config == nil {
// If config is nil, return empty config `{}`.
return []byte(fmt.Sprintf(`[{%q: %v}]`, bc.Name, "{}")), nil
}
c, err := json.Marshal(bc.Config)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf(`[{%q: %s}]`, bc.Name, c)), nil
}
// UnmarshalJSON implements the json.Unmarshaler interface.
//
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and

View File

@@ -44,25 +44,23 @@ func GetCPUTime() int64 {
}
// Rusage is an alias for syscall.Rusage under linux environment.
type Rusage syscall.Rusage
type Rusage = syscall.Rusage
// GetRusage returns the resource usage of current process.
func GetRusage() (rusage *Rusage) {
rusage = new(Rusage)
syscall.Getrusage(syscall.RUSAGE_SELF, (*syscall.Rusage)(rusage))
return
func GetRusage() *Rusage {
rusage := new(Rusage)
syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
return rusage
}
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
// between two Rusage structs.
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
f := (*syscall.Rusage)(first)
l := (*syscall.Rusage)(latest)
var (
utimeDiffs = l.Utime.Sec - f.Utime.Sec
utimeDiffus = l.Utime.Usec - f.Utime.Usec
stimeDiffs = l.Stime.Sec - f.Stime.Sec
stimeDiffus = l.Stime.Usec - f.Stime.Usec
utimeDiffs = latest.Utime.Sec - first.Utime.Sec
utimeDiffus = latest.Utime.Usec - first.Utime.Usec
stimeDiffs = latest.Stime.Sec - first.Stime.Sec
stimeDiffus = latest.Stime.Usec - first.Stime.Usec
)
uTimeElapsed := float64(utimeDiffs) + float64(utimeDiffus)*1.0e-6

View File

@@ -50,7 +50,7 @@ func GetCPUTime() int64 {
type Rusage struct{}
// GetRusage is a no-op function under non-linux or appengine environment.
func GetRusage() (rusage *Rusage) {
func GetRusage() *Rusage {
log()
return nil
}

View File

@@ -20,13 +20,17 @@ package transport
import (
"bytes"
"errors"
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
@@ -128,6 +132,14 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type earlyAbortStream struct {
streamID uint32
contentSubtype string
status *status.Status
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
type dataFrame struct {
streamID uint32
endStream bool
@@ -749,6 +761,24 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return nil
}
func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if l.side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
headerFields := []hpack.HeaderField{
{Name: ":status", Value: "200"},
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
}
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
return err
}
return nil
}
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
@@ -787,6 +817,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *earlyAbortStream:
return l.earlyAbortStreamHandler(i)
case *incomingGoAway:
return l.incomingGoAwayHandler(i)
case *dataFrame:

View File

@@ -32,14 +32,14 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -60,7 +60,7 @@ type http2Client struct {
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
md interface{}
md metadata.MD
conn net.Conn // underlying communication channel
loopy *loopyWriter
remoteAddr net.Addr
@@ -115,6 +115,9 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
@@ -139,17 +142,26 @@ type http2Client struct {
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
address := addr.Addr
networkType, ok := networktype.Get(addr)
if fn != nil {
return fn(ctx, addr.Addr)
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
// For backward compatibility, if the user dialed "unix:///path",
// the passthrough resolver would be used and the user's custom
// dialer would see "unix:///path". Since the unix resolver is used
// and the address is now "/path", prepend "unix://" so the user's
// custom dialer sees the same address.
return fn(ctx, "unix://"+address)
}
return fn(ctx, address)
}
networkType := "tcp"
if n, ok := networktype.Get(addr); ok {
networkType = n
if !ok {
networkType, address = parseDialTarget(address)
}
if networkType == "tcp" && useProxy {
return proxyDial(ctx, addr.Addr, grpcUA)
return proxyDial(ctx, address, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, networkType, addr.Addr)
return (&net.Dialer{}).DialContext(ctx, networkType, address)
}
func isTemporary(err error) bool {
@@ -228,8 +240,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
@@ -268,7 +279,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -296,6 +306,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
} else if md := imetadata.Get(addr); md != nil {
t.md = md
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
@@ -332,12 +348,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting
@@ -355,14 +373,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}
@@ -399,6 +419,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype,
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
@@ -438,7 +459,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
Method: callHdr.Method,
AuthInfo: t.authInfo,
}
ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
@@ -512,14 +533,12 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
for k, vv := range t.md {
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
return headerFields, nil
@@ -819,6 +838,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
if s.doneFunc != nil {
s.doneFunc()
}
}
// Close kicks off the shutdown process of the transport. This should be called
@@ -828,12 +850,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
return nil
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
@@ -849,13 +871,19 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
if len(goAwayDebugMessage) > 0 {
err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -863,7 +891,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
}
// GracefulClose sets the state to draining, which prevents new streams from
@@ -882,7 +909,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
@@ -1128,9 +1155,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
}
}
id := f.LastStreamID
if id > 0 && id%2 != 1 {
if id > 0 && id%2 == 0 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
@@ -1148,7 +1175,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
@@ -1178,7 +1205,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}
@@ -1194,12 +1221,13 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, string(f.DebugData()))
}
func (t *http2Client) GetGoAwayReason() GoAwayReason {
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
t.mu.Lock()
defer t.mu.Unlock()
return t.goAwayReason
return t.goAwayReason, t.goAwayDebugMessage
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -1296,7 +1324,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
@@ -1305,7 +1334,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return
}
t.onPrefaceReceipt()
@@ -1341,7 +1371,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
@@ -1400,7 +1430,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()

View File

@@ -26,6 +26,7 @@ import (
"io"
"math"
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
@@ -355,26 +356,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -402,6 +383,39 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true
}
t.maxStreamID = streamID
if state.data.httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod)
}
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
s.cancel()
return false
}
if t.inTapHandle != nil {
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
})
return false
}
}
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}

View File

@@ -27,6 +27,7 @@ import (
"math"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@@ -110,6 +111,7 @@ type parsedHeaderData struct {
timeoutSet bool
timeout time.Duration
method string
httpMethod string
// key-value metadata map from the peer.
mdata map[string][]string
statsTags []byte
@@ -362,6 +364,8 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
}
d.data.statsTrace = v
d.addMetadata(f.Name, string(v))
case ":method":
d.data.httpMethod = f.Value
default:
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
break
@@ -598,3 +602,31 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")
// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
return n, target[m1+1:]
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr := t.Path
if scheme == "unix" {
if addr == "" {
addr = t.Host
}
return scheme, addr
}
}
return net, target
}

View File

@@ -241,6 +241,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
@@ -611,6 +612,8 @@ type CallHdr struct {
ContentSubtype string
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
DoneFunc func() // called when the stream is finished
}
// ClientTransport is the common interface for all gRPC client-side transport
@@ -619,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error)
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
@@ -653,8 +656,9 @@ type ClientTransport interface {
// HTTP/2).
GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
// GetGoAwayReason returns the reason why GoAway frame was received, along
// with a human readable string with debug info.
GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package internal
import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)
// handshakeClusterNameKey is the type used as the key to store cluster name in
// the Attributes field of resolver.Address.
type handshakeClusterNameKey struct{}
// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field
// is updated with the cluster name.
func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(handshakeClusterNameKey{}, clusterName)
return addr
}
// GetXDSHandshakeClusterName returns cluster name stored in attr.
func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) {
v := attr.Value(handshakeClusterNameKey{})
name, ok := v.(string)
return name, ok
}