filey/filey/preamble/filesystem4_2.py

562 lines
19 KiB
Python

"""
File management wrappers on os.path, shutil, and useful non-standard modules
"Apparent drive" refers to the term before the first os.sep in an object's given path.
if the given path is relative
then the ADrive may be ".." or the name of the File/Directory
else
the drive is the name/letter of the disk partition on which the content at the address is stored.
"Apparent Directory" similarly refers first term before the last os.sep in an object's given path
if the given path is relative
then the ADir may be ".." or the name of the File/Directory
else
the drive is the name/letter of the disk partition on which the content at the address is stored.
TODO
Add relative path support
Classes for specific mimes
"""
from pathlib import Path
from functools import reduce, cached_property
from itertools import chain
from typing import Iterable
import os, re, sys, shutil
import filetype as ft, audio_metadata as am
from send2trash import send2trash
from sl4ng import show, delevel, gather, nameSpacer, ffplay, commons, nopes
# from magnitudes import represent# as rp
gen = type(i for i in range(0))
func = type(ft.match)
forbiddens = r'\/:?*<>|"' # at least on windows
formats = {
'pics': "bmp png jpg jpeg tiff".split(),
'music': "mp3 m4a wav ogg wma flac aiff alac".split(),
'videos': "mp4 wmv".split(),
'docs': "doc docx pdf xlsx pptx ppt xls csv".split(),
}
formats['all'] = [*chain.from_iterable(formats.values())]
def trim(path:str, edge:str=os.sep) -> str:
out = path[:]
while out.startswith(edge):
out = out[1:]
while out.endswith(edge):
out = out[:-1]
return out
def normalize(path:str, relative:bool=False) -> str:
other = ''.join(i for i in '\/' if not i==os.sep)
if other in path:
terms = []
for term in path.split(os.sep):
if other in term:
for part in term.split(other):
terms.append(part)
else:
terms.append(term)
path = os.path.join(*terms)
if relative:
path = '.'+os.sep+path
return path
def hasdirs(path:str) -> bool:
return bool(re.search(re.escape(os.sep), normalize(path)))
def likeFile(path:str) -> bool:
path = normalize(path)
return bool(re.search('readme$|.+\.(\S)+$',path.split(os.sep)[-1],re.I))
def multisplit(splitters:Iterable[str], target:str) -> gen:
"""
Split a string by a the elements of a sequence
>>> list(multisplit(',`* ', 'wma,wmv mp3`vga*mp4 ,`* ogg'))
['wma', 'wmv', 'mp3', 'vga', 'mp4', 'ogg']
"""
splitters = iter(splitters)
result = target.split(next(splitters))
for splitter in splitters:
result = [*chain.from_iterable(i.split(splitter) for i in result)]
yield from filter(None, result)
class Address:
"""
Methods return self unless otherwise stated
"""
def __init__(self, path:str):
self.path = path = normalize(path)
# if not os.path.exists(path):
# print(Warning(f'Warning: Path "{path}" does not exist'))
def __str__(self):
return self.path
def __repr__(self):
return self.path
def __hash__(self):
return hash(self.path)
def create(self, content:[str, bytes, bytearray]=None, raw:bool=False):
"""
Create an entry in the file-system. If the address is not vacant no action will be taken.
The raw handle only works for Files and enables writing bytes/bytearrays
"""
if self.exists:
return self
elif content:
if raw:
if isinstance(content,(bytes,bytearray)):
fobj = open(self.path,'wb')
else:
fobj = open(self.path,'w')
fobj.write(content)
fobj.close()
else:
if isinstance(content,str):
content = bytes(content,encoding='utf-8')
else:
content = bytes(content)
with open(self.path,'wb') as fobj:
fobj.write(content)
else:
if likeFile(self.path):
with open(self.path,'x') as fobj:
pass
else:
os.makedirs(self.path,exist_ok=True)
return self
@property
def exists(self):
return os.path.exists(self.path)
@property
def isdir(self):
if self.exists:
return os.path.isdir(self.path)
return not likeFile(self.path)
@property
def isfile(self):
if self.exists:
return os.path.isdir(self.path)
return likeFile(self.path)
@property
def obj(self):
"""
Determine if self.path points to a file or folder and create the corresponding object
"""
if self.exists:
return File(self.path) if os.path.isfile(self.path) else Directory(self.path)
else:
return File(self.path) if likeFile(self.path) else Directory(self.path)
@property
def up(self):
"""
Return the ADir
"""
return Address(delevel(self.path)).obj
@property
def name(self):
"""
Return the name of the referent
"""
return os.path.split(self.path)[1]
@property
def ancestors(self):
"""
Return consecutive ADirs until the ADrive is reached
"""
level = []
p = self.path[:]
while p != delevel(p):
p = delevel(p)
level.append(p)
return tuple(Address(i).obj for i in level)[::-1]
@property
def colleagues(self):
"""
Every member of the same Directory whose type is the same as the referent
"""
return tuple(i for i in self.up if isinstance(i, type(self)))
@property
def neighbours(self):
"""
Everything in the same Directory
"""
return self.up.content
@property
def depth(self):
"""
Number of ancestors
"""
return len(self.ancestors)
@property
def top(self):
"""
The apparent drive. Will not be helpful if self.path is relative
"""
return self.ancestors[0]
@property
def stat(self):
"""
return os.stat(self.path)
"""
return os.stat(self.path)
def delevel(self, steps:int=1, path:bool=False) -> str:
"""
Go up some number of levels in the File system
"""
return delevel(self.path,steps) if path else Directory(delevel(self.path,steps))
def heritage(self):
"""
A fancy representation of the tree from the apparent drive up to the given path
"""
print(f'\nheritage({self.name.title()})')
ancs = list(self.ancestors)
ancs.append(self.path)
for i,anc in enumerate(ancs):
print('\t' + ('','.'*i)[i>0] + i*' ' + [i for i in str(anc).split(os.sep) if i][-1])
return self
def touch(self):
"""
Implements the unix command 'touch', which updates the 'date modified' of the content at the path
"""
p = self.path
pathlib.Path(p).touch()
self = Address(p).obj
return self
def erase(self, recycle:bool=True):
"""
Send a File to the trash, or remove it without recycling.
"""
send2trash(self.path) if recycle else os.remove(self.path)
return self
def copy(self, folder:str=None, cwd:bool=False, sep:str='_'):
"""
Returns a clone of the referent at a given Directory-path
The given path will be created if it doesn't exist
Will copy in the File's original folder if no path is given
The cwd switch will always copy to the current working Directory
"""
copier = (shutil.copy2, shutil.copytree)[self.isdir]
if folder:
new = os.path.join(folder, self.name)
elif cwd:
new = os.path.join(os.getcwd(), self.name)
else:
new = self.path
new = nameSpacer(new, sep=sep)
os.makedirs(delevel(new), exist_ok=True)
copier(self.path, new)
out = Address(new).obj
return out.touch() if touch else out
def move(self, folder:str=None, name:str=None, dodge:str=False, sep:str='_'):
"""
addy.move(folder) -> move to the given Directory ()
addy.move(name) -> move to the given path (relative paths will follow from the objects existing path)
addy.move(folder, name) -> move to the given Directory
The dodge handle enables automatic file-system collision evasion
"""
if folder and name:
new = os.path.join(folder, name)
elif folder:
new = os.path.join(folder, self.name)
elif name:
new = os.path.join(self.up.path, name)
else:
raise ValueError('The file couldn\'t be moved because "name" and "folder" are undefined')
new = nameSpacer(new, sep=sep) if dodge else new
os.rename(self.path, new)
self.path = new
return self
def rename(self, name:str):
"""
Change the name of the referent
"""
return self.move(name=name)
def expose(self):
"""
Reveal the referent in the system's file explorer (will open the containing Directory if the referent is a File)
"""
if self.isdir:
os.startFile(self.path)
else:
os.startFile(self.up.path)
return self
class File(Address):
def __init__(self,path:str):
path = os.path.abspath(trim(path))
super(File,self).__init__(path)
self._stream = None
def __enter__(self):
self._stream = open(str(self),'b')
return self._stream
def __exit__(self, exc_type, exc_value, traceback):
self._stream.close()
self._stream = None
@property
def size(self):
return size(os.stat(self.path).st_size)
@property
def mime(self):
return match.MIME if (match:=ft.guess(str(self))) else None
@property
def kind(self):
return self.mime.split('/')[0] if (match:=ft.guess(self.path)) else None
@property
def ext(self):
return os.path.splitext(self.name)[1]
@property
def title(self):
"""
return the File's name without the extension
"""
return os.path.splitext(self.name)[0]
def open(mode:str='r', scrape:bool=False):
"""
Return the File's byte or text stream.
Scrape splits the text at all whitespace and returns the content as a string
"""
if scrape:
with open(self.path, mode) as fobj:
return ' '.join(fobj.read().split())
with open(self.path, mode) as fobj:
return fobj
class Directory(Address):
def __init__(self, path:str):
path = os.path.abspath(trim(path))
self.path = normalize(path)
super(Directory, self).__init__(path)
self._ind = -1
def __len__(self):
return len(self.content)
def __bool__(self):
"""
Check if the Directory is empty or not
"""
return len(os.listdir(self.path))>0
def __iter__(self):
return self
def __next__(self):
if self._ind<len(self)-1:
self._ind += 1
return self.content[self._ind]
self._ind = -1
raise StopIteration
def __getitem__(self, item):
"""
Return an object whose name is an exact match for the given item
"""
if any(re.search(f'^{item}$', i.name, re.I) for i in self.content):
return Address(os.path.join(self.path, item)).obj
raise ValueError(f'The folder "{self.name}" does not contain anything called "{item}"')
def __truediv__(self, other):
if isinstance(other, str):
return Address(os.path.join(self.path, other)).obj
raise TypeError(f"Other must be a string")
def __call__(self, keyword:str, sort:bool=False, case:bool=False, **kwargs) -> Iterable:
"""
See help(self.search)
"""
return self.search(keyword, sort, case, **kwargs)
@property
def children(self):
"""
Return "os.listdir" but filtered for directories
"""
return tuple(addy.obj for i in os.listdir(self.path) if (addy:=Address(os.path.join(str(self),i))).isdir)
@property
def files(self):
"""
Return "os.listdir" but filtered for Files
"""
return tuple(addy.obj for i in os.listdir(self.path) if (addy:=Address(os.path.join(self.path,i))).isfile)
@property
def content(self):
"""
Return address-like objects from "os.listdir"
"""
return tuple(Address(os.path.join(self.path,i)).obj for i in os.listdir(self.path))
@property
def leaves(self):
"""
Return All Files from all branches
"""
return tuple(self.gather())
@property
def branches(self):
"""
Return Every Directory whose path contains "self.path"
"""
return tuple(set(i.delevel() for i in self.gather()))
@property
def size(self):
"""
Return Prettified version of _size
"""
return sum(File.size for File in self.leaves)
@property
def _size(self):
"""
Return sum of File sizes for all leaves
"""
return sum(File.size for File in self.leaves)
@property
def mimes(self):
"""
Return File mimes for all Files from all branches
"""
return tuple(set(File.mime for File in self.gather()))
@property
def kinds(self):
"""
Return File types for all Files from branches
"""
return tuple(set(m.split('/')[0] for m in self.mime))
@property
def exts(self):
"""
Return extensions for all Files from all branches
"""
return tuple(set(f.ext for f in self.gather()))
@property
def isroot(self):
"""
Return check if the Directory is at the highest level of the File system
"""
return not self.depth
def add(self, other, copy=False):
"""
Introduce new elements. Send an address-like object to self.
"""
if not other.exists:
raise OSError(f"{other.name.title()} could not be added to {self.name.title()} because it doesn't exist")
new = os.path.join(self.path, os.path.split(other.path)[-1])
other.rename(new)
return self
def enter(self):
"""
Set referent as current working Directory
"""
os.chdir(self.path)
def gather(self, titles:bool=False, walk:bool=True, ext:str='') -> gen:
"""
Generate an iterable of the Files rooted in a given folder
It is possible to search for multiple File extensions if you separate each one with a space, comma, asterisk, or tilde.
Only use one symbol per gathering though.
"""
folder = self.path
if walk:
if ext:
ext = ext.replace('.', '')
sep = [i for i in ',`* ' if i in ext]
pattern = '|'.join(f'\.{i}$' for i in ext.split(sep[0] if sep else None))
pat = re.compile(pattern, re.I)
for root, folders, names in os.walk(folder):
for name in names:
if os.path.isfile(p:=os.path.join(root, name)) and pat.search(name) and name!='NTUSER.DAT':
yield name if titles else p
else:
for root, folders, names in os.walk(folder):
for name in names:
if os.path.exists(p:=os.path.join(root, name)):
yield name if titles else p
else:
if ext:
ext = ext.replace('.', '')
sep = [i for i in ',`* ' if i in ext]
pattern = '|'.join(f'\.{i}$' for i in ext.split(sep[0] if sep else None))
pat = re.compile(pattern, re.I)
for name in os.listdir(folder):
if os.path.isfile(p:=os.path.join(folder, name)) and pat.search(name) and name!='NTUSER.DAT':
yield name if titles else p
else:
for name in os.listdir(folder):
if os.path.isfile(p:=os.path.join(folder, name)):
yield name if titles else p
def search(self, keyword:str, sort:bool=False, case:bool=False, prescape:bool=False, **kwargs) -> Iterable:
"""
Return an iterator of Files whose path match the given keyword within a Directory.
The search is linear and the sorting is based on the number of matches. If sorted, a list will be returned.
Case pertains to case-sensitivity
Prescape informs the method that kewords do not need to be escaped
For kwargs see help(self.gather)
"""
casesensitivity = (re.I, 0)[case]
escaper = (re.escape, id)[prescape]
if isinstance(keyword, str):
keyword = keyword.split()
if not isinstance(keyword, str):
keyword = '|'.join(map(escaper, keyword))
if sort:
return sorted(
filter(
# lambda x: re.search(keyword, x, casesensitivity),
lambda x: len([*re.finditer(keyword, x, casesensitivity)]) == len(keyword.split('|')),
self.gather(**kwargs),
),
key=lambda x: len([*re.finditer(keyword, x, casesensitivity)]),
reverse=True
)
else:
return filter(
# lambda x: re.finditer(keyword, x, casesensitivity),
lambda x: re.search(keyword, x, casesensitivity),
self.gather(**kwargs)
)
if __name__ == '__main__':
mock = 'drive: users user place subplace File.extension.secondextension'.split()
# mock = 'drive: users user place subplace readme'.split()
testPaths = [
''.join(mock),
os.path.join(*mock),
os.path.join(*mock[:4])+'/'+'/'.join(mock[4:]),
os.path.join(*mock[3:4])+'/'+'/'.join(mock[4:]),
os.path.join(*mock[:-1]),
'/'.join(mock[:-1]),
mock[-1]
]
show(zip(map(type,map(lambda x: Address(x).obj, testPaths)),testPaths))
# show(zip(testPaths,map(normalize,testPaths)))
# show(zip(testPaths,map(hasdirs,testPaths)))
# show(zip(testPaths,map(likeFile,testPaths)))
# ddemo = Address(os.path.join(*mock[:-1])).obj
# fdemo = Address(os.path.join(*mock)).obj
# ddemo = Directory(os.path.join(*mock[:-1]))
# fdemo = File(os.path.join(*mock))
fp = r'c:\users\kenneth\pyo_rec.wav'
dp = r'c:\users\kenneth\videos'
d = Directory(dp)
f = File(fp)
system = (d, f)
show(d('slowthai',sort=True))
show(d('alix melanie', sort=True))
print(formats['all'])
for v in formats.values():
print(' '.join(v))