aboutsummaryrefslogtreecommitdiff
path: root/fs/sync/pipe_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'fs/sync/pipe_test.go')
-rw-r--r--fs/sync/pipe_test.go290
1 files changed, 290 insertions, 0 deletions
diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go
new file mode 100644
index 0000000..94916aa
--- /dev/null
+++ b/fs/sync/pipe_test.go
@@ -0,0 +1,290 @@
+package sync
+
+import (
+ "container/heap"
+ "context"
+ "sync"
+ "sync/atomic"
+ "testing"
+
+ "github.com/rclone/rclone/fs"
+ "github.com/rclone/rclone/fstest/mockobject"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// Check interface satisfied
+var _ heap.Interface = (*pipe)(nil)
+
+func TestPipe(t *testing.T) {
+ var queueLength int
+ var queueSize int64
+ stats := func(n int, size int64) {
+ queueLength, queueSize = n, size
+ }
+
+ // Make a new pipe
+ p, err := newPipe("", stats, 10)
+ require.NoError(t, err)
+
+ checkStats := func(expectedN int, expectedSize int64) {
+ n, size := p.Stats()
+ assert.Equal(t, expectedN, n)
+ assert.Equal(t, expectedSize, size)
+ assert.Equal(t, expectedN, queueLength)
+ assert.Equal(t, expectedSize, queueSize)
+ }
+
+ checkStats(0, 0)
+
+ ctx := context.Background()
+
+ obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
+
+ pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
+ pairD := fs.ObjectPair{Src: obj1, Dst: obj1} // this object should not count to the stats
+
+ // Put an object
+ ok := p.Put(ctx, pair1)
+ assert.Equal(t, true, ok)
+ checkStats(1, 5)
+
+ // Put an object to be deleted
+ ok = p.Put(ctx, pairD)
+ assert.Equal(t, true, ok)
+ checkStats(2, 5)
+
+ // Close the pipe showing reading on closed pipe is OK
+ p.Close()
+
+ // Read from pipe
+ pair2, ok := p.Get(ctx)
+ assert.Equal(t, pair1, pair2)
+ assert.Equal(t, true, ok)
+ checkStats(1, 0)
+
+ // Read from pipe
+ pair2, ok = p.Get(ctx)
+ assert.Equal(t, pairD, pair2)
+ assert.Equal(t, true, ok)
+ checkStats(0, 0)
+
+ // Check read on closed pipe
+ pair2, ok = p.Get(ctx)
+ assert.Equal(t, fs.ObjectPair{}, pair2)
+ assert.Equal(t, false, ok)
+
+ // Check panic on write to closed pipe
+ assert.Panics(t, func() { p.Put(ctx, pair1) })
+
+ // Make a new pipe
+ p, err = newPipe("", stats, 10)
+ require.NoError(t, err)
+ ctx2, cancel := context.WithCancel(ctx)
+
+ // cancel it in the background - check read ceases
+ go cancel()
+ pair2, ok = p.Get(ctx2)
+ assert.Equal(t, fs.ObjectPair{}, pair2)
+ assert.Equal(t, false, ok)
+
+ // check we can't write
+ ok = p.Put(ctx2, pair1)
+ assert.Equal(t, false, ok)
+
+}
+
+// TestPipeConcurrent runs concurrent Get and Put to flush out any
+// race conditions and concurrency problems.
+func TestPipeConcurrent(t *testing.T) {
+ const (
+ N = 1000
+ readWriters = 10
+ )
+
+ stats := func(n int, size int64) {}
+
+ // Make a new pipe
+ p, err := newPipe("", stats, 10)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
+ pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
+ ctx := context.Background()
+ var count atomic.Int64
+
+ for range readWriters {
+ wg.Add(2)
+ go func() {
+ defer wg.Done()
+ for range N {
+ // Read from pipe
+ pair2, ok := p.Get(ctx)
+ assert.Equal(t, pair1, pair2)
+ assert.Equal(t, true, ok)
+ count.Add(-1)
+ }
+ }()
+ go func() {
+ defer wg.Done()
+ for range N {
+ // Put an object
+ ok := p.Put(ctx, pair1)
+ assert.Equal(t, true, ok)
+ count.Add(1)
+ }
+ }()
+ }
+ wg.Wait()
+
+ assert.Equal(t, int64(0), count.Load())
+}
+
+func TestPipeOrderBy(t *testing.T) {
+ var (
+ stats = func(n int, size int64) {}
+ ctx = context.Background()
+ obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
+ obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
+ pair1 = fs.ObjectPair{Src: obj1}
+ pair2 = fs.ObjectPair{Src: obj2}
+ )
+
+ for _, test := range []struct {
+ orderBy string
+ swapped1 bool
+ swapped2 bool
+ fraction int
+ }{
+ {"", false, true, -1},
+ {"size", false, false, -1},
+ {"name", true, true, -1},
+ {"modtime", false, true, -1},
+ {"size,ascending", false, false, -1},
+ {"name,asc", true, true, -1},
+ {"modtime,ascending", false, true, -1},
+ {"size,descending", true, true, -1},
+ {"name,desc", false, false, -1},
+ {"modtime,descending", true, false, -1},
+ {"size,mixed,50", false, false, 25},
+ {"size,mixed,51", true, true, 75},
+ } {
+ t.Run(test.orderBy, func(t *testing.T) {
+ p, err := newPipe(test.orderBy, stats, 10)
+ require.NoError(t, err)
+
+ readAndCheck := func(swapped bool) {
+ var readFirst, readSecond fs.ObjectPair
+ var ok1, ok2 bool
+ if test.fraction < 0 {
+ readFirst, ok1 = p.Get(ctx)
+ readSecond, ok2 = p.Get(ctx)
+ } else {
+ readFirst, ok1 = p.GetMax(ctx, test.fraction)
+ readSecond, ok2 = p.GetMax(ctx, test.fraction)
+ }
+ assert.True(t, ok1)
+ assert.True(t, ok2)
+
+ if swapped {
+ assert.True(t, readFirst == pair2 && readSecond == pair1)
+ } else {
+ assert.True(t, readFirst == pair1 && readSecond == pair2)
+ }
+ }
+
+ ok := p.Put(ctx, pair1)
+ assert.True(t, ok)
+ ok = p.Put(ctx, pair2)
+ assert.True(t, ok)
+
+ readAndCheck(test.swapped1)
+
+ // insert other way round
+
+ ok = p.Put(ctx, pair2)
+ assert.True(t, ok)
+ ok = p.Put(ctx, pair1)
+ assert.True(t, ok)
+
+ readAndCheck(test.swapped2)
+ })
+ }
+}
+
+func TestNewLess(t *testing.T) {
+ t.Run("blankOK", func(t *testing.T) {
+ less, _, err := newLess("")
+ require.NoError(t, err)
+ assert.Nil(t, less)
+ })
+
+ t.Run("tooManyParts", func(t *testing.T) {
+ _, _, err := newLess("size,asc,toomanyparts")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "bad --order-by string")
+ })
+
+ t.Run("tooManyParts2", func(t *testing.T) {
+ _, _, err := newLess("size,mixed,50,toomanyparts")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "bad --order-by string")
+ })
+
+ t.Run("badMixed", func(t *testing.T) {
+ _, _, err := newLess("size,mixed,32.7")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "bad mixed fraction")
+ })
+
+ t.Run("unknownComparison", func(t *testing.T) {
+ _, _, err := newLess("potato")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "unknown --order-by comparison")
+ })
+
+ t.Run("unknownSortDirection", func(t *testing.T) {
+ _, _, err := newLess("name,sideways")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "unknown --order-by sort direction")
+ })
+
+ var (
+ obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
+ obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
+ pair1 = fs.ObjectPair{Src: obj1}
+ pair2 = fs.ObjectPair{Src: obj2}
+ )
+
+ for _, test := range []struct {
+ orderBy string
+ pair1LessPair2 bool
+ pair2LessPair1 bool
+ wantFraction int
+ }{
+ {"size", true, false, -1},
+ {"name", false, true, -1},
+ {"modtime", false, false, -1},
+ {"size,ascending", true, false, -1},
+ {"name,asc", false, true, -1},
+ {"modtime,ascending", false, false, -1},
+ {"size,descending", false, true, -1},
+ {"name,desc", true, false, -1},
+ {"modtime,descending", true, true, -1},
+ {"modtime,mixed", false, false, 50},
+ {"modtime,mixed,30", false, false, 30},
+ } {
+ t.Run(test.orderBy, func(t *testing.T) {
+ less, gotFraction, err := newLess(test.orderBy)
+ assert.Equal(t, test.wantFraction, gotFraction)
+ require.NoError(t, err)
+ require.NotNil(t, less)
+ pair1LessPair2 := less(pair1, pair2)
+ assert.Equal(t, test.pair1LessPair2, pair1LessPair2)
+ pair2LessPair1 := less(pair2, pair1)
+ assert.Equal(t, test.pair2LessPair1, pair2LessPair1)
+ })
+ }
+
+}