#
# Copyright (C) OpenCyphal Development Team <opencyphal.org>
# Copyright Amazon.com Inc. or its affiliates.
# SPDX-License-Identifier: MIT
#
"""
jinja-based :class:`~nunavut.generators.AbstractGenerator` implementation.
"""
import abc
import datetime
import io
import logging
import re
import shutil
from pathlib import Path
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, TextIO, Tuple, Type, Union
import pydsdl
import nunavut.lang
from .._generators import AbstractGenerator
from .._postprocessors import FilePostProcessor, LinePostProcessor, PostProcessor
from .._utilities import TEMPLATE_SUFFIX, ResourceSearchPolicy, ResourceType, YesNoDefault
from .environment import CodeGenEnvironment, CodeGenEnvironmentBuilder
from .loaders import DEFAULT_TEMPLATE_PATH, DSDLSupportTemplateLoader, DSDLTemplateLoader
logger = logging.getLogger(__name__)
# +---------------------------------------------------------------------------+
# | JINJA : CodeGenerator
# +---------------------------------------------------------------------------+
[docs]
class CodeGenerator(AbstractGenerator):
"""
Abstract base class for all Generators that build source code using Jinja templates.
:param int resource_types: A bitfield of :class:`nunavut._utilities.ResourceType` for filtering the
types of resources this generator will emit.
:param nunavut.Namespace namespace: The top-level namespace to generates code
at and from.
:param YesNoDefault generate_namespace_types: Set to YES to emit files for namespaces.
NO will suppress namespace file generation and DEFAULT will
use the language's preference.
:param templates_dir: Directories containing jinja templates. These will be available along
with any built-in templates provided by the target language. The templates
at these paths will take precedence masking any built-in templates
where the names are the same. See :class:`jinja2.ChoiceLoader` for rules
on the lookup hierarchy.
:type templates_dir: Optional[Union[Path,List[Path]]]
:param Optional[Type[DSDLTemplateLoader]] template_loader:
If None uses an internal default implementation of :class:`DSDLTemplateLoader` otherwise instantiates the given
class with the arguments specified by the :class:`DSDLTemplateLoader` constructor.
:param bool followlinks: If True then symbolic links will be followed when
searching for templates.
:param bool trim_blocks: If this is set to True the first newline after a
block is removed (block, not variable tag!).
:param bool lstrip_blocks: If this is set to True leading spaces and tabs
are stripped from the start of a line to a block.
Defaults to False.
:param Dict[str, Callable] additional_filters: Optional jinja filters to add to the
global environment using the key as the filter name
and the callable as the filter.
:param Dict[str, Callable] additional_tests: Optional jinja tests to add to the
global environment using the key as the test name
and the callable as the test.
:param Dict[str, Any] additional_globals: Optional objects to add to the template
environment globals collection.
:param post_processors: A list of :class:`nunavut.postprocessors.PostProcessor`
:type post_processors: Optional[List[nunavut.postprocessors.PostProcessor]]
:param builtin_template_path: If provided overrides the folder name under which built-in templates are loaded from
within a target language's package (i.e. ignored if no target language is
specified). For example, if the target language is ``c`` and this parameter
was set to ``foo`` then built-in templates would be loaded from
``nunavut.lang.c.foo``.
:type builtin_template_path: str
:param search_policy: The policy to use when searching for templates.
:type search_policy: ResourceSearchPolicy
:param embed_auditing_info: If True then the generator will embed auditing information in the generated code.
:type embed_auditing_info: bool
:raises RuntimeError: If any additional filter or test attempts to replace a built-in
or otherwise already defined filter or test.
"""
@staticmethod
def __augment_post_processors_with_ln_limit_empty_lines(
post_processors: Optional[List[PostProcessor]], limit_empty_lines: int
) -> List[PostProcessor]:
"""
Subroutine of _handle_post_processors method.
"""
from nunavut._postprocessors import LimitEmptyLines # pylint: disable=import-outside-toplevel
if post_processors is None:
post_processors = [LimitEmptyLines(limit_empty_lines)]
else:
found_pp = False
for pp in post_processors:
if isinstance(pp, LimitEmptyLines):
found_pp = True
break
if not found_pp:
post_processors.append(LimitEmptyLines(limit_empty_lines))
return post_processors
@staticmethod
def __augment_post_processors_with_ln_trim_trailing_whitespace(
post_processors: Optional[List[PostProcessor]],
) -> List[PostProcessor]:
"""
Subroutine of _handle_post_processors method.
"""
from nunavut._postprocessors import TrimTrailingWhitespace # pylint: disable=import-outside-toplevel
if post_processors is None:
post_processors = [TrimTrailingWhitespace()]
else:
found_pp = False
for pp in post_processors:
if isinstance(pp, TrimTrailingWhitespace):
found_pp = True
break
if not found_pp:
post_processors.append(TrimTrailingWhitespace())
return post_processors
@classmethod
def _handle_post_processors(
cls,
target_language: "nunavut.lang.Language",
post_processors: Optional[List["PostProcessor"]],
) -> Optional[List["PostProcessor"]]:
"""
Used by constructor to process an optional list of post-processors and to augment or create this list
if needed to support language options.
"""
try:
limit_empty_lines = target_language.get_config_value("limit_empty_lines")
post_processors = cls.__augment_post_processors_with_ln_limit_empty_lines(
post_processors, int(limit_empty_lines)
)
except KeyError:
pass
if target_language.get_config_value_as_bool("trim_trailing_whitespace"):
post_processors = cls.__augment_post_processors_with_ln_trim_trailing_whitespace(post_processors)
return post_processors
def __init__(
self,
namespace: nunavut.Namespace,
resource_types: int = ResourceType.ANY.value,
generate_namespace_types: YesNoDefault = YesNoDefault.DEFAULT,
templates_dir: Optional[Union[Path, List[Path]]] = None,
template_loader: Optional[Type[DSDLTemplateLoader]] = None,
followlinks: bool = False,
trim_blocks: bool = False,
lstrip_blocks: bool = False,
additional_filters: Optional[Dict[str, Callable]] = None,
additional_tests: Optional[Dict[str, Callable]] = None,
additional_globals: Optional[Dict[str, Any]] = None,
post_processors: Optional[List["PostProcessor"]] = None,
builtin_template_path: str = DEFAULT_TEMPLATE_PATH,
search_policy: ResourceSearchPolicy = ResourceSearchPolicy.FIND_ALL,
embed_auditing_info: bool = False,
**kwargs: Any,
):
super().__init__(namespace, resource_types, generate_namespace_types, **kwargs)
if templates_dir is not None and not isinstance(templates_dir, list):
templates_dir = [templates_dir]
language_context = self._namespace.get_language_context()
target_language = language_context.get_target_language()
if template_loader is None:
template_loader = DSDLTemplateLoader
self._dsdl_template_loader = template_loader(
namespace=namespace,
resource_types=resource_types,
templates_dirs=templates_dir,
followlinks=followlinks,
builtin_template_path=builtin_template_path,
search_policy=search_policy,
**kwargs,
)
self._post_processors = self._handle_post_processors(target_language, post_processors)
env_builder = (
CodeGenEnvironmentBuilder(self._dsdl_template_loader)
.set_trim_blocks(trim_blocks)
.set_lstrip_blocks(lstrip_blocks)
)
if additional_filters is not None:
env_builder.add_filters(**additional_filters)
if additional_tests is not None:
env_builder.add_tests(**additional_tests)
if additional_globals is not None:
env_builder.add_globals(**additional_globals)
env_builder.set_embed_auditing_info(embed_auditing_info)
self._env = self._create_environment(env_builder, language_context)
def _create_environment(
self, env_builder: CodeGenEnvironmentBuilder, language_context: nunavut.lang.LanguageContext
) -> CodeGenEnvironment:
"""
Create the code generation environment for this generator.
This is a template method that subclasses can override to customize the environment
with generator-specific filters, tests, and globals. The base implementation creates
an environment with language-specific filters only.
:param env_builder: The environment builder with basic configuration already set.
:param language_context: The language context for this generator.
:return: A configured CodeGenEnvironment instance.
"""
return env_builder.create(language_context)
@property
def dsdl_loader(self) -> DSDLTemplateLoader:
"""
The template loader used by this generator.
"""
return self._dsdl_template_loader
@property
def language_context(self) -> nunavut.lang.LanguageContext:
"""
The language context used by this generator.
"""
return self._namespace.get_language_context()
@property
def environment(self) -> CodeGenEnvironment:
"""
The generator environment.
"""
return self._env
# +-----------------------------------------------------------------------+
# | PROTECTED
# +-----------------------------------------------------------------------+
def _handle_overwrite(self, output_path: Path, allow_overwrite: bool) -> None:
if output_path.exists():
if allow_overwrite:
output_path.chmod(output_path.stat().st_mode | 0o220)
else:
raise PermissionError("{output_path} exists and allow_overwrite is False.")
# +-----------------------------------------------------------------------+
# | AbstractGenerator
# +-----------------------------------------------------------------------+
[docs]
def get_templates(self) -> Iterable[Path]:
"""
Enumerate all templates found in the templates path.
:data:`~TEMPLATE_SUFFIX` as the suffix for the filename.
:return: A list of paths to all templates found by this Generator object.
"""
return self._dsdl_template_loader.get_templates()
[docs]
@abc.abstractmethod
def generate_all(
self,
is_dryrun: bool = False,
allow_overwrite: bool = True,
) -> Iterable[Path]:
raise NotImplementedError()
# +-----------------------------------------------------------------------+
# | PRIVATE
# +-----------------------------------------------------------------------+
@staticmethod
def _filter_and_write_line(
line_and_lineend: Tuple[str, str],
output_file: TextIO,
line_pps: List["LinePostProcessor"],
) -> None:
for line_pp in line_pps:
line_and_lineend = line_pp(line_and_lineend)
if line_and_lineend is None:
raise ValueError(
"line post processor must return a 2-tuple. To elide a line return a tuple of empty"
"strings. None is not a valid value."
)
output_file.write(line_and_lineend[0])
output_file.write(line_and_lineend[1])
@classmethod
def _generate_with_line_buffer(
cls,
output_file: TextIO,
template_gen: Generator[str, None, None],
line_pps: List["LinePostProcessor"],
) -> None:
newline_pattern = re.compile(r"\n|\r\n", flags=re.MULTILINE)
line_buffer = io.StringIO()
for part in template_gen:
search_pos = 0 # type: int
match_obj = newline_pattern.search(part, search_pos)
while True:
if search_pos < 0 or search_pos >= len(part):
break
if match_obj is None:
line_buffer.write(part[search_pos:])
break
# We have a newline
line_buffer.write(part[search_pos : match_obj.start()])
newline_chars = part[match_obj.start() : match_obj.end()]
line = line_buffer.getvalue() # type: str
line_buffer = io.StringIO()
cls._filter_and_write_line((line, newline_chars), output_file, line_pps)
search_pos = match_obj.end()
match_obj = newline_pattern.search(part, search_pos)
remainder = line_buffer.getvalue()
if len(remainder) > 0:
cls._filter_and_write_line((remainder, ""), output_file, line_pps)
def _generate_code(
self,
output_path: Path,
template_gen: Generator[str, None, None],
allow_overwrite: bool,
) -> None:
"""
Logic that should run from _generate_type iff is_dryrun is False.
"""
try:
self._env.now_utc = datetime.datetime.now(datetime.UTC) # type: ignore
except AttributeError:
self._env.now_utc = datetime.datetime.utcnow()
from ..lang._common import UniqueNameGenerator # pylint: disable=import-outside-toplevel
# reset the name generator state for this type
UniqueNameGenerator.reset()
# Predetermine the post processor types.
line_pps = [] # type: List['LinePostProcessor']
file_pps = [] # type: List['FilePostProcessor']
if self._post_processors is not None:
for pp in self._post_processors:
if isinstance(pp, LinePostProcessor):
line_pps.append(pp)
elif isinstance(pp, FilePostProcessor):
file_pps.append(pp)
else:
raise ValueError(f"PostProcessor type {type(pp)} is unknown.")
logger.debug("Using post-processors: %r %r", line_pps, file_pps)
self._handle_overwrite(output_path, allow_overwrite)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(str(output_path), "w", encoding="utf-8") as output_file:
if len(line_pps) > 0:
# The logic gets much more complex when doing line post-processing.
self._generate_with_line_buffer(output_file, template_gen, line_pps)
else:
for part in template_gen:
output_file.write(part)
for file_pp in file_pps:
output_path = file_pp(output_path)
# +---------------------------------------------------------------------------+
# | JINJA : DSDLCodeGenerator
# +---------------------------------------------------------------------------+
[docs]
class DSDLCodeGenerator(CodeGenerator):
"""
:class:`~CodeGenerator` implementation that generates code for a given set
of DSDL types.
"""
# +-----------------------------------------------------------------------+
# | JINJA : filters
# +-----------------------------------------------------------------------+
[docs]
@staticmethod
def filter_yamlfy(value: Any) -> str:
"""
Filter to, optionally, emit a dump of the dsdl input as a yaml document.
Available as ``yamlfy`` in all template environments.
Example::
/*
{{ T | yamlfy }}
*/
Result Example (truncated for brevity)::
/*
!!python/object:pydsdl.StructureType
_attributes:
- !!python/object:pydsdl.Field
_serializable: !!python/object:pydsdl.UnsignedIntegerType
_bit_length: 16
_cast_mode: &id001 !!python/object/apply:pydsdl.CastMode
- 0
_name: value
*/
:param value: The input value to parse as yaml.
:return: If a yaml parser is available, a pretty dump of the given value as yaml.
If a yaml parser is not available then an empty string is returned.
"""
from yaml import Dumper as YamlDumper # pylint: disable=import-outside-toplevel
from yaml import dump as yaml_dump # pylint: disable=import-outside-toplevel
return str(yaml_dump(value, Dumper=YamlDumper))
[docs]
def filter_type_to_template(self, value: Any) -> str:
"""
Template for type resolution as a filter. Available as ``type_to_template``
in all template environments.
Example::
{%- for attribute in T.attributes %}
{%* include attribute.data_type | type_to_template %}
{%- if not loop.last %},{% endif %}
{%- endfor %}
:param value: The input value to change into a template include path.
:return: A path to a template named for the type with :data:`TEMPLATE_SUFFIX`
"""
result = self.dsdl_loader.type_to_template(type(value))
if result is None:
raise RuntimeError(f"No template found for type {value}")
return result.name
[docs]
def filter_type_to_include_path(self, value: Any, resolve: bool = False) -> str:
"""
Emits an include path to the output target for a given type.
Example::
# include "{{ T.my_type | type_to_include_path }}"
Result Example:
# include "foo/bar/my_type.h"
:param Any value: The type to emit an include for.
:param bool resolve: If True the path returned will be absolute else the path will
be relative to the folder of the root namespace.
:return: A string path to output file for the type.
"""
include_path = self.namespace.find_output_path_for_type(value)
if resolve:
return include_path.resolve().as_posix()
else:
return include_path.relative_to(self.namespace.output_folder.parent).as_posix()
[docs]
@staticmethod
def filter_typename(value: Any) -> str:
"""
Filters a given token as its type name. Available as ``typename``
in all template environments.
This example supposes that ``T.some_value == "some string"``
Example::
{{ T.some_value | typename }}
Result Example::
str
:param value: The input value to filter into a type name.
:return: The ``__name__`` of the python type.
"""
return type(value).__name__
[docs]
@staticmethod
def filter_alignment_prefix(offset: pydsdl.BitLengthSet) -> str:
"""
Provides a string prefix based on a given :class:`pydsdl.BitLengthSet`.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
import pydsdl
.. code-block:: python
# Given
B = pydsdl.BitLengthSet(32)
# and
template = '{{ B | alignment_prefix }}'
# outputs
rendered = 'aligned'
.. invisible-code-block: python
jinja_filter_tester(DSDLCodeGenerator.filter_alignment_prefix, template, rendered, 'py', B=B)
.. code-block:: python
# Given
B = pydsdl.BitLengthSet(32)
B += 1
# and
template = '{{ B | alignment_prefix }}'
# outputs
rendered = 'unaligned'
.. invisible-code-block: python
jinja_filter_tester(DSDLCodeGenerator.filter_alignment_prefix, template, rendered, 'py', B=B)
:param pydsdl.BitLengthSet offset: A bit length set to test for alignment.
:return: 'aligned' or 'unaligned' based on the state of the ``offset`` argument.
"""
if isinstance(offset, pydsdl.BitLengthSet):
return "aligned" if offset.is_aligned_at_byte() else "unaligned"
else: # pragma: no cover
raise TypeError(f"Expected BitLengthSet, got {type(offset).__name__}")
[docs]
@staticmethod
def filter_bit_length_set(values: Optional[Union[Iterable[int], int]]) -> pydsdl.BitLengthSet:
"""
Convert an integer or a list of integers into a :class:`pydsdl.BitLengthSet`.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
import pydsdl
assert type(DSDLCodeGenerator.filter_bit_length_set(23)) == pydsdl.BitLengthSet
"""
return pydsdl.BitLengthSet(values)
[docs]
@staticmethod
def filter_remove_blank_lines(text: str) -> str:
"""
Remove blank lines from the supplied string.
Lines that contain only whitespace characters are also considered blank.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
import pydsdl
.. code-block:: python
# Given
text = '''123
456
\t
\v\f
789'''
# and
template = '{{ text | remove_blank_lines }}'
# then the black lines will be removed leaving...
rendered = '''123
456
789'''
.. invisible-code-block: python
jinja_filter_tester(DSDLCodeGenerator.filter_remove_blank_lines, template, rendered, 'c', text=text)
"""
return re.sub(r"\n([ \t\f\v]*\n)+", r"\n", text)
[docs]
@staticmethod
def filter_bits2bytes_ceil(n_bits: int) -> int:
"""
Implements ``int(ceil(x/8)) | x >= 0``.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
assert DSDLCodeGenerator.filter_bits2bytes_ceil(50) == 7
assert DSDLCodeGenerator.filter_bits2bytes_ceil(8) == 1
assert DSDLCodeGenerator.filter_bits2bytes_ceil(7) == 1
assert DSDLCodeGenerator.filter_bits2bytes_ceil(1) == 1
assert DSDLCodeGenerator.filter_bits2bytes_ceil(0) == 0
"""
if n_bits < 0:
raise ValueError("The number of bits cannot be negative")
return (int(n_bits) + 7) // 8
[docs]
@staticmethod
def filter_text_table(data: Dict, start_each_line: str, column_sep: str = " : ", line_end: str = "\n") -> str:
"""
Create a text table from a dictionary of data.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
import pydsdl
.. code-block:: python
# Given
table = {
"banana": "yellow",
"apple": "red",
"grape": "purple"
}
# and
template = '''
{{ table | text_table("// ", " | ", "\\n") }}'''
# then
rendered = '''
// banana | yellow
// apple | red
// grape | purple'''
.. invisible-code-block: python
jinja_filter_tester(DSDLCodeGenerator.filter_text_table, template, rendered, 'c', table=table)
"""
# Find the longest key to set the width of the first column
key_width = max(len(key) for key in data.keys())
output = []
for key, value in data.items():
output.append(f"{start_each_line}{key:<{key_width}}{column_sep}{value}".rstrip())
return line_end.join(output)
# +-----------------------------------------------------------------------+
# | JINJA : tests
# +-----------------------------------------------------------------------+
[docs]
@staticmethod
def is_None(value: Any) -> bool: # pylint: disable=invalid-name
"""
Tests if a value is ``None``
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
assert DSDLCodeGenerator.is_None(None) is True
assert DSDLCodeGenerator.is_None(1) is False
"""
return value is None
[docs]
@staticmethod
def is_saturated(t: pydsdl.PrimitiveType) -> bool:
"""
Tests if a type is a saturated type or not.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
from unittest.mock import MagicMock
import pydsdl
import pytest
saturated_mock = MagicMock(spec=pydsdl.PrimitiveType)
saturated_mock.cast_mode = pydsdl.PrimitiveType.CastMode.SATURATED
assert DSDLCodeGenerator.is_saturated(saturated_mock) is True
truncated_mock = MagicMock(spec=pydsdl.PrimitiveType)
truncated_mock.cast_mode = pydsdl.PrimitiveType.CastMode.TRUNCATED
assert DSDLCodeGenerator.is_saturated(truncated_mock) is False
with pytest.raises(TypeError):
DSDLCodeGenerator.is_saturated(MagicMock(spec=pydsdl.SerializableType))
"""
if isinstance(t, pydsdl.PrimitiveType):
return {
pydsdl.PrimitiveType.CastMode.SATURATED: True,
pydsdl.PrimitiveType.CastMode.TRUNCATED: False,
}[t.cast_mode]
else:
raise TypeError(f"Cast mode is not defined for {type(t).__name__}")
[docs]
@staticmethod
def is_service_request(instance: pydsdl.Any) -> bool:
"""
Tests if a type is request type of a service type.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
from unittest.mock import MagicMock
import pydsdl
service_request_mock = MagicMock(spec=pydsdl.SerializableType)
service_request_mock.has_parent_service = True
service_request_mock.full_name = 'foo.bar.Service_1_0.Request'
assert DSDLCodeGenerator.is_service_request(service_request_mock) is True
service_request_mock.has_parent_service = False
assert DSDLCodeGenerator.is_service_request(service_request_mock) is False
service_request_mock.has_parent_service = True
service_request_mock.full_name = 'foo.bar.Service_1_0.Response'
assert DSDLCodeGenerator.is_service_request(service_request_mock) is False
"""
return instance.has_parent_service and instance.full_name.split(".")[-1] == "Request" # type: ignore
[docs]
@staticmethod
def is_service_response(instance: pydsdl.Any) -> bool:
"""
Tests if a type is response type of a service type.
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
from unittest.mock import MagicMock
import pydsdl
service_request_mock = MagicMock(spec=pydsdl.SerializableType)
service_request_mock.has_parent_service = True
service_request_mock.full_name = 'foo.bar.Service_1_0.Response'
assert DSDLCodeGenerator.is_service_response(service_request_mock) is True
service_request_mock.has_parent_service = False
assert DSDLCodeGenerator.is_service_response(service_request_mock) is False
service_request_mock.has_parent_service = True
service_request_mock.full_name = 'foo.bar.Service_1_0.Request'
assert DSDLCodeGenerator.is_service_response(service_request_mock) is False
"""
return instance.has_parent_service and instance.full_name.split(".")[-1] == "Response" # type: ignore
[docs]
@staticmethod
def is_deprecated(instance: pydsdl.Any) -> bool:
"""
Tests if a type is marked as deprecated
.. invisible-code-block: python
from nunavut.jinja import DSDLCodeGenerator
from unittest.mock import MagicMock
import pydsdl
composite_type_mock = MagicMock(spec=pydsdl.CompositeType)
composite_type_mock.deprecated = True
assert DSDLCodeGenerator.is_deprecated(composite_type_mock) is True
array_type_mock = MagicMock(spec=pydsdl.ArrayType)
array_type_mock.element_type = composite_type_mock
assert DSDLCodeGenerator.is_deprecated(array_type_mock) is True
other_type_mock = MagicMock(spec=pydsdl.SerializableType)
assert DSDLCodeGenerator.is_deprecated(other_type_mock) is False
"""
if isinstance(instance, pydsdl.CompositeType):
return instance.deprecated # type: ignore
elif isinstance(instance, pydsdl.ArrayType) and isinstance(instance.element_type, pydsdl.CompositeType):
return instance.element_type.deprecated # type: ignore
else:
return False
# +-----------------------------------------------------------------------+
def __init__(self, namespace: nunavut.Namespace, resource_types: int = ResourceType.ANY.value, **kwargs: Any):
super().__init__(namespace, resource_types=resource_types, **kwargs)
def _create_environment(
self, env_builder: CodeGenEnvironmentBuilder, language_context: nunavut.lang.LanguageContext
) -> CodeGenEnvironment:
"""
Create the environment with DSDL-specific filters and tests.
This override adds DSDL-specific functionality like type_to_template, type_to_include_path,
and DSDL tests (is_service_request, is_service_response, etc.) that should only be
available in DSDL type templates, not in support templates.
:param env_builder: The environment builder with basic configuration already set.
:param language_context: The language context for this generator.
:return: A configured CodeGenEnvironment with DSDL-specific additions.
"""
env = super()._create_environment(env_builder, language_context)
# Add DSDL-specific tests (is_service_request, is_service_response, etc.)
for test_name, test in self._create_all_dsdl_tests().items():
env.add_test(test_name, test)
# Add DSDL-specific filters (type_to_template, type_to_include_path, etc.)
env.add_conventional_methods_to_environment(self)
return env
# +-----------------------------------------------------------------------+
# | AbstractGenerator
# +-----------------------------------------------------------------------+
[docs]
def get_templates(self) -> Iterable[Path]:
if (self.resource_types & ResourceType.ONLY.value) == ResourceType.ONLY.value:
# This generator doesn't generate resources and the "only resources" flag is set.
return []
return super().get_templates()
[docs]
def generate_all(
self,
is_dryrun: bool = False,
allow_overwrite: bool = True,
) -> Iterable[Path]:
generated = [] # type: List[Path]
if (self.resource_types & ResourceType.ONLY.value) == ResourceType.ONLY.value:
# This generator doesn't generate resources and the "only resources" flag is set.
# We'll set the dryrun flag to True to avoid generating anything but we'll still return the list of
# types that would have been generated.
is_dryrun = True
provider = self.namespace.get_all_types if self.generate_namespace_types else self.namespace.get_all_datatypes
for parsed_type, output_path in provider():
logger.info("Generating: %s", parsed_type)
generated.append(self._generate_type(parsed_type, output_path, is_dryrun, allow_overwrite))
generated.extend(self._generate_index_files(is_dryrun, allow_overwrite))
return generated
# +-----------------------------------------------------------------------+
# | PRIVATE
# +-----------------------------------------------------------------------+
@classmethod
def _create_instance_tests_for_type(cls, root: pydsdl.Any) -> Dict[str, Callable]:
tests = dict()
def _field_is_instance(field_or_datatype: pydsdl.Any) -> bool:
if isinstance(field_or_datatype, pydsdl.Attribute):
return isinstance(field_or_datatype.data_type, root)
else:
return isinstance(field_or_datatype, root)
tests[root.__name__] = _field_is_instance
root_name_lower = root.__name__.lower()
if len(root_name_lower) > 4 and root_name_lower.endswith("type"):
tests[root_name_lower[:-4]] = _field_is_instance
elif len(root_name_lower) > 5 and root_name_lower.endswith("field"):
tests[root_name_lower[:-5]] = _field_is_instance
else:
tests[root_name_lower] = _field_is_instance
for derived in root.__subclasses__():
tests.update(cls._create_instance_tests_for_type(derived))
return tests
@classmethod
def _create_all_dsdl_tests(cls) -> Mapping[str, Callable]:
"""
Create a collection of jinja tests for all base dsdl types.
.. invisible-code-block: python
import pydsdl
from unittest.mock import MagicMock
from nunavut.jinja import DSDLCodeGenerator
test_set = DSDLCodeGenerator._create_all_dsdl_tests()
def _do_pydsdl_instance_test_test(pydsdl_obj, test_name):
if not test_set[test_name](pydsdl_obj):
raise AssertionError(test_name)
def _do_pydsdl_instance_test_tests(pydsdl_type):
mock_instance = MagicMock(spec=pydsdl_type)
_do_pydsdl_instance_test_test(mock_instance, pydsdl_type.__name__)
if pydsdl_type.__name__.endswith('Type'):
_do_pydsdl_instance_test_test(mock_instance, pydsdl_type.__name__[:-4].lower())
if pydsdl_type.__name__.endswith('Field'):
_do_pydsdl_instance_test_test(mock_instance, pydsdl_type.__name__[:-5].lower())
mock_attribute = MagicMock(spec=pydsdl.Attribute)
mock_attribute.data_type = mock_instance
_do_pydsdl_instance_test_test(mock_attribute, pydsdl_type.__name__)
_do_pydsdl_instance_test_tests(pydsdl.SerializableType)
_do_pydsdl_instance_test_tests(pydsdl.PrimitiveType)
_do_pydsdl_instance_test_tests(pydsdl.IntegerType)
_do_pydsdl_instance_test_tests(pydsdl.ServiceType)
"""
all_tests = dict()
all_tests.update(cls._create_instance_tests_for_type(pydsdl.SerializableType))
all_tests.update(cls._create_instance_tests_for_type(pydsdl.Attribute))
return all_tests
def _generate_type(
self, input_type: pydsdl.CompositeType, output_path: Path, is_dryrun: bool, allow_overwrite: bool
) -> Path:
template_name = self.filter_type_to_template(input_type)
template = self._env.get_template(template_name)
template_gen = template.generate(T=input_type)
if not is_dryrun:
self._generate_code(output_path, template_gen, allow_overwrite)
return output_path
def _generate_index_files(
self,
is_dryrun: bool,
allow_overwrite: bool,
) -> List[Path]:
"""
Renders index files which are each given access to all namespaces as `N`.
"""
output_paths = []
index = self.namespace.get_index_namespace()
index_file_path = index.output_folder
target_extension = self.language_context.get_target_language().get_config_value(
nunavut.lang.Language.WKCV_DEFINITION_FILE_EXTENSION
)
for index_file in self.index_files:
template_name = self.dsdl_loader.index_file_to_template(index_file)
if template_name is None:
raise RuntimeError(f"No template found for index file {index_file}")
template = self._env.get_template(template_name.name)
template_gen = template.generate(N=index)
index_file_output = index_file_path / index_file
if len(index_file.suffix) == 0:
index_file_output = index_file_output.with_suffix(target_extension)
output_paths.append(index_file_output)
if not is_dryrun:
self._generate_code(index_file_output, template_gen, allow_overwrite)
return output_paths
# +---------------------------------------------------------------------------+
# | JINJA : SupportGenerator
# +---------------------------------------------------------------------------+
[docs]
class SupportGenerator(CodeGenerator):
"""
Generates output files by copying them from within the Nunavut package itself
for non templates but uses jinja to generate headers from templates with the
language environment provided but no ``T`` (DSDL type) global set.
This generator always copies files from those returned by the ``file_iterator``
to locations under :func:`nunavut.Namespace.get_support_output_folder()`
"""
def __init__(
self,
namespace: nunavut.Namespace,
resource_types: int,
generate_namespace_types: YesNoDefault = YesNoDefault.DEFAULT,
templates_dir: Optional[Union[Path, List[Path]]] = None,
**kwargs: Any,
):
super().__init__(
namespace,
resource_types,
generate_namespace_types=generate_namespace_types,
templates_dir=templates_dir,
builtin_template_path="support",
template_loader=DSDLSupportTemplateLoader,
**kwargs,
)
target_language = self.language_context.get_target_language()
# Create the sub-folder to copy-to based on the support namespace.
self._sub_folders = Path("")
for namespace_part in target_language.support_namespace:
self._sub_folders = self._sub_folders / Path(namespace_part)
# +-----------------------------------------------------------------------+
# | AbstractGenerator
# +-----------------------------------------------------------------------+
[docs]
def get_templates(self) -> Iterable[Path]:
if self.resource_types == 0:
# This generator only generates resources and the "no resources" flag is set.
return []
return super().get_templates()
[docs]
def generate_all(
self,
is_dryrun: bool = False,
allow_overwrite: bool = True,
) -> Iterable[Path]:
generated = [] # type: List[Path]
if self.resource_types == 0:
# This generator only generates resources and the "no resources" flag is set.
return generated
target_language = self.language_context.get_target_language()
target_path = Path(self.namespace.get_index_namespace().base_output_path) / self._sub_folders
line_pps: List[LinePostProcessor] = []
file_pps: List[FilePostProcessor] = []
if self._post_processors is not None:
for pp in self._post_processors:
if isinstance(pp, LinePostProcessor):
line_pps.append(pp)
elif isinstance(pp, FilePostProcessor):
file_pps.append(pp)
else:
raise ValueError(f"PostProcessor type {type(pp)} is unknown.")
for resource in self.get_templates():
target = (target_path / resource.name).with_suffix(target_language.extension)
logger.info("Generating support file: %s", target)
if resource.suffix == TEMPLATE_SUFFIX:
self._generate_header(resource, target, is_dryrun, allow_overwrite)
generated.append(target)
else:
self._copy_header(resource, target, is_dryrun, allow_overwrite, line_pps, file_pps)
generated.append(target)
return generated
# +-----------------------------------------------------------------------+
# | Private
# +-----------------------------------------------------------------------+
def _generate_header(self, template_path: Path, output_path: Path, is_dryrun: bool, allow_overwrite: bool) -> Path:
template = self._env.get_template(template_path.name)
template_gen = template.generate()
if not is_dryrun:
self._generate_code(output_path, template_gen, allow_overwrite)
return output_path
def _copy_header(
self,
resource: Path,
target: Path,
is_dryrun: bool,
allow_overwrite: bool,
line_pps: List["LinePostProcessor"],
file_pps: List["FilePostProcessor"],
) -> Path:
if not is_dryrun:
self._handle_overwrite(target, allow_overwrite)
target.parent.mkdir(parents=True, exist_ok=True)
if len(line_pps) == 0:
shutil.copy(str(resource), str(target))
else:
self._copy_header_using_line_pps(resource, target, line_pps)
for file_pp in file_pps:
target = file_pp(target)
return target
def _copy_header_using_line_pps(
self,
resource: Path,
target: Path,
line_pps: List["LinePostProcessor"],
) -> None:
with open(str(target), "w", encoding="utf-8") as target_file:
with open(str(resource), "r", encoding="utf-8") as resource_file:
for resource_line in resource_file:
if len(resource_line) > 1 and resource_line[-2] == "\r":
resource_line_tuple = (resource_line[0:-2], "\r\n")
else:
resource_line_tuple = (resource_line[0:-1], "\n")
for line_pp in line_pps:
resource_line_tuple = line_pp(resource_line_tuple)
target_file.write(resource_line_tuple[0])
target_file.write(resource_line_tuple[1])