Commit 7d0c97c6 authored by David Danier's avatar David Danier

Merge branch 'master' into autocreate-external-networks

parents a282e798 d005d633
stages:
- test
pytest:
image: python:3.7
stage: test
script:
- pip install pipenv
- pipenv install --system --dev
- pytest b5/tests -v
......@@ -12,6 +12,8 @@ packaging = ">=16.0"
[dev-packages]
"e1839a8" = {path = ".", editable = true}
pylint = ">=2.3.1"
pytest = ">=5.3.2"
[requires]
python_version = "3.7"
This diff is collapsed.
import argparse
import os
import termcolor
import sys
from .lib.state import State
from .lib.module import load_module
import termcolor
from .lib.argumentparser import ExecuteArgumentParser
from .exceptions import B5ExecutionError
from .lib.module import load_module
from .lib.state import State
def main():
try:
parser = argparse.ArgumentParser(
prog='b5-execute',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='b5-execute is not intended to be called directly!',
)
parser.add_argument(
'--state-file', nargs='?',
dest='state_file',
)
parser.add_argument(
'--module', nargs='?',
dest='module',
)
parser.add_argument(
'--method', nargs='?',
dest='method',
)
parser.add_argument(
'--args', nargs=argparse.REMAINDER,
dest='args'
)
args = parser.parse_args()
parser = ExecuteArgumentParser('b5-execute', 'b5-execute is not intended to be called directly!')
parser.add_arguments()
args = parser.parse()
if not args.state_file or not args.module or not args.method:
raise B5ExecutionError('b5-execute is not intended to be called directly!')
state = State.load(open(args.state_file, 'rb'))
if not 'modules' in state.config:
if 'modules' not in state.config:
raise B5ExecutionError('No modules defined')
if not args.module in state.config['modules']:
if args.module not in state.config['modules']:
raise B5ExecutionError('Module not available')
module = load_module(state, args.module)
......@@ -49,6 +31,6 @@ def main():
os.chdir(state.run_path)
method(state, args.args)
except B5ExecutionError as e:
termcolor.cprint(str(e), 'red')
except B5ExecutionError as error:
termcolor.cprint(str(error), 'red')
sys.exit(1)
import argparse
import os
import shutil
import termcolor
import subprocess
import sys
import re
import termcolor
from .lib.skeleton import Skeleton
from .lib.argumentparser import InitArgumentParser
from .exceptions import B5ExecutionError
NON_URL_SKELETON = re.compile('^[A-Za-z0-9_-]+$')
def _run_cmd(cmd, error='Command execution failed, see above'):
try:
subprocess.run(
......@@ -26,40 +23,21 @@ def _run_cmd(cmd, error='Command execution failed, see above'):
def main():
try:
parser = argparse.ArgumentParser(
prog='b5-init',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='b5-init might be used to setup new projects',
)
parser.add_argument(
'-s', '--skeleton', nargs='?',
dest='skeleton', default='basic'
)
parser.add_argument(
'-b', '--branch', nargs='?',
dest='branch',
)
parser.add_argument(
dest='path'
)
args = parser.parse_args()
parser = InitArgumentParser('b5-init', 'b5-init might be used to setup new projects')
parser.add_arguments()
args = parser.parse()
skeleton = args.skeleton
skeleton = Skeleton(args.skeleton)
branch = args.branch
path = args.path
if NON_URL_SKELETON.match(skeleton):
skeleton_url = 'https://git.team23.de/build/b5-skel-{skeleton}.git'.format(skeleton=skeleton)
else:
skeleton_url = skeleton
full_path = os.path.realpath(os.path.join(os.getcwd(), path))
os.makedirs(full_path, exist_ok=True)
if len(os.listdir(full_path)) > 0:
if os.listdir(full_path):
raise B5ExecutionError('Cannot init an existing directory if not empty')
_run_cmd(['git', 'clone', skeleton_url, full_path], 'Could not clone skeleton repository, see above')
_run_cmd(['git', 'clone', skeleton.get_url(), full_path], 'Could not clone skeleton repository, see above')
os.chdir(full_path)
if not branch is None:
_run_cmd(['git', 'checkout', branch], 'Could not checkout required branch, see above')
......@@ -78,8 +56,8 @@ def main():
shutil.rmtree(init_path)
# _run_cmd(['git', 'add', '-A'])
termcolor.cprint('Successful initialized {path}'.format(path=path), 'green')
termcolor.cprint(' skeleton used: {skeleton_url}'.format(skeleton_url=skeleton_url), 'green')
termcolor.cprint(' skeleton used: {skeleton_url}'.format(skeleton_url=skeleton.get_url()), 'green')
termcolor.cprint(' project path: {full_path}'.format(full_path=full_path), 'green')
except B5ExecutionError as e:
termcolor.cprint(str(e), 'red')
except B5ExecutionError as error:
termcolor.cprint(str(error), 'red')
sys.exit(1)
import argparse
from .detect import DETECT
class ArgumentParser:
def __init__(self, prog='b5', description='', formatter_class=argparse.ArgumentDefaultsHelpFormatter):
self.parser = argparse.ArgumentParser(
prog=prog,
formatter_class=formatter_class,
description=description
)
self.defaults = {}
def parse(self, arguments=None, help_as_default=False):
if help_as_default:
if not arguments:
arguments = ['help']
args = self.parser.parse_args(arguments)
for key, value in self.defaults.items():
if not getattr(args, key):
setattr(args, key, value)
return args
def set_default(self, key, value):
self.defaults.update({key: value})
class MainArgumentParser(ArgumentParser):
"""
class for parsing all the b5 command line arguments.
Possible command line argument:
--config -c Path to config (inside run path)
--taskfile -t Path to Taskfile (inside run path)
--project-path -p Project path if not part of parent paths, normally b5 tries to get the project path by itself
--run-path -r Path inside the project b5 will execute in (cd into)
--detect -d Version Control System detection (git or hg)
--shell -s Shell to run the generated script in (should be bash)
--quiet -q Determine if the script should print out stuff
"""
def add_arguments(self):
self.__add_argument_config()
self.__add_argument_taskfile()
self.__add_argument_project_path()
self.__add_argument_run_path()
self.__add_argument_detect()
self.__add_argument_shell()
self.__add_argument_quiet()
self.__add_argument_command()
def __add_argument_config(self):
self.parser.add_argument(
'-c', '--config',
nargs='?',
action='append',
help='Path to config (inside run path)',
dest='configfiles',
)
def __add_argument_taskfile(self):
self.parser.add_argument(
'-t', '--taskfile',
nargs='?',
action='append',
help='Path to Taskfile (inside run path)',
dest='taskfiles',
)
def __add_argument_project_path(self):
self.parser.add_argument(
'-p', '--project-path',
nargs='?',
help='Project path if not part of parent paths, normally b5 tries to get the project path by itself',
dest='project_path',
)
def __add_argument_run_path(self):
self.parser.add_argument(
'-r', '--run-path',
nargs='?',
help='Path inside the project b5 will execute in (cd into)',
dest='run_path',
default='build',
)
def __add_argument_detect(self):
self.parser.add_argument(
'-d', '--detect',
nargs='?',
help='Project detection',
choices=DETECT,
dest='detect',
default='git',
)
def __add_argument_shell(self):
self.parser.add_argument(
'-s', '--shell', nargs='?',
help='Shell to run the generated script in (should be bash)',
dest='shell',
default='/bin/bash',
)
def __add_argument_quiet(self):
self.parser.add_argument(
'-q', '--quiet',
action='store_true',
dest='quiet',
default=False,
)
self.parser.set_defaults()
def __add_argument_command(self):
self.parser.add_argument('command')
self.parser.add_argument('command_args', nargs=argparse.REMAINDER)
class InitArgumentParser(ArgumentParser):
def add_arguments(self):
self.__add_argument_skeleton()
self.__add_argument_branch()
self.__add_argument_path()
def __add_argument_skeleton(self):
self.parser.add_argument(
'-s', '--skeleton',
nargs='?',
dest='skeleton',
default='basic',
)
def __add_argument_branch(self):
self.parser.add_argument(
'-b', '--branch',
nargs='?',
dest='branch',
)
def __add_argument_path(self):
self.parser.add_argument(
dest='path'
)
class ExecuteArgumentParser(ArgumentParser):
def add_arguments(self):
self.__add_argument_state_file()
self.__add_argument_module()
self._add_argument_method()
self.__add_argument_args()
def __add_argument_state_file(self):
self.parser.add_argument(
'--state-file', nargs='?',
dest='state_file',
)
def __add_argument_module(self):
self.parser.add_argument(
'--module', nargs='?',
dest='module',
)
def _add_argument_method(self):
self.parser.add_argument(
'--method', nargs='?',
dest='method',
)
def __add_argument_args(self):
self.parser.add_argument(
'--args',
nargs=argparse.REMAINDER,
dest='args'
)
class TemplateArgumentParser(ArgumentParser):
def add_arguments(self):
self.__add_argument_overwrite()
self.__add_argument_template_file()
self.__add_argument_output_file()
def __add_argument_overwrite(self):
self.parser.add_argument(
'-o', '--overwrite', nargs='?',
help='Control if existing files should be overwritten',
dest='overwrite', default='ask',
choices=['yes', 'if-older', 'no', 'ask', 'ask-if-older']
)
def __add_argument_template_file(self):
self.parser.add_argument('template_file')
def __add_argument_output_file(self):
self.parser.add_argument('output_file', nargs='?')
import yaml
import os
import yaml
from .version import ensure_config_version
from ..exceptions import B5ExecutionError
......@@ -15,8 +16,8 @@ def find_configs(state, configs):
'config': config,
'path': config_path,
})
#if not found_configs:
# raise B5ExecutionError('No config found, tried %s inside %s' % (', '.join(configs), run_path))
# if not found_configs:
# raise B5ExecutionError('No config found, tried %s inside %s' % (', '.join(configs), run_path))
return found_configs
......@@ -40,7 +41,6 @@ def merge_config(cur_config, new_config):
return result_config
def validate_config(config):
if 'version' in config:
ensure_config_version(config['version'])
......@@ -51,8 +51,8 @@ def load_config(state):
configfiles = state.configfiles
config = {}
for configfile in configfiles:
fh = open(configfile['path'], 'r')
file_config = yaml.safe_load(fh)
file_handle = open(configfile['path'], 'r')
file_config = yaml.safe_load(file_handle)
if not isinstance(file_config, dict):
file_config = {}
config = merge_config(config, file_config)
......
......@@ -4,14 +4,14 @@ from importlib import import_module
def import_string(dotted_path):
try:
module_path, class_name = dotted_path.rsplit('.', 1)
except ValueError as err:
except ValueError:
raise ImportError("%s doesn't look like a module path" % dotted_path)
module = import_module(module_path)
try:
return getattr(module, class_name)
except AttributeError as err:
except AttributeError:
raise ImportError('Module "%s" does not define a "%s" attribute/class' % (
module_path, class_name)
)
\ No newline at end of file
)
from ..modules import MODULES
from ..exceptions import B5ExecutionError
from .importutils import import_string
from ..exceptions import B5ExecutionError
from ..modules import MODULES
def load_module(state, module_key):
if not 'modules' in state.config or not module_key in state.config['modules']:
if 'modules' not in state.config or module_key not in state.config['modules']:
raise RuntimeError('Module %s is not defined in config' % module_key)
module_config = state.config['modules'][module_key]
......@@ -17,8 +17,10 @@ def load_module(state, module_key):
module_import_path = module_class_key
if module_class_key in MODULES:
module_import_path = MODULES[module_class_key]
if not '.' in module_import_path:
raise B5ExecutionError('Module seems not to be valid (key=%s/import=%s), please check config' % (module_key, module_import_path))
if '.' not in module_import_path:
raise B5ExecutionError('Module seems not to be valid (key=%s/import=%s), please check config' %
(module_key, module_import_path)
)
try:
module_class = import_string(module_import_path)
......
import os
import re
import shlex
import tempfile
import os
from .module import load_module
RE_KEY_ESCAPE = re.compile('[^a-zA-Z0-9]+')
CONFIG_SUB = '%s_%s'
CONFIG_KEYS = '%s_KEYS'
def modules_script_source(state):
script = []
if 'modules' in state.config:
......@@ -15,12 +21,6 @@ def modules_script_source(state):
def config_script_source(config, prefix='CONFIG'):
import re
RE_KEY_ESCAPE = re.compile('[^a-zA-Z0-9]+')
CONFIG_SUB = '%s_%s'
CONFIG_KEYS = '%s_KEYS'
def _gen_config(config_node, prefix):
script = []
......@@ -85,7 +85,6 @@ def construct_script_source(state):
# run_path and parse Taskfile's
script.append('cd %s\n' % shlex.quote(state.run_path))
for taskfile in state.taskfiles:
#script.append('source %s\n' % shlex.quote(taskfile['path']))
script.append(open(taskfile['path'], 'r').read())
return '\n'.join(script)
......@@ -108,16 +107,16 @@ fi
)
class StoredScriptSource(object):
class StoredScriptSource:
def __init__(self, state, source):
self.state = state
self.source = source
self.fh = tempfile.NamedTemporaryFile(suffix='b5-compiled', delete=False)
self.fh.write(self.source.encode('utf-8'))
self.fh.close()
self.file_handle = tempfile.NamedTemporaryFile(suffix='b5-compiled', delete=False)
self.file_handle.write(self.source.encode('utf-8'))
self.file_handle.close()
def close(self):
os.unlink(self.fh.name)
os.unlink(self.file_handle.name)
def __enter__(self):
return self
......@@ -127,4 +126,4 @@ class StoredScriptSource(object):
@property
def name(self):
return self.fh.name
return self.file_handle.name
import os
import re
import urllib.request
from urllib.error import URLError
class Skeleton:
NON_URL_SKELETON = re.compile('^[A-Za-z0-9_-]+$')
def __init__(self, skeleton_identified):
self._skeleton_identified = skeleton_identified
def get_url(self):
try:
return self._url
except AttributeError:
skeleton_url = self._skeleton_identified
if self.NON_URL_SKELETON.match(skeleton_url):
skeleton_url = 'https://git.team23.de/build/b5-skel-{skeleton}.git'.format(skeleton=self._skeleton_identified)
# If it's not a public repository, clone using ssh in order to allow ssh key file auth
if not self.__is_public_repository(skeleton_url):
skeleton_url = 'git@git.team23.de:build/b5-skel-{skeleton}.git'.format(skeleton=self._skeleton_identified)
self._url = skeleton_url
return self._url
def __is_public_repository(self, url):
request = urllib.request.urlopen(url)
request_url = request.geturl()
if url == request_url or url.rsplit('.', 1)[0] == request_url:
try:
if request.getcode() == 200:
return True
except URLError:
pass
return False
import yaml
import tempfile
import os
import tempfile
import yaml
class StoredState(object):
class StoredState:
def __init__(self, state):
self.state = state
if not self.state.stored_name is None:
if self.state.stored_name is not None:
raise RuntimeError('You may only store the state once')
self.fh = tempfile.NamedTemporaryFile(suffix='b5-state', mode='w', encoding='utf-8', delete=False)
self.file_handle = tempfile.NamedTemporaryFile(suffix='b5-state', mode='w', encoding='utf-8', delete=False)
self.state.stored_name = self.name
yaml.dump({
key: getattr(self.state, key)
for key in state.KEYS
}, self.fh, default_flow_style=False)
self.fh.close()
}, self.file_handle, default_flow_style=False)
self.file_handle.close()
def close(self):
os.unlink(self.fh.name)
os.unlink(self.file_handle.name)
self.state.stored_name = None
def __enter__(self):
......@@ -29,10 +30,10 @@ class StoredState(object):
@property
def name(self):
return self.fh.name
return self.file_handle.name
class State(object):
class State:
KEYS = ('project_path', 'run_path', 'taskfiles', 'configfiles', 'config', 'args', 'stored_name')
taskfiles = []
......@@ -44,7 +45,7 @@ class State(object):
if not hasattr(self, key):
setattr(self, key, None)
for key in kwargs:
if not key in self.KEYS:
if key not in self.KEYS:
raise RuntimeError('Key %s is not a valid state attribute' % key)
setattr(self, key, kwargs[key])
......@@ -52,5 +53,5 @@ class State(object):
return StoredState(self)
@classmethod
def load(cls, fh):
return cls(**yaml.safe_load(fh))
def load(cls, file_handle):
return cls(**yaml.safe_load(file_handle))
......@@ -16,4 +16,3 @@ def find_taskfiles(state, taskfiles):
if not found_taskfiles:
raise B5ExecutionError('No Taskfiles found, tried %s inside %s' % (', '.join(taskfiles), run_path))
return found_taskfiles
import argparse
import os
import subprocess
import sys
import termcolor
from .lib.argumentparser import MainArgumentParser
from . import VERSION
from .exceptions import B5ExecutionError
from .lib.config import load_config
from .lib.detect import detect_project_path, DETECT
from .lib.taskfile import find_taskfiles
from .lib.config import find_configs
from .lib.config import load_config
from .lib.detect import detect_project_path
from .lib.script import StoredScriptSource, construct_script_source, construct_script_run
from .lib.state import State
from . import VERSION
from .lib.taskfile import find_taskfiles
def main():
try:
# Parse all arguments
parser = argparse.ArgumentParser(
prog='b5',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'-q', '--quiet',
action='store_true',
dest='quiet',
)
parser.add_argument(
'-p', '--project-path', nargs='?',
help='Project path if not part of parent paths, normally b5 tries to get the project path by itself',
dest='project_path',
)
parser.add_argument(
'-r', '--run-path', nargs='?',
help='Path inside the project b5 will execute in (cd into)',
dest='run_path', default='build',
)
parser.add_argument(
'-d', '--detect', nargs='?',
help='Project detection',
default='git', choices=DETECT,
dest='detect',
)
parser.add_argument(
'-c', '--config', nargs='?', action='append',
help='Path to config (inside run path)',
dest='configfiles',
)
parser.add_argument(
'-t', '--taskfile', nargs='?', action='append',
help='Path to Taskfile (inside run path)',
dest='taskfiles',
)
parser.add_argument(
'-s', '--shell', nargs='?',
help='Shell to run the generated script in (should be bash)',
dest='shell',
default='/bin/bash',
)
parser.add_argument('command')
parser.add_argument('command_args', nargs=argparse.REMAINDER)
parser.set_defaults(quiet=False)
sys_args = sys.argv[1:]
if not sys_args: