Coverage for glotter/source.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-12 02:25 +0000

1import os 

2from functools import lru_cache 

3 

4import yaml 

5 

6from glotter import testinfo 

7from glotter.settings import Settings 

8from glotter.containerfactory import ContainerFactory 

9from glotter.utils import error_and_exit 

10 

11BAD_SOURCES = "__bad_sources__" 

12 

13 

14class Source: 

15 """Metadata about a source file""" 

16 

17 def __init__(self, name, language, path, test_info_string): 

18 """Initialize source 

19 

20 :param name: filename including extension 

21 :param path: path to the file excluding name 

22 :param language: the language of the source 

23 :param test_info_string: a string in yaml format containing testinfo for a directory 

24 """ 

25 self._name = name 

26 self._language = language 

27 self._path = path 

28 

29 self._test_info = testinfo.TestInfo.from_string(test_info_string, self) 

30 

31 @property 

32 def full_path(self): 

33 """Returns the full path to the source including filename and extension""" 

34 return os.path.join(self._path, self._name) 

35 

36 @property 

37 def path(self): 

38 """Returns the path to the source excluding name""" 

39 return self._path 

40 

41 @property 

42 def name(self): 

43 """Returns the name of the source excluding the extension""" 

44 return os.path.splitext(self._name)[0] 

45 

46 @property 

47 def language(self): 

48 """Returns the language of the source""" 

49 return self._language 

50 

51 @property 

52 def extension(self): 

53 """Returns the extension of the source""" 

54 return os.path.splitext(self._name)[1] 

55 

56 @property 

57 def test_info(self): 

58 """Returns parsed TestInfo object""" 

59 return self._test_info 

60 

61 def __repr__(self): 

62 return f"Source(name: {self.name}, path: {self.path})" 

63 

64 def build(self, params=""): 

65 if self.test_info.container_info.build is not None: 

66 command = f"{self.test_info.container_info.build} {params}" 

67 result = self._container_exec(command) 

68 if result[0] != 0: 

69 raise RuntimeError( 

70 f'unable to build using cmd "{self.test_info.container_info.build} {params}":\n' 

71 f'{result[1].decode("utf-8")}' 

72 ) 

73 

74 def run(self, params=None): 

75 """ 

76 Run the source and return the output 

77 

78 :param params: input passed to the source as it's run 

79 :return: the output of running the source 

80 """ 

81 params = params or "" 

82 command = f"{self.test_info.container_info.cmd} {params}" 

83 result = self._container_exec(command) 

84 return result[1].decode("utf-8") 

85 

86 def exec(self, command): 

87 """ 

88 Run a command inside the container for a source 

89 

90 :param command: command to run 

91 :return: the output of the command as a string 

92 """ 

93 result = self._container_exec(command) 

94 return result[1].decode("utf-8") 

95 

96 def _container_exec(self, command): 

97 """ 

98 Run a command inside the container for a source 

99 

100 :param command: command to run 

101 :return: the exit code and output of the command 

102 """ 

103 container = ContainerFactory().get_container(self) 

104 return container.exec_run( 

105 cmd=command, 

106 detach=False, 

107 workdir="/src", 

108 ) 

109 

110 def cleanup(self): 

111 ContainerFactory().cleanup(self) 

112 

113 

114@lru_cache 

115def get_sources(path, check_bad_sources=False): 

116 """ 

117 Walk through a directory and create Source objects 

118 

119 :param path: path to the directory through which to walk 

120 :param check_bad_source: if True, check for bad source filenames. Default is False 

121 :return: a dict where the key is the ProjectType and the value is a list of all the 

122 Source objects of that project. If check_bad_source is True, 

123 the BAD_SOURCES key contains a list of invalid paths relative to the current 

124 working directory 

125 """ 

126 sources = {k: [] for k in Settings().projects} 

127 orig_path = path 

128 if check_bad_sources: 

129 sources[BAD_SOURCES] = [] 

130 

131 for root, _, files in os.walk(path): 

132 path = os.path.abspath(root) 

133 if "testinfo.yml" in files: 

134 with open( 

135 os.path.join(path, "testinfo.yml"), "r", encoding="utf-8" 

136 ) as file: 

137 test_info_string = file.read() 

138 folder_info = testinfo.FolderInfo.from_dict( 

139 yaml.safe_load(test_info_string)["folder"] 

140 ) 

141 folder_project_names = folder_info.get_project_mappings( 

142 include_extension=True 

143 ) 

144 for project_type, project_name in folder_project_names.items(): 

145 if project_name in files: 

146 source = Source( 

147 project_name, os.path.basename(path), path, test_info_string 

148 ) 

149 sources[project_type].append(source) 

150 

151 if check_bad_sources: 

152 invalid_filenames = set(files) - ( 

153 set(folder_project_names.values()) | {"testinfo.yml", "README.md"} 

154 ) 

155 sources[BAD_SOURCES] += [ 

156 os.path.join(os.path.relpath(path, orig_path), filename) 

157 for filename in invalid_filenames 

158 ] 

159 

160 return sources 

161 

162 

163def filter_sources(args, sources): 

164 """ 

165 Filter sources based language, project, and/or source 

166 

167 :param args: Arguments indicating what to filter on 

168 :param sources: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project 

169 :return: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project 

170 that match the filter 

171 """ 

172 

173 if args.project: 

174 if args.project not in sources: 

175 error_and_exit(f'No valid sources found for project: "{args.project}"') 

176 

177 sources = {args.project: sources[args.project]} 

178 

179 filtered_sources_by_type = {} 

180 for project_type, sources_by_type in sources.items(): 

181 filtered_sources = [ 

182 source for source in sources_by_type if _matches_source(args, source) 

183 ] 

184 if filtered_sources: 

185 filtered_sources_by_type[project_type] = filtered_sources 

186 

187 if not filtered_sources_by_type: 

188 errors = [] 

189 if args.project: 

190 errors.append(f'project "{args.project}"') 

191 

192 if args.language: 

193 if isinstance(args.language, set): 

194 errors.append( 

195 "languages " 

196 + ", ".join(f'"{language}"' for language in sorted(args.language)) 

197 ) 

198 else: 

199 errors.append(f'language "{args.language}"') 

200 

201 if args.source: 

202 errors.append(f'source "{args.source}"') 

203 

204 if errors: 

205 error_msg = ", ".join(errors) 

206 error_and_exit( 

207 f"No valid sources found for the following combination: {error_msg}" 

208 ) 

209 

210 return filtered_sources_by_type 

211 

212 

213def _matches_source(args, source): 

214 if args.language: 

215 if isinstance(args.language, set): 

216 if source.language.lower() not in args.language: 

217 return False 

218 elif source.language.lower() != args.language.lower(): 

219 return False 

220 

221 return ( 

222 not args.source 

223 or f"{source.name}{source.extension}".lower() == args.source.lower() 

224 )