add prune and remove unused packages
This commit is contained in:
184
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go
generated
vendored
184
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go
generated
vendored
@@ -1,184 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestItemExponentialFailureRateLimiter(t *testing.T) {
|
||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||
|
||||
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 4*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 8*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 16*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 1*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
|
||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second)
|
||||
for i := 0; i < 5; i++ {
|
||||
limiter.When("one")
|
||||
}
|
||||
if e, a := 32*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
limiter.When("overflow1")
|
||||
}
|
||||
if e, a := 1000*time.Second, limiter.When("overflow1"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter = NewItemExponentialFailureRateLimiter(1*time.Minute, 1000*time.Hour)
|
||||
for i := 0; i < 2; i++ {
|
||||
limiter.When("two")
|
||||
}
|
||||
if e, a := 4*time.Minute, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
limiter.When("overflow2")
|
||||
}
|
||||
if e, a := 1000*time.Hour, limiter.When("overflow2"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestItemFastSlowRateLimiter(t *testing.T) {
|
||||
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMaxOfRateLimiter(t *testing.T) {
|
||||
limiter := NewMaxOfRateLimiter(
|
||||
NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
|
||||
NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second),
|
||||
)
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 3*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 3*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
255
vendor/k8s.io/client-go/util/workqueue/delaying_queue_test.go
generated
vendored
255
vendor/k8s.io/client-go/util/workqueue/delaying_queue_test.go
generated
vendored
@@ -1,255 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
func TestSimpleQueue(t *testing.T) {
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
q := newDelayingQueue(fakeClock, "")
|
||||
|
||||
first := "foo"
|
||||
|
||||
q.AddAfter(first, 50*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
fakeClock.Step(60 * time.Millisecond)
|
||||
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Errorf("should have added")
|
||||
}
|
||||
item, _ := q.Get()
|
||||
q.Done(item)
|
||||
|
||||
// step past the next heartbeat
|
||||
fakeClock.Step(10 * time.Second)
|
||||
|
||||
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
|
||||
if q.Len() > 0 {
|
||||
return false, fmt.Errorf("added to queue")
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != wait.ErrWaitTimeout {
|
||||
t.Errorf("expected timeout, got: %v", err)
|
||||
}
|
||||
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeduping(t *testing.T) {
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
q := newDelayingQueue(fakeClock, "")
|
||||
|
||||
first := "foo"
|
||||
|
||||
q.AddAfter(first, 50*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
q.AddAfter(first, 70*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
// step past the first block, we should receive now
|
||||
fakeClock.Step(60 * time.Millisecond)
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Errorf("should have added")
|
||||
}
|
||||
item, _ := q.Get()
|
||||
q.Done(item)
|
||||
|
||||
// step past the second add
|
||||
fakeClock.Step(20 * time.Millisecond)
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
// test again, but this time the earlier should override
|
||||
q.AddAfter(first, 50*time.Millisecond)
|
||||
q.AddAfter(first, 30*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
fakeClock.Step(40 * time.Millisecond)
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Errorf("should have added")
|
||||
}
|
||||
item, _ = q.Get()
|
||||
q.Done(item)
|
||||
|
||||
// step past the second add
|
||||
fakeClock.Step(20 * time.Millisecond)
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddTwoFireEarly(t *testing.T) {
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
q := newDelayingQueue(fakeClock, "")
|
||||
|
||||
first := "foo"
|
||||
second := "bar"
|
||||
third := "baz"
|
||||
|
||||
q.AddAfter(first, 1*time.Second)
|
||||
q.AddAfter(second, 50*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
fakeClock.Step(60 * time.Millisecond)
|
||||
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
item, _ := q.Get()
|
||||
if !reflect.DeepEqual(item, second) {
|
||||
t.Errorf("expected %v, got %v", second, item)
|
||||
}
|
||||
|
||||
q.AddAfter(third, 2*time.Second)
|
||||
|
||||
fakeClock.Step(1 * time.Second)
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
item, _ = q.Get()
|
||||
if !reflect.DeepEqual(item, first) {
|
||||
t.Errorf("expected %v, got %v", first, item)
|
||||
}
|
||||
|
||||
fakeClock.Step(2 * time.Second)
|
||||
if err := waitForAdded(q, 1); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
item, _ = q.Get()
|
||||
if !reflect.DeepEqual(item, third) {
|
||||
t.Errorf("expected %v, got %v", third, item)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopyShifting(t *testing.T) {
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
q := newDelayingQueue(fakeClock, "")
|
||||
|
||||
first := "foo"
|
||||
second := "bar"
|
||||
third := "baz"
|
||||
|
||||
q.AddAfter(first, 1*time.Second)
|
||||
q.AddAfter(second, 500*time.Millisecond)
|
||||
q.AddAfter(third, 250*time.Millisecond)
|
||||
if err := waitForWaitingQueueToFill(q); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
if q.Len() != 0 {
|
||||
t.Errorf("should not have added")
|
||||
}
|
||||
|
||||
fakeClock.Step(2 * time.Second)
|
||||
|
||||
if err := waitForAdded(q, 3); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
actualFirst, _ := q.Get()
|
||||
if !reflect.DeepEqual(actualFirst, third) {
|
||||
t.Errorf("expected %v, got %v", third, actualFirst)
|
||||
}
|
||||
actualSecond, _ := q.Get()
|
||||
if !reflect.DeepEqual(actualSecond, second) {
|
||||
t.Errorf("expected %v, got %v", second, actualSecond)
|
||||
}
|
||||
actualThird, _ := q.Get()
|
||||
if !reflect.DeepEqual(actualThird, first) {
|
||||
t.Errorf("expected %v, got %v", first, actualThird)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
|
||||
r := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
q := newDelayingQueue(fakeClock, "")
|
||||
|
||||
// Add items
|
||||
for n := 0; n < b.N; n++ {
|
||||
data := fmt.Sprintf("%d", n)
|
||||
q.AddAfter(data, time.Duration(r.Int63n(int64(10*time.Minute))))
|
||||
}
|
||||
|
||||
// Exercise item removal as well
|
||||
fakeClock.Step(11 * time.Minute)
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, _ = q.Get()
|
||||
}
|
||||
}
|
||||
|
||||
func waitForAdded(q DelayingInterface, depth int) error {
|
||||
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||
if q.Len() == depth {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func waitForWaitingQueueToFill(q DelayingInterface) error {
|
||||
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||
if len(q.(*delayingType).waitingForAddCh) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
293
vendor/k8s.io/client-go/util/workqueue/metrics_test.go
generated
vendored
293
vendor/k8s.io/client-go/util/workqueue/metrics_test.go
generated
vendored
@@ -1,293 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
type testMetrics struct {
|
||||
added, gotten, finished int64
|
||||
|
||||
updateCalled chan<- struct{}
|
||||
}
|
||||
|
||||
func (m *testMetrics) add(item t) { m.added++ }
|
||||
func (m *testMetrics) get(item t) { m.gotten++ }
|
||||
func (m *testMetrics) done(item t) { m.finished++ }
|
||||
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
|
||||
|
||||
func TestMetricShutdown(t *testing.T) {
|
||||
ch := make(chan struct{})
|
||||
m := &testMetrics{
|
||||
updateCalled: ch,
|
||||
}
|
||||
c := clock.NewFakeClock(time.Now())
|
||||
q := newQueue(c, m, time.Millisecond)
|
||||
for !c.HasWaiters() {
|
||||
// Wait for the go routine to call NewTicker()
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
c.Step(time.Millisecond)
|
||||
<-ch
|
||||
q.ShutDown()
|
||||
|
||||
c.Step(time.Hour)
|
||||
select {
|
||||
default:
|
||||
return
|
||||
case <-ch:
|
||||
t.Errorf("Unexpected update after shutdown was called.")
|
||||
}
|
||||
}
|
||||
|
||||
type testMetric struct {
|
||||
inc int64
|
||||
dec int64
|
||||
set float64
|
||||
|
||||
observedValue float64
|
||||
observedCount int
|
||||
|
||||
notifyCh chan<- struct{}
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (m *testMetric) Inc() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.inc++
|
||||
m.notify()
|
||||
}
|
||||
|
||||
func (m *testMetric) Dec() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.dec++
|
||||
m.notify()
|
||||
}
|
||||
|
||||
func (m *testMetric) Set(f float64) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.set = f
|
||||
m.notify()
|
||||
}
|
||||
|
||||
func (m *testMetric) Observe(f float64) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
m.observedValue = f
|
||||
m.observedCount++
|
||||
m.notify()
|
||||
}
|
||||
|
||||
func (m *testMetric) gaugeValue() float64 {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.set != 0 {
|
||||
return m.set
|
||||
}
|
||||
return float64(m.inc - m.dec)
|
||||
}
|
||||
|
||||
func (m *testMetric) observationValue() float64 {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.observedValue
|
||||
}
|
||||
|
||||
func (m *testMetric) observationCount() int {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.observedCount
|
||||
}
|
||||
|
||||
func (m *testMetric) notify() {
|
||||
if m.notifyCh != nil {
|
||||
m.notifyCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
type testMetricsProvider struct {
|
||||
depth testMetric
|
||||
adds testMetric
|
||||
latency testMetric
|
||||
duration testMetric
|
||||
unfinished testMetric
|
||||
longest testMetric
|
||||
retries testMetric
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
||||
return &m.depth
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
|
||||
return &m.adds
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
|
||||
return &m.latency
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
|
||||
return &m.duration
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
|
||||
return &m.unfinished
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
|
||||
return &m.longest
|
||||
}
|
||||
|
||||
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
||||
return &m.retries
|
||||
}
|
||||
|
||||
func TestSinceInMicroseconds(t *testing.T) {
|
||||
mp := testMetricsProvider{}
|
||||
c := clock.NewFakeClock(time.Now())
|
||||
mf := queueMetricsFactory{metricsProvider: &mp}
|
||||
m := mf.newQueueMetrics("test", c)
|
||||
dqm := m.(*defaultQueueMetrics)
|
||||
|
||||
for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} {
|
||||
n := c.Now()
|
||||
c.Step(time.Duration(i) * time.Microsecond)
|
||||
if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics(t *testing.T) {
|
||||
mp := testMetricsProvider{}
|
||||
t0 := time.Unix(0, 0)
|
||||
c := clock.NewFakeClock(t0)
|
||||
mf := queueMetricsFactory{metricsProvider: &mp}
|
||||
m := mf.newQueueMetrics("test", c)
|
||||
q := newQueue(c, m, time.Millisecond)
|
||||
defer q.ShutDown()
|
||||
for !c.HasWaiters() {
|
||||
// Wait for the go routine to call NewTicker()
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
q.Add("foo")
|
||||
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
c.Step(50 * time.Microsecond)
|
||||
|
||||
// Start processing
|
||||
i, _ := q.Get()
|
||||
if i != "foo" {
|
||||
t.Errorf("Expected %v, got %v", "foo", i)
|
||||
}
|
||||
|
||||
if e, a := 50.0, mp.latency.observationValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1, mp.latency.observationCount(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
// Add it back while processing; multiple adds of the same item are
|
||||
// de-duped.
|
||||
q.Add(i)
|
||||
q.Add(i)
|
||||
q.Add(i)
|
||||
q.Add(i)
|
||||
q.Add(i)
|
||||
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
// One thing remains in the queue
|
||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
c.Step(25 * time.Microsecond)
|
||||
|
||||
// Finish it up
|
||||
q.Done(i)
|
||||
|
||||
if e, a := 25.0, mp.duration.observationValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1, mp.duration.observationCount(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
// One thing remains in the queue
|
||||
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
// It should be back on the queue
|
||||
i, _ = q.Get()
|
||||
if i != "foo" {
|
||||
t.Errorf("Expected %v, got %v", "foo", i)
|
||||
}
|
||||
|
||||
if e, a := 25.0, mp.latency.observationValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, mp.latency.observationCount(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
// use a channel to ensure we don't look at the metric before it's
|
||||
// been set.
|
||||
ch := make(chan struct{}, 1)
|
||||
mp.unfinished.notifyCh = ch
|
||||
c.Step(time.Millisecond)
|
||||
<-ch
|
||||
mp.unfinished.notifyCh = nil
|
||||
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1000.0, mp.longest.gaugeValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
// Finish that one up
|
||||
q.Done(i)
|
||||
if e, a := 1000.0, mp.duration.observationValue(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, mp.duration.observationCount(); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
161
vendor/k8s.io/client-go/util/workqueue/queue_test.go
generated
vendored
161
vendor/k8s.io/client-go/util/workqueue/queue_test.go
generated
vendored
@@ -1,161 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue_test
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
// If something is seriously wrong this test will never complete.
|
||||
q := workqueue.New()
|
||||
|
||||
// Start producers
|
||||
const producers = 50
|
||||
producerWG := sync.WaitGroup{}
|
||||
producerWG.Add(producers)
|
||||
for i := 0; i < producers; i++ {
|
||||
go func(i int) {
|
||||
defer producerWG.Done()
|
||||
for j := 0; j < 50; j++ {
|
||||
q.Add(i)
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Start consumers
|
||||
const consumers = 10
|
||||
consumerWG := sync.WaitGroup{}
|
||||
consumerWG.Add(consumers)
|
||||
for i := 0; i < consumers; i++ {
|
||||
go func(i int) {
|
||||
defer consumerWG.Done()
|
||||
for {
|
||||
item, quit := q.Get()
|
||||
if item == "added after shutdown!" {
|
||||
t.Errorf("Got an item added after shutdown.")
|
||||
}
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
t.Logf("Worker %v: begin processing %v", i, item)
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
t.Logf("Worker %v: done processing %v", i, item)
|
||||
q.Done(item)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
producerWG.Wait()
|
||||
q.ShutDown()
|
||||
q.Add("added after shutdown!")
|
||||
consumerWG.Wait()
|
||||
}
|
||||
|
||||
func TestAddWhileProcessing(t *testing.T) {
|
||||
q := workqueue.New()
|
||||
|
||||
// Start producers
|
||||
const producers = 50
|
||||
producerWG := sync.WaitGroup{}
|
||||
producerWG.Add(producers)
|
||||
for i := 0; i < producers; i++ {
|
||||
go func(i int) {
|
||||
defer producerWG.Done()
|
||||
q.Add(i)
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Start consumers
|
||||
const consumers = 10
|
||||
consumerWG := sync.WaitGroup{}
|
||||
consumerWG.Add(consumers)
|
||||
for i := 0; i < consumers; i++ {
|
||||
go func(i int) {
|
||||
defer consumerWG.Done()
|
||||
// Every worker will re-add every item up to two times.
|
||||
// This tests the dirty-while-processing case.
|
||||
counters := map[interface{}]int{}
|
||||
for {
|
||||
item, quit := q.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
counters[item]++
|
||||
if counters[item] < 2 {
|
||||
q.Add(item)
|
||||
}
|
||||
q.Done(item)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
producerWG.Wait()
|
||||
q.ShutDown()
|
||||
consumerWG.Wait()
|
||||
}
|
||||
|
||||
func TestLen(t *testing.T) {
|
||||
q := workqueue.New()
|
||||
q.Add("foo")
|
||||
if e, a := 1, q.Len(); e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
q.Add("bar")
|
||||
if e, a := 2, q.Len(); e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
q.Add("foo") // should not increase the queue length.
|
||||
if e, a := 2, q.Len(); e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReinsert(t *testing.T) {
|
||||
q := workqueue.New()
|
||||
q.Add("foo")
|
||||
|
||||
// Start processing
|
||||
i, _ := q.Get()
|
||||
if i != "foo" {
|
||||
t.Errorf("Expected %v, got %v", "foo", i)
|
||||
}
|
||||
|
||||
// Add it back while processing
|
||||
q.Add(i)
|
||||
|
||||
// Finish it up
|
||||
q.Done(i)
|
||||
|
||||
// It should be back on the queue
|
||||
i, _ = q.Get()
|
||||
if i != "foo" {
|
||||
t.Errorf("Expected %v, got %v", "foo", i)
|
||||
}
|
||||
|
||||
// Finish that one up
|
||||
q.Done(i)
|
||||
|
||||
if a := q.Len(); a != 0 {
|
||||
t.Errorf("Expected queue to be empty. Has %v items", a)
|
||||
}
|
||||
}
|
75
vendor/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go
generated
vendored
75
vendor/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go
generated
vendored
@@ -1,75 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
func TestRateLimitingQueue(t *testing.T) {
|
||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
delayingQueue := &delayingType{
|
||||
Interface: New(),
|
||||
clock: fakeClock,
|
||||
heartbeat: fakeClock.NewTicker(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan *waitFor, 1000),
|
||||
metrics: newRetryMetrics(""),
|
||||
}
|
||||
queue.DelayingInterface = delayingQueue
|
||||
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry := <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, queue.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
queue.AddRateLimited("two")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("two")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
queue.Forget("one")
|
||||
if e, a := 0, queue.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user