Coverage for glotter/source.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-01 21:54 +0000

1from functools import lru_cache 

2 

3from glotter_core.source import CoreSource, categorize_sources 

4 

5from glotter.containerfactory import get_container_factory 

6from glotter.settings import get_settings 

7from glotter.utils import error_and_exit 

8 

9BAD_SOURCES = "__bad_sources__" 

10 

11 

12class Source(CoreSource): 

13 """Metadata about a source file""" 

14 

15 def __repr__(self): 

16 return f"Source(name: {self.name}, path: {self.path})" 

17 

18 def build(self, params=""): 

19 if self.test_info.container_info.build is not None: 

20 command = f"{self.test_info.container_info.build} {params}" 

21 result = self._container_exec(command) 

22 if result[0] != 0: 

23 raise RuntimeError( 

24 f'unable to build using cmd "{self.test_info.container_info.build} {params}":\n' 

25 f"{result[1].decode('utf-8')}" 

26 ) 

27 

28 def run(self, params=None): 

29 """ 

30 Run the source and return the output 

31 

32 :param params: input passed to the source as it's run 

33 :return: the output of running the source 

34 """ 

35 params = params or "" 

36 command = f"{self.test_info.container_info.cmd} {params}" 

37 result = self._container_exec(command) 

38 return result[1].decode("utf-8") 

39 

40 def exec(self, command): 

41 """ 

42 Run a command inside the container for a source 

43 

44 :param command: command to run 

45 :return: the output of the command as a string 

46 """ 

47 result = self._container_exec(command) 

48 return result[1].decode("utf-8") 

49 

50 def _container_exec(self, command): 

51 """ 

52 Run a command inside the container for a source 

53 

54 :param command: command to run 

55 :return: the exit code and output of the command 

56 """ 

57 container = get_container_factory().get_container(self) 

58 return container.exec_run( 

59 cmd=command, 

60 detach=False, 

61 workdir="/src", 

62 ) 

63 

64 def cleanup(self): 

65 get_container_factory().cleanup(self) 

66 

67 

68@lru_cache 

69def get_sources(path, check_bad_sources=False): 

70 """ 

71 Walk through a directory and create Source objects 

72 

73 :param path: path to the directory through which to walk 

74 :param check_bad_source: if True, check for bad source filenames. Default is False 

75 :return: a dict where the key is the ProjectType and the value is a list of all the 

76 Source objects of that project. If check_bad_source is True, 

77 the BAD_SOURCES key contains a list of invalid paths relative to the current 

78 working directory 

79 """ 

80 

81 categories = categorize_sources(path, get_settings().projects, Source) 

82 sources = categories.testable_by_project 

83 if check_bad_sources: 

84 sources[BAD_SOURCES] = categories.bad_sources 

85 

86 return sources 

87 

88 

89def filter_sources(args, sources): 

90 """ 

91 Filter sources based language, project, and/or source 

92 

93 :param args: Arguments indicating what to filter on 

94 :param sources: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project 

95 :return: a dict where the key is the ProjectType and the value is a list of all the Source objects of that project 

96 that match the filter 

97 """ 

98 

99 if args.project: 

100 if args.project not in sources: 

101 error_and_exit(f'No valid sources found for project: "{args.project}"') 

102 

103 sources = {args.project: sources[args.project]} 

104 

105 filtered_sources_by_type = {} 

106 for project_type, sources_by_type in sources.items(): 

107 filtered_sources = [source for source in sources_by_type if _matches_source(args, source)] 

108 if filtered_sources: 

109 filtered_sources_by_type[project_type] = filtered_sources 

110 

111 if not filtered_sources_by_type: 

112 errors = [] 

113 if args.project: 

114 errors.append(f'project "{args.project}"') 

115 

116 if args.language: 

117 if isinstance(args.language, set): 

118 errors.append( 

119 "languages " + ", ".join(f'"{language}"' for language in sorted(args.language)) 

120 ) 

121 else: 

122 errors.append(f'language "{args.language}"') 

123 

124 if args.source: 

125 errors.append(f'source "{args.source}"') 

126 

127 if errors: 

128 error_msg = ", ".join(errors) 

129 error_and_exit(f"No valid sources found for the following combination: {error_msg}") 

130 

131 return filtered_sources_by_type 

132 

133 

134def _matches_source(args, source): 

135 if args.language: 

136 if isinstance(args.language, set): 

137 if source.language.lower() not in args.language: 

138 return False 

139 elif source.language.lower() != args.language.lower(): 

140 return False 

141 

142 return not args.source or f"{source.name}{source.extension}".lower() == args.source.lower()