update kubernetes dependencies to v1.25.0
Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
14
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
14
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@@ -133,9 +133,11 @@ type cleanupStream struct {
|
||||
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
|
||||
|
||||
type earlyAbortStream struct {
|
||||
httpStatus uint32
|
||||
streamID uint32
|
||||
contentSubtype string
|
||||
status *status.Status
|
||||
rst bool
|
||||
}
|
||||
|
||||
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
|
||||
@@ -771,9 +773,12 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
|
||||
if l.side == clientSide {
|
||||
return errors.New("earlyAbortStream not handled on client")
|
||||
}
|
||||
|
||||
// In case the caller forgets to set the http status, default to 200.
|
||||
if eas.httpStatus == 0 {
|
||||
eas.httpStatus = 200
|
||||
}
|
||||
headerFields := []hpack.HeaderField{
|
||||
{Name: ":status", Value: "200"},
|
||||
{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
|
||||
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
|
||||
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
|
||||
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
|
||||
@@ -782,6 +787,11 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
|
||||
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if eas.rst {
|
||||
if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
4
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
4
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
@@ -136,12 +136,10 @@ type inFlow struct {
|
||||
|
||||
// newLimit updates the inflow window to a new value n.
|
||||
// It assumes that n is always greater than the old limit.
|
||||
func (f *inFlow) newLimit(n uint32) uint32 {
|
||||
func (f *inFlow) newLimit(n uint32) {
|
||||
f.mu.Lock()
|
||||
d := n - f.limit
|
||||
f.limit = n
|
||||
f.mu.Unlock()
|
||||
return d
|
||||
}
|
||||
|
||||
func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||
|
133
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
133
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@@ -25,6 +25,7 @@ import (
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -131,7 +132,7 @@ type http2Client struct {
|
||||
kpDormant bool
|
||||
|
||||
// Fields below are for channelz metric collection.
|
||||
channelzID int64 // channelz unique identification number
|
||||
channelzID *channelz.Identifier
|
||||
czData *channelzData
|
||||
|
||||
onGoAway func(GoAwayReason)
|
||||
@@ -146,13 +147,20 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
|
||||
address := addr.Addr
|
||||
networkType, ok := networktype.Get(addr)
|
||||
if fn != nil {
|
||||
// Special handling for unix scheme with custom dialer. Back in the day,
|
||||
// we did not have a unix resolver and therefore targets with a unix
|
||||
// scheme would end up using the passthrough resolver. So, user's used a
|
||||
// custom dialer in this case and expected the original dial target to
|
||||
// be passed to the custom dialer. Now, we have a unix resolver. But if
|
||||
// a custom dialer is specified, we want to retain the old behavior in
|
||||
// terms of the address being passed to the custom dialer.
|
||||
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
|
||||
// For backward compatibility, if the user dialed "unix:///path",
|
||||
// the passthrough resolver would be used and the user's custom
|
||||
// dialer would see "unix:///path". Since the unix resolver is used
|
||||
// and the address is now "/path", prepend "unix://" so the user's
|
||||
// custom dialer sees the same address.
|
||||
return fn(ctx, "unix://"+address)
|
||||
// Supported unix targets are either "unix://absolute-path" or
|
||||
// "unix:relative-path".
|
||||
if filepath.IsAbs(address) {
|
||||
return fn(ctx, "unix://"+address)
|
||||
}
|
||||
return fn(ctx, "unix:"+address)
|
||||
}
|
||||
return fn(ctx, address)
|
||||
}
|
||||
@@ -193,6 +201,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
}()
|
||||
|
||||
// gRPC, resolver, balancer etc. can specify arbitrary data in the
|
||||
// Attributes field of resolver.Address, which is shoved into connectCtx
|
||||
// and passed to the dialer and credential handshaker. This makes it possible for
|
||||
// address specific arbitrary data to reach custom dialers and credential handshakers.
|
||||
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
|
||||
|
||||
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
|
||||
if err != nil {
|
||||
if opts.FailOnNonTempDialError {
|
||||
@@ -237,11 +251,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
}
|
||||
if transportCreds != nil {
|
||||
// gRPC, resolver, balancer etc. can specify arbitrary data in the
|
||||
// Attributes field of resolver.Address, which is shoved into connectCtx
|
||||
// and passed to the credential handshaker. This makes it possible for
|
||||
// address specific arbitrary data to reach the credential handshaker.
|
||||
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
|
||||
rawConn := conn
|
||||
// Pull the deadline from the connectCtx, which will be used for
|
||||
// timeouts in the authentication protocol handshake. Can ignore the
|
||||
@@ -342,8 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
|
||||
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if t.keepaliveEnabled {
|
||||
t.kpDormancyCond = sync.NewCond(&t.mu)
|
||||
@@ -579,7 +589,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
|
||||
return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
|
||||
}
|
||||
for k, v := range data {
|
||||
// Capital header names are illegal in HTTP/2.
|
||||
@@ -616,12 +626,21 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
|
||||
return callAuthData, nil
|
||||
}
|
||||
|
||||
// NewStreamError wraps an error and reports additional information.
|
||||
// NewStreamError wraps an error and reports additional information. Typically
|
||||
// NewStream errors result in transparent retry, as they mean nothing went onto
|
||||
// the wire. However, there are two notable exceptions:
|
||||
//
|
||||
// 1. If the stream headers violate the max header list size allowed by the
|
||||
// server. It's possible this could succeed on another transport, even if
|
||||
// it's unlikely, but do not transparently retry.
|
||||
// 2. If the credentials errored when requesting their headers. In this case,
|
||||
// it's possible a retry can fix the problem, but indefinitely transparently
|
||||
// retrying is not appropriate as it is likely the credentials, if they can
|
||||
// eventually succeed, would need I/O to do so.
|
||||
type NewStreamError struct {
|
||||
Err error
|
||||
|
||||
DoNotRetry bool
|
||||
PerformedIO bool
|
||||
AllowTransparentRetry bool
|
||||
}
|
||||
|
||||
func (e NewStreamError) Error() string {
|
||||
@@ -630,25 +649,11 @@ func (e NewStreamError) Error() string {
|
||||
|
||||
// NewStream creates a stream and registers it into the transport as "active"
|
||||
// streams. All non-nil errors returned will be *NewStreamError.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
nse, ok := err.(*NewStreamError)
|
||||
if !ok {
|
||||
nse = &NewStreamError{Err: err}
|
||||
}
|
||||
if len(t.perRPCCreds) > 0 || callHdr.Creds != nil {
|
||||
// We may have performed I/O in the per-RPC creds callback, so do not
|
||||
// allow transparent retry.
|
||||
nse.PerformedIO = true
|
||||
}
|
||||
err = nse
|
||||
}
|
||||
}()
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
|
||||
}
|
||||
s := t.newStream(ctx, callHdr)
|
||||
cleanup := func(err error) {
|
||||
@@ -748,23 +753,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return true
|
||||
}, hdr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Connection closed.
|
||||
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
|
||||
}
|
||||
if success {
|
||||
break
|
||||
}
|
||||
if hdrListSizeErr != nil {
|
||||
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
|
||||
return nil, &NewStreamError{Err: hdrListSizeErr}
|
||||
}
|
||||
firstTry = false
|
||||
select {
|
||||
case <-ch:
|
||||
case <-s.ctx.Done():
|
||||
return nil, ContextErr(s.ctx.Err())
|
||||
case <-ctx.Done():
|
||||
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
|
||||
case <-t.goAway:
|
||||
return nil, errStreamDrain
|
||||
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
|
||||
case <-t.ctx.Done():
|
||||
return nil, ErrConnClosing
|
||||
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
|
||||
}
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
@@ -893,9 +899,7 @@ func (t *http2Client) Close(err error) {
|
||||
t.controlBuf.finish()
|
||||
t.cancel()
|
||||
t.conn.Close()
|
||||
if channelz.IsOn() {
|
||||
channelz.RemoveEntry(t.channelzID)
|
||||
}
|
||||
channelz.RemoveEntry(t.channelzID)
|
||||
// Append info about previous goaways if there were any, since this may be important
|
||||
// for understanding the root cause for this connection to be closed.
|
||||
_, goAwayDebugMessage := t.GetGoAwayReason()
|
||||
@@ -1077,7 +1081,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
}
|
||||
// The server has closed the stream without sending trailers. Record that
|
||||
// the read direction is closed, and set the status appropriately.
|
||||
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
|
||||
if f.StreamEnded() {
|
||||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
|
||||
}
|
||||
}
|
||||
@@ -1407,26 +1411,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
|
||||
isHeader := false
|
||||
defer func() {
|
||||
if t.statsHandler != nil {
|
||||
if isHeader {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Header: s.header.Copy(),
|
||||
Compression: s.recvCompress,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inHeader)
|
||||
} else {
|
||||
inTrailer := &stats.InTrailer{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Trailer: s.trailer.Copy(),
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inTrailer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// If headerChan hasn't been closed yet
|
||||
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||
@@ -1448,6 +1432,25 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
close(s.headerChan)
|
||||
}
|
||||
|
||||
if t.statsHandler != nil {
|
||||
if isHeader {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Header: metadata.MD(mdata).Copy(),
|
||||
Compression: s.recvCompress,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inHeader)
|
||||
} else {
|
||||
inTrailer := &stats.InTrailer{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
Trailer: metadata.MD(mdata).Copy(),
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.ctx, inTrailer)
|
||||
}
|
||||
}
|
||||
|
||||
if !endStream {
|
||||
return
|
||||
}
|
||||
@@ -1553,7 +1556,7 @@ func minTime(a, b time.Duration) time.Duration {
|
||||
return b
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
|
||||
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
|
||||
func (t *http2Client) keepalive() {
|
||||
p := &ping{data: [8]byte{}}
|
||||
// True iff a ping has been sent, and no data has been received since then.
|
||||
|
216
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
216
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@@ -21,7 +21,6 @@ package transport
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
@@ -36,6 +35,7 @@ import (
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/internal/syscall"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@@ -52,10 +52,10 @@ import (
|
||||
var (
|
||||
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
|
||||
// the stream's state.
|
||||
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
|
||||
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
|
||||
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
|
||||
// than the limit set by peer.
|
||||
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
|
||||
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
|
||||
)
|
||||
|
||||
// serverConnectionCounter counts the number of connections a server has seen
|
||||
@@ -73,7 +73,6 @@ type http2Server struct {
|
||||
writerDone chan struct{} // sync point to enable testing.
|
||||
remoteAddr net.Addr
|
||||
localAddr net.Addr
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
inTapHandle tap.ServerInHandle
|
||||
framer *framer
|
||||
@@ -118,21 +117,42 @@ type http2Server struct {
|
||||
idle time.Time
|
||||
|
||||
// Fields below are for channelz metric collection.
|
||||
channelzID int64 // channelz unique identification number
|
||||
channelzID *channelz.Identifier
|
||||
czData *channelzData
|
||||
bufferPool *bufferPool
|
||||
|
||||
connectionID uint64
|
||||
|
||||
// maxStreamMu guards the maximum stream ID
|
||||
// This lock may not be taken if mu is already held.
|
||||
maxStreamMu sync.Mutex
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
}
|
||||
|
||||
// NewServerTransport creates a http2 transport with conn and configuration
|
||||
// options from config.
|
||||
//
|
||||
// It returns a non-nil transport and a nil error on success. On failure, it
|
||||
// returns a non-nil transport and a nil-error. For a special case where the
|
||||
// returns a nil transport and a non-nil error. For a special case where the
|
||||
// underlying conn gets closed before the client preface could be read, it
|
||||
// returns a nil transport and a nil error.
|
||||
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
|
||||
var authInfo credentials.AuthInfo
|
||||
rawConn := conn
|
||||
if config.Credentials != nil {
|
||||
var err error
|
||||
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
|
||||
if err != nil {
|
||||
// ErrConnDispatched means that the connection was dispatched away
|
||||
// from gRPC; those connections should be left open. io.EOF means
|
||||
// the connection was closed before handshaking completed, which can
|
||||
// happen naturally from probers. Return these errors directly.
|
||||
if err == credentials.ErrConnDispatched || err == io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
||||
}
|
||||
}
|
||||
writeBufSize := config.WriteBufferSize
|
||||
readBufSize := config.ReadBufferSize
|
||||
maxHeaderListSize := defaultServerMaxHeaderListSize
|
||||
@@ -211,18 +231,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultServerKeepaliveTimeout
|
||||
}
|
||||
if kp.Time != infinity {
|
||||
if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
|
||||
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
|
||||
}
|
||||
}
|
||||
kep := config.KeepalivePolicy
|
||||
if kep.MinTime == 0 {
|
||||
kep.MinTime = defaultKeepalivePolicyMinTime
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
t := &http2Server{
|
||||
ctx: context.Background(),
|
||||
ctx: setConnection(context.Background(), rawConn),
|
||||
done: done,
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
authInfo: config.AuthInfo,
|
||||
authInfo: authInfo,
|
||||
framer: framer,
|
||||
readerDone: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
@@ -254,12 +280,12 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
connBegin := &stats.ConnBegin{}
|
||||
t.stats.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
|
||||
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
|
||||
|
||||
t.framer.writer.Flush()
|
||||
|
||||
defer func() {
|
||||
@@ -273,10 +299,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
if _, err := io.ReadFull(t.conn, preface); err != nil {
|
||||
// In deployments where a gRPC server runs behind a cloud load balancer
|
||||
// which performs regular TCP level health checks, the connection is
|
||||
// closed immediately by the latter. Skipping the error here will help
|
||||
// reduce log clutter.
|
||||
// closed immediately by the latter. Returning io.EOF here allows the
|
||||
// grpc server implementation to recognize this scenario and suppress
|
||||
// logging to reduce spam.
|
||||
if err == io.EOF {
|
||||
return nil, nil
|
||||
return nil, io.EOF
|
||||
}
|
||||
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
|
||||
}
|
||||
@@ -316,6 +343,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
|
||||
// Acquire max stream ID lock for entire duration
|
||||
t.maxStreamMu.Lock()
|
||||
defer t.maxStreamMu.Unlock()
|
||||
|
||||
streamID := frame.Header().StreamID
|
||||
|
||||
// frame.Truncated is set to true when framer detects that the current header
|
||||
@@ -330,6 +361,15 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
return false
|
||||
}
|
||||
|
||||
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
||||
// illegal gRPC stream id.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
|
||||
}
|
||||
return true
|
||||
}
|
||||
t.maxStreamID = streamID
|
||||
|
||||
buf := newRecvBuffer()
|
||||
s := &Stream{
|
||||
id: streamID,
|
||||
@@ -337,7 +377,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
}
|
||||
|
||||
var (
|
||||
// If a gRPC Response-Headers has already been received, then it means
|
||||
// that the peer is speaking gRPC and we are in gRPC mode.
|
||||
@@ -373,6 +412,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
if timeout, err = decodeTimeout(hf.Value); err != nil {
|
||||
headerError = true
|
||||
}
|
||||
// "Transports must consider requests containing the Connection header
|
||||
// as malformed." - A41
|
||||
case "connection":
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
|
||||
}
|
||||
headerError = true
|
||||
default:
|
||||
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
|
||||
break
|
||||
@@ -387,6 +433,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
}
|
||||
}
|
||||
|
||||
// "If multiple Host headers or multiple :authority headers are present, the
|
||||
// request must be rejected with an HTTP status code 400 as required by Host
|
||||
// validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
|
||||
// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
|
||||
// error, this takes precedence over a client not speaking gRPC.
|
||||
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
|
||||
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: %v", errMsg)
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 400,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.New(codes.Internal, errMsg),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
}
|
||||
|
||||
if !isGRPC || headerError {
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: streamID,
|
||||
@@ -397,6 +463,19 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
return false
|
||||
}
|
||||
|
||||
// "If :authority is missing, Host must be renamed to :authority." - A41
|
||||
if len(mdata[":authority"]) == 0 {
|
||||
// No-op if host isn't present, no eventual :authority header is a valid
|
||||
// RPC.
|
||||
if host, ok := mdata["host"]; ok {
|
||||
mdata[":authority"] = host
|
||||
delete(mdata, "host")
|
||||
}
|
||||
} else {
|
||||
// "If :authority is present, Host must be discarded" - A41
|
||||
delete(mdata, "host")
|
||||
}
|
||||
|
||||
if frame.StreamEnded() {
|
||||
// s is just created by the caller. No lock needed.
|
||||
s.state = streamReadDone
|
||||
@@ -441,26 +520,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
s.cancel()
|
||||
return false
|
||||
}
|
||||
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
||||
t.mu.Unlock()
|
||||
// illegal gRPC stream id.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
|
||||
}
|
||||
s.cancel()
|
||||
return true
|
||||
}
|
||||
t.maxStreamID = streamID
|
||||
if httpMethod != http.MethodPost {
|
||||
t.mu.Unlock()
|
||||
errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
|
||||
logger.Infof("transport: %v", errMsg)
|
||||
}
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
streamID: streamID,
|
||||
rst: true,
|
||||
rstCode: http2.ErrCodeProtocol,
|
||||
onWrite: func() {},
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 405,
|
||||
streamID: streamID,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.New(codes.Internal, errMsg),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
s.cancel()
|
||||
return false
|
||||
@@ -477,9 +548,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
stat = status.New(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: 200,
|
||||
streamID: s.id,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: stat,
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
}
|
||||
@@ -717,7 +790,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
s.write(recvMsg{buffer: buffer})
|
||||
}
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataEndStream) {
|
||||
if f.StreamEnded() {
|
||||
// Received the end of stream from the client.
|
||||
s.compareAndSwapState(streamActive, streamReadDone)
|
||||
s.write(recvMsg{err: io.EOF})
|
||||
@@ -861,11 +934,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrConnClosing
|
||||
default:
|
||||
}
|
||||
return ContextErr(s.ctx.Err())
|
||||
}
|
||||
|
||||
// WriteHeader sends the header metadata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
if s.updateHeaderSent() || s.getState() == streamDone {
|
||||
if s.updateHeaderSent() {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
|
||||
if s.getState() == streamDone {
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
|
||||
s.hdrMu.Lock()
|
||||
if md.Len() > 0 {
|
||||
if s.header.Len() > 0 {
|
||||
@@ -876,7 +963,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
}
|
||||
if err := t.writeHeaderLocked(s); err != nil {
|
||||
s.hdrMu.Unlock()
|
||||
return err
|
||||
return status.Convert(err).Err()
|
||||
}
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
@@ -992,23 +1079,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
if _, ok := err.(ConnectionError); ok {
|
||||
return err
|
||||
}
|
||||
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
|
||||
return status.Errorf(codes.Internal, "transport: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Writing headers checks for this condition.
|
||||
if s.getState() == streamDone {
|
||||
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
|
||||
s.cancel()
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrConnClosing
|
||||
default:
|
||||
}
|
||||
return ContextErr(s.ctx.Err())
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
}
|
||||
df := &dataFrame{
|
||||
@@ -1018,12 +1094,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
||||
onEachWrite: t.setResetPingStrikes,
|
||||
}
|
||||
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrConnClosing
|
||||
default:
|
||||
}
|
||||
return ContextErr(s.ctx.Err())
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
return t.controlBuf.put(df)
|
||||
}
|
||||
@@ -1146,9 +1217,7 @@ func (t *http2Server) Close() {
|
||||
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
|
||||
logger.Infof("transport: error closing conn during Close: %v", err)
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.RemoveEntry(t.channelzID)
|
||||
}
|
||||
channelz.RemoveEntry(t.channelzID)
|
||||
// Cancel all active streams.
|
||||
for _, s := range streams {
|
||||
s.cancel()
|
||||
@@ -1161,10 +1230,6 @@ func (t *http2Server) Close() {
|
||||
|
||||
// deleteStream deletes the stream s from transport's active streams.
|
||||
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; ok {
|
||||
@@ -1186,6 +1251,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
|
||||
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
||||
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
oldState := s.swapState(streamDone)
|
||||
if oldState == streamDone {
|
||||
// If the stream was already done, return.
|
||||
@@ -1205,6 +1275,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
|
||||
|
||||
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
s.swapState(streamDone)
|
||||
t.deleteStream(s, eosReceived)
|
||||
|
||||
@@ -1235,20 +1310,23 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
|
||||
// Handles outgoing GoAway and returns true if loopy needs to put itself
|
||||
// in draining mode.
|
||||
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
t.maxStreamMu.Lock()
|
||||
t.mu.Lock()
|
||||
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
|
||||
t.mu.Unlock()
|
||||
t.maxStreamMu.Unlock()
|
||||
// The transport is closing.
|
||||
return false, ErrConnClosing
|
||||
}
|
||||
sid := t.maxStreamID
|
||||
if !g.headsUp {
|
||||
// Stop accepting more streams now.
|
||||
t.state = draining
|
||||
sid := t.maxStreamID
|
||||
if len(t.activeStreams) == 0 {
|
||||
g.closeConn = true
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.maxStreamMu.Unlock()
|
||||
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -1261,6 +1339,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.maxStreamMu.Unlock()
|
||||
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
|
||||
// Follow that with a ping and wait for the ack to come back or a timer
|
||||
// to expire. During this time accept new streams since they might have
|
||||
@@ -1345,3 +1424,18 @@ func getJitter(v time.Duration) time.Duration {
|
||||
j := grpcrand.Int63n(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
||||
type connectionKey struct{}
|
||||
|
||||
// GetConnection gets the connection from the context.
|
||||
func GetConnection(ctx context.Context) net.Conn {
|
||||
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
|
||||
return conn
|
||||
}
|
||||
|
||||
// SetConnection adds the connection to the context to be able to get
|
||||
// information about the destination ip and port for an incoming RPC. This also
|
||||
// allows any unary or streaming interceptors to see the connection.
|
||||
func setConnection(ctx context.Context, conn net.Conn) context.Context {
|
||||
return context.WithValue(ctx, connectionKey{}, conn)
|
||||
}
|
||||
|
2
vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/transport/networktype/networktype.go
generated
vendored
@@ -31,7 +31,7 @@ const key = keyType("grpc.internal.transport.networktype")
|
||||
|
||||
// Set returns a copy of the provided address with attributes containing networkType.
|
||||
func Set(address resolver.Address, networkType string) resolver.Address {
|
||||
address.Attributes = address.Attributes.WithValues(key, networkType)
|
||||
address.Attributes = address.Attributes.WithValue(key, networkType)
|
||||
return address
|
||||
}
|
||||
|
||||
|
4
vendor/google.golang.org/grpc/internal/transport/proxy.go
generated
vendored
4
vendor/google.golang.org/grpc/internal/transport/proxy.go
generated
vendored
@@ -37,7 +37,7 @@ var (
|
||||
httpProxyFromEnvironment = http.ProxyFromEnvironment
|
||||
)
|
||||
|
||||
func mapAddress(ctx context.Context, address string) (*url.URL, error) {
|
||||
func mapAddress(address string) (*url.URL, error) {
|
||||
req := &http.Request{
|
||||
URL: &url.URL{
|
||||
Scheme: "https",
|
||||
@@ -114,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
|
||||
// connection.
|
||||
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
|
||||
newAddr := addr
|
||||
proxyURL, err := mapAddress(ctx, addr)
|
||||
proxyURL, err := mapAddress(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
15
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
15
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@@ -30,9 +30,11 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
@@ -518,7 +520,8 @@ const (
|
||||
// ServerConfig consists of all the configurations to establish a server transport.
|
||||
type ServerConfig struct {
|
||||
MaxStreams uint32
|
||||
AuthInfo credentials.AuthInfo
|
||||
ConnectionTimeout time.Duration
|
||||
Credentials credentials.TransportCredentials
|
||||
InTapHandle tap.ServerInHandle
|
||||
StatsHandler stats.Handler
|
||||
KeepaliveParams keepalive.ServerParameters
|
||||
@@ -527,7 +530,7 @@ type ServerConfig struct {
|
||||
InitialConnWindowSize int32
|
||||
WriteBufferSize int
|
||||
ReadBufferSize int
|
||||
ChannelzParentID int64
|
||||
ChannelzParentID *channelz.Identifier
|
||||
MaxHeaderListSize *uint32
|
||||
HeaderTableSize *uint32
|
||||
}
|
||||
@@ -561,7 +564,7 @@ type ConnectOptions struct {
|
||||
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
|
||||
ReadBufferSize int
|
||||
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
|
||||
ChannelzParentID int64
|
||||
ChannelzParentID *channelz.Identifier
|
||||
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
|
||||
MaxHeaderListSize *uint32
|
||||
// UseProxy specifies if a proxy should be used.
|
||||
@@ -739,6 +742,12 @@ func (e ConnectionError) Origin() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// Unwrap returns the original error of this connection error or nil when the
|
||||
// origin is nil.
|
||||
func (e ConnectionError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrConnClosing indicates that the transport is closing.
|
||||
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
|
||||
|
Reference in New Issue
Block a user