Coverage for glotter/source.py: 100%
106 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 02:25 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 02:25 +0000
1import os
2from functools import lru_cache
4import yaml
6from glotter import testinfo
7from glotter.settings import Settings
8from glotter.containerfactory import ContainerFactory
9from glotter.utils import error_and_exit
11BAD_SOURCES = "__bad_sources__"
14class Source:
15 """Metadata about a source file"""
17 def __init__(self, name, language, path, test_info_string):
18 """Initialize source
20 :param name: filename including extension
21 :param path: path to the file excluding name
22 :param language: the language of the source
23 :param test_info_string: a string in yaml format containing testinfo for a directory
24 """
25 self._name = name
26 self._language = language
27 self._path = path
29 self._test_info = testinfo.TestInfo.from_string(test_info_string, self)
31 @property
32 def full_path(self):
33 """Returns the full path to the source including filename and extension"""
34 return os.path.join(self._path, self._name)
36 @property
37 def path(self):
38 """Returns the path to the source excluding name"""
39 return self._path
41 @property
42 def name(self):
43 """Returns the name of the source excluding the extension"""
44 return os.path.splitext(self._name)[0]
46 @property
47 def language(self):
48 """Returns the language of the source"""
49 return self._language
51 @property
52 def extension(self):
53 """Returns the extension of the source"""
54 return os.path.splitext(self._name)[1]
56 @property
57 def test_info(self):
58 """Returns parsed TestInfo object"""
59 return self._test_info
61 def __repr__(self):
62 return f"Source(name: {self.name}, path: {self.path})"
64 def build(self, params=""):
65 if self.test_info.container_info.build is not None:
66 command = f"{self.test_info.container_info.build} {params}"
67 result = self._container_exec(command)
68 if result[0] != 0:
69 raise RuntimeError(
70 f'unable to build using cmd "{self.test_info.container_info.build} {params}":\n'
71 f'{result[1].decode("utf-8")}'
72 )
74 def run(self, params=None):
75 """
76 Run the source and return the output
78 :param params: input passed to the source as it's run
79 :return: the output of running the source
80 """
81 params = params or ""
82 command = f"{self.test_info.container_info.cmd} {params}"
83 result = self._container_exec(command)
84 return result[1].decode("utf-8")
86 def exec(self, command):
87 """
88 Run a command inside the container for a source
90 :param command: command to run
91 :return: the output of the command as a string
92 """
93 result = self._container_exec(command)
94 return result[1].decode("utf-8")
96 def _container_exec(self, command):
97 """
98 Run a command inside the container for a source
100 :param command: command to run
101 :return: the exit code and output of the command
102 """
103 container = ContainerFactory().get_container(self)
104 return container.exec_run(
105 cmd=command,
106 detach=False,
107 workdir="/src",
108 )
110 def cleanup(self):
111 ContainerFactory().cleanup(self)
114@lru_cache
115def get_sources(path, check_bad_sources=False):
116 """
117 Walk through a directory and create Source objects
119 :param path: path to the directory through which to walk
120 :param check_bad_source: if True, check for bad source filenames. Default is False
121 :return: a dict where the key is the ProjectType and the value is a list of all the
122 Source objects of that project. If check_bad_source is True,
123 the BAD_SOURCES key contains a list of invalid paths relative to the current
124 working directory
125 """
126 sources = {k: [] for k in Settings().projects}
127 orig_path = path
128 if check_bad_sources:
129 sources[BAD_SOURCES] = []
131 for root, _, files in os.walk(path):
132 path = os.path.abspath(root)
133 if "testinfo.yml" in files:
134 with open(
135 os.path.join(path, "testinfo.yml"), "r", encoding="utf-8"
136 ) as file:
137 test_info_string = file.read()
138 folder_info = testinfo.FolderInfo.from_dict(
139 yaml.safe_load(test_info_string)["folder"]
140 )
141 folder_project_names = folder_info.get_project_mappings(
142 include_extension=True
143 )
144 for project_type, project_name in folder_project_names.items():
145 if project_name in files:
146 source = Source(
147 project_name, os.path.basename(path), path, test_info_string
148 )
149 sources[project_type].append(source)
151 if check_bad_sources:
152 invalid_filenames = set(files) - (
153 set(folder_project_names.values()) | {"testinfo.yml", "README.md"}
154 )
155 sources[BAD_SOURCES] += [
156 os.path.join(os.path.relpath(path, orig_path), filename)
157 for filename in invalid_filenames
158 ]
160 return sources
163def filter_sources(args, sources):
164 """
165 Filter sources based language, project, and/or source
167 :param args: Arguments indicating what to filter on
168 :param sources: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project
169 :return: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project
170 that match the filter
171 """
173 if args.project:
174 if args.project not in sources:
175 error_and_exit(f'No valid sources found for project: "{args.project}"')
177 sources = {args.project: sources[args.project]}
179 filtered_sources_by_type = {}
180 for project_type, sources_by_type in sources.items():
181 filtered_sources = [
182 source for source in sources_by_type if _matches_source(args, source)
183 ]
184 if filtered_sources:
185 filtered_sources_by_type[project_type] = filtered_sources
187 if not filtered_sources_by_type:
188 errors = []
189 if args.project:
190 errors.append(f'project "{args.project}"')
192 if args.language:
193 if isinstance(args.language, set):
194 errors.append(
195 "languages "
196 + ", ".join(f'"{language}"' for language in sorted(args.language))
197 )
198 else:
199 errors.append(f'language "{args.language}"')
201 if args.source:
202 errors.append(f'source "{args.source}"')
204 if errors:
205 error_msg = ", ".join(errors)
206 error_and_exit(
207 f"No valid sources found for the following combination: {error_msg}"
208 )
210 return filtered_sources_by_type
213def _matches_source(args, source):
214 if args.language:
215 if isinstance(args.language, set):
216 if source.language.lower() not in args.language:
217 return False
218 elif source.language.lower() != args.language.lower():
219 return False
221 return (
222 not args.source
223 or f"{source.name}{source.extension}".lower() == args.source.lower()
224 )