update vendor csi-lib-utils@v0.6.1

Signed-off-by: Andrew Sy Kim <kiman@vmware.com>
This commit is contained in:
Andrew Sy Kim
2019-03-29 15:14:11 -04:00
parent 54a21f108e
commit 4e3f4a86ec
31 changed files with 2651 additions and 365 deletions

View File

@@ -0,0 +1,110 @@
/*
Copyright 2019 Kubernetes Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"net"
"sync"
"google.golang.org/grpc/reflection"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
)
// CSIDriverControllerServer is the Controller service component of the driver.
type CSIDriverControllerServer struct {
Controller csi.ControllerServer
Identity csi.IdentityServer
}
// CSIDriverController is the CSI Driver Controller backend.
type CSIDriverController struct {
listener net.Listener
server *grpc.Server
controllerServer *CSIDriverControllerServer
wg sync.WaitGroup
running bool
lock sync.Mutex
creds *CSICreds
}
func NewCSIDriverController(controllerServer *CSIDriverControllerServer) *CSIDriverController {
return &CSIDriverController{
controllerServer: controllerServer,
}
}
func (c *CSIDriverController) goServe(started chan<- bool) {
goServe(c.server, &c.wg, c.listener, started)
}
func (c *CSIDriverController) Address() string {
return c.listener.Addr().String()
}
func (c *CSIDriverController) Start(l net.Listener) error {
c.lock.Lock()
defer c.lock.Unlock()
// Set listener.
c.listener = l
// Create a new grpc server.
c.server = grpc.NewServer(
grpc.UnaryInterceptor(c.callInterceptor),
)
if c.controllerServer.Controller != nil {
csi.RegisterControllerServer(c.server, c.controllerServer.Controller)
}
if c.controllerServer.Identity != nil {
csi.RegisterIdentityServer(c.server, c.controllerServer.Identity)
}
reflection.Register(c.server)
waitForServer := make(chan bool)
c.goServe(waitForServer)
<-waitForServer
c.running = true
return nil
}
func (c *CSIDriverController) Stop() {
stop(&c.lock, &c.wg, c.server, c.running)
}
func (c *CSIDriverController) Close() {
c.server.Stop()
}
func (c *CSIDriverController) IsRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.running
}
func (c *CSIDriverController) SetDefaultCreds() {
setDefaultCreds(c.creds)
}
func (c *CSIDriverController) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return callInterceptor(ctx, c.creds, req, info, handler)
}

View File

@@ -0,0 +1,109 @@
/*
Copyright 2019 Kubernetes Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
context "context"
"net"
"sync"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// CSIDriverNodeServer is the Node service component of the driver.
type CSIDriverNodeServer struct {
Node csi.NodeServer
Identity csi.IdentityServer
}
// CSIDriverNode is the CSI Driver Node backend.
type CSIDriverNode struct {
listener net.Listener
server *grpc.Server
nodeServer *CSIDriverNodeServer
wg sync.WaitGroup
running bool
lock sync.Mutex
creds *CSICreds
}
func NewCSIDriverNode(nodeServer *CSIDriverNodeServer) *CSIDriverNode {
return &CSIDriverNode{
nodeServer: nodeServer,
}
}
func (c *CSIDriverNode) goServe(started chan<- bool) {
goServe(c.server, &c.wg, c.listener, started)
}
func (c *CSIDriverNode) Address() string {
return c.listener.Addr().String()
}
func (c *CSIDriverNode) Start(l net.Listener) error {
c.lock.Lock()
defer c.lock.Unlock()
// Set listener.
c.listener = l
// Create a new grpc server.
c.server = grpc.NewServer(
grpc.UnaryInterceptor(c.callInterceptor),
)
if c.nodeServer.Node != nil {
csi.RegisterNodeServer(c.server, c.nodeServer.Node)
}
if c.nodeServer.Identity != nil {
csi.RegisterIdentityServer(c.server, c.nodeServer.Identity)
}
reflection.Register(c.server)
waitForServer := make(chan bool)
c.goServe(waitForServer)
<-waitForServer
c.running = true
return nil
}
func (c *CSIDriverNode) Stop() {
stop(&c.lock, &c.wg, c.server, c.running)
}
func (c *CSIDriverNode) Close() {
c.server.Stop()
}
func (c *CSIDriverNode) IsRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.running
}
func (c *CSIDriverNode) SetDefaultCreds() {
setDefaultCreds(c.creds)
}
func (c *CSIDriverNode) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return callInterceptor(ctx, c.creds, req, info, handler)
}

View File

@@ -41,6 +41,8 @@ var (
ErrAuthFailed = errors.New("authentication failed")
)
// CSIDriverServers is a unified driver component with both Controller and Node
// services.
type CSIDriverServers struct {
Controller csi.ControllerServer
Identity csi.IdentityServer
@@ -54,14 +56,15 @@ const secretField = "secretKey"
// secrets. This mock driver has a single string secret with secretField as the
// key.
type CSICreds struct {
CreateVolumeSecret string
DeleteVolumeSecret string
ControllerPublishVolumeSecret string
ControllerUnpublishVolumeSecret string
NodeStageVolumeSecret string
NodePublishVolumeSecret string
CreateSnapshotSecret string
DeleteSnapshotSecret string
CreateVolumeSecret string
DeleteVolumeSecret string
ControllerPublishVolumeSecret string
ControllerUnpublishVolumeSecret string
NodeStageVolumeSecret string
NodePublishVolumeSecret string
CreateSnapshotSecret string
DeleteSnapshotSecret string
ControllerValidateVolumeCapabilitiesSecret string
}
type CSIDriver struct {
@@ -81,15 +84,7 @@ func NewCSIDriver(servers *CSIDriverServers) *CSIDriver {
}
func (c *CSIDriver) goServe(started chan<- bool) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
started <- true
err := c.server.Serve(c.listener)
if err != nil {
panic(err.Error())
}
}()
goServe(c.server, &c.wg, c.listener, started)
}
func (c *CSIDriver) Address() string {
@@ -128,15 +123,7 @@ func (c *CSIDriver) Start(l net.Listener) error {
}
func (c *CSIDriver) Stop() {
c.lock.Lock()
defer c.lock.Unlock()
if !c.running {
return
}
c.server.Stop()
c.wg.Wait()
stop(&c.lock, &c.wg, c.server, c.running)
}
func (c *CSIDriver) Close() {
@@ -152,20 +139,56 @@ func (c *CSIDriver) IsRunning() bool {
// SetDefaultCreds sets the default secrets for CSI creds.
func (c *CSIDriver) SetDefaultCreds() {
c.creds = &CSICreds{
CreateVolumeSecret: "secretval1",
DeleteVolumeSecret: "secretval2",
ControllerPublishVolumeSecret: "secretval3",
ControllerUnpublishVolumeSecret: "secretval4",
NodeStageVolumeSecret: "secretval5",
NodePublishVolumeSecret: "secretval6",
CreateSnapshotSecret: "secretval7",
DeleteSnapshotSecret: "secretval8",
}
setDefaultCreds(c.creds)
}
func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
err := c.authInterceptor(req)
return callInterceptor(ctx, c.creds, req, info, handler)
}
// goServe starts a grpc server.
func goServe(server *grpc.Server, wg *sync.WaitGroup, listener net.Listener, started chan<- bool) {
wg.Add(1)
go func() {
defer wg.Done()
started <- true
err := server.Serve(listener)
if err != nil {
panic(err.Error())
}
}()
}
// stop stops a grpc server.
func stop(lock *sync.Mutex, wg *sync.WaitGroup, server *grpc.Server, running bool) {
lock.Lock()
defer lock.Unlock()
if !running {
return
}
server.Stop()
wg.Wait()
}
// setDefaultCreds sets the default credentials, given a CSICreds instance.
func setDefaultCreds(creds *CSICreds) {
creds = &CSICreds{
CreateVolumeSecret: "secretval1",
DeleteVolumeSecret: "secretval2",
ControllerPublishVolumeSecret: "secretval3",
ControllerUnpublishVolumeSecret: "secretval4",
NodeStageVolumeSecret: "secretval5",
NodePublishVolumeSecret: "secretval6",
CreateSnapshotSecret: "secretval7",
DeleteSnapshotSecret: "secretval8",
ControllerValidateVolumeCapabilitiesSecret: "secretval9",
}
}
func callInterceptor(ctx context.Context, creds *CSICreds, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
err := authInterceptor(creds, req)
if err != nil {
logGRPC(info.FullMethod, req, nil, err)
return nil, err
@@ -175,9 +198,9 @@ func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *
return rsp, err
}
func (c *CSIDriver) authInterceptor(req interface{}) error {
if c.creds != nil {
authenticated, authErr := isAuthenticated(req, c.creds)
func authInterceptor(creds *CSICreds, req interface{}) error {
if creds != nil {
authenticated, authErr := isAuthenticated(req, creds)
if !authenticated {
if authErr == ErrNoCredentials {
return status.Error(codes.InvalidArgument, authErr.Error())
@@ -227,6 +250,8 @@ func isAuthenticated(req interface{}, creds *CSICreds) (bool, error) {
return authenticateCreateSnapshot(r, creds)
case *csi.DeleteSnapshotRequest:
return authenticateDeleteSnapshot(r, creds)
case *csi.ValidateVolumeCapabilitiesRequest:
return authenticateControllerValidateVolumeCapabilities(r, creds)
default:
return true, nil
}
@@ -264,6 +289,10 @@ func authenticateDeleteSnapshot(req *csi.DeleteSnapshotRequest, creds *CSICreds)
return credsCheck(req.GetSecrets(), creds.DeleteSnapshotSecret)
}
func authenticateControllerValidateVolumeCapabilities(req *csi.ValidateVolumeCapabilitiesRequest, creds *CSICreds) (bool, error) {
return credsCheck(req.GetSecrets(), creds.ControllerValidateVolumeCapabilitiesSecret)
}
func credsCheck(secrets map[string]string, secretVal string) (bool, error) {
if len(secrets) == 0 {
return false, ErrNoCredentials

View File

@@ -96,6 +96,19 @@ func (m *MockControllerServer) EXPECT() *MockControllerServerMockRecorder {
return m.recorder
}
// ControllerExpandVolume mocks base method
func (m *MockControllerServer) ControllerExpandVolume(arg0 context.Context, arg1 *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
ret := m.ctrl.Call(m, "ControllerExpandVolume", arg0, arg1)
ret0, _ := ret[0].(*csi.ControllerExpandVolumeResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ControllerExpandVolume indicates an expected call of ControllerExpandVolume
func (mr *MockControllerServerMockRecorder) ControllerExpandVolume(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ControllerExpandVolume", reflect.TypeOf((*MockControllerServer)(nil).ControllerExpandVolume), arg0, arg1)
}
// ControllerGetCapabilities mocks base method
func (m *MockControllerServer) ControllerGetCapabilities(arg0 context.Context, arg1 *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
ret := m.ctrl.Call(m, "ControllerGetCapabilities", arg0, arg1)
@@ -262,6 +275,19 @@ func (m *MockNodeServer) EXPECT() *MockNodeServerMockRecorder {
return m.recorder
}
// NodeExpandVolume mocks base method
func (m *MockNodeServer) NodeExpandVolume(arg0 context.Context, arg1 *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
ret := m.ctrl.Call(m, "NodeExpandVolume", arg0, arg1)
ret0, _ := ret[0].(*csi.NodeExpandVolumeResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// NodeExpandVolume indicates an expected call of NodeExpandVolume
func (mr *MockNodeServerMockRecorder) NodeExpandVolume(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeExpandVolume", reflect.TypeOf((*MockNodeServer)(nil).NodeExpandVolume), arg0, arg1)
}
// NodeGetCapabilities mocks base method
func (m *MockNodeServer) NodeGetCapabilities(arg0 context.Context, arg1 *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
ret := m.ctrl.Call(m, "NodeGetCapabilities", arg0, arg1)

View File

@@ -46,9 +46,9 @@ func NewMockCSIDriver(servers *MockCSIDriverServers) *MockCSIDriver {
}
}
func (m *MockCSIDriver) Start() error {
// Listen on a port assigned by the net package
l, err := net.Listen("tcp", "127.0.0.1:0")
// StartOnAddress starts a new gRPC server listening on given address.
func (m *MockCSIDriver) StartOnAddress(network, address string) error {
l, err := net.Listen(network, address)
if err != nil {
return err
}
@@ -61,6 +61,12 @@ func (m *MockCSIDriver) Start() error {
return nil
}
// Start starts a new gRPC server listening on a random TCP loopback port.
func (m *MockCSIDriver) Start() error {
// Listen on a port assigned by the net package
return m.StartOnAddress("tcp", "127.0.0.1:0")
}
func (m *MockCSIDriver) Nexus() (*grpc.ClientConn, error) {
// Start server
err := m.Start()