Coverage for glotter/containerfactory.py: 95%
59 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-01 21:54 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-01 21:54 +0000
1import os
2import shutil
3import tempfile
4from datetime import datetime, timedelta
5from functools import cache
6from uuid import uuid4 as uuid
8import docker
11@cache
12def get_container_factory():
13 """
14 Get ContainerFactory as a singleton
15 """
16 return ContainerFactory()
19class ContainerFactory:
20 def __init__(self):
21 """
22 Initialize a ContainerFactory
23 """
24 self._containers = {}
25 self._volume_dis = {}
26 self._client = docker.from_env()
27 self._api_client = self._client.api
29 def get_container(self, source):
30 """
31 Returns a running container for a give source. This will return an existing container if one exists
32 or create a new one if necessary
34 :param source: the source to use inside the container
35 :return: a running container specific to the source
36 """
37 key = source.full_path
39 tmp_dir = tempfile.mkdtemp()
40 os.chmod(tmp_dir, 0o777)
41 shutil.copy(source.full_path, tmp_dir)
42 self._volume_dis[key] = tmp_dir
44 image = self.get_image(source.test_info.container_info)
45 volume_info = {tmp_dir: {"bind": "/src", "mode": "rw"}}
46 if key not in self._containers:
47 self._containers[key] = self._client.containers.run(
48 image=image,
49 name=f"{source.name}_{uuid().hex}",
50 command="sleep 1h",
51 working_dir="/src",
52 volumes=volume_info,
53 detach=True,
54 entrypoint="",
55 )
56 return self._containers[key]
58 def get_image(self, container_info, quiet=False, parallel=False):
59 """
60 Pull a docker image
62 :param container_info: metadata about the image to pull
63 :param quiet: whether to print output while downloading
64 :param parallel: whether image download is occurring in parallel
65 :return: a docker image
66 """
67 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
68 if len(images) == 1:
69 return images[0]
70 if not quiet:
71 end_char = "\n" if parallel else ""
72 print(
73 f"Pulling {container_info.image}:{container_info.tag}... ",
74 end=end_char,
75 flush=True,
76 )
77 last_update = datetime.now()
78 for _ in self._api_client.pull(
79 repository=container_info.image,
80 tag=str(container_info.tag),
81 stream=True,
82 decode=True,
83 ):
84 if not quiet and not parallel and datetime.now() - last_update > timedelta(seconds=5):
85 print("... ", end="", flush=True)
86 last_update = datetime.now()
87 if not quiet:
88 if parallel: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 print(
90 f"... done pulling {container_info.image}:{container_info.tag}",
91 flush=True,
92 )
93 else:
94 print("done", flush=True)
96 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
97 if len(images) == 1: 97 ↛ 100line 97 didn't jump to line 100 because the condition on line 97 was always true
98 return images[0]
100 return None
102 def remove_image(self, container_info):
103 """
104 Remove a docker image
106 :param container_info: metadata about the image to remove
107 """
109 image_name = f"{container_info.image}:{container_info.tag!s}"
110 images = self._client.images.list(name=f"{container_info.image}:{container_info.tag!s}")
111 if len(images) == 1:
112 print(f"Removing {image_name}", flush=True)
113 self._client.images.remove(image=image_name, force=True)
115 def cleanup(self, source):
116 """
117 Cleanup docker container and temporary folder. Also remove both from their
118 respective dictionaries
120 :param source: source for determining what to cleanup
121 """
122 key = source.full_path
124 self._containers[key].remove(v=True, force=True)
125 shutil.rmtree(self._volume_dis[key], ignore_errors=True)
127 del self._volume_dis[key]
128 del self._containers[key]