Source code for stouputils.lock.queue


# Imports
from __future__ import annotations

import os
import time
import uuid
from typing import IO, TYPE_CHECKING, Any, cast

from ..decorators import abstract

if TYPE_CHECKING:
    import redis


[docs] class BaseTicketQueue: """ Base API for ticket queues. """
[docs] @abstract def register(self) -> tuple[int, str]: raise NotImplementedError
[docs] @abstract def is_head(self, ticket: int) -> bool: raise NotImplementedError
[docs] @abstract def remove(self, member: str) -> None: raise NotImplementedError
[docs] @abstract def cleanup_stale(self) -> None: raise NotImplementedError
[docs] @abstract def is_empty(self) -> bool: """ Return True if the queue currently has no waiting members. Implementations should consider the concrete storage details (e.g. on filesystem the "seq" file is not considered a queue member). """ raise NotImplementedError
[docs] @abstract def maybe_cleanup(self) -> None: """ Attempt to remove any on-disk or remote artifacts when the queue is empty. This should be a best-effort no-op if other clients are concurrently active. Implementations should handle errors internally and not raise. """ raise NotImplementedError
[docs] class FileTicketQueue(BaseTicketQueue): """ File-system backed ticket queue. Tickets are assigned using a small ``seq`` file protected by an exclusive lock (via ``fcntl`` on POSIX). Each waiter creates a ticket file named ``{ticket:020d}.{pid}.{uuid}`` in the queue directory. The head of the sorted directory listing is considered the current owner. Examples: >>> # Basic filesystem queue behaviour and cleanup >>> import tempfile, os, time >>> tmp = tempfile.mkdtemp() >>> qd = tmp + "/q" >>> q = FileTicketQueue(qd, stale_timeout=0.01) >>> t1, m1 = q.register() >>> t2, m2 = q.register() >>> q.is_head(t1) True >>> q.remove(m1) >>> q.is_head(t2) True >>> # Make the remaining ticket appear stale and cleanup >>> p = os.path.join(qd, m2) >>> os.utime(p, (0, 0)) >>> q.cleanup_stale() >>> q.is_empty() True >>> q.maybe_cleanup() >>> os.path.exists(qd) False """ def __init__(self, queue_dir: str, stale_timeout: float | None = None) -> None: self.queue_dir: str = queue_dir self.stale_timeout: float | None = stale_timeout os.makedirs(queue_dir, exist_ok=True) def _get_ticket(self) -> int: seq_path: str = os.path.join(self.queue_dir, "seq") # Ensure queue directory exists os.makedirs(self.queue_dir, exist_ok=True) def _inc_seq_in_file(f: IO[Any]) -> int: """Read, increment and persist the sequence in the given open file.""" f.seek(0) data: str = f.read().decode().strip() seq: int = int(data) if data else 0 seq += 1 f.seek(0) f.truncate(0) f.write(str(seq).encode()) f.flush() return seq # Try POSIX advisory lock via fcntl when available try: import fcntl with open(seq_path, "a+b") as f: fcntl.flock(f, fcntl.LOCK_EX) # type: ignore try: seq = _inc_seq_in_file(f) finally: try: fcntl.flock(f, fcntl.LOCK_UN) # type: ignore except Exception: pass return seq except Exception: # fallthrough to try Windows locking pass # Try Windows locking via msvcrt try: import msvcrt with open(seq_path, "a+b") as f: fd = f.fileno() # Lock first byte of the file (blocking) locked = False try: msvcrt.locking(fd, msvcrt.LK_LOCK, 1) # type: ignore locked = True except Exception: # Fallback to non-blocking lock if needed try: msvcrt.locking(fd, msvcrt.LK_NBLCK, 1) # type: ignore locked = True except Exception: locked = False try: if locked: seq = _inc_seq_in_file(f) else: # If locking failed, still attempt a best-effort increment seq = _inc_seq_in_file(f) finally: try: if locked: msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) # type: ignore except Exception: pass return seq except Exception: # Fallback to timestamp + random suffix to reduce collisions import random return int(time.time() * 1e6) * 1000000 + random.getrandbits(48) def register(self) -> tuple[int, str]: ticket: int = self._get_ticket() fname: str = f"{ticket:020d}.{os.getpid()}.{uuid.uuid4().hex}" p: str = os.path.join(self.queue_dir, fname) # Create our ticket file with open(p, "w") as f: f.write(str(time.time())) return ticket, fname def is_head(self, ticket: int) -> bool: try: files: list[str] = sorted(os.listdir(self.queue_dir)) except FileNotFoundError: return False if not files: return False try: head_ticket: int = int(files[0].split(".")[0]) except Exception: return False return head_ticket == ticket def remove(self, member: str) -> None: try: p: str = os.path.join(self.queue_dir, member) if os.path.exists(p): os.remove(p) except Exception: pass
[docs] def cleanup_stale(self) -> None: """ Remove stale head ticket if its mtime exceeds the stale timeout. """ stale: float | None = self.stale_timeout if stale is None: return try: files: list[str] = sorted(os.listdir(self.queue_dir)) if not files: return head: str = files[0] p: str = os.path.join(self.queue_dir, head) try: mtime: float = os.path.getmtime(p) except Exception: return if time.time() - mtime >= stale: try: os.remove(p) except Exception: pass except Exception: pass
[docs] def is_empty(self) -> bool: """Return True if the queue directory contains no ticket files. The sequence file ``seq`` is ignored when determining emptiness. """ try: files: list[str] = sorted(os.listdir(self.queue_dir)) except Exception: return True # Exclude the seq file which is used to allocate tickets members = [f for f in files if f != "seq"] return len(members) == 0
[docs] def maybe_cleanup(self) -> None: """ Try to remove sequence file and queue dir if the queue is empty. This is a best-effort operation: if other clients are active or a race occurs, the function simply returns without raising. """ try: if not self.is_empty(): return # Remove seq file if present seq_path: str = os.path.join(self.queue_dir, "seq") try: if os.path.exists(seq_path): os.remove(seq_path) except Exception: pass # Attempt to remove directory if empty try: os.rmdir(self.queue_dir) except Exception: pass except Exception: pass
[docs] class RedisTicketQueue(BaseTicketQueue): """ Redis-backed ticket queue using INCR + ZADD. Member format: ``{ticket}:{token}:{ts_ms}`` where ``ts_ms`` is the insertion timestamp in milliseconds. The ZSET score is the ticket number which provides ordering. This class performs stale head cleanup based on the provided stale timeout. Examples: >>> # Redis queue examples; run only on non-Windows environments >>> def _redis_ticket_queue_doctest(): ... import time, redis ... client = redis.Redis() ... name = "doctest:rq" ... # Ensure clean start ... _ = client.delete(f"{name}:queue") ... _ = client.delete(f"{name}:seq") ... q = RedisTicketQueue(name, client, stale_timeout=0.01) ... t1, m1 = q.register() ... t2, m2 = q.register() ... q.is_head(t1) ... True ... q.remove(m1) ... q.is_head(t2) ... True ... q.remove(m2) ... q.maybe_cleanup() ... print(client.exists(f"{name}:queue") == 0 and client.exists(f"{name}:seq") == 0) >>> import os >>> if os.name != 'nt': ... _redis_ticket_queue_doctest() ... else: ... print("True") True """ def __init__(self, name: str, client: redis.Redis | None = None, stale_timeout: float | None = None) -> None: self.name: str = name self.client: redis.Redis | None = client self.stale_timeout: float | None = stale_timeout def ensure_client(self) -> redis.Redis: if self.client is None: import redis self.client = redis.Redis() return self.client def register(self) -> tuple[int, str]: client: redis.Redis = self.ensure_client() # redis-py may have a partly unknown return type; cast to int for Pylance ticket: int = cast(int, client.incr(f"{self.name}:seq")) ts_ms: int = int(time.monotonic() * 1000) token: str = uuid.uuid4().hex member: str = f"{ticket}:{token}:{ts_ms}" client.zadd(f"{self.name}:queue", {member: ticket}) return ticket, member def is_head(self, ticket: int) -> bool: client: redis.Redis = self.ensure_client() # zrange may return an Awaitable or a list of bytes; cast to list[bytes] head = cast(list[bytes], client.zrange(f"{self.name}:queue", 0, 0)) # type: ignore[reportUnknownMemberType] if not head: return False head_member: str = head[0].decode() try: head_ticket: int = int(head_member.split(":")[0]) except Exception: return False return head_ticket == ticket def remove(self, member: str) -> None: try: client: redis.Redis = self.ensure_client() client.zrem(f"{self.name}:queue", member) except Exception: pass def cleanup_stale(self) -> None: stale: float | None = self.stale_timeout if stale is None: return try: client: redis.Redis = self.ensure_client() # zrange may return an Awaitable or a list of bytes; cast to list[bytes] head = cast(list[bytes], client.zrange(f"{self.name}:queue", 0, 0)) # type: ignore[reportUnknownMemberType] if not head: return head_member: str = head[0].decode() parts: list[str] = head_member.split(":" ) if len(parts) < 3: return ts_ms: int = int(parts[2]) age: float = (time.monotonic() * 1000) - ts_ms if age >= (stale * 1000): try: client.zrem(f"{self.name}:queue", head_member) except Exception: pass except Exception: pass def is_empty(self) -> bool: try: client: redis.Redis = self.ensure_client() cnt = cast(int, client.zcard(f"{self.name}:queue")) return cnt == 0 except Exception: # On error assume non-empty to avoid aggressive cleanup return False
[docs] def maybe_cleanup(self) -> None: """Attempt to remove Redis keys used by the queue when it is empty. This is best effort: if concurrent clients are active the operation may be a no-op. """ try: if not self.is_empty(): return client: redis.Redis = self.ensure_client() try: client.delete(f"{self.name}:queue") client.delete(f"{self.name}:seq") except Exception: pass except Exception: pass