diff options
Diffstat (limited to 'fs/operations/copy.go')
| -rw-r--r-- | fs/operations/copy.go | 422 |
1 files changed, 422 insertions, 0 deletions
diff --git a/fs/operations/copy.go b/fs/operations/copy.go new file mode 100644 index 0000000..f7871d2 --- /dev/null +++ b/fs/operations/copy.go @@ -0,0 +1,422 @@ +// This file implements operations.Copy +// +// This is probably the most important operation in rclone. + +package operations + +import ( + "context" + "errors" + "fmt" + "hash/crc32" + "io" + "path" + "strings" + "time" + "unicode/utf8" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/transform" +) + +// State of the copy +type copy struct { + f fs.Fs // destination fs.Fs + dstFeatures *fs.Features // Features() for fs.Fs + dst fs.Object // destination object to update, may be nil + remote string // destination path, used if dst is nil + src fs.Object // source object + ci *fs.ConfigInfo // current config + maxTries int // max number of tries to do the copy + doUpdate bool // whether we are updating an existing file or not + hashType hash.Type // common hash to use + hashOption *fs.HashesOption // open option for the common hash + tr *accounting.Transfer // accounting for the transfer + inplace bool // set if we are updating inplace and not using a partial name + remoteForCopy string // the name used for the transfer, either remote or remote+".partial" +} + +// Used to remove a failed copy +func (c *copy) removeFailedCopy(ctx context.Context, o fs.Object) { + if o == nil { + return + } + fs.Infof(o, "Removing failed copy") + err := o.Remove(ctx) + if err != nil { + fs.Infof(o, "Failed to remove failed copy: %s", err) + } +} + +// Used to remove a failed partial copy +func (c *copy) removeFailedPartialCopy(ctx context.Context, f fs.Fs, remote string) { + o, err := f.NewObject(ctx, remote) + if errors.Is(err, fs.ErrorObjectNotFound) { + // Assume object has been deleted + return + } + if err != nil { + fs.Infof(remote, "Failed to remove failed partial copy: %s", err) + return + } + c.removeFailedCopy(ctx, o) +} + +// TruncateString s to n bytes. +// +// If s is valid UTF-8 then this may truncate to fewer than n bytes to +// make the returned string also valid UTF-8. +func TruncateString(s string, n int) string { + truncated := s[:n] + if !utf8.ValidString(s) { + // If input string wasn't valid UTF-8 then just return the truncation + return truncated + } + for len(truncated) > 0 { + if utf8.ValidString(truncated) { + return truncated + } + // Remove 1 byte until valid + truncated = truncated[:len(truncated)-1] + } + return truncated +} + +// Check to see if we should be using a partial name and return the name for the copy and the inplace flag +func (c *copy) checkPartial(ctx context.Context) (remoteForCopy string, inplace bool, err error) { + remoteForCopy = c.remote + if c.ci.Inplace || c.dstFeatures.Move == nil || !c.dstFeatures.PartialUploads || strings.HasSuffix(c.remote, ".rclonelink") { + return remoteForCopy, true, nil + } + if len(c.ci.PartialSuffix) > 16 { + return remoteForCopy, true, fmt.Errorf("expecting length of --partial-suffix to be not greater than %d but got %d", 16, len(c.ci.PartialSuffix)) + } + // Avoid making the leaf name longer if it's already lengthy to avoid + // trouble with file name length limits. + + // generate a stable random suffix by hashing the filename and fingerprint + hasher := crc32.New(crc32.IEEETable) + _, _ = hasher.Write([]byte(c.remote)) + _, _ = hasher.Write([]byte(fs.Fingerprint(ctx, c.src, true))) + hash := hasher.Sum32() + + suffix := fmt.Sprintf(".%08x%s", hash, c.ci.PartialSuffix) + base := path.Base(remoteForCopy) + if len(base) > 100 { + remoteForCopy = TruncateString(remoteForCopy, len(remoteForCopy)-len(suffix)) + suffix + } else { + remoteForCopy += suffix + } + return remoteForCopy, false, nil +} + +// Check to see if we have hit max transfer limits +func (c *copy) checkLimits(ctx context.Context) (err error) { + if c.ci.MaxTransfer < 0 { + return nil + } + var bytesSoFar int64 + if c.ci.CutoffMode == fs.CutoffModeCautious { + bytesSoFar = accounting.Stats(ctx).GetBytesWithPending() + c.src.Size() + } else { + bytesSoFar = accounting.Stats(ctx).GetBytes() + } + if bytesSoFar >= int64(c.ci.MaxTransfer) { + if c.ci.CutoffMode == fs.CutoffModeHard { + return accounting.ErrorMaxTransferLimitReachedFatal + } + return accounting.ErrorMaxTransferLimitReachedGraceful + } + return nil +} + +// Server side copy c.src to (c.f, c.remoteForCopy) if possible or return fs.ErrorCantCopy if not +func (c *copy) serverSideCopy(ctx context.Context) (actionTaken string, newDst fs.Object, err error) { + doCopy := c.dstFeatures.Copy + serverSideCopyOK := false + if doCopy == nil { + serverSideCopyOK = false + } else if SameConfig(c.src.Fs(), c.f) { + serverSideCopyOK = true + } else if SameRemoteType(c.src.Fs(), c.f) { + serverSideCopyOK = c.dstFeatures.ServerSideAcrossConfigs || c.ci.ServerSideAcrossConfigs + } + if !serverSideCopyOK { + return actionTaken, nil, fs.ErrorCantCopy + } + in := c.tr.Account(ctx, nil) // account the transfer + in.ServerSideTransferStart() + newDst, err = doCopy(ctx, c.src, c.remoteForCopy) + if err == nil { + in.ServerSideCopyEnd(newDst.Size()) // account the bytes for the server-side transfer + } + _ = in.Close() + if errors.Is(err, fs.ErrorCantCopy) { + c.tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy + } + actionTaken = "Copied (server-side copy)" + return actionTaken, newDst, err +} + +// Copy c.src to (c.f, c.remoteForCopy) using multiThreadCopy +func (c *copy) multiThreadCopy(ctx context.Context, uploadOptions []fs.OpenOption) (actionTaken string, newDst fs.Object, err error) { + newDst, err = multiThreadCopy(ctx, c.f, c.remoteForCopy, c.src, c.ci.MultiThreadStreams, c.tr, uploadOptions...) + if c.doUpdate { + actionTaken = "Multi-thread Copied (replaced existing)" + } else { + actionTaken = "Multi-thread Copied (new)" + } + return actionTaken, newDst, err +} + +// Copy the stream from in to (c.f, c.remoteForCopy) and close it +// +// Use Rcat to handle both remotes supporting and not supporting PutStream. +func (c *copy) rcat(ctx context.Context, in io.ReadCloser) (actionTaken string, newDst fs.Object, err error) { + // Make any metadata to pass to rcat + var meta fs.Metadata + if c.ci.Metadata { + meta, err = fs.GetMetadata(ctx, c.src) + if err != nil { + fs.Errorf(c.src, "Failed to read metadata: %v", err) + } + } + + // NB Rcat closes in0 + fsrc, ok := c.src.Fs().(fs.Fs) + if !ok { + fsrc = nil + } + newDst, err = rcatSrc(ctx, c.f, c.remoteForCopy, in, c.src.ModTime(ctx), meta, fsrc) + if c.doUpdate { + actionTaken = "Copied (Rcat, replaced existing)" + } else { + actionTaken = "Copied (Rcat, new)" + } + return actionTaken, newDst, err +} + +// Copy the stream from in to (c.f, c.remoteForCopy) and close it +func (c *copy) updateOrPut(ctx context.Context, in io.ReadCloser, uploadOptions []fs.OpenOption) (actionTaken string, newDst fs.Object, err error) { + // account and buffer the transfer + inAcc := c.tr.Account(ctx, in).WithBuffer() + var wrappedSrc fs.ObjectInfo = c.src + + // We try to pass the original object if possible + if c.src.Remote() != c.remoteForCopy { + wrappedSrc = fs.NewOverrideRemote(c.src, c.remoteForCopy) + } + if c.doUpdate && c.inplace { + err = c.dst.Update(ctx, inAcc, wrappedSrc, uploadOptions...) + // Make sure newDst is c.dst since we updated it + if err == nil { + newDst = c.dst + } + } else { + newDst, err = c.f.Put(ctx, inAcc, wrappedSrc, uploadOptions...) + } + closeErr := inAcc.Close() + if err == nil { + err = closeErr + } + if c.doUpdate { + actionTaken = "Copied (replaced existing)" + } else { + actionTaken = "Copied (new)" + } + return actionTaken, newDst, err +} + +// Do a manual copy by reading the bytes and writing them +func (c *copy) manualCopy(ctx context.Context) (actionTaken string, newDst fs.Object, err error) { + // Remove partial files on premature exit + if !c.inplace { + defer atexit.Unregister(atexit.Register(func() { + ctx := context.Background() + c.removeFailedPartialCopy(ctx, c.f, c.remoteForCopy) + })) + } + + // Options for the upload + uploadOptions := []fs.OpenOption{c.hashOption} + for _, option := range c.ci.UploadHeaders { + uploadOptions = append(uploadOptions, option) + } + if c.ci.MetadataSet != nil { + uploadOptions = append(uploadOptions, fs.MetadataOption(c.ci.MetadataSet)) + } + + // Options for the download + downloadOptions := []fs.OpenOption{c.hashOption} + for _, option := range c.ci.DownloadHeaders { + downloadOptions = append(downloadOptions, option) + } + + if doMultiThreadCopy(ctx, c.f, c.src) { + return c.multiThreadCopy(ctx, uploadOptions) + } + + var in io.ReadCloser + in, err = Open(ctx, c.src, downloadOptions...) + if err != nil { + return actionTaken, nil, fmt.Errorf("failed to open source object: %w", err) + } + + // Note that c.rcat and c.updateOrPut close in + if c.src.Size() == -1 { + return c.rcat(ctx, in) + } + return c.updateOrPut(ctx, in, uploadOptions) +} + +// Verify the copy +func (c *copy) verify(ctx context.Context, newDst fs.Object) (err error) { + // Verify sizes are the same after transfer + if sizeDiffers(ctx, c.src, newDst) { + return fmt.Errorf("corrupted on transfer: sizes differ src(%s) %d vs dst(%s) %d", c.src.Fs(), c.src.Size(), newDst.Fs(), newDst.Size()) + } + // Verify hashes are the same after transfer - ignoring blank hashes + if c.hashType != hash.None { + // checkHashes has logs and counts errors + equal, _, srcSum, dstSum, _ := checkHashes(ctx, c.src, newDst, c.hashType) + if !equal { + return fmt.Errorf("corrupted on transfer: %v hashes differ src(%s) %q vs dst(%s) %q", c.hashType, c.src.Fs(), srcSum, newDst.Fs(), dstSum) + } + } + return nil +} + +// copy src object to dst or f if nil. If dst is nil then it uses +// remote as the name of the new object. +// +// It returns the destination object if possible. Note that this may +// be nil. +func (c *copy) copy(ctx context.Context) (newDst fs.Object, err error) { + var actionTaken string + retry := true + for tries := 0; retry && tries < c.maxTries; tries++ { + // Check we haven't hit any accounting limits + err = c.checkLimits(ctx) + if err != nil { + return nil, err + } + + // Try server side copy + actionTaken, newDst, err = c.serverSideCopy(ctx) + + // If can't server-side copy, do it manually + if errors.Is(err, fs.ErrorCantCopy) { + actionTaken, newDst, err = c.manualCopy(ctx) + } + + // End if ctx is in error + if fserrors.ContextError(ctx, &err) { + break + } + + // Retry if err returned a retry error + retry = false + if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { + retry = true + } else if t, ok := pacer.IsRetryAfter(err); ok { + fs.Debugf(c.src, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) + time.Sleep(t) + retry = true + } + if retry { + fs.Debugf(c.src, "Received error: %v - low level retry %d/%d", err, tries, c.maxTries) + c.tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry + continue + } + } + if err != nil { + err = fs.CountError(ctx, err) + fs.Errorf(c.src, "Failed to copy: %v", err) + if !c.inplace { + c.removeFailedPartialCopy(ctx, c.f, c.remoteForCopy) + } + return newDst, err + } + + // Verify the copy + err = c.verify(ctx, newDst) + if err != nil { + fs.Errorf(newDst, "%v", err) + err = fs.CountError(ctx, err) + c.removeFailedCopy(ctx, newDst) + return nil, err + } + + // Move the copied file to its real destination. + if !c.inplace && c.remoteForCopy != c.remote { + movedNewDst, err := c.dstFeatures.Move(ctx, newDst, c.remote) + if err != nil { + fs.Errorf(newDst, "partial file rename failed: %v", err) + err = fs.CountError(ctx, err) + c.removeFailedCopy(ctx, newDst) + return nil, err + } + fs.Debugf(newDst, "renamed to: %s", c.remote) + newDst = movedNewDst + } + + // Log what we have done + if newDst != nil && c.src.String() != newDst.String() { + actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String()) + } + fs.Infof(c.src, "%s%s", actionTaken, fs.LogValueHide("size", fs.SizeSuffix(c.src.Size()))) + + return newDst, nil +} + +// Copy src object to dst or f if nil. If dst is nil then it uses +// remote as the name of the new object. +// +// It returns the destination object if possible. Note that this may +// be nil. +func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { + ci := fs.GetConfig(ctx) + tr := accounting.Stats(ctx).NewTransfer(src, f) + defer func() { + tr.Done(ctx, err) + }() + if SkipDestructive(ctx, src, "copy") { + in := tr.Account(ctx, nil) + in.DryRun(src.Size()) + return newDst, nil + } + c := ©{ + f: f, + dstFeatures: f.Features(), + dst: dst, + remote: transform.Path(ctx, remote, false), + src: src, + ci: ci, + tr: tr, + maxTries: ci.LowLevelRetries, + doUpdate: dst != nil, + } + c.hashType, c.hashOption = CommonHash(ctx, f, src.Fs()) + if c.dst != nil { + c.remote = transform.Path(ctx, c.dst.Remote(), false) + } + // Are we using partials? + // + // If so set the flag and update the name we use for the copy + c.remoteForCopy, c.inplace, err = c.checkPartial(ctx) + if err != nil { + return nil, err + } + // Do the copy now everything is set up + return c.copy(ctx) +} + +// CopyFile moves a single file possibly to a new name +func CopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName string, srcFileName string) (err error) { + return moveOrCopyFile(ctx, fdst, fsrc, dstFileName, srcFileName, true, false) +} |
