Coverage for glotter/source.py: 100%
67 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-01 21:54 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-01 21:54 +0000
1from functools import lru_cache
3from glotter_core.source import CoreSource, categorize_sources
5from glotter.containerfactory import get_container_factory
6from glotter.settings import get_settings
7from glotter.utils import error_and_exit
9BAD_SOURCES = "__bad_sources__"
12class Source(CoreSource):
13 """Metadata about a source file"""
15 def __repr__(self):
16 return f"Source(name: {self.name}, path: {self.path})"
18 def build(self, params=""):
19 if self.test_info.container_info.build is not None:
20 command = f"{self.test_info.container_info.build} {params}"
21 result = self._container_exec(command)
22 if result[0] != 0:
23 raise RuntimeError(
24 f'unable to build using cmd "{self.test_info.container_info.build} {params}":\n'
25 f"{result[1].decode('utf-8')}"
26 )
28 def run(self, params=None):
29 """
30 Run the source and return the output
32 :param params: input passed to the source as it's run
33 :return: the output of running the source
34 """
35 params = params or ""
36 command = f"{self.test_info.container_info.cmd} {params}"
37 result = self._container_exec(command)
38 return result[1].decode("utf-8")
40 def exec(self, command):
41 """
42 Run a command inside the container for a source
44 :param command: command to run
45 :return: the output of the command as a string
46 """
47 result = self._container_exec(command)
48 return result[1].decode("utf-8")
50 def _container_exec(self, command):
51 """
52 Run a command inside the container for a source
54 :param command: command to run
55 :return: the exit code and output of the command
56 """
57 container = get_container_factory().get_container(self)
58 return container.exec_run(
59 cmd=command,
60 detach=False,
61 workdir="/src",
62 )
64 def cleanup(self):
65 get_container_factory().cleanup(self)
68@lru_cache
69def get_sources(path, check_bad_sources=False):
70 """
71 Walk through a directory and create Source objects
73 :param path: path to the directory through which to walk
74 :param check_bad_source: if True, check for bad source filenames. Default is False
75 :return: a dict where the key is the ProjectType and the value is a list of all the
76 Source objects of that project. If check_bad_source is True,
77 the BAD_SOURCES key contains a list of invalid paths relative to the current
78 working directory
79 """
81 categories = categorize_sources(path, get_settings().projects, Source)
82 sources = categories.testable_by_project
83 if check_bad_sources:
84 sources[BAD_SOURCES] = categories.bad_sources
86 return sources
89def filter_sources(args, sources):
90 """
91 Filter sources based language, project, and/or source
93 :param args: Arguments indicating what to filter on
94 :param sources: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project
95 :return: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project
96 that match the filter
97 """
99 if args.project:
100 if args.project not in sources:
101 error_and_exit(f'No valid sources found for project: "{args.project}"')
103 sources = {args.project: sources[args.project]}
105 filtered_sources_by_type = {}
106 for project_type, sources_by_type in sources.items():
107 filtered_sources = [source for source in sources_by_type if _matches_source(args, source)]
108 if filtered_sources:
109 filtered_sources_by_type[project_type] = filtered_sources
111 if not filtered_sources_by_type:
112 errors = []
113 if args.project:
114 errors.append(f'project "{args.project}"')
116 if args.language:
117 if isinstance(args.language, set):
118 errors.append(
119 "languages " + ", ".join(f'"{language}"' for language in sorted(args.language))
120 )
121 else:
122 errors.append(f'language "{args.language}"')
124 if args.source:
125 errors.append(f'source "{args.source}"')
127 if errors:
128 error_msg = ", ".join(errors)
129 error_and_exit(f"No valid sources found for the following combination: {error_msg}")
131 return filtered_sources_by_type
134def _matches_source(args, source):
135 if args.language:
136 if isinstance(args.language, set):
137 if source.language.lower() not in args.language:
138 return False
139 elif source.language.lower() != args.language.lower():
140 return False
142 return not args.source or f"{source.name}{source.extension}".lower() == args.source.lower()