1
0
Fork 0
This commit is contained in:
Lucidiot 2020-02-05 06:50:21 +01:00
parent 3711938565
commit 0112c4a9b3
Signed by: lucidiot
GPG Key ID: 3358C1CA6906FB8D
6 changed files with 254 additions and 78 deletions

View File

@ -1,78 +1 @@
from abc import ABCMeta, abstractmethod
import json
from website_generator.utils import Registry
class ActionMetaclass(ABCMeta):
def __new__(cls, name, *args, **kwargs):
newclass = ABCMeta.__new__(cls, name, *args, **kwargs)
if name != 'Action':
register(newclass)
return newclass
class Action(metaclass=ActionMetaclass):
def __init__(self, **params):
for name, value in params.items():
setattr(self, name, value)
@abstractmethod
def __call__(self):
pass
class ActionRegistry(Registry):
def check_key(self, key):
assert isinstance(key, str), 'Action type must be a string.'
def check_value(self, value):
assert callable(value), 'Action must be a callable.'
def register(self, key, value=None):
"""
Register a generator action.
May be used a register(action) if the action has a defined type
(using the action.type attribute), or as register(type, action)
or register(action, type) to set a custom type.
"""
if value:
try:
# Try register(type, action)
return super().register(key, value)
except (KeyError, ValueError):
# Try register(action, type)
try:
return super().register(value, key)
except (KeyError, ValueError):
pass
raise
if not getattr(key, 'type', None):
raise KeyError(
'Action does not have a type. '
'Set the type attribute or use register(type, action)'
)
return super().register(key.type, key)
registry = ActionRegistry()
register = registry.register
unregister = registry.unregister
class DebugAction(Action):
type = 'debug'
def __call__(self):
def _default(obj):
if hasattr(obj, '__dict__'):
return vars(obj)
try:
return dict(obj)
except Exception:
return str(obj)
print(json.dumps(vars(self), default=_default, indent=4))
from website_generator.actions.base import * # noqa: F401, F403

View File

@ -0,0 +1,82 @@
from abc import ABCMeta, abstractmethod
import json
from website_generator.utils import Registry
class ActionMetaclass(ABCMeta):
def __new__(cls, name, *args, **kwargs):
newclass = ABCMeta.__new__(cls, name, *args, **kwargs)
if name != 'Action':
register(newclass)
return newclass
class Action(metaclass=ActionMetaclass):
def __init__(self, **params):
for name, value in params.items():
setattr(self, name, value)
@abstractmethod
def __call__(self):
pass
class ActionRegistry(Registry):
def check_key(self, key):
assert isinstance(key, str), 'Action type must be a string.'
def check_value(self, value):
assert callable(value), 'Action must be a callable.'
def register(self, key, value=None):
"""
Register a generator action.
May be used a register(action) if the action has a defined type
(using the action.type attribute), or as register(type, action)
or register(action, type) to set a custom type.
"""
if value:
try:
# Try register(type, action)
return super().register(key, value)
except (KeyError, ValueError):
# Try register(action, type)
try:
return super().register(value, key)
except (KeyError, ValueError):
pass
raise
if not getattr(key, 'type', None):
raise KeyError(
'Action does not have a type. '
'Set the type attribute or use register(type, action)'
)
return super().register(key.type, key)
registry = ActionRegistry()
register = registry.register
unregister = registry.unregister
class DebugAction(Action):
"""
A simple action that prints its execution context,
for debugging purposes.
"""
type = 'debug'
def __call__(self):
def _default(obj):
if hasattr(obj, '__dict__'):
return vars(obj)
try:
return dict(obj)
except Exception:
return str(obj)
print(json.dumps(vars(self), default=_default, indent=4))

View File

@ -0,0 +1,29 @@
from website_generator.fields import PathMatcherField, PathField
from website_generator.config import ActionConfig
from website_generator.actions import Action
import shutil
import typesystem
class CopyActionConfig(ActionConfig):
src = PathMatcherField()
dest = PathField(allow_null=True, must_exist=False, allow_folders=True)
overwite = typesystem.Boolean(default=True)
class CopyAction(Action):
type = 'copy'
def __init__(self, **params):
config = CopyActionConfig.validate(params)
params['src'] = config.src
params['dest'] = config.dest
params['overwrite'] = config.overwrite
super().__init__(**params)
def __call__(self):
for source_path in self.src.all():
if source_path.is_dir():
shutil.copytree(str(source_path), str(self.dest))
else:
shutil.copy2(str(source_path), str(self.dest))

