1
0
Fork 0

Base implementation

This commit is contained in:
Lucidiot 2019-08-25 05:08:13 +02:00
parent 96455d3816
commit 3711938565
Signed by: lucidiot
GPG Key ID: 3358C1CA6906FB8D
6 changed files with 315 additions and 0 deletions

29
website_generator/__main__.py Executable file
View File

@ -0,0 +1,29 @@
import argparse
from website_generator import WebsiteGenerator
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'config',
type=argparse.FileType(),
help='Path to a YAML configuration file.',
)
parser.add_argument(
'-t', '--tag',
type=str,
action='append',
help='Restrict execution of actions to one or more tags.',
dest='tags',
)
return parser.parse_args()
def main():
args = parse_args()
generator = WebsiteGenerator(args.config.read())
generator()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,78 @@
from abc import ABCMeta, abstractmethod
import json
from website_generator.utils import Registry
class ActionMetaclass(ABCMeta):
def __new__(cls, name, *args, **kwargs):
newclass = ABCMeta.__new__(cls, name, *args, **kwargs)
if name != 'Action':
register(newclass)
return newclass
class Action(metaclass=ActionMetaclass):
def __init__(self, **params):
for name, value in params.items():
setattr(self, name, value)
@abstractmethod
def __call__(self):
pass
class ActionRegistry(Registry):
def check_key(self, key):
assert isinstance(key, str), 'Action type must be a string.'
def check_value(self, value):
assert callable(value), 'Action must be a callable.'
def register(self, key, value=None):
"""
Register a generator action.
May be used a register(action) if the action has a defined type
(using the action.type attribute), or as register(type, action)
or register(action, type) to set a custom type.
"""
if value:
try:
# Try register(type, action)
return super().register(key, value)
except (KeyError, ValueError):
# Try register(action, type)
try:
return super().register(value, key)
except (KeyError, ValueError):
pass
raise
if not getattr(key, 'type', None):
raise KeyError(
'Action does not have a type. '
'Set the type attribute or use register(type, action)'
)
return super().register(key.type, key)
registry = ActionRegistry()
register = registry.register
unregister = registry.unregister
class DebugAction(Action):
type = 'debug'
def __call__(self):
def _default(obj):
if hasattr(obj, '__dict__'):
return vars(obj)
try:
return dict(obj)
except Exception:
return str(obj)
print(json.dumps(vars(self), default=_default, indent=4))

View File

@ -0,0 +1,36 @@
from typesystem.fields import Const
import typesystem
from website_generator.fields import PathField
from website_generator.actions import registry
definitions = typesystem.SchemaDefinitions()
class ActionConfig(typesystem.Schema, definitions=definitions):
name = typesystem.String()
type = typesystem.Choice(choices=registry.keys())
tags = typesystem.Array(
items=typesystem.String(),
unique_items=True,
allow_null=True,
default=list,
)
class Config(typesystem.Schema, definitions=definitions):
# TODO: Use a Version field to parse into distutils.version.LooseVersion
version = Const(const='0.1')
content_root = PathField(allow_files=False, allow_folders=True)
output_root = PathField(allow_files=False, allow_folders=True)
public_root = PathField(must_exist=False, allow_null=True, default='/')
actions = typesystem.Array(
items=typesystem.Reference(to='ActionConfig'),
min_items=1,
)
use = typesystem.Array(
items=typesystem.String(),
unique_items=True,
allow_null=True,
default=list,
)
logging = typesystem.Object(additional_properties=True, allow_null=True)

View File

@ -0,0 +1,54 @@
from pathlib import Path
import typesystem
class PathField(typesystem.String):
errors = {
'type': 'Must be a string or instance of pathlib.Path.',
'null': 'May not be null.',
'blank': 'Must not be blank.',
'max_length': 'Must have no more than {max_length} characters.',
'min_length': 'Must have at least {min_length} characters.',
'pattern': 'Must match the pattern /{pattern}/.',
'format': 'Must be a valid {format}.',
'not_found': 'Path does not exist.',
'no_files': 'Path must not point to a file.',
'no_dirs': 'Path must not point to a directory.',
'no_symlinks': 'Path must not point to a symbolic link.',
}
def __init__(self,
*,
must_exist=True,
allow_files=True,
allow_folders=False,
allow_symlinks=True,
**kwargs):
super().__init__(**kwargs)
self.must_exist = must_exist
self.allow_files = must_exist and allow_files
self.allow_folders = must_exist and allow_folders
self.allow_symlinks = must_exist and allow_symlinks
def validate(self, value, *, strict=False):
if isinstance(value, Path):
value = str(value)
value = super().validate(value, strict=strict)
if not value:
return value
value = Path(value)
if not self.allow_symlinks and value.is_symlink():
self.validation_error("no_symlinks")
value = value.expanduser().resolve()
if self.must_exist and not value.exists():
self.validation_error("not_found")
if not self.allow_files and value.is_file():
self.validation_error("no_files")
if not self.allow_folders and value.is_dir():
self.validation_error("no_dirs")
return value

View File

@ -0,0 +1,87 @@
from importlib import import_module
from itertools import chain
from time import time
from typing import Union
import logging.config
import typesystem
from website_generator.config import Config
from website_generator.actions import registry
DEFAULT_LOGGING = {
'version': 1,
'formatters': {
'print': {
'format': '%(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'print',
'level': 'INFO',
'stream': 'ext://sys.stdout',
},
},
'loggers': {
'website_generator': {
'level': 'INFO',
'propagate': True,
'handlers': ['console'],
},
},
}
logger = logging.getLogger(__name__)
class WebsiteGenerator(object):
def __init__(self, config: Union[str, Config]):
if not isinstance(config, Config):
config = typesystem.validate_yaml(
config,
validator=Config,
)
self.config = config
def __call__(self, tags=[]):
logging.config.dictConfig(self.config.logging or DEFAULT_LOGGING)
logger.debug('Configuration version {!s}'.format(self.config.version))
logger.debug('Content root: {!s}'.format(self.config.content_root))
logger.debug('Output root: {!s}'.format(self.config.output_root))
if self.config.use:
logger.debug('Loading modules…')
for name in self.config.use:
logger.debug('Loading {}'.format(name))
import_module(name)
action_configs = self.config.actions
if tags:
action_configs = list(filter(
lambda a: any((tag in a.tags for tag in tags)),
action_configs,
))
else:
tags = list(set(chain(*[
action.tags for action in action_configs
])))
logger.info('Running {} actions'.format(len(action_configs)))
start_time = time()
for action_config in action_configs:
logger.info(" {}".format(action_config.name))
action_class = registry[action_config.type]
action = action_class(
main_config=self.config,
**action_config,
)
action()
logger.info(" {}: done".format(action_config.name))
run_time = time() - start_time
logger.info('Completed {} actions in {:.1f} seconds'.format(
len(action_configs),
run_time,
))

View File

@ -0,0 +1,31 @@
class Registry(dict):
unregister = dict.__delitem__
def check_key(self, key):
pass
def check_value(self, value):
pass
def check(self, key, value):
try:
self.check_key(key)
except KeyError:
raise
except Exception as e:
raise KeyError(e.message)
try:
self.check_value(value)
except ValueError:
raise
except Exception as e:
raise ValueError(e.message)
def register(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
self.check(key, value)
super().__setitem__(key, value)