external-snapshotter should use CSI connection lib
This commit is contained in:
@@ -52,7 +52,7 @@ const (
|
|||||||
var (
|
var (
|
||||||
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot content for snapshot that requests a VolumeSnapshotClass with a snapshotter field set equal to this name.")
|
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot content for snapshot that requests a VolumeSnapshotClass with a snapshotter field set equal to this name.")
|
||||||
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
|
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
|
||||||
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
|
connectionTimeout = flag.Duration("connection-timeout", 0, "The --connection-timeout flag is deprecated")
|
||||||
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
|
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
|
||||||
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
|
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
|
||||||
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
|
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
|
||||||
@@ -76,6 +76,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
glog.Infof("Version: %s", version)
|
glog.Infof("Version: %s", version)
|
||||||
|
|
||||||
|
if *connectionTimeout != 0 {
|
||||||
|
glog.Warning("--connection-timeout is deprecated and will have no effect")
|
||||||
|
}
|
||||||
|
|
||||||
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
|
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
|
||||||
config, err := buildConfig(*kubeconfig)
|
config, err := buildConfig(*kubeconfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -116,7 +120,7 @@ func main() {
|
|||||||
snapshotscheme.AddToScheme(scheme.Scheme)
|
snapshotscheme.AddToScheme(scheme.Scheme)
|
||||||
|
|
||||||
// Connect to CSI.
|
// Connect to CSI.
|
||||||
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
|
csiConn, err := connection.New(*csiAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(err.Error())
|
glog.Error(err.Error())
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@@ -19,17 +19,14 @@ package connection
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/golang/protobuf/ptypes"
|
"github.com/golang/protobuf/ptypes"
|
||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
"github.com/golang/protobuf/ptypes/timestamp"
|
||||||
|
"github.com/kubernetes-csi/csi-lib-utils/connection"
|
||||||
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/connectivity"
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -73,8 +70,8 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// New returns a CSI connection object.
|
// New returns a CSI connection object.
|
||||||
func New(address string, timeout time.Duration) (CSIConnection, error) {
|
func New(address string) (CSIConnection, error) {
|
||||||
conn, err := connect(address, timeout)
|
conn, err := connection.Connect(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -83,39 +80,6 @@ func New(address string, timeout time.Duration) (CSIConnection, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
|
|
||||||
glog.V(2).Infof("Connecting to %s", address)
|
|
||||||
dialOptions := []grpc.DialOption{
|
|
||||||
grpc.WithInsecure(),
|
|
||||||
grpc.WithBackoffMaxDelay(time.Second),
|
|
||||||
grpc.WithUnaryInterceptor(logGRPC),
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(address, "/") {
|
|
||||||
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
||||||
return net.DialTimeout("unix", addr, timeout)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
conn, err := grpc.Dial(address, dialOptions...)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
||||||
defer cancel()
|
|
||||||
for {
|
|
||||||
if !conn.WaitForStateChange(ctx, conn.GetState()) {
|
|
||||||
glog.V(4).Infof("Connection timed out")
|
|
||||||
// subsequent GetPluginInfo will show the real connection error
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
if conn.GetState() == connectivity.Ready {
|
|
||||||
glog.V(3).Infof("Connected")
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("Still trying, connection is %s", conn.GetState())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
|
func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
|
||||||
client := csi.NewIdentityClient(c.conn)
|
client := csi.NewIdentityClient(c.conn)
|
||||||
|
|
||||||
|
@@ -50,7 +50,7 @@ func createMockServer(t *testing.T) (*gomock.Controller, *driver.MockCSIDriver,
|
|||||||
|
|
||||||
// Create a client connection to it
|
// Create a client connection to it
|
||||||
addr := drv.Address()
|
addr := drv.Address()
|
||||||
csiConn, err := New(addr, 10)
|
csiConn, err := New(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, nil, err
|
return nil, nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user