Source code for pybdl.api.utils.quota_cache

"""Persistent cache for API quota usage."""

import asyncio
import json
import threading
from pathlib import Path
from typing import Any

from pybdl.utils.cache import get_cache_file_path


[docs] class PersistentQuotaCache: """ Persistent cache for API quota usage, stored on disk. This class provides thread-safe, persistent storage for quota usage data, allowing rate limiters to survive process restarts and share state between sessions. """ def __init__(self, enabled: bool = True) -> None: """ Initialize the persistent quota cache. Args: enabled: Whether to enable persistent caching. """ self.enabled = enabled cache_path = get_cache_file_path("quota_cache.json") self.cache_file: Path | str = Path(cache_path) if isinstance(cache_path, str) else cache_path self._lock = threading.Lock() self._async_lock: asyncio.Lock | None = None # Lazy initialization for async self._data: dict[str, Any] = {} if self.enabled: self._ensure_cache_dir() self._load() def _ensure_cache_dir(self) -> None: """Ensure cache directory exists.""" # Handle case where cache_file might be set as string (e.g., in tests) cache_path = Path(self.cache_file) if isinstance(self.cache_file, str) else self.cache_file cache_path.parent.mkdir(parents=True, exist_ok=True) def _load(self) -> None: """ Load quota data from the cache file. """ try: # Handle case where cache_file might be set as string (e.g., in tests) cache_path = Path(self.cache_file) if isinstance(self.cache_file, str) else self.cache_file if cache_path.exists(): with open(cache_path) as f: self._data = json.load(f) except (json.JSONDecodeError, OSError): # Log warning but continue with empty cache self._data = {} def _save(self) -> None: """ Save quota data to the cache file. """ if not self.enabled: return try: # Handle case where cache_file might be set as string (e.g., in tests) cache_path = Path(self.cache_file) if isinstance(self.cache_file, str) else self.cache_file # Write to temp file first, then rename (atomic operation) temp_file = cache_path.with_suffix(".tmp") with open(temp_file, "w") as f: json.dump(self._data, f) temp_file.replace(cache_path) except OSError: # Non-fatal: log but don't crash pass
[docs] def get(self, key: str) -> Any: """ Retrieve a cached value by key. Args: key: Cache key. Returns: Cached value, or [] if not found or disabled. """ if not self.enabled: return [] with self._lock: return self._data.get(key, [])
[docs] def set(self, key: str, value: Any) -> None: """ Set a cached value by key and persist it. Args: key: Cache key. value: Value to store. """ if not self.enabled: return with self._lock: self._data[key] = value self._save()
[docs] def try_append_if_under_limit( self, key: str, value: float, max_length: int, cleanup_older_than: float | None = None ) -> bool: """ Atomically try to append a value to a cached list if it wouldn't exceed max_length. This prevents race conditions when multiple limiters try to record calls simultaneously. The entire operation (get, check, append, save) happens atomically under the cache lock. Args: key: Cache key. value: Value to append (typically a timestamp). max_length: Maximum length allowed. cleanup_older_than: If provided, remove values older than this timestamp. Returns: True if append succeeded, False if it would exceed the limit. """ if not self.enabled: return True # If cache disabled, allow the append with self._lock: current = list(self._data.get(key, [])) # Cleanup old values if requested if cleanup_older_than is not None: current = [v for v in current if isinstance(v, (int, float)) and v > cleanup_older_than] # Check if we can append if len(current) >= max_length: return False # Append and save atomically current.append(value) self._data[key] = current self._save() return True