diff options
Diffstat (limited to 'fs/sync/pipe_test.go')
| -rw-r--r-- | fs/sync/pipe_test.go | 290 |
1 files changed, 290 insertions, 0 deletions
diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go new file mode 100644 index 0000000..94916aa --- /dev/null +++ b/fs/sync/pipe_test.go @@ -0,0 +1,290 @@ +package sync + +import ( + "container/heap" + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Check interface satisfied +var _ heap.Interface = (*pipe)(nil) + +func TestPipe(t *testing.T) { + var queueLength int + var queueSize int64 + stats := func(n int, size int64) { + queueLength, queueSize = n, size + } + + // Make a new pipe + p, err := newPipe("", stats, 10) + require.NoError(t, err) + + checkStats := func(expectedN int, expectedSize int64) { + n, size := p.Stats() + assert.Equal(t, expectedN, n) + assert.Equal(t, expectedSize, size) + assert.Equal(t, expectedN, queueLength) + assert.Equal(t, expectedSize, queueSize) + } + + checkStats(0, 0) + + ctx := context.Background() + + obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) + + pair1 := fs.ObjectPair{Src: obj1, Dst: nil} + pairD := fs.ObjectPair{Src: obj1, Dst: obj1} // this object should not count to the stats + + // Put an object + ok := p.Put(ctx, pair1) + assert.Equal(t, true, ok) + checkStats(1, 5) + + // Put an object to be deleted + ok = p.Put(ctx, pairD) + assert.Equal(t, true, ok) + checkStats(2, 5) + + // Close the pipe showing reading on closed pipe is OK + p.Close() + + // Read from pipe + pair2, ok := p.Get(ctx) + assert.Equal(t, pair1, pair2) + assert.Equal(t, true, ok) + checkStats(1, 0) + + // Read from pipe + pair2, ok = p.Get(ctx) + assert.Equal(t, pairD, pair2) + assert.Equal(t, true, ok) + checkStats(0, 0) + + // Check read on closed pipe + pair2, ok = p.Get(ctx) + assert.Equal(t, fs.ObjectPair{}, pair2) + assert.Equal(t, false, ok) + + // Check panic on write to closed pipe + assert.Panics(t, func() { p.Put(ctx, pair1) }) + + // Make a new pipe + p, err = newPipe("", stats, 10) + require.NoError(t, err) + ctx2, cancel := context.WithCancel(ctx) + + // cancel it in the background - check read ceases + go cancel() + pair2, ok = p.Get(ctx2) + assert.Equal(t, fs.ObjectPair{}, pair2) + assert.Equal(t, false, ok) + + // check we can't write + ok = p.Put(ctx2, pair1) + assert.Equal(t, false, ok) + +} + +// TestPipeConcurrent runs concurrent Get and Put to flush out any +// race conditions and concurrency problems. +func TestPipeConcurrent(t *testing.T) { + const ( + N = 1000 + readWriters = 10 + ) + + stats := func(n int, size int64) {} + + // Make a new pipe + p, err := newPipe("", stats, 10) + require.NoError(t, err) + + var wg sync.WaitGroup + obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) + pair1 := fs.ObjectPair{Src: obj1, Dst: nil} + ctx := context.Background() + var count atomic.Int64 + + for range readWriters { + wg.Add(2) + go func() { + defer wg.Done() + for range N { + // Read from pipe + pair2, ok := p.Get(ctx) + assert.Equal(t, pair1, pair2) + assert.Equal(t, true, ok) + count.Add(-1) + } + }() + go func() { + defer wg.Done() + for range N { + // Put an object + ok := p.Put(ctx, pair1) + assert.Equal(t, true, ok) + count.Add(1) + } + }() + } + wg.Wait() + + assert.Equal(t, int64(0), count.Load()) +} + +func TestPipeOrderBy(t *testing.T) { + var ( + stats = func(n int, size int64) {} + ctx = context.Background() + obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) + obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) + pair1 = fs.ObjectPair{Src: obj1} + pair2 = fs.ObjectPair{Src: obj2} + ) + + for _, test := range []struct { + orderBy string + swapped1 bool + swapped2 bool + fraction int + }{ + {"", false, true, -1}, + {"size", false, false, -1}, + {"name", true, true, -1}, + {"modtime", false, true, -1}, + {"size,ascending", false, false, -1}, + {"name,asc", true, true, -1}, + {"modtime,ascending", false, true, -1}, + {"size,descending", true, true, -1}, + {"name,desc", false, false, -1}, + {"modtime,descending", true, false, -1}, + {"size,mixed,50", false, false, 25}, + {"size,mixed,51", true, true, 75}, + } { + t.Run(test.orderBy, func(t *testing.T) { + p, err := newPipe(test.orderBy, stats, 10) + require.NoError(t, err) + + readAndCheck := func(swapped bool) { + var readFirst, readSecond fs.ObjectPair + var ok1, ok2 bool + if test.fraction < 0 { + readFirst, ok1 = p.Get(ctx) + readSecond, ok2 = p.Get(ctx) + } else { + readFirst, ok1 = p.GetMax(ctx, test.fraction) + readSecond, ok2 = p.GetMax(ctx, test.fraction) + } + assert.True(t, ok1) + assert.True(t, ok2) + + if swapped { + assert.True(t, readFirst == pair2 && readSecond == pair1) + } else { + assert.True(t, readFirst == pair1 && readSecond == pair2) + } + } + + ok := p.Put(ctx, pair1) + assert.True(t, ok) + ok = p.Put(ctx, pair2) + assert.True(t, ok) + + readAndCheck(test.swapped1) + + // insert other way round + + ok = p.Put(ctx, pair2) + assert.True(t, ok) + ok = p.Put(ctx, pair1) + assert.True(t, ok) + + readAndCheck(test.swapped2) + }) + } +} + +func TestNewLess(t *testing.T) { + t.Run("blankOK", func(t *testing.T) { + less, _, err := newLess("") + require.NoError(t, err) + assert.Nil(t, less) + }) + + t.Run("tooManyParts", func(t *testing.T) { + _, _, err := newLess("size,asc,toomanyparts") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad --order-by string") + }) + + t.Run("tooManyParts2", func(t *testing.T) { + _, _, err := newLess("size,mixed,50,toomanyparts") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad --order-by string") + }) + + t.Run("badMixed", func(t *testing.T) { + _, _, err := newLess("size,mixed,32.7") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad mixed fraction") + }) + + t.Run("unknownComparison", func(t *testing.T) { + _, _, err := newLess("potato") + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown --order-by comparison") + }) + + t.Run("unknownSortDirection", func(t *testing.T) { + _, _, err := newLess("name,sideways") + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown --order-by sort direction") + }) + + var ( + obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) + obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) + pair1 = fs.ObjectPair{Src: obj1} + pair2 = fs.ObjectPair{Src: obj2} + ) + + for _, test := range []struct { + orderBy string + pair1LessPair2 bool + pair2LessPair1 bool + wantFraction int + }{ + {"size", true, false, -1}, + {"name", false, true, -1}, + {"modtime", false, false, -1}, + {"size,ascending", true, false, -1}, + {"name,asc", false, true, -1}, + {"modtime,ascending", false, false, -1}, + {"size,descending", false, true, -1}, + {"name,desc", true, false, -1}, + {"modtime,descending", true, true, -1}, + {"modtime,mixed", false, false, 50}, + {"modtime,mixed,30", false, false, 30}, + } { + t.Run(test.orderBy, func(t *testing.T) { + less, gotFraction, err := newLess(test.orderBy) + assert.Equal(t, test.wantFraction, gotFraction) + require.NoError(t, err) + require.NotNil(t, less) + pair1LessPair2 := less(pair1, pair2) + assert.Equal(t, test.pair1LessPair2, pair1LessPair2) + pair2LessPair1 := less(pair2, pair1) + assert.Equal(t, test.pair2LessPair1, pair2LessPair1) + }) + } + +} |
