| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 | #!/usr/bin/python3# TODO: option to print out full path name; most useful in the .json output formatVERSION = "0.4"import sys, argparse# global mutables answer_json = []verbose = debug = 0db = Nonexattr_comment = "user.xdg.comment"xattr_author  = "user.xdg.comment.author"xattr_date    = "user.xdg.comment.date"mode          = "db"#======= debugging/verbose ===========def print_d(*a):    if debug:        print('>>', *a)def errorBox(*a):    print(*a)# >>> snip here <<<#============ the DnDataBase, UiHelper and FileObj code is shared with other dirnotes programsimport getpass, time, stat, shutil, sqlite3, json, os, mathDEFAULT_CONFIG_FILE = "~/.config/dirnotes/dirnotes.conf" # or /etc/dirnotes.conf# config#    we could store the config in the database, in a second table#    or in a .json fileDEFAULT_CONFIG = {"xattr_tag":"user.xdg.comment",  "database":"~/.local/share/dirnotes/dirnotes.db",  "start_mode":"xattr",  "options for database":("~/.local/share/dirnotes/dirnotes.db","~/.dirnotes.db","/etc/dirnotes.db"),  "options for start_mode":("db","xattr")}class ConfigLoader:    # singleton  def __init__(self, configFile):    configFile = os.path.expanduser(configFile)    try:      with open(configFile,"r") as f:        config = json.load(f)    except json.JSONDecodeError:      errorBox(f"problem reading config file {configFile}; check the JSON syntax")      config = DEFAULT_CONFIG    except FileNotFoundError:      errorBox(f"config file {configFile} not found; using the default settings")      config = DEFAULT_CONFIG      try:        os.makedirs(os.path.dirname(configFile),exist_ok = True)        with open(configFile,"w") as f:          json.dump(config,f,indent=4)      except:        errorBox(f"problem creating the config file {configFile}")    self.dbName = os.path.expanduser(config["database"])    self.mode = config["start_mode"]    # can get over-ruled by the command line options    self.xattr_comment = config["xattr_tag"]class DnDataBase:  ''' the database is flat    fileName: fully qualified name    st_mtime: a float    size: a long    comment: a string    comment_time: a float, the time of the comment save    author: the username that created the comment    this object: 1) finds or creates the database      2) determine if it's readonly    TODO: the database is usually associated with a user, in $XDG_DATA_HOME (~/.local/share/)    TODO: if the database is not found, create it in $XDG_DATA_DIRS (/usr/local/share/)      make it 0666 permissions (rw-rw-rw-)  '''  def __init__(self,dbFile):    '''try to open the database; if not found, create it'''    try:      self.db = sqlite3.connect(dbFile)    except sqlite3.OperationalError:      print_d(f"Database {dbFile} not found")      try:         os.makedirs(os.path.dirname(dbFile), exist_ok = True)        self.db = sqlite3.connect(dbFile)      except (sqlite3.OperationalError, PermissionError):        printd(f"Failed to create {dbFile}, aborting")        raise    # create new table if it doesn't exist    try:      self.db.execute("select * from dirnotes")    except sqlite3.OperationalError:      self.db.execute("create table dirnotes (name TEXT, date DATETIME, size INTEGER, comment TEXT, comment_date DATETIME, author TEXT)")      self.db.execute("create index dirnotes_i on dirnotes(name)")       print_d(f"Table dirnotes created")      # at this point, if a shared database is required, somebody needs to set perms to 0o666      self.writable = True    try:      self.db.execute("pragma user_verson=0")    except sqlite3.OperationalError:      self.writable = FalseDATE_FORMAT   = "%Y-%m-%d %H:%M:%S"class UiHelper:  @staticmethod  def epochToDb(epoch):    return time.strftime(DATE_FORMAT,time.localtime(epoch))  @staticmethod  def DbToEpoch(dbTime):    return time.mktime(time.strptime(dbTime,DATE_FORMAT))  @staticmethod  def getShortDate(longDate):    now = time.time()    diff = now - longDate    if diff > YEAR:      fmt = "%b %e  %Y"    else:      fmt = "%b %e %H:%M"    return time.strftime(fmt, time.localtime(longDate))  @staticmethod  def getShortSize(fo):    if fo.isDir():      return " <DIR> "    elif fo.isLink():      return " <LINK>"    size = fo.getSize()    log = int((math.log10(size+1)-2)/3)    s = " KMGTE"[log]    base = int(size/math.pow(10,log*3))    return f"{base}{s}".strip().rjust(7)## one for each file## and a special one for ".." parent directoryclass FileObj:  """  The FileObj knows about both kinds of comments. """  def __init__(self, fileName, db):    self.fileName = os.path.abspath(fileName)     # full path; dirs end WITHOUT a terminal /    self.stat = os.lstat(self.fileName)    self.displayName = os.path.split(fileName)[1] # base name; dirs end with a /    if self.isDir():      if not self.displayName.endswith('/'):        self.displayName += '/'    self.date = self.stat.st_mtime    self.size = self.stat.st_size     self.db = db  def getName(self):    """ returns the absolute pathname """    return self.fileName  def getDisplayName(self):    """ returns just the basename of the file; dirs end in / """    return self.displayName  def getDbData(self):    """ returns (comment, author, comment_date) """    if not hasattr(self,'dbCommentAuthorDate'):      cad = self.db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(self.fileName,)).fetchone()      self.dbCommentAuthorDate = cad if cad else (None, None, None)    return self.dbCommentAuthorDate  def getDbComment(self):    return self.getDbData()[0]  def getXattrData(self):    """ returns (comment, author, comment_date) """    if not hasattr(self,'xattrCommentAuthorDate'):      c = a = d = None      try:        c = os.getxattr(self.fileName, xattr_comment, follow_symlinks=False).decode()        a = os.getxattr(self.fileName, xattr_author, follow_symlinks=False).decode()        d = os.getxattr(self.fileName, xattr_date, follow_symlinks=False).decode()      except:  # no xattr comment        pass      self.xattrCommentAuthorDate = c,a,d    return self.xattrCommentAuthorDate  def getXattrComment(self):    return self.getXattrData()[0]  def setDbComment(self,newComment):    # how are we going to hook this?    #if not self.db.writable:    #  errorBox("The database is readonly; you cannot add or edit comments")    #  return    s = os.lstat(self.fileName)    try:      print_d(f"setDbComment db {self.db}, file: {self.fileName}")      self.db.execute("insert into dirnotes (name,date,size,comment,comment_date,author) values (?,datetime(?,'unixepoch','localtime'),?,?,datetime(?,'unixepoch','localtime'),?)",          (self.fileName, s.st_mtime, s.st_size,          str(newComment), time.time(), getpass.getuser()))      self.db.commit()      self.dbCommentAuthorDate = newComment, getpass.getuser(), UiHelper.epochToDb(time.time())    except sqlite3.OperationalError:      print_d("database is locked or unwritable")      errorBox("the database that stores comments is locked or unwritable")  def setXattrComment(self,newComment):    print_d(f"set comment {newComment} on file {self.fileName}")    try:      os.setxattr(self.fileName,xattr_comment,bytes(newComment,'utf8'),follow_symlinks=False)      os.setxattr(self.fileName,xattr_author,bytes(getpass.getuser(),'utf8'),follow_symlinks=False)      os.setxattr(self.fileName,xattr_date,bytes(time.strftime(DATE_FORMAT),'utf8'),follow_symlinks=False)      self.xattrCommentAuthorDate = newComment, getpass.getuser(), time.strftime(DATE_FORMAT)       return True    # we need to move these cases out to a handler     except Exception as e:      if self.isLink():        errorBox("Linux does not allow xattr comments on symlinks; comment is stored in database")      elif self.isSock():        errorBox("Linux does not allow comments on sockets; comment is stored in database")      elif os.access(self.fileName, os.W_OK)!=True:        errorBox(f"you don't appear to have write permissions on this file: {self.fileName}")        # change the listbox background to yellow      elif "Errno 95" in str(e):        errorBox("is this a VFAT or EXFAT volume? these don't allow comments")      return False  def getComment(self,mode):    """ returns the comment for the given mode """    return self.getDbComment() if mode == "db"    else self.getXattrComment()  def getOtherComment(self,mode):    return self.getDbComment() if mode == "xattr" else self.getXattrComment()  def getData(self,mode):    """ returns (comment, author, comment_date) for the given mode """    return self.getDbData()    if mode == "db"    else self.getXattrData()  def getOtherData(self,mode):    """ returns (comment, author, comment_date) for the 'other' mode """    return self.getDbData()    if mode == "xattr" else self.getXattrData()  def getDate(self):    return self.date  def getSize(self):    return self.size  def isDir(self):    return stat.S_ISDIR(self.stat.st_mode)  def isLink(self):    return stat.S_ISLNK(self.stat.st_mode)  def isSock(self):    return stat.S_ISSOCK(self.stat.st_mode)  def copyFile(self, dest, doMove = False):    """ dest is either a FQ filename or a FQ directory, to be expanded with same basename """    # NOTE: this method copies the xattr (comment + old author + old date)     #       but creates new db (comment + this author + new date)    if os.path.isdir(dest):      dest = os.path.join(destDir,self.displayName)    try:      print_d("try copy from",self.fileName,"to",dest)      # shutil methods preserve dates & chmod/chown & xattr      if doMove:        shutil.move(self.fileName, dest)      else:        shutil.copy2(self.fileName, dest)        # can raise FileNotFoundError, Permission Error, shutil.SameFileError, IsADirectoryError     except:      errorBox(f"file copy/move to <{dest}> failed; check permissions")      return    # and copy the database record    f = FileObj(dest, self.db)    f.setDbComment(self.getDbComment())  def moveFile(self, dest):    """ dest is either a FQ filename or a FQ directory, to be expanded with same basename """    self.copyFile(dest, doMove = True)# >>> snip here <<<#============= the functions that are called from the main.loop ===============def file_copy(f,target,target_is_dir,is_copy,force):    print_d(f"call file_copy/move with args={target},{target_is_dir} and {force}")    dest = target if not target_is_dir else os.path.join(target,f.getDisplayName())    if os.path.exists(dest) and not force:        go = input("The copy/move target <<" + dest + ">> exists. Overwrite? (y or n) ")        if go != 'y' and go != 'Y':            return    print_d(f"copy/move from {f} to {dest}")    if is_copy:      f.copyFile(dest)    else:      f.moveFile(dest)def file_zap(f,all_flag):    db = f.db    print_d(f"zapping the comment history of {f.getName()}")    if all_flag:        confirm = input("You requested a complete flush of the comment database history. Please hit 'Y' to confirm")        if confirm == 'Y':            print_d("zapping the entire database")            db.execute("delete from dirnotes where comment_date < (select max(comment_date) from dirnotes d2 where d2.name = dirnotes.name)")    else:        db.execute("delete from dirnotes where name=? and comment_date < (select max(comment_date) from dirnotes where name=?)",(f.getName(),f.getName()))    db.commit()def file_modify_comment(f, create, append, erase):    print_d(f"modify the comment on file {f} with extra={(create,append,erase)}")    if not os.path.exists(f.getName()):        print(f"the target file does not exist; please check the spelling of the file: {f}")        sys.exit(10)    if create:        f.setXattrComment(create)        f.setDbComment(create)    elif append:        c = f.getComment(mode)        f.setXattrComment(f"{c}; {append}")        f.setDbComment(f"{c}; {append}")    elif erase:        f.setXattrComment('')        f.setDbComment('')def file_display(f, listall, json, minimal):    fn = f.getDisplayName()    print_d(f"list file details {fn}")    c,a,d = f.getData(mode)    c1,a1,d1 = f.getOtherData(mode)    diffFlag = '*' if c and (c != c1) else ''    if c or listall:        if not json:            if minimal:                print(f"{c}{diffFlag}")            elif verbose:                print(f"{f.getName()}: {repr(c)}{diffFlag}, {repr(a)}, {repr(d)}")            else:                print(f"{fn}: {repr(c)}{diffFlag}")        else:            entry = {"file":fn, "comment":c}            if verbose:                entry.update({"file":f.getName(),"author":a, "date": d})            if diffFlag:                entry["diffFlag"] = True            answer_json.append(entry)def file_history(f,json):    db = f.db    c = db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(f.getName(),))    if not json:        print(f"file: \t\t{f.getName()}\n")    else:        answer_json.append ( {"file":f.getName()} )    for a in c.fetchall():        if not json:            print(f"comment: \t{a[0]}\nauthor: \t{a[1]}\t\tdate: \t\t{a[2]}\n")        else:            answer_json.append( {"comment":a[0],"author":a[1],"date":a[2]} )def main(args):    parser = argparse.ArgumentParser(description="Display or add comments to files",        epilog="Some options conflict. Use only one of: -l -c -a -H -e -z -Z and one of -d -x")    parser.add_argument('-V',"--version", action="version",   version=f"dncli ver:{VERSION}")    parser.add_argument('-v',"--verbose", action='count',     help="verbose output (include comment author & date)",default=0)    parser.add_argument('-D',"--debug",   action='store_true',help="include debugging output; do not use in scripts",default=0)    parser.add_argument('-j',"--json",    action="store_true",help="output in JSON format")    pars_m = parser.add_mutually_exclusive_group()    pars_m.add_argument('-l',"--listall", action="store_true",help="list all files, including those without comments")    parser.add_argument('-d',"--db",      action="store_true",help="list comments from database")    parser.add_argument('-x',"--xattr",   action="store_true",help="list comments from xattr")    parser.add_argument('-n',"--minimal", action="store_true",help="output only comments; useful in scripting")    parser.add_argument('-H',"--history", action="store_true",help="output the history of database comments for a file")    pars_m.add_argument('-c',"--create",  metavar="comment",  help="add a comment to a file")    pars_m.add_argument('-a',"--append",  metavar="comment",  help="append to a comment on a file, separator=';'")    pars_m.add_argument('-C',"--copy",    action="store_true",help="copy a file with its comments")    pars_m.add_argument('-M',"--move",    action="store_true",help="move a file with its comments")    parser.add_argument('-y',"--cp_force",action="store_true",help="copy over existing files")    pars_m.add_argument('-e',"--erase",   action="store_true",help="erase the comment on a file")    pars_m.add_argument('-z',"--zap",     action="store_true",help="clear the database comment history on a file")    pars_m.add_argument('-Z',"--zapall",  action="store_true",help="clear the comment history in the entire database")    parser.add_argument(     "--config",  dest="config_file", help="use config file (default ~/.config/dirnotes/dirnotes.conf)")    parser.add_argument('file_list',nargs='*',help="file(s); list commands may omit this")    args = parser.parse_args()    # default is to display all files that have comments    # major modes are: display (<none> -l -H), add-comment (-a -c -e), clear-history(-z -Z), copy (-C)    # determine the major mode, then apply an appropriate function over the file_list        args.display = not (args.create or args.append or args.copy or args.erase or args.zap or args.zapall)    if args.cp_force and not args.copy:        print("the -y/--cp_force options can only be used with the -C/--copy command")        sys.exit(3)         if args.json and not args.display:        print("the -j/--json option can only be used with the display modes")        sys.exit(4)    if args.minimal and not args.display:        print("the -n/--minimal option only applies to the display modes")        sys.exit(5)    if args.history and not args.display:        print("the -H/--history option only applies to the display modes")        sys.exit(5)    if args.xattr and (args.zap or args.zapall):        print("the -x/--xattr option doesn't apply to the -z/--zap and -Z/--zapall commands")        sys.exit(7)    global verbose, debug    verbose = args.verbose    debug = args.debug        config = ConfigLoader(args.config_file or DEFAULT_CONFIG_FILE)    global mode    mode = config.mode    mode = "xattr" if args.xattr else ("db" if args.db else mode)    db = DnDataBase(config.dbName).db    #====== 1) build the file list =============    files = args.file_list    # for the list commands, auto-fill the file list with the current directory    if not files and args.display:        files = os.listdir(".")        files.sort()    # other command require explicity file lists    if not files:        print("please specify a file or files to use")        sys.exit(10)    print_d("got the files:", files)    #======= 2) build the function    if args.create or args.append or args.erase:        print_d(f"create/append/erase: {args.create} . {args.append} . {args.erase}")        func = file_modify_comment        loop_args = (args.create, args.append, args.erase)    elif args.zap or args.zapall:        print_d(f"got a zap command {args.zap} . {args.zapall}")        func = file_zap        loop_args = (args.zapall,)        if args.zapall:            files = ('.',)    elif args.copy or args.move:        print_d(f"got a copy/move command to copy={args.copy}, move={args.move}")        # the last item on the file list is the target        n_files = len(files)        if n_files < 2:            print("the copy/move command requires at least two arguments, the last one is the destination")            sys.exit(1)        files, target = files[:-1], files[-1]        target_is_directory = os.path.isdir(target)        print_d(f"copy/move from {files} to {target}")        if n_files > 2 and not target_is_directory:            print("multiple copy/move files must go to a target directory")            sys.exit(3)        func = file_copy        loop_args = (target, target_is_directory, args.copy, args.cp_force)    elif args.history:        func = file_history        loop_args = (args.json,)    else:        assert args.display        print_d(f"list files using {mode} priority")        print_d(f"display command with option: {args.listall} and {args.history} and {args.json} and {args.minimal}")        loop_args = (args.listall, args.json, args.minimal)        func = file_display    #====== 3) loop on the list, execute the function =============    for f in files:        func(FileObj(f,db),*loop_args)    if answer_json:        print(json.dumps(answer_json))if __name__ == "__main__":    main(sys.argv)
 |