Coverage for glotter/containerfactory.py: 91%
56 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-09-13 19:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-09-13 19:09 +0000
1import os
2import shutil
3import tempfile
4from datetime import datetime, timedelta
5from uuid import uuid4 as uuid
7import docker
9from glotter.singleton import Singleton
12class ContainerFactory(metaclass=Singleton):
13 def __init__(self, docker_client=None):
14 """
15 Initialize a ContainerFactory. This class is a singleton.
17 :param docker_client: optionally set the docker client. Defaults to setting from the environment
18 """
19 self._containers = {}
20 self._volume_dis = {}
21 self._client = docker_client or docker.from_env()
22 self._api_client = self._client.api
24 def get_container(self, source):
25 """
26 Returns a running container for a give source. This will return an existing container if one exists
27 or create a new one if necessary
29 :param source: the source to use inside the container
30 :return: a running container specific to the source
31 """
32 key = source.full_path
34 tmp_dir = tempfile.mkdtemp()
35 os.chmod(tmp_dir, 0o777)
36 shutil.copy(source.full_path, tmp_dir)
37 self._volume_dis[key] = tmp_dir
39 image = self.get_image(source.test_info.container_info)
40 volume_info = {tmp_dir: {"bind": "/src", "mode": "rw"}}
41 if key not in self._containers:
42 self._containers[key] = self._client.containers.run(
43 image=image,
44 name=f"{source.name}_{uuid().hex}",
45 command="sleep 1h",
46 working_dir="/src",
47 volumes=volume_info,
48 detach=True,
49 entrypoint="",
50 )
51 return self._containers[key]
53 def get_image(self, container_info, quiet=False, parallel=False):
54 """
55 Pull a docker image
57 :param container_info: metadata about the image to pull
58 :param quiet: whether to print output while downloading
59 :param parallel: whether image download is occurring in parallel
60 :return: a docker image
61 """
62 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
63 if len(images) == 1:
64 return images[0]
65 if not quiet:
66 end_char = "\n" if parallel else ""
67 print(
68 f"Pulling {container_info.image}:{container_info.tag}... ",
69 end=end_char,
70 flush=True,
71 )
72 last_update = datetime.now()
73 for _ in self._api_client.pull(
74 repository=container_info.image,
75 tag=str(container_info.tag),
76 stream=True,
77 decode=True,
78 ):
79 if not quiet and not parallel and datetime.now() - last_update > timedelta(seconds=5): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 print("... ", end="", flush=True)
81 last_update = datetime.now()
82 if not quiet:
83 if parallel: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 print(
85 f"... done pulling {container_info.image}:{container_info.tag}",
86 flush=True,
87 )
88 else:
89 print("done", flush=True)
91 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
92 if len(images) == 1: 92 ↛ 95line 92 didn't jump to line 95 because the condition on line 92 was always true
93 return images[0]
95 return None
97 def remove_image(self, container_info):
98 """
99 Remove a docker image
101 :param container_info: metadata about the image to remove
102 """
104 image_name = f"{container_info.image}:{container_info.tag!s}"
105 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
106 if len(images) == 1:
107 print(f"Removing {image_name}", flush=True)
108 self._client.images.remove(image=image_name, force=True)
110 def cleanup(self, source):
111 """
112 Cleanup docker container and temporary folder. Also remove both from their
113 respective dictionaries
115 :param source: source for determining what to cleanup
116 """
117 key = source.full_path
119 self._containers[key].remove(v=True, force=True)
120 shutil.rmtree(self._volume_dis[key], ignore_errors=True)
122 del self._volume_dis[key]
123 del self._containers[key]