|
4 | 4 | import json |
5 | 5 | import os |
6 | 6 | import re |
| 7 | +import shutil |
7 | 8 | import subprocess |
| 9 | +import tempfile |
8 | 10 | import yaml |
9 | 11 | from datetime import datetime, timezone |
10 | 12 | from pathlib import Path |
@@ -175,155 +177,181 @@ def run_ansible_in_environment( |
175 | 177 | # This ensures Ansible's Python process flushes stdout immediately |
176 | 178 | env["PYTHONUNBUFFERED"] = "1" |
177 | 179 |
|
178 | | - # handle sub environments |
179 | | - if "." in environment: |
180 | | - sub_name = environment.split(".")[1] |
181 | | - env["SUB"] = environment |
182 | | - environment = environment.split(".")[0] |
183 | | - logger.info( |
184 | | - f"worker = {worker}, environment = {environment}, sub = {sub_name}, role = {role}" |
185 | | - ) |
186 | | - else: |
187 | | - logger.info(f"worker = {worker}, environment = {environment}, role = {role}") |
188 | | - |
189 | | - env["ENVIRONMENT"] = environment |
190 | | - |
191 | | - # NOTE: This is a first step to make Ansible Vault usable via OSISM workers. |
192 | | - # It's not ready in that form yet. |
193 | | - ansible_vault_password = utils.redis.get("ansible_vault_password") |
194 | | - if ansible_vault_password: |
195 | | - env["VAULT"] = "/ansible-vault.py" |
196 | | - |
197 | | - # Log play execution start |
198 | | - log_play_execution( |
199 | | - request_id=request_id, |
200 | | - worker=worker, |
201 | | - environment=environment, |
202 | | - role=role, |
203 | | - hosts=None, # Hosts will be empty at start, filled at completion |
204 | | - arguments=joined_arguments, |
205 | | - result="started", |
206 | | - ) |
207 | | - |
208 | | - # NOTE: Consider arguments in the future |
209 | | - if locking: |
210 | | - lock = utils.create_redlock( |
211 | | - key=f"lock-ansible-{environment}-{role}", |
212 | | - auto_release_time=auto_release_time, |
213 | | - ) |
214 | | - |
215 | | - # NOTE: use python interface in the future, something with ansible-runner and the fact cache is |
216 | | - # not working out of the box |
| 180 | + # Use a unique SSH ControlPath directory per task to prevent race conditions |
| 181 | + # when multiple Celery workers connect to the same host simultaneously. |
| 182 | + # Without this, concurrent Ansible runs share the same ControlMaster socket, |
| 183 | + # which can cause intermittent "Permission denied (publickey)" errors. |
| 184 | + ssh_control_dir = tempfile.mkdtemp(prefix=f".ansible-ssh-{request_id}-") |
| 185 | + logger.debug(f"Using per-task SSH ControlPath directory: {ssh_control_dir}") |
| 186 | + env["ANSIBLE_SSH_CONTROL_PATH_DIR"] = ssh_control_dir |
217 | 187 |
|
218 | | - # Use stdbuf for line-buffered output to ensure immediate streaming |
219 | | - # stdbuf -oL sets stdout to line-buffered mode at the OS level |
220 | | - stdbuf_prefix = "stdbuf -oL " |
221 | | - |
222 | | - # execute roles from kolla-ansible |
223 | | - if worker == "kolla-ansible": |
224 | | - if locking: |
225 | | - lock.acquire() |
226 | | - |
227 | | - if role in ["mariadb-backup", "mariadb_backup"]: |
228 | | - action = "backup" |
229 | | - role = "mariadb" |
230 | | - # Hacky workaround. The handling of kolla_action will be revised in the future. |
231 | | - joined_arguments = re.sub( |
232 | | - r"-e kolla_action=(bootstrap|config|deploy|precheck|pull|reconfigure|refresh-containers|start|stop|upgrade)", |
233 | | - r"-e kolla_action=backup", |
234 | | - joined_arguments, |
| 188 | + try: |
| 189 | + # handle sub environments |
| 190 | + if "." in environment: |
| 191 | + sub_name = environment.split(".")[1] |
| 192 | + env["SUB"] = environment |
| 193 | + environment = environment.split(".")[0] |
| 194 | + logger.info( |
| 195 | + f"worker = {worker}, environment = {environment}, sub = {sub_name}, role = {role}" |
235 | 196 | ) |
236 | 197 | else: |
237 | | - action = "deploy" |
238 | | - |
239 | | - command = f"{stdbuf_prefix}/run.sh {action} {role} {joined_arguments}" |
240 | | - logger.info(f"RUN {command}") |
241 | | - p = subprocess.Popen( |
242 | | - command, |
243 | | - stdout=subprocess.PIPE, |
244 | | - stderr=subprocess.STDOUT, |
245 | | - shell=True, |
246 | | - env=env, |
247 | | - ) |
248 | | - |
249 | | - # execute roles from osism-kubernetes |
250 | | - elif worker == "osism-kubernetes": |
251 | | - if locking: |
252 | | - lock.acquire() |
253 | | - |
254 | | - command = f"{stdbuf_prefix}/run.sh {role} {joined_arguments}" |
255 | | - logger.info(f"RUN {command}") |
256 | | - p = subprocess.Popen( |
257 | | - command, |
258 | | - stdout=subprocess.PIPE, |
259 | | - stderr=subprocess.STDOUT, |
260 | | - shell=True, |
261 | | - env=env, |
262 | | - ) |
| 198 | + logger.info( |
| 199 | + f"worker = {worker}, environment = {environment}, role = {role}" |
| 200 | + ) |
263 | 201 |
|
264 | | - # execute roles from ceph-ansible |
265 | | - elif worker == "ceph-ansible": |
266 | | - if locking: |
267 | | - lock.acquire() |
268 | | - |
269 | | - command = f"{stdbuf_prefix}/run.sh {role} {joined_arguments}" |
270 | | - logger.info(f"RUN {command}") |
271 | | - p = subprocess.Popen( |
272 | | - command, |
273 | | - stdout=subprocess.PIPE, |
274 | | - stderr=subprocess.STDOUT, |
275 | | - shell=True, |
276 | | - env=env, |
| 202 | + env["ENVIRONMENT"] = environment |
| 203 | + |
| 204 | + # NOTE: This is a first step to make Ansible Vault usable via OSISM workers. |
| 205 | + # It's not ready in that form yet. |
| 206 | + ansible_vault_password = utils.redis.get("ansible_vault_password") |
| 207 | + if ansible_vault_password: |
| 208 | + env["VAULT"] = "/ansible-vault.py" |
| 209 | + |
| 210 | + # Log play execution start |
| 211 | + log_play_execution( |
| 212 | + request_id=request_id, |
| 213 | + worker=worker, |
| 214 | + environment=environment, |
| 215 | + role=role, |
| 216 | + hosts=None, # Hosts will be empty at start, filled at completion |
| 217 | + arguments=joined_arguments, |
| 218 | + result="started", |
277 | 219 | ) |
278 | 220 |
|
279 | | - # execute all other roles |
280 | | - else: |
| 221 | + # NOTE: Consider arguments in the future |
| 222 | + lock = None |
281 | 223 | if locking: |
282 | | - lock.acquire() |
283 | | - |
284 | | - command = f"{stdbuf_prefix}/run-{environment}.sh {role} {joined_arguments}" |
285 | | - logger.info(f"RUN {command}") |
286 | | - p = subprocess.Popen( |
287 | | - command, |
288 | | - stdout=subprocess.PIPE, |
289 | | - stderr=subprocess.STDOUT, |
290 | | - shell=True, |
291 | | - env=env, |
292 | | - ) |
293 | | - |
294 | | - while p.poll() is None: |
295 | | - line = p.stdout.readline().decode("utf-8") |
296 | | - |
297 | | - # Extract hosts from Ansible output |
298 | | - match = HOST_PATTERN.match(line.strip()) |
299 | | - if match: |
300 | | - hostname = match.group(2) |
301 | | - extracted_hosts.add(hostname) # Local set (automatic deduplication) |
| 224 | + lock = utils.create_redlock( |
| 225 | + key=f"lock-ansible-{environment}-{role}", |
| 226 | + auto_release_time=auto_release_time, |
| 227 | + ) |
302 | 228 |
|
303 | | - if publish: |
304 | | - utils.push_task_output(request_id, line) |
305 | | - result += line |
| 229 | + # NOTE: use python interface in the future, something with ansible-runner and the fact cache is |
| 230 | + # not working out of the box |
| 231 | + |
| 232 | + # Use stdbuf for line-buffered output to ensure immediate streaming |
| 233 | + # stdbuf -oL sets stdout to line-buffered mode at the OS level |
| 234 | + stdbuf_prefix = "stdbuf -oL " |
| 235 | + |
| 236 | + # execute roles from kolla-ansible |
| 237 | + if worker == "kolla-ansible": |
| 238 | + if lock: |
| 239 | + lock.acquire() |
| 240 | + |
| 241 | + if role in ["mariadb-backup", "mariadb_backup"]: |
| 242 | + action = "backup" |
| 243 | + role = "mariadb" |
| 244 | + # Hacky workaround. The handling of kolla_action will be revised in the future. |
| 245 | + joined_arguments = re.sub( |
| 246 | + r"-e kolla_action=(bootstrap|config|deploy|precheck|pull|reconfigure|refresh-containers|start|stop|upgrade)", |
| 247 | + r"-e kolla_action=backup", |
| 248 | + joined_arguments, |
| 249 | + ) |
| 250 | + else: |
| 251 | + action = "deploy" |
| 252 | + |
| 253 | + command = f"{stdbuf_prefix}/run.sh {action} {role} {joined_arguments}" |
| 254 | + logger.info(f"RUN {command}") |
| 255 | + p = subprocess.Popen( |
| 256 | + command, |
| 257 | + stdout=subprocess.PIPE, |
| 258 | + stderr=subprocess.STDOUT, |
| 259 | + shell=True, |
| 260 | + env=env, |
| 261 | + ) |
306 | 262 |
|
307 | | - rc = p.wait(timeout=60) |
| 263 | + # execute roles from osism-kubernetes |
| 264 | + elif worker == "osism-kubernetes": |
| 265 | + if lock: |
| 266 | + lock.acquire() |
| 267 | + |
| 268 | + command = f"{stdbuf_prefix}/run.sh {role} {joined_arguments}" |
| 269 | + logger.info(f"RUN {command}") |
| 270 | + p = subprocess.Popen( |
| 271 | + command, |
| 272 | + stdout=subprocess.PIPE, |
| 273 | + stderr=subprocess.STDOUT, |
| 274 | + shell=True, |
| 275 | + env=env, |
| 276 | + ) |
308 | 277 |
|
309 | | - # Log play execution result |
310 | | - log_play_execution( |
311 | | - request_id=request_id, |
312 | | - worker=worker, |
313 | | - environment=environment, |
314 | | - role=role, |
315 | | - hosts=sorted(list(extracted_hosts)), # Direct pass of extracted hosts |
316 | | - arguments=joined_arguments, |
317 | | - result="success" if rc == 0 else "failure", |
318 | | - ) |
| 278 | + # execute roles from ceph-ansible |
| 279 | + elif worker == "ceph-ansible": |
| 280 | + if lock: |
| 281 | + lock.acquire() |
| 282 | + |
| 283 | + command = f"{stdbuf_prefix}/run.sh {role} {joined_arguments}" |
| 284 | + logger.info(f"RUN {command}") |
| 285 | + p = subprocess.Popen( |
| 286 | + command, |
| 287 | + stdout=subprocess.PIPE, |
| 288 | + stderr=subprocess.STDOUT, |
| 289 | + shell=True, |
| 290 | + env=env, |
| 291 | + ) |
319 | 292 |
|
320 | | - if publish: |
321 | | - utils.finish_task_output(request_id, rc=rc) |
| 293 | + # execute all other roles |
| 294 | + else: |
| 295 | + if lock: |
| 296 | + lock.acquire() |
| 297 | + |
| 298 | + command = f"{stdbuf_prefix}/run-{environment}.sh {role} {joined_arguments}" |
| 299 | + logger.info(f"RUN {command}") |
| 300 | + p = subprocess.Popen( |
| 301 | + command, |
| 302 | + stdout=subprocess.PIPE, |
| 303 | + stderr=subprocess.STDOUT, |
| 304 | + shell=True, |
| 305 | + env=env, |
| 306 | + ) |
322 | 307 |
|
323 | | - if locking: |
324 | | - lock.release() |
| 308 | + try: |
| 309 | + while p.poll() is None: |
| 310 | + line = p.stdout.readline().decode("utf-8") |
| 311 | + |
| 312 | + # Extract hosts from Ansible output |
| 313 | + match = HOST_PATTERN.match(line.strip()) |
| 314 | + if match: |
| 315 | + hostname = match.group(2) |
| 316 | + extracted_hosts.add(hostname) # Local set (automatic deduplication) |
| 317 | + |
| 318 | + if publish: |
| 319 | + utils.push_task_output(request_id, line) |
| 320 | + result += line |
| 321 | + |
| 322 | + rc = p.wait(timeout=60) |
| 323 | + |
| 324 | + # Log play execution result |
| 325 | + log_play_execution( |
| 326 | + request_id=request_id, |
| 327 | + worker=worker, |
| 328 | + environment=environment, |
| 329 | + role=role, |
| 330 | + hosts=sorted(list(extracted_hosts)), # Direct pass of extracted hosts |
| 331 | + arguments=joined_arguments, |
| 332 | + result="success" if rc == 0 else "failure", |
| 333 | + ) |
325 | 334 |
|
326 | | - return result |
| 335 | + if publish: |
| 336 | + utils.finish_task_output(request_id, rc=rc) |
| 337 | + |
| 338 | + return result |
| 339 | + finally: |
| 340 | + if lock: |
| 341 | + try: |
| 342 | + lock.release() |
| 343 | + except Exception as e: |
| 344 | + logger.warning( |
| 345 | + f"Failed to release lock for {environment}-{role}: {e}" |
| 346 | + ) |
| 347 | + finally: |
| 348 | + # Clean up per-task SSH ControlPath directory |
| 349 | + try: |
| 350 | + shutil.rmtree(ssh_control_dir) |
| 351 | + except Exception as e: |
| 352 | + logger.warning( |
| 353 | + f"Failed to clean up SSH ControlPath directory {ssh_control_dir}: {e}" |
| 354 | + ) |
327 | 355 |
|
328 | 356 |
|
329 | 357 | def run_command( |
|
0 commit comments