Coverage for glotter/containerfactory.py: 91%

56 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-12 02:25 +0000

1import os 

2import shutil 

3import tempfile 

4from datetime import datetime, timedelta 

5from uuid import uuid4 as uuid 

6 

7import docker 

8 

9from glotter.singleton import Singleton 

10 

11 

12class ContainerFactory(metaclass=Singleton): 

13 def __init__(self, docker_client=None): 

14 """ 

15 Initialize a ContainerFactory. This class is a singleton. 

16 

17 :param docker_client: optionally set the docker client. Defaults to setting from the environment 

18 """ 

19 self._containers = {} 

20 self._volume_dis = {} 

21 self._client = docker_client or docker.from_env() 

22 self._api_client = self._client.api 

23 

24 def get_container(self, source): 

25 """ 

26 Returns a running container for a give source. This will return an existing container if one exists 

27 or create a new one if necessary 

28 

29 :param source: the source to use inside the container 

30 :return: a running container specific to the source 

31 """ 

32 key = source.full_path 

33 

34 tmp_dir = tempfile.mkdtemp() 

35 os.chmod(tmp_dir, 0o777) 

36 shutil.copy(source.full_path, tmp_dir) 

37 self._volume_dis[key] = tmp_dir 

38 

39 image = self.get_image(source.test_info.container_info) 

40 volume_info = {tmp_dir: {"bind": "/src", "mode": "rw"}} 

41 if key not in self._containers: 

42 self._containers[key] = self._client.containers.run( 

43 image=image, 

44 name=f"{source.name}_{uuid().hex}", 

45 command="sleep 1h", 

46 working_dir="/src", 

47 volumes=volume_info, 

48 detach=True, 

49 entrypoint="", 

50 ) 

51 return self._containers[key] 

52 

53 def get_image(self, container_info, quiet=False, parallel=False): 

54 """ 

55 Pull a docker image 

56 

57 :param container_info: metadata about the image to pull 

58 :param quiet: whether to print output while downloading 

59 :param parallel: whether image download is occurring in parallel 

60 :return: a docker image 

61 """ 

62 images = self._client.images.list( 

63 name=f"{container_info.image}:{str(container_info.tag)}" 

64 ) 

65 if len(images) == 1: 

66 return images[0] 

67 if not quiet: 

68 end_char = "\n" if parallel else "" 

69 print( 

70 f"Pulling {container_info.image}:{container_info.tag}... ", 

71 end=end_char, 

72 flush=True, 

73 ) 

74 last_update = datetime.now() 

75 for _ in self._api_client.pull( 

76 repository=container_info.image, 

77 tag=str(container_info.tag), 

78 stream=True, 

79 decode=True, 

80 ): 

81 if ( 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was never true

82 not quiet 

83 and not parallel 

84 and datetime.now() - last_update > timedelta(seconds=5) 

85 ): 

86 print("... ", end="", flush=True) 

87 last_update = datetime.now() 

88 if not quiet: 

89 if parallel: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 print( 

91 f"... done pulling {container_info.image}:{container_info.tag}", 

92 flush=True, 

93 ) 

94 else: 

95 print("done", flush=True) 

96 

97 images = self._client.images.list( 

98 name=f"{container_info.image}:{str(container_info.tag)}" 

99 ) 

100 if len(images) == 1: 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true

101 return images[0] 

102 

103 return None 

104 

105 def remove_image(self, container_info): 

106 """ 

107 Remove a docker image 

108 

109 :param container_info: metadata about the image to remove 

110 """ 

111 

112 image_name = f"{container_info.image}:{str(container_info.tag)}" 

113 images = self._client.images.list( 

114 name=f"{container_info.image}:{str(container_info.tag)}" 

115 ) 

116 if len(images) == 1: 

117 print(f"Removing {image_name}", flush=True) 

118 self._client.images.remove(image=image_name, force=True) 

119 

120 def cleanup(self, source): 

121 """ 

122 Cleanup docker container and temporary folder. Also remove both from their 

123 respective dictionaries 

124 

125 :param source: source for determining what to cleanup 

126 """ 

127 key = source.full_path 

128 

129 self._containers[key].remove(v=True, force=True) 

130 shutil.rmtree(self._volume_dis[key], ignore_errors=True) 

131 

132 del self._volume_dis[key] 

133 del self._containers[key]