diff options
Diffstat (limited to 'fs/operations/multithread.go')
| -rw-r--r-- | fs/operations/multithread.go | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go new file mode 100644 index 0000000..9abd5f7 --- /dev/null +++ b/fs/operations/multithread.go @@ -0,0 +1,380 @@ +package operations + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/multipart" + "github.com/rclone/rclone/lib/pool" + "golang.org/x/sync/errgroup" +) + +const ( + multithreadChunkSize = 64 << 10 +) + +// Return a boolean as to whether we should use multi thread copy for +// this transfer +func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool { + ci := fs.GetConfig(ctx) + + // Disable multi thread if... + + // ...it isn't configured + if ci.MultiThreadStreams <= 1 { + return false + } + // ...if the source doesn't support it + if src.Fs().Features().NoMultiThreading { + return false + } + // ...size of object is less than cutoff + if src.Size() < int64(ci.MultiThreadCutoff) { + return false + } + // ...destination doesn't support it + dstFeatures := f.Features() + if dstFeatures.OpenChunkWriter == nil && dstFeatures.OpenWriterAt == nil { + return false + } + // ...if --multi-thread-streams not in use and source and + // destination are both local + if !ci.MultiThreadSet && dstFeatures.IsLocal && src.Fs().Features().IsLocal { + return false + } + return true +} + +// state for a multi-thread copy +type multiThreadCopyState struct { + ctx context.Context + partSize int64 + size int64 + src fs.Object + acc *accounting.Account + numChunks int + noBuffering bool // set to read the input without buffering +} + +// Copy a single chunk into place +func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer fs.ChunkWriter) (err error) { + defer func() { + if err != nil { + fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d failed: %v", chunk+1, mc.numChunks, err) + } + }() + start := int64(chunk) * mc.partSize + if start >= mc.size { + return nil + } + end := min(start+mc.partSize, mc.size) + size := end - start + + // Reserve the memory first so we don't open the source and wait for memory buffers for ages + var rw *pool.RW + if !mc.noBuffering { + rw = multipart.NewRW().Reserve(size) + defer fs.CheckClose(rw, &err) + } + + fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d (%d-%d) size %v starting", chunk+1, mc.numChunks, start, end, fs.SizeSuffix(size)) + + rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1}) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to open source: %w", err) + } + defer fs.CheckClose(rc, &err) + + var rs io.ReadSeeker + if mc.noBuffering { + // Read directly if we are sure we aren't going to seek + // and account with accounting + rc.SetAccounting(mc.acc.AccountRead) + rs = rc + } else { + // Read the chunk into buffered reader + _, err = io.CopyN(rw, rc, size) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err) + } + // Account as we go + rw.SetAccounting(mc.acc.AccountRead) + rs = rw + } + + // Write the chunk + bytesWritten, err := writer.WriteChunk(ctx, chunk, rs) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err) + } + + fs.Debugf(mc.src, "multi-thread copy: chunk %d/%d (%d-%d) size %v finished", chunk+1, mc.numChunks, start, end, fs.SizeSuffix(bytesWritten)) + return nil +} + +// Given a file size and a chunkSize +// it returns the number of chunks, so that chunkSize * numChunks >= size +func calculateNumChunks(size int64, chunkSize int64) int { + numChunks := size / chunkSize + if size%chunkSize != 0 { + numChunks++ + } + return int(numChunks) +} + +// Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature +// and if that's not available it creates an adapter using OpenWriterAt +func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption) (newDst fs.Object, err error) { + openChunkWriter := f.Features().OpenChunkWriter + ci := fs.GetConfig(ctx) + noBuffering := false + usingOpenWriterAt := false + if openChunkWriter == nil { + openWriterAt := f.Features().OpenWriterAt + if openWriterAt == nil { + return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported") + } + openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f) + // If we are using OpenWriterAt we don't seek the chunks so don't need to buffer + fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt") + noBuffering = true + usingOpenWriterAt = true + } else if src.Fs().Features().IsLocal { + // If the source fs is local we don't need to buffer + fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk") + noBuffering = true + } else if f.Features().ChunkWriterDoesntSeek { + // If the destination Fs promises not to seek its chunks + // (except for retries) then we don't need buffering. + fs.Debugf(src, "multi-thread copy: disabling buffering because destination has set ChunkWriterDoesntSeek") + noBuffering = true + } + + if src.Size() < 0 { + return nil, fmt.Errorf("multi-thread copy: can't copy unknown sized file") + } + if src.Size() == 0 { + return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file") + } + + info, chunkWriter, err := openChunkWriter(ctx, remote, src, options...) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err) + } + + uploadCtx, cancel := context.WithCancel(ctx) + defer cancel() + uploadedOK := false + defer atexit.OnError(&err, func() { + cancel() + if info.LeavePartsOnError || uploadedOK { + return + } + fs.Debugf(src, "multi-thread copy: cancelling transfer on exit") + abortErr := chunkWriter.Abort(ctx) + if abortErr != nil { + fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr) + } + })() + + if info.ChunkSize > src.Size() { + fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(info.ChunkSize), fs.SizeSuffix(src.Size())) + info.ChunkSize = src.Size() + } + + // Use the backend concurrency if it is higher than --multi-thread-streams or if --multi-thread-streams wasn't set explicitly + if !ci.MultiThreadSet || info.Concurrency > concurrency { + fs.Debugf(src, "multi-thread copy: using backend concurrency of %d instead of --multi-thread-streams %d", info.Concurrency, concurrency) + concurrency = info.Concurrency + } + + numChunks := calculateNumChunks(src.Size(), info.ChunkSize) + if concurrency > numChunks { + fs.Debugf(src, "multi-thread copy: number of streams %d was bigger than number of chunks %d", concurrency, numChunks) + concurrency = numChunks + } + + if concurrency < 1 { + concurrency = 1 + } + + g, gCtx := errgroup.WithContext(uploadCtx) + g.SetLimit(concurrency) + + mc := &multiThreadCopyState{ + ctx: gCtx, + size: src.Size(), + src: src, + partSize: info.ChunkSize, + numChunks: numChunks, + noBuffering: noBuffering, + } + + // Make accounting + mc.acc = tr.Account(gCtx, nil) + + fs.Debugf(src, "Starting multi-thread copy with %d chunks of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), concurrency) + for chunk := range mc.numChunks { + // Fail fast, in case an errgroup managed function returns an error + if gCtx.Err() != nil { + break + } + chunk := chunk + g.Go(func() error { + return mc.copyChunk(gCtx, chunk, chunkWriter) + }) + } + + err = g.Wait() + if err != nil { + return nil, err + } + err = chunkWriter.Close(ctx) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", err) + } + uploadedOK = true // file is definitely uploaded OK so no need to abort + + obj, err := f.NewObject(ctx, remote) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err) + } + + // OpenWriterAt doesn't set metadata so we need to set it on completion + if usingOpenWriterAt { + setModTime := true + if ci.Metadata { + do, ok := obj.(fs.SetMetadataer) + if ok { + meta, err := fs.GetMetadataOptions(ctx, f, src, options) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to read metadata from source object: %w", err) + } + if _, foundMeta := meta["mtime"]; !foundMeta { + meta.Set("mtime", src.ModTime(ctx).Format(time.RFC3339Nano)) + } + err = do.SetMetadata(ctx, meta) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to set metadata: %w", err) + } + setModTime = false + } else { + fs.Errorf(obj, "multi-thread copy: can't set metadata as SetMetadata isn't implemented in: %v", f) + } + } + if setModTime { + err = obj.SetModTime(ctx, src.ModTime(ctx)) + switch err { + case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete: + default: + return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err) + } + } + } + + fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.numChunks, fs.SizeSuffix(mc.partSize)) + return obj, nil +} + +// writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter +type writerAtChunkWriter struct { + remote string + size int64 + writerAt fs.WriterAtCloser + chunkSize int64 + chunks int + writeBufferSize int64 + f fs.Fs + closed bool +} + +// WriteChunk writes chunkNumber from reader +func (w *writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { + fs.Debugf(w.remote, "writing chunk %v", chunkNumber) + + bytesToWrite := w.chunkSize + if chunkNumber == (w.chunks-1) && w.size%w.chunkSize != 0 { + bytesToWrite = w.size % w.chunkSize + } + + var writer io.Writer = io.NewOffsetWriter(w.writerAt, int64(chunkNumber)*w.chunkSize) + if w.writeBufferSize > 0 { + writer = bufio.NewWriterSize(writer, int(w.writeBufferSize)) + } + n, err := io.Copy(writer, reader) + if err != nil { + return -1, err + } + if n != bytesToWrite { + return -1, fmt.Errorf("expected to write %v bytes for chunk %v, but wrote %v bytes", bytesToWrite, chunkNumber, n) + } + // if we were buffering, flush to disk + switch w := writer.(type) { + case *bufio.Writer: + err = w.Flush() + if err != nil { + return -1, fmt.Errorf("multi-thread copy: flush failed: %w", err) + } + } + return n, nil +} + +// Close the chunk writing +func (w *writerAtChunkWriter) Close(ctx context.Context) error { + if w.closed { + return nil + } + w.closed = true + return w.writerAt.Close() +} + +// Abort the chunk writing +func (w *writerAtChunkWriter) Abort(ctx context.Context) error { + err := w.Close(ctx) + if err != nil { + fs.Errorf(w.remote, "multi-thread copy: failed to close file before aborting: %v", err) + } + obj, err := w.f.NewObject(ctx, w.remote) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err) + } + return obj.Remove(ctx) +} + +// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize +func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn { + return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { + ci := fs.GetConfig(ctx) + + writerAt, err := openWriterAt(ctx, remote, src.Size()) + if err != nil { + return info, nil, err + } + + if writeBufferSize > 0 { + fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writeBufferSize) + } + + chunkWriter := &writerAtChunkWriter{ + remote: remote, + size: src.Size(), + chunkSize: chunkSize, + chunks: calculateNumChunks(src.Size(), chunkSize), + writerAt: writerAt, + writeBufferSize: writeBufferSize, + f: f, + } + info = fs.ChunkWriterInfo{ + ChunkSize: chunkSize, + Concurrency: ci.MultiThreadStreams, + } + return info, chunkWriter, nil + } +} |
