stouputils.lock.queue module#
- class BaseTicketQueue[source]#
Bases:
objectBase API for ticket queues.
- register() tuple[int, str][source]#
Function ‘register()’ is abstract and must be implemented by a subclass
- is_head(ticket: int) bool[source]#
Function ‘is_head()’ is abstract and must be implemented by a subclass
- remove(member: str) None[source]#
Function ‘remove()’ is abstract and must be implemented by a subclass
- cleanup_stale() None[source]#
Function ‘cleanup_stale()’ is abstract and must be implemented by a subclass
- class FileTicketQueue(
- queue_dir: str,
- stale_timeout: float | None = None,
Bases:
BaseTicketQueueFile-system backed ticket queue.
Tickets are assigned using a small
seqfile protected by an exclusive lock (viafcntlon POSIX). Each waiter creates a ticket file named{ticket:020d}.{pid}.{uuid}in the queue directory. The head of the sorted directory listing is considered the current owner.Examples
>>> # Basic filesystem queue behaviour and cleanup >>> import tempfile, os, time >>> tmp = tempfile.mkdtemp() >>> qd = tmp + "/q" >>> q = FileTicketQueue(qd, stale_timeout=0.01) >>> t1, m1 = q.register() >>> t2, m2 = q.register() >>> q.is_head(t1) True >>> q.remove(m1) >>> q.is_head(t2) True >>> # Make the remaining ticket appear stale and cleanup >>> p = os.path.join(qd, m2) >>> os.utime(p, (0, 0)) >>> q.cleanup_stale() >>> q.is_empty() True >>> q.maybe_cleanup() >>> os.path.exists(qd) False
- class RedisTicketQueue(
- name: str,
- client: redis.Redis | None = None,
- stale_timeout: float | None = None,
Bases:
BaseTicketQueueRedis-backed ticket queue using INCR + ZADD.
Member format:
{ticket}:{token}:{ts_ms}wherets_msis the insertion timestamp in milliseconds. The ZSET score is the ticket number which provides ordering. This class performs stale head cleanup based on the provided stale timeout.Examples
>>> # Redis queue examples; run only on non-Windows environments >>> def _redis_ticket_queue_doctest(): ... import time, redis ... client = redis.Redis() ... name = "doctest:rq" ... # Ensure clean start ... _ = client.delete(f"{name}:queue") ... _ = client.delete(f"{name}:seq") ... q = RedisTicketQueue(name, client, stale_timeout=0.01) ... t1, m1 = q.register() ... t2, m2 = q.register() ... q.is_head(t1) ... True ... q.remove(m1) ... q.is_head(t2) ... True ... q.remove(m2) ... q.maybe_cleanup() ... print(client.exists(f"{name}:queue") == 0 and client.exists(f"{name}:seq") == 0) >>> import os >>> if os.name != 'nt': ... _redis_ticket_queue_doctest() ... else: ... print("True") True