Update k8s apis to release-1.14 and update all of vendor

This commit is contained in:
Michelle Au
2019-03-11 11:12:01 -07:00
parent 08e735383b
commit bb5f13e6bb
622 changed files with 85535 additions and 27177 deletions

View File

@@ -19,6 +19,7 @@
package grpc
import (
"context"
"errors"
"fmt"
"math"
@@ -29,7 +30,6 @@ import (
"sync/atomic"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/codes"
@@ -38,6 +38,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -84,7 +86,7 @@ var (
// with other individual Transport Credentials.
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
// errTransportCredentialsMissing indicates that users want to transmit security
// information (e.g., oauth2 token) which requires secure connection on an insecure
// information (e.g., OAuth2 token) which requires secure connection on an insecure
// connection.
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
@@ -123,12 +125,13 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
@@ -182,7 +185,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return dialContext(ctx, network, addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
@@ -234,9 +237,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
if cc.dopts.resolverBuilder == nil {
// If resolver builder is still nil, the parse target's scheme is
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original unparsed target.
// the original target.
grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
@@ -285,19 +288,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Build the resolver.
cc.resolverWrapper, err = newCCResolverWrapper(cc)
rWrapper, err := newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
// Start the resolver wrapper goroutine after resolverWrapper is created.
//
// If the goroutine is started before resolverWrapper is ready, the
// following may happen: The goroutine sends updates to cc. cc forwards
// those to balancer. Balancer creates new addrConn. addrConn fails to
// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
// resolverWrapper.
cc.resolverWrapper.start()
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
@@ -306,7 +304,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface{ Temporary() bool })
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
@@ -384,13 +384,13 @@ type ClientConn struct {
csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions
resolverWrapper *ccResolverWrapper
blockingpicker *pickerWrapper
mu sync.RWMutex
sc ServiceConfig
scRaw string
conns map[*addrConn]struct{}
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc ServiceConfig
scRaw string
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
@@ -399,6 +399,8 @@ type ClientConn struct {
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
}
@@ -434,7 +436,7 @@ func (cc *ClientConn) scWatcher() {
}
cc.mu.Lock()
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
// We may revisit this decision in the future.
cc.sc = sc
cc.scRaw = ""
cc.mu.Unlock()
@@ -444,6 +446,25 @@ func (cc *ClientConn) scWatcher() {
}
}
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc.firstResolveEvent.HasFired() {
return nil
}
select {
case <-cc.firstResolveEvent.Done():
return nil
case <-ctx.Done():
return status.FromContextError(ctx.Err()).Err()
case <-cc.ctx.Done():
return ErrClientConnClosing
}
}
func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -457,6 +478,7 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
}
cc.curAddresses = addrs
cc.firstResolveEvent.Fire()
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
@@ -569,13 +591,12 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
resetBackoff: make(chan struct{}),
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -657,11 +678,10 @@ func (ac *addrConn) connect() error {
return nil
}
ac.updateConnectivityState(connectivity.Connecting)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go ac.resetTransport(false)
go ac.resetTransport()
return nil
}
@@ -680,6 +700,12 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return true
}
// Unless we're busy reconnecting already, let's reconnect from the top of
// the list.
if ac.state != connectivity.Ready {
return false
}
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@@ -690,7 +716,6 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@@ -715,6 +740,12 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
return m
}
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
cc.mu.RLock()
defer cc.mu.RUnlock()
return cc.sc.healthCheckConfig
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
hdr, _ := metadata.FromOutgoingContext(ctx)
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
@@ -877,10 +908,13 @@ type addrConn struct {
acbw balancer.SubConn
scopts balancer.NewSubConnOptions
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
// health checking may require server to report healthy to set ac to READY), and is reset
// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
// is received, transport is closed, ac has been torn down).
transport transport.ClientTransport // The current transport.
mu sync.Mutex
addrIdx int // The index in addrs list to start reconnecting from.
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@@ -889,31 +923,28 @@ type addrConn struct {
tearDownErr error // The reason this addrConn is torn down.
backoffIdx int
// backoffDeadline is the time until which resetTransport needs to
// wait before increasing backoffIdx count.
backoffDeadline time.Time
// connectDeadline is the time by which all connection
// negotiations must complete.
connectDeadline time.Time
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
channelzID int64 // channelz unique identification number
channelzID int64 // channelz unique identification number.
czData *channelzData
successfulHandshake bool
}
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
if ac.state == s {
return
}
updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
ac.state = s
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
Desc: updateMsg,
Severity: channelz.CtINFO,
})
}
ac.cc.handleSubConnStateChange(ac.acbw, s)
}
// adjustParams updates parameters used to create transports upon
@@ -930,175 +961,221 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
}
}
// resetTransport makes sure that a healthy ac.transport exists.
//
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
//
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
func (ac *addrConn) resetTransport(resolveNow bool) {
for {
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
// want to reset is if we have tried all the addresses handed to us.
if resolveNow {
ac.mu.Lock()
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
tryNextAddrFromStart := grpcsync.NewEvent()
ac.mu.Lock()
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.mu.Unlock()
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// If the connection is READY, a failure must have occurred.
// Otherwise, we'll consider this is a transient failure when:
// We've exhausted all addresses
// We're in CONNECTING
// And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
ac.updateConnectivityState(connectivity.TransientFailure)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.transport = nil
ac.mu.Unlock()
if err := ac.nextAddr(); err != nil {
return
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
backoffIdx := ac.backoffIdx
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
addrs := ac.addrs
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if backoffFor > dialDuration {
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
connectDeadline := start.Add(dialDuration)
ac.backoffDeadline = start.Add(backoffFor)
ac.connectDeadline = connectDeadline
connectDeadline := time.Now().Add(dialDuration)
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
addrLoop:
for _, addr := range addrs {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.Connecting)
ac.transport = nil
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
}
hctx, hcancel := context.WithCancel(ac.ctx)
defer hcancel()
ac.mu.Unlock()
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
Severity: channelz.CtINFO,
})
}
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
hcancel()
if err == errConnClosing {
return
}
if tryNextAddrFromStart.HasFired() {
break addrLoop
}
continue
}
ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.mu.Unlock()
<-reconnect.Done()
hcancel()
if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()
if tryNextAddrFromStart.HasFired() {
break addrLoop
}
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}
}
// After exhausting all addresses, or after need to reconnect after a
// READY, the addrConn enters TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
if ac.state != connectivity.Connecting {
ac.updateConnectivityState(connectivity.Connecting)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
addr := ac.addrs[ac.addrIdx]
copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
}
// Backoff.
b := ac.resetBackoff
timer := time.NewTimer(backoffFor)
acctx := ac.ctx
ac.mu.Unlock()
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
Severity: channelz.CtINFO,
})
select {
case <-timer.C:
ac.mu.Lock()
ac.backoffIdx++
ac.mu.Unlock()
case <-b:
timer.Stop()
case <-acctx.Done():
timer.Stop()
return
}
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
continue
}
return
}
}
// createTransport creates a connection to one of the backends in addrs.
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
oneReset := sync.Once{}
skipReset := make(chan struct{})
allowedToReset := make(chan struct{})
prefaceReceived := make(chan struct{})
// createTransport creates a connection to one of the backends in addrs. It
// sets ac.transport in the success case, or it returns an error if it was
// unable to successfully create a transport.
//
// If waitForHandshake is enabled, it blocks until server preface arrives.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
onCloseCalled := make(chan struct{})
var prefaceMu sync.Mutex
var serverPrefaceReceived bool
var clientPrefaceWrote bool
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
select {
case <-skipReset: // The outer resetTransport loop will handle reconnection.
return
case <-allowedToReset: // We're in the clear to reset.
go oneReset.Do(func() { ac.resetTransport(false) })
}
}
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
select {
case <-skipReset: // The outer resetTransport loop will handle reconnection.
return
case <-allowedToReset: // We're in the clear to reset.
oneReset.Do(func() { ac.resetTransport(false) })
}
}
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
prefaceTimer := time.NewTimer(time.Until(connectDeadline))
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
ac.mu.Lock()
prefaceMu.Lock()
serverPrefaceReceived = true
if clientPrefaceWrote {
ac.successfulHandshake = true
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.addrIdx = 0
ac.backoffIdx = 0
}
prefaceMu.Unlock()
ac.mu.Unlock()
}
// Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
@@ -1106,14 +1183,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
prefaceMu.Lock()
clientPrefaceWrote = true
if serverPrefaceReceived {
ac.successfulHandshake = true
}
prefaceMu.Unlock()
if ac.dopts.waitForHandshake {
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
@@ -1123,10 +1193,9 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
close(allowedToReset)
return nil
return nil, errors.New("connection closed")
}
} else {
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
select {
case <-prefaceTimer.C:
@@ -1143,99 +1212,76 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
if err != nil {
// newTr is either nil, or closed.
cancel()
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return errConnClosing
return nil, errConnClosing
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return err
return nil, err
}
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
newTr.Close()
return errConnClosing
return nil, errConnClosing
}
ac.updateConnectivityState(connectivity.Ready)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.transport = newTr
ac.curAddr = addr
ac.mu.Unlock()
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
// goroutine failing races with all the code in this method that sets the connection to "ready".
close(allowedToReset)
return nil
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
return nil, errConnClosing
}
ac.mu.Unlock()
return newTr, nil
}
// nextAddr increments the addrIdx if there are more addresses to try. If
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
// increment the backoffIdx.
//
// nextAddr must be called without ac.mu being held.
func (ac *addrConn) nextAddr() error {
ac.mu.Lock()
// If a handshake has been observed, we expect the counters to have manually
// been reset so we'll just return, since we want the next usage to start
// at index 0.
if ac.successfulHandshake {
ac.successfulHandshake = false
ac.mu.Unlock()
return nil
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
// Set up the health check helper functions
newStream := func() (interface{}, error) {
return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
}
if ac.addrIdx < len(ac.addrs)-1 {
ac.addrIdx++
ac.mu.Unlock()
return nil
firstReady := true
reportHealth := func(ok bool) {
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.transport != newTr {
return
}
if ok {
if firstReady {
firstReady = false
ac.curAddr = addr
}
ac.updateConnectivityState(connectivity.Ready)
} else {
ac.updateConnectivityState(connectivity.TransientFailure)
}
}
ac.addrIdx = 0
ac.backoffIdx++
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
Severity: channelz.CtError,
})
}
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
}
ac.cc.resolveNow(resolver.ResolveNowOption{})
backoffDeadline := ac.backoffDeadline
b := ac.resetBackoff
ac.mu.Unlock()
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
select {
case <-timer.C:
case <-b:
timer.Stop()
case <-ac.ctx.Done():
timer.Stop()
return ac.ctx.Err()
}
return nil
}
func (ac *addrConn) resetConnectBackoff() {
@@ -1279,21 +1325,22 @@ func (ac *addrConn) tearDown(err error) {
ac.mu.Unlock()
return
}
curTr := ac.transport
ac.transport = nil
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancelation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac.mu.Unlock()
ac.transport.GracefulClose()
curTr.GracefulClose()
ac.mu.Lock()
}
if channelz.IsOn() {