diff options
Diffstat (limited to 'fs/sync/pipe.go')
| -rw-r--r-- | fs/sync/pipe.go | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/fs/sync/pipe.go b/fs/sync/pipe.go new file mode 100644 index 0000000..124a077 --- /dev/null +++ b/fs/sync/pipe.go @@ -0,0 +1,237 @@ +package sync + +import ( + "context" + "fmt" + "math/bits" + "strconv" + "strings" + "sync" + + "github.com/aalpar/deheap" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" +) + +// compare two items for order by +type lessFn func(a, b fs.ObjectPair) bool + +// pipe provides an unbounded channel like experience +// +// Note unlike channels these aren't strictly ordered. +type pipe struct { + mu sync.Mutex + c chan struct{} + queue []fs.ObjectPair + closed bool + totalSize int64 + stats func(items int, totalSize int64) + less lessFn + fraction int +} + +func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) { + if maxBacklog < 0 { + maxBacklog = (1 << (bits.UintSize - 1)) - 1 // largest positive int + } + less, fraction, err := newLess(orderBy) + if err != nil { + return nil, fserrors.FatalError(err) + } + p := &pipe{ + c: make(chan struct{}, maxBacklog), + stats: stats, + less: less, + fraction: fraction, + } + if p.less != nil { + deheap.Init(p) + } + return p, nil +} + +// Len satisfy heap.Interface - must be called with lock held +func (p *pipe) Len() int { + return len(p.queue) +} + +// Len satisfy heap.Interface - must be called with lock held +func (p *pipe) Less(i, j int) bool { + return p.less(p.queue[i], p.queue[j]) +} + +// Swap satisfy heap.Interface - must be called with lock held +func (p *pipe) Swap(i, j int) { + p.queue[i], p.queue[j] = p.queue[j], p.queue[i] +} + +// Push satisfy heap.Interface - must be called with lock held +func (p *pipe) Push(item any) { + p.queue = append(p.queue, item.(fs.ObjectPair)) +} + +// Pop satisfy heap.Interface - must be called with lock held +func (p *pipe) Pop() any { + old := p.queue + n := len(old) + item := old[n-1] + old[n-1] = fs.ObjectPair{} // avoid memory leak + p.queue = old[0 : n-1] + return item +} + +// Put a pair into the pipe +// +// It returns ok = false if the context was cancelled +// +// It will panic if you call it after Close() +// +// Note that pairs where src==dst aren't counted for stats +func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) { + if ctx.Err() != nil { + return false + } + p.mu.Lock() + if p.less == nil { + // no order-by + p.queue = append(p.queue, pair) + } else { + deheap.Push(p, pair) + } + size := pair.Src.Size() + if size > 0 && pair.Src != pair.Dst { + p.totalSize += size + } + p.stats(len(p.queue), p.totalSize) + p.mu.Unlock() + select { + case <-ctx.Done(): + return false + case p.c <- struct{}{}: + } + return true +} + +// Get a pair from the pipe +// +// If fraction is > the mixed fraction set in the pipe then it gets it +// from the other end of the heap if order-by is in effect +// +// It returns ok = false if the context was cancelled or Close() has +// been called. +func (p *pipe) GetMax(ctx context.Context, fraction int) (pair fs.ObjectPair, ok bool) { + if ctx.Err() != nil { + return + } + select { + case <-ctx.Done(): + return + case _, ok = <-p.c: + if !ok { + return + } + } + p.mu.Lock() + if p.less == nil { + // no order-by + pair = p.queue[0] + p.queue[0] = fs.ObjectPair{} // avoid memory leak + p.queue = p.queue[1:] + } else if p.fraction < 0 || fraction < p.fraction { + pair = deheap.Pop(p).(fs.ObjectPair) + } else { + pair = deheap.PopMax(p).(fs.ObjectPair) + } + size := pair.Src.Size() + if size > 0 && pair.Src != pair.Dst { + p.totalSize -= size + } + if p.totalSize < 0 { + p.totalSize = 0 + } + p.stats(len(p.queue), p.totalSize) + p.mu.Unlock() + return pair, true +} + +// Get a pair from the pipe +// +// It returns ok = false if the context was cancelled or Close() has +// been called. +func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { + return p.GetMax(ctx, -1) +} + +// Stats reads the number of items in the queue and the totalSize +func (p *pipe) Stats() (items int, totalSize int64) { + p.mu.Lock() + items, totalSize = len(p.queue), p.totalSize + p.mu.Unlock() + return items, totalSize +} + +// Close the pipe +// +// Writes to a closed pipe will panic as will double closing a pipe +func (p *pipe) Close() { + p.mu.Lock() + close(p.c) + p.closed = true + p.mu.Unlock() +} + +// newLess returns a less function for the heap comparison or nil if +// one is not required +func newLess(orderBy string) (less lessFn, fraction int, err error) { + fraction = -1 + if orderBy == "" { + return nil, fraction, nil + } + parts := strings.Split(strings.ToLower(orderBy), ",") + switch parts[0] { + case "name": + less = func(a, b fs.ObjectPair) bool { + return a.Src.Remote() < b.Src.Remote() + } + case "size": + less = func(a, b fs.ObjectPair) bool { + return a.Src.Size() < b.Src.Size() + } + case "modtime": + less = func(a, b fs.ObjectPair) bool { + ctx := context.Background() + return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx)) + } + default: + return nil, fraction, fmt.Errorf("unknown --order-by comparison %q", parts[0]) + } + descending := false + if len(parts) > 1 { + switch parts[1] { + case "ascending", "asc": + case "descending", "desc": + descending = true + case "mixed": + fraction = 50 + if len(parts) > 2 { + fraction, err = strconv.Atoi(parts[2]) + if err != nil { + return nil, fraction, fmt.Errorf("bad mixed fraction --order-by %q", parts[2]) + } + } + + default: + return nil, fraction, fmt.Errorf("unknown --order-by sort direction %q", parts[1]) + } + } + if (fraction >= 0 && len(parts) > 3) || (fraction < 0 && len(parts) > 2) { + return nil, fraction, fmt.Errorf("bad --order-by string %q", orderBy) + } + if descending { + oldLess := less + less = func(a, b fs.ObjectPair) bool { + return !oldLess(a, b) + } + } + return less, fraction, nil +} |
