aboutsummaryrefslogtreecommitdiff
path: root/fs/operations/check.go
diff options
context:
space:
mode:
Diffstat (limited to 'fs/operations/check.go')
-rw-r--r--fs/operations/check.go626
1 files changed, 626 insertions, 0 deletions
diff --git a/fs/operations/check.go b/fs/operations/check.go
new file mode 100644
index 0000000..d4d1eb3
--- /dev/null
+++ b/fs/operations/check.go
@@ -0,0 +1,626 @@
+package operations
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "github.com/rclone/rclone/fs"
+ "github.com/rclone/rclone/fs/accounting"
+ "github.com/rclone/rclone/fs/filter"
+ "github.com/rclone/rclone/fs/fserrors"
+ "github.com/rclone/rclone/fs/hash"
+ "github.com/rclone/rclone/fs/march"
+ "github.com/rclone/rclone/lib/readers"
+ "golang.org/x/text/unicode/norm"
+)
+
+// checkFn is the type of the checking function used in CheckFn()
+//
+// It should check the two objects (a, b) and return if they differ
+// and whether the hash was used.
+//
+// If there are differences then this should Errorf the difference and
+// the reason but return with err = nil. It should not CountError in
+// this case.
+type checkFn func(ctx context.Context, a, b fs.Object) (differ bool, noHash bool, err error)
+
+// CheckOpt contains options for the Check functions
+type CheckOpt struct {
+ Fdst, Fsrc fs.Fs // fses to check
+ Check checkFn // function to use for checking
+ OneWay bool // one way only?
+ Combined io.Writer // a file with file names with leading sigils
+ MissingOnSrc io.Writer // files only in the destination
+ MissingOnDst io.Writer // files only in the source
+ Match io.Writer // matching files
+ Differ io.Writer // differing files
+ Error io.Writer // files with errors of some kind
+}
+
+// checkMarch is used to march over two Fses in the same way as
+// sync/copy
+type checkMarch struct {
+ ctx context.Context
+ ioMu sync.Mutex
+ wg sync.WaitGroup
+ tokens chan struct{}
+ differences atomic.Int32
+ noHashes atomic.Int32
+ srcFilesMissing atomic.Int32
+ dstFilesMissing atomic.Int32
+ matches atomic.Int32
+ opt CheckOpt
+}
+
+// report outputs the fileName to out if required and to the combined log
+func (c *checkMarch) report(o fs.DirEntry, out io.Writer, sigil rune) {
+ c.reportFilename(o.String(), out, sigil)
+}
+
+func (c *checkMarch) reportFilename(filename string, out io.Writer, sigil rune) {
+ if out != nil {
+ SyncFprintf(out, "%s\n", filename)
+ }
+ if c.opt.Combined != nil {
+ SyncFprintf(c.opt.Combined, "%c %s\n", sigil, filename)
+ }
+}
+
+// DstOnly have an object which is in the destination only
+func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) {
+ switch dst.(type) {
+ case fs.Object:
+ if c.opt.OneWay {
+ return false
+ }
+ err := fmt.Errorf("file not in %v", c.opt.Fsrc)
+ fs.Errorf(dst, "%v", err)
+ _ = fs.CountError(c.ctx, err)
+ c.differences.Add(1)
+ c.srcFilesMissing.Add(1)
+ c.report(dst, c.opt.MissingOnSrc, '-')
+ case fs.Directory:
+ // Do the same thing to the entire contents of the directory
+ if c.opt.OneWay {
+ return false
+ }
+ return true
+ default:
+ panic("Bad object in DirEntries")
+ }
+ return false
+}
+
+// SrcOnly have an object which is in the source only
+func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) {
+ switch src.(type) {
+ case fs.Object:
+ err := fmt.Errorf("file not in %v", c.opt.Fdst)
+ fs.Errorf(src, "%v", err)
+ _ = fs.CountError(c.ctx, err)
+ c.differences.Add(1)
+ c.dstFilesMissing.Add(1)
+ c.report(src, c.opt.MissingOnDst, '+')
+ case fs.Directory:
+ // Do the same thing to the entire contents of the directory
+ return true
+ default:
+ panic("Bad object in DirEntries")
+ }
+ return false
+}
+
+// check to see if two objects are identical using the check function
+func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) {
+ ci := fs.GetConfig(ctx)
+ tr := accounting.Stats(ctx).NewCheckingTransfer(src, "checking")
+ defer func() {
+ tr.Done(ctx, err)
+ }()
+ if sizeDiffers(ctx, src, dst) {
+ err = fmt.Errorf("sizes differ")
+ fs.Errorf(src, "%v", err)
+ return true, false, nil
+ }
+ if ci.SizeOnly {
+ return false, false, nil
+ }
+ return c.opt.Check(ctx, dst, src)
+}
+
+// Match is called when src and dst are present, so sync src to dst
+func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse bool) {
+ switch srcX := src.(type) {
+ case fs.Object:
+ dstX, ok := dst.(fs.Object)
+ if ok {
+ if SkipDestructive(ctx, src, "check") {
+ return false
+ }
+ c.wg.Add(1)
+ c.tokens <- struct{}{} // put a token to limit concurrency
+ go func() {
+ defer func() {
+ <-c.tokens // get the token back to free up a slot
+ c.wg.Done()
+ }()
+ differ, noHash, err := c.checkIdentical(ctx, dstX, srcX)
+ if err != nil {
+ fs.Errorf(src, "%v", err)
+ _ = fs.CountError(ctx, err)
+ c.report(src, c.opt.Error, '!')
+ } else if differ {
+ c.differences.Add(1)
+ err := errors.New("files differ")
+ // the checkFn has already logged the reason
+ _ = fs.CountError(ctx, err)
+ c.report(src, c.opt.Differ, '*')
+ } else {
+ c.matches.Add(1)
+ c.report(src, c.opt.Match, '=')
+ if noHash {
+ c.noHashes.Add(1)
+ fs.Debugf(dstX, "OK - could not check hash")
+ } else {
+ fs.Debugf(dstX, "OK")
+ }
+ }
+ }()
+ } else {
+ err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst)
+ fs.Errorf(src, "%v", err)
+ _ = fs.CountError(ctx, err)
+ c.differences.Add(1)
+ c.dstFilesMissing.Add(1)
+ c.report(src, c.opt.MissingOnDst, '+')
+ }
+ case fs.Directory:
+ // Do the same thing to the entire contents of the directory
+ _, ok := dst.(fs.Directory)
+ if ok {
+ return true
+ }
+ err := fmt.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc)
+ fs.Errorf(dst, "%v", err)
+ _ = fs.CountError(ctx, err)
+ c.differences.Add(1)
+ c.srcFilesMissing.Add(1)
+ c.report(dst, c.opt.MissingOnSrc, '-')
+
+ default:
+ panic("Bad object in DirEntries")
+ }
+ return false
+}
+
+// CheckFn checks the files in fsrc and fdst according to Size and
+// hash using checkFunction on each file to check the hashes.
+//
+// checkFunction sees if dst and src are identical
+//
+// it returns true if differences were found
+// it also returns whether it couldn't be hashed
+func CheckFn(ctx context.Context, opt *CheckOpt) error {
+ ci := fs.GetConfig(ctx)
+ if opt.Check == nil {
+ return errors.New("internal error: nil check function")
+ }
+ c := &checkMarch{
+ ctx: ctx,
+ tokens: make(chan struct{}, ci.Checkers),
+ opt: *opt,
+ }
+
+ // set up a march over fdst and fsrc
+ m := &march.March{
+ Ctx: ctx,
+ Fdst: c.opt.Fdst,
+ Fsrc: c.opt.Fsrc,
+ Dir: "",
+ Callback: c,
+ NoTraverse: ci.NoTraverse,
+ NoUnicodeNormalization: ci.NoUnicodeNormalization,
+ }
+ fs.Debugf(c.opt.Fdst, "Waiting for checks to finish")
+ err := m.Run(ctx)
+ c.wg.Wait() // wait for background go-routines
+
+ return c.reportResults(ctx, err)
+}
+
+func (c *checkMarch) reportResults(ctx context.Context, err error) error {
+ if c.dstFilesMissing.Load() > 0 {
+ fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing.Load())
+ }
+ if c.srcFilesMissing.Load() > 0 {
+ entity := "files"
+ if c.opt.Fsrc == nil {
+ entity = "hashes"
+ }
+ fs.Logf(c.opt.Fsrc, "%d %s missing", c.srcFilesMissing.Load(), entity)
+ }
+
+ fs.Logf(c.opt.Fdst, "%d differences found", c.differences.Load())
+ if errs := accounting.Stats(ctx).GetErrors(); errs > 0 {
+ fs.Logf(c.opt.Fdst, "%d errors while checking", errs)
+ }
+ if c.noHashes.Load() > 0 {
+ fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes.Load())
+ }
+ if c.matches.Load() > 0 {
+ fs.Logf(c.opt.Fdst, "%d matching files", c.matches.Load())
+ }
+ if err != nil {
+ return err
+ }
+ if c.differences.Load() > 0 {
+ // Return an already counted error so we don't double count this error too
+ err = fserrors.FsError(fmt.Errorf("%d differences found", c.differences.Load()))
+ fserrors.Count(err)
+ return err
+ }
+ return nil
+}
+
+// Check the files in fsrc and fdst according to Size and hash
+func Check(ctx context.Context, opt *CheckOpt) error {
+ optCopy := *opt
+ optCopy.Check = func(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) {
+ same, ht, err := CheckHashes(ctx, src, dst)
+ if err != nil {
+ return true, false, err
+ }
+ if ht == hash.None {
+ return false, true, nil
+ }
+ if !same {
+ err = fmt.Errorf("%v differ", ht)
+ fs.Errorf(src, "%v", err)
+ return true, false, nil
+ }
+ return false, false, nil
+ }
+
+ return CheckFn(ctx, &optCopy)
+}
+
+// CheckEqualReaders checks to see if in1 and in2 have the same
+// content when read.
+//
+// it returns true if no differences were found
+func CheckEqualReaders(in1, in2 io.Reader) (equal bool, err error) {
+ const bufSize = 64 * 1024
+ buf1 := make([]byte, bufSize)
+ buf2 := make([]byte, bufSize)
+ for {
+ n1, err1 := readers.ReadFill(in1, buf1)
+ n2, err2 := readers.ReadFill(in2, buf2)
+ // check errors
+ if err1 != nil && err1 != io.EOF {
+ return false, err1
+ } else if err2 != nil && err2 != io.EOF {
+ return false, err2
+ }
+ // err1 && err2 are nil or io.EOF here
+ // process the data
+ if n1 != n2 || !bytes.Equal(buf1[:n1], buf2[:n2]) {
+ return false, nil
+ }
+ // if both streams finished the we have finished
+ if err1 == io.EOF && err2 == io.EOF {
+ break
+ }
+ }
+ return true, nil
+}
+
+// CheckIdenticalDownload checks to see if dst and src are identical
+// by reading all their bytes if necessary.
+//
+// it returns true if no differences were found
+func CheckIdenticalDownload(ctx context.Context, src, dst fs.Object) (equal bool, err error) {
+ ci := fs.GetConfig(ctx)
+ err = Retry(ctx, src, ci.LowLevelRetries, func() error {
+ equal, err = checkIdenticalDownload(ctx, src, dst)
+ return err
+ })
+ return equal, err
+}
+
+// Does the work for CheckIdenticalDownload
+func checkIdenticalDownload(ctx context.Context, src, dst fs.Object) (equal bool, err error) {
+ var in1, in2 io.ReadCloser
+ in1, err = Open(ctx, dst)
+ if err != nil {
+ return false, fmt.Errorf("failed to open %q: %w", dst, err)
+ }
+ tr1 := accounting.Stats(ctx).NewTransfer(dst, nil)
+ defer func() {
+ tr1.Done(ctx, nil) // error handling is done by the caller
+ }()
+ in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer
+
+ in2, err = Open(ctx, src)
+ if err != nil {
+ return false, fmt.Errorf("failed to open %q: %w", src, err)
+ }
+ tr2 := accounting.Stats(ctx).NewTransfer(dst, nil)
+ defer func() {
+ tr2.Done(ctx, nil) // error handling is done by the caller
+ }()
+ in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer
+
+ // To assign err variable before defer.
+ equal, err = CheckEqualReaders(in1, in2)
+ return
+}
+
+// CheckDownload checks the files in fsrc and fdst according to Size
+// and the actual contents of the files.
+func CheckDownload(ctx context.Context, opt *CheckOpt) error {
+ optCopy := *opt
+ optCopy.Check = func(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) {
+ same, err := CheckIdenticalDownload(ctx, src, dst)
+ if err != nil {
+ return true, true, fmt.Errorf("failed to download: %w", err)
+ }
+ if !same {
+ err = errors.New("contents differ")
+ fs.Errorf(src, "%v", err)
+ return true, false, nil
+ }
+ return false, false, nil
+ }
+ return CheckFn(ctx, &optCopy)
+}
+
+// ApplyTransforms handles --no-unicode-normalization and --ignore-case-sync for CheckSum
+// so that it matches behavior of Check (where it's handled by March)
+func ApplyTransforms(ctx context.Context, s string) string {
+ ci := fs.GetConfig(ctx)
+ return ToNormal(s, !ci.NoUnicodeNormalization, ci.IgnoreCaseSync)
+}
+
+// ToNormal normalizes case and unicode form and returns the transformed string.
+// It is similar to ApplyTransforms but does not use a context.
+// If normUnicode == true, s will be transformed to NFC.
+// If normCase == true, s will be transformed to lowercase.
+// If both are true, both transformations will be performed.
+func ToNormal(s string, normUnicode, normCase bool) string {
+ if normUnicode {
+ s = norm.NFC.String(s)
+ }
+ if normCase {
+ s = strings.ToLower(s)
+ }
+ return s
+}
+
+// CheckSum checks filesystem hashes against a SUM file
+func CheckSum(ctx context.Context, fsrc, fsum fs.Fs, sumFile string, hashType hash.Type, opt *CheckOpt, download bool) error {
+ var options CheckOpt
+ if opt != nil {
+ options = *opt
+ } else {
+ // default options for hashsum -c
+ options.Combined = os.Stdout
+ }
+ // CheckSum treats Fsrc and Fdst specially:
+ options.Fsrc = nil // no file system here, corresponds to the sum list
+ options.Fdst = fsrc // denotes the file system to check
+ opt = &options // override supplied argument
+
+ if !download && (hashType == hash.None || !opt.Fdst.Hashes().Contains(hashType)) {
+ return fmt.Errorf("%s: hash type is not supported by file system: %s", hashType, opt.Fdst)
+ }
+
+ if sumFile == "" {
+ return fmt.Errorf("not a sum file: %s", fsum)
+ }
+ sumObj, err := fsum.NewObject(ctx, sumFile)
+ if err != nil {
+ return fmt.Errorf("cannot open sum file: %w", err)
+ }
+ hashes, err := ParseSumFile(ctx, sumObj)
+ if err != nil {
+ return fmt.Errorf("failed to parse sum file: %w", err)
+ }
+
+ ci := fs.GetConfig(ctx)
+ c := &checkMarch{
+ ctx: ctx,
+ tokens: make(chan struct{}, ci.Checkers),
+ opt: *opt,
+ }
+ lastErr := ListFn(ctx, opt.Fdst, func(obj fs.Object) {
+ c.checkSum(ctx, obj, download, hashes, hashType)
+ })
+ c.wg.Wait() // wait for background go-routines
+
+ // make census of unhandled sums
+ fi := filter.GetConfig(ctx)
+ for filename, hash := range hashes {
+ if hash == "" { // the sum has been successfully consumed
+ continue
+ }
+ if !fi.IncludeRemote(filename) { // the file was filtered out
+ continue
+ }
+ // filesystem missed the file, sum wasn't consumed
+ err := fmt.Errorf("file not in %v", opt.Fdst)
+ fs.Errorf(filename, "%v", err)
+ _ = fs.CountError(ctx, err)
+ if lastErr == nil {
+ lastErr = err
+ }
+ c.dstFilesMissing.Add(1)
+ c.reportFilename(filename, opt.MissingOnDst, '+')
+ }
+
+ return c.reportResults(ctx, lastErr)
+}
+
+// checkSum checks single object against golden hashes
+func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool, hashes HashSums, hashType hash.Type) {
+ normalizedRemote := ApplyTransforms(ctx, obj.Remote())
+ c.ioMu.Lock()
+ sumHash, sumFound := hashes[normalizedRemote]
+ hashes[normalizedRemote] = "" // mark sum as consumed
+ c.ioMu.Unlock()
+
+ if !sumFound && c.opt.OneWay {
+ return
+ }
+
+ var err error
+ tr := accounting.Stats(ctx).NewCheckingTransfer(obj, "hashing")
+ defer tr.Done(ctx, err)
+
+ if !sumFound {
+ err = errors.New("sum not found")
+ _ = fs.CountError(ctx, err)
+ fs.Errorf(obj, "%v", err)
+ c.differences.Add(1)
+ c.srcFilesMissing.Add(1)
+ c.report(obj, c.opt.MissingOnSrc, '-')
+ return
+ }
+
+ if !download {
+ var objHash string
+ objHash, err = obj.Hash(ctx, hashType)
+ c.matchSum(ctx, sumHash, objHash, obj, err, hashType)
+ return
+ }
+
+ c.wg.Add(1)
+ c.tokens <- struct{}{} // put a token to limit concurrency
+ go func() {
+ var (
+ objHash string
+ err error
+ in io.ReadCloser
+ )
+ defer func() {
+ c.matchSum(ctx, sumHash, objHash, obj, err, hashType)
+ <-c.tokens // get the token back to free up a slot
+ c.wg.Done()
+ }()
+ if in, err = Open(ctx, obj); err != nil {
+ return
+ }
+ tr := accounting.Stats(ctx).NewTransfer(obj, nil)
+ in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer
+ defer func() {
+ tr.Done(ctx, nil) // will close the stream
+ }()
+ hashVals, err2 := hash.StreamTypes(in, hash.NewHashSet(hashType))
+ if err2 != nil {
+ err = err2 // pass to matchSum
+ return
+ }
+ objHash = hashVals[hashType]
+ }()
+}
+
+// matchSum sums up the results of hashsum matching for an object
+func (c *checkMarch) matchSum(ctx context.Context, sumHash, objHash string, obj fs.Object, err error, hashType hash.Type) {
+ switch {
+ case err != nil:
+ _ = fs.CountError(ctx, err)
+ fs.Errorf(obj, "Failed to calculate hash: %v", err)
+ c.report(obj, c.opt.Error, '!')
+ case sumHash == "":
+ err = errors.New("duplicate file")
+ _ = fs.CountError(ctx, err)
+ fs.Errorf(obj, "%v", err)
+ c.report(obj, c.opt.Error, '!')
+ case objHash == "":
+ fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
+ fs.Debugf(obj, "%v - could not check hash (%v)", hashType, c.opt.Fdst)
+ c.noHashes.Add(1)
+ c.matches.Add(1)
+ c.report(obj, c.opt.Match, '=')
+ case objHash == sumHash:
+ fs.Debugf(obj, "%v = %s OK", hashType, sumHash)
+ c.matches.Add(1)
+ c.report(obj, c.opt.Match, '=')
+ default:
+ err = errors.New("files differ")
+ _ = fs.CountError(ctx, err)
+ fs.Debugf(nil, "%v = %s (sum)", hashType, sumHash)
+ fs.Debugf(obj, "%v = %s (%v)", hashType, objHash, c.opt.Fdst)
+ fs.Errorf(obj, "%v", err)
+ c.differences.Add(1)
+ c.report(obj, c.opt.Differ, '*')
+ }
+}
+
+// HashSums represents a parsed SUM file
+type HashSums map[string]string
+
+// ParseSumFile parses a hash SUM file and returns hashes as a map
+func ParseSumFile(ctx context.Context, sumFile fs.Object) (HashSums, error) {
+ rd, err := Open(ctx, sumFile)
+ if err != nil {
+ return nil, err
+ }
+ parser := bufio.NewReader(rd)
+
+ const maxWarn = 3
+ numWarn := 0
+
+ re := regexp.MustCompile(`^([^ ]+) [ *](.+)$`)
+ hashes := HashSums{}
+ for lineNo := 0; true; lineNo++ {
+ lineBytes, _, err := parser.ReadLine()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+ line := string(lineBytes)
+ if line == "" {
+ continue
+ }
+
+ fields := re.FindStringSubmatch(ApplyTransforms(ctx, line))
+ if fields == nil {
+ numWarn++
+ if numWarn <= maxWarn {
+ fs.Logf(sumFile, "improperly formatted checksum line %d", lineNo)
+ }
+ continue
+ }
+
+ sum, file := fields[1], fields[2]
+ if hashes[file] != "" {
+ numWarn++
+ if numWarn <= maxWarn {
+ fs.Logf(sumFile, "duplicate file on checksum line %d", lineNo)
+ }
+ continue
+ }
+
+ // We've standardised on lower case checksums in rclone internals.
+ hashes[file] = strings.ToLower(sum)
+ }
+
+ if numWarn > maxWarn {
+ fs.Logf(sumFile, "%d warning(s) suppressed...", numWarn-maxWarn)
+ }
+ if err = rd.Close(); err != nil {
+ return nil, err
+ }
+ return hashes, nil
+}