View File

@ -0,0 +1,44 @@
from website_generator.fields import PathMatcherField, PathField
from website_generator.config import ActionConfig
from website_generator.actions import Action
from jinja2 import Environment, FilesystemLoader, select_autoescape
import typesystem
class JinjaActionConfig(ActionConfig):
template = PathMatcherField()
dest = PathField(allow_null=True, must_exist=False, allow_folders=True)
environment = typesystem.Object(additional_properties=True, default=dict)
class JinjaAction(Action):
type = 'jinja2'
def __init__(self, **params):
config = JinjaActionConfig.validate(params)
params['template'] = config.template
params['dest'] = config.dest
super().__init__(**params)
def build_environment(self):
"""
Build a Jinja2 environment for templates.
"""
return Environment(
loader=FilesystemLoader(self.main_config.content_root),
)
def render_template(self, env, template_path, dest_path):
template = env.get_template(template_path)
def __call__(self):
env = self.build_environment()
if self.dest.is_dir() or str(self.dest).endswith('/'):
self.dest.mkdir(parents=True, exist_ok=True)
for template_path in self.src.all():
self.render_template(env, template_path, self.dest / template_path.name)
else:
template_path =
template = env.get_template(template_path)

View File

@ -1,4 +1,5 @@
from pathlib import Path
from website_generator.utils import PathMatcher
import typesystem
@ -52,3 +53,38 @@ class PathField(typesystem.String):
self.validation_error("no_dirs")
return value
class PathMatcherField(typesystem.Union):
def __init__(self, *, **kwargs):
super().__init__([
typesystem.String(),
typesystem.Array(
items=typesystem.String(),
min_items=1,
),
typesystem.Object(
properties={
'include': typesystem.Union([
typesystem.String(),
typesystem.Array(
items=typesystem.String(),
min_items=1,
),
]),
'exclude': typesystem.Union([
typesystem.String(),
typesystem.Array(
items=typesystem.String(),
min_items=1,
),
]),
},
min_properties=1,
required=['include'],
),
], **kwargs)
def validate(self, *args, **kwargs):
return PathMatcher(super().validate(*args, **kwargs))

View File

@ -1,3 +1,7 @@
from glob import glob
import fnmatch
class Registry(dict):
unregister = dict.__delitem__
@ -29,3 +33,61 @@ class Registry(dict):
def __setitem__(self, key, value):
self.check(key, value)
super().__setitem__(key, value)
class PathMatcher(object):
"""
A configurable glob.
"""
def __init__(self, config):
if isinstance(config, str):
self.include = [config, ]
elif isinstance(config, list):
self.include = config
elif not isinstance(config, dict):
raise ValueError('Invalid pattern configuration')
if not config.get('include'):
raise ValueError('At least one inclusion pattern required')
elif isinstance(config['include'], str):
self.include = [config['include'], ]
elif isinstance(config['include'], list):
self.include = config['include']
else:
raise ValueError(
'"include" must be a pattern or list of patterns'
)
config.setdefault('exclude', [])
if isinstance(config['exclude'], str):
self.exclude = [config['exclude'], ]
elif isinstance(config['exclude'], list):
self.exclude = config['exclude']
else:
raise ValueError(
'"exclude" must be a pattern or list of patterns'
)
self._found = None
def all(self):
if self._found is not None:
return self._found
self._found = set()
for include_pattern in self.include:
self._found |= set(glob(include_pattern))
for exclude_pattern in self.exclude:
self._found -= set(fnmatch.filter(self._found, exclude_pattern))
self._found = list(map(Path, self._found))
return self._found
def get(self):
if not self.all():
raise ValueError('No matching paths found')
if len(self.all()) != 1:
raise ValueError('More than one matching path found')
return self.all()[0]