| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- from __future__ import annotations
- import abc
- import atexit
- import contextlib
- import logging
- import os
- import pathlib
- import random
- import tempfile
- import time
- import typing
- import warnings
- from . import constants, exceptions, portalocker, types
- from .types import Filename, Mode
- logger = logging.getLogger(__name__)
- DEFAULT_TIMEOUT = 5
- DEFAULT_CHECK_INTERVAL = 0.25
- DEFAULT_FAIL_WHEN_LOCKED = False
- LOCK_METHOD = constants.LockFlags.EXCLUSIVE | constants.LockFlags.NON_BLOCKING
- __all__ = [
- 'Lock',
- 'open_atomic',
- ]
- def coalesce(*args: typing.Any, test_value: typing.Any = None) -> typing.Any:
- """Simple coalescing function that returns the first value that is not
- equal to the `test_value`. Or `None` if no value is valid. Usually this
- means that the last given value is the default value.
- Note that the `test_value` is compared using an identity check
- (i.e. `value is not test_value`) so changing the `test_value` won't work
- for all values.
- >>> coalesce(None, 1)
- 1
- >>> coalesce()
- >>> coalesce(0, False, True)
- 0
- >>> coalesce(0, False, True, test_value=0)
- False
- # This won't work because of the `is not test_value` type testing:
- >>> coalesce([], dict(spam='eggs'), test_value=[])
- []
- """
- return next((arg for arg in args if arg is not test_value), None)
- @contextlib.contextmanager
- def open_atomic(
- filename: Filename,
- binary: bool = True,
- ) -> typing.Iterator[types.IO]:
- """Open a file for atomic writing. Instead of locking this method allows
- you to write the entire file and move it to the actual location. Note that
- this makes the assumption that a rename is atomic on your platform which
- is generally the case but not a guarantee.
- http://docs.python.org/library/os.html#os.rename
- >>> filename = 'test_file.txt'
- >>> if os.path.exists(filename):
- ... os.remove(filename)
- >>> with open_atomic(filename) as fh:
- ... written = fh.write(b'test')
- >>> assert os.path.exists(filename)
- >>> os.remove(filename)
- >>> import pathlib
- >>> path_filename = pathlib.Path('test_file.txt')
- >>> with open_atomic(path_filename) as fh:
- ... written = fh.write(b'test')
- >>> assert path_filename.exists()
- >>> path_filename.unlink()
- """
- # `pathlib.Path` cast in case `path` is a `str`
- path: pathlib.Path
- if isinstance(filename, pathlib.Path):
- path = filename
- else:
- path = pathlib.Path(filename)
- assert not path.exists(), f'{path!r} exists'
- # Create the parent directory if it doesn't exist
- path.parent.mkdir(parents=True, exist_ok=True)
- with tempfile.NamedTemporaryFile(
- mode=(binary and 'wb') or 'w',
- dir=str(path.parent),
- delete=False,
- ) as temp_fh:
- yield temp_fh
- temp_fh.flush()
- os.fsync(temp_fh.fileno())
- try:
- os.rename(temp_fh.name, path)
- finally:
- with contextlib.suppress(Exception):
- os.remove(temp_fh.name)
- class LockBase(abc.ABC): # pragma: no cover
- #: timeout when trying to acquire a lock
- timeout: float
- #: check interval while waiting for `timeout`
- check_interval: float
- #: skip the timeout and immediately fail if the initial lock fails
- fail_when_locked: bool
- def __init__(
- self,
- timeout: float | None = None,
- check_interval: float | None = None,
- fail_when_locked: bool | None = None,
- ) -> None:
- self.timeout = coalesce(timeout, DEFAULT_TIMEOUT)
- self.check_interval = coalesce(check_interval, DEFAULT_CHECK_INTERVAL)
- self.fail_when_locked = coalesce(
- fail_when_locked,
- DEFAULT_FAIL_WHEN_LOCKED,
- )
- @abc.abstractmethod
- def acquire(
- self,
- timeout: float | None = None,
- check_interval: float | None = None,
- fail_when_locked: bool | None = None,
- ) -> typing.IO[typing.AnyStr]: ...
- def _timeout_generator(
- self,
- timeout: float | None,
- check_interval: float | None,
- ) -> typing.Iterator[int]:
- f_timeout = coalesce(timeout, self.timeout, 0.0)
- f_check_interval = coalesce(check_interval, self.check_interval, 0.0)
- yield 0
- i = 0
- start_time = time.perf_counter()
- while start_time + f_timeout > time.perf_counter():
- i += 1
- yield i
- # Take low lock checks into account to stay within the interval
- since_start_time = time.perf_counter() - start_time
- time.sleep(max(0.001, (i * f_check_interval) - since_start_time))
- @abc.abstractmethod
- def release(self) -> None: ...
- def __enter__(self) -> typing.IO[typing.AnyStr]:
- return self.acquire()
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_value: BaseException | None,
- traceback: typing.Any, # Should be typing.TracebackType
- ) -> bool | None:
- self.release()
- return None
- def __delete__(self, instance: LockBase) -> None:
- instance.release()
- class Lock(LockBase):
- """Lock manager with built-in timeout
- Args:
- filename: filename
- mode: the open mode, 'a' or 'ab' should be used for writing. When mode
- contains `w` the file will be truncated to 0 bytes.
- timeout: timeout when trying to acquire a lock
- check_interval: check interval while waiting
- fail_when_locked: after the initial lock failed, return an error
- or lock the file. This does not wait for the timeout.
- **file_open_kwargs: The kwargs for the `open(...)` call
- fail_when_locked is useful when multiple threads/processes can race
- when creating a file. If set to true than the system will wait till
- the lock was acquired and then return an AlreadyLocked exception.
- Note that the file is opened first and locked later. So using 'w' as
- mode will result in truncate _BEFORE_ the lock is checked.
- """
- fh: types.IO | None
- filename: str
- mode: str
- truncate: bool
- timeout: float
- check_interval: float
- fail_when_locked: bool
- flags: constants.LockFlags
- file_open_kwargs: dict[str, typing.Any]
- def __init__(
- self,
- filename: Filename,
- mode: Mode = 'a',
- timeout: float | None = None,
- check_interval: float = DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool = DEFAULT_FAIL_WHEN_LOCKED,
- flags: constants.LockFlags = LOCK_METHOD,
- **file_open_kwargs: typing.Any,
- ) -> None:
- if 'w' in mode:
- truncate = True
- mode = typing.cast(Mode, mode.replace('w', 'a'))
- else:
- truncate = False
- if timeout is None:
- timeout = DEFAULT_TIMEOUT
- elif not (flags & constants.LockFlags.NON_BLOCKING):
- warnings.warn(
- 'timeout has no effect in blocking mode',
- stacklevel=1,
- )
- self.fh = None
- self.filename = str(filename)
- self.mode = mode
- self.truncate = truncate
- self.flags = flags
- self.file_open_kwargs = file_open_kwargs
- super().__init__(timeout, check_interval, fail_when_locked)
- def acquire(
- self,
- timeout: float | None = None,
- check_interval: float | None = None,
- fail_when_locked: bool | None = None,
- ) -> typing.IO[typing.AnyStr]:
- """Acquire the locked filehandle"""
- fail_when_locked = coalesce(fail_when_locked, self.fail_when_locked)
- if (
- not (self.flags & constants.LockFlags.NON_BLOCKING)
- and timeout is not None
- ):
- warnings.warn(
- 'timeout has no effect in blocking mode',
- stacklevel=1,
- )
- # If we already have a filehandle, return it
- fh = self.fh
- if fh:
- # Due to type invariance we need to cast the type
- return typing.cast(typing.IO[typing.AnyStr], fh)
- # Get a new filehandler
- fh = self._get_fh()
- def try_close() -> None: # pragma: no cover
- # Silently try to close the handle if possible, ignore all issues
- if fh is not None:
- with contextlib.suppress(Exception):
- fh.close()
- exception = None
- # Try till the timeout has passed
- for _ in self._timeout_generator(timeout, check_interval):
- exception = None
- try:
- # Try to lock
- fh = self._get_lock(fh)
- break
- except exceptions.LockException as exc:
- # Python will automatically remove the variable from memory
- # unless you save it in a different location
- exception = exc
- # We already tried to the get the lock
- # If fail_when_locked is True, stop trying
- if fail_when_locked:
- try_close()
- raise exceptions.AlreadyLocked(exception) from exc
- except Exception as exc:
- # Something went wrong with the locking mechanism.
- # Wrap in a LockException and re-raise:
- try_close()
- raise exceptions.LockException(exc) from exc
- # Wait a bit
- if exception:
- try_close()
- # We got a timeout... reraising
- raise exception
- # Prepare the filehandle (truncate if needed)
- fh = self._prepare_fh(fh)
- self.fh = fh
- return typing.cast(typing.IO[typing.AnyStr], fh)
- def __enter__(self) -> typing.IO[typing.AnyStr]:
- return self.acquire()
- def release(self) -> None:
- """Releases the currently locked file handle"""
- if self.fh:
- portalocker.unlock(self.fh)
- self.fh.close()
- self.fh = None
- def _get_fh(self) -> types.IO:
- """Get a new filehandle"""
- return typing.cast(
- types.IO,
- open( # noqa: SIM115
- self.filename,
- self.mode,
- **self.file_open_kwargs,
- ),
- )
- def _get_lock(self, fh: types.IO) -> types.IO:
- """
- Try to lock the given filehandle
- returns LockException if it fails"""
- portalocker.lock(fh, self.flags)
- return fh
- def _prepare_fh(self, fh: types.IO) -> types.IO:
- """
- Prepare the filehandle for usage
- If truncate is a number, the file will be truncated to that amount of
- bytes
- """
- if self.truncate:
- fh.seek(0)
- fh.truncate(0)
- return fh
- class RLock(Lock):
- """
- A reentrant lock, functions in a similar way to threading.RLock in that it
- can be acquired multiple times. When the corresponding number of release()
- calls are made the lock will finally release the underlying file lock.
- """
- def __init__(
- self,
- filename: Filename,
- mode: Mode = 'a',
- timeout: float = DEFAULT_TIMEOUT,
- check_interval: float = DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool = False,
- flags: constants.LockFlags = LOCK_METHOD,
- ) -> None:
- super().__init__(
- filename,
- mode,
- timeout,
- check_interval,
- fail_when_locked,
- flags,
- )
- self._acquire_count = 0
- def acquire(
- self,
- timeout: float | None = None,
- check_interval: float | None = None,
- fail_when_locked: bool | None = None,
- ) -> typing.IO[typing.AnyStr]:
- fh: typing.IO[typing.AnyStr]
- if self._acquire_count >= 1:
- fh = typing.cast(typing.IO[typing.AnyStr], self.fh)
- else:
- fh = super().acquire(timeout, check_interval, fail_when_locked)
- self._acquire_count += 1
- assert fh is not None
- return fh
- def release(self) -> None:
- if self._acquire_count == 0:
- raise exceptions.LockException(
- 'Cannot release more times than acquired',
- )
- if self._acquire_count == 1:
- super().release()
- self._acquire_count -= 1
- class TemporaryFileLock(Lock):
- def __init__(
- self,
- filename: str = '.lock',
- timeout: float = DEFAULT_TIMEOUT,
- check_interval: float = DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool = True,
- flags: constants.LockFlags = LOCK_METHOD,
- ) -> None:
- super().__init__(
- filename=filename,
- mode='w',
- timeout=timeout,
- check_interval=check_interval,
- fail_when_locked=fail_when_locked,
- flags=flags,
- )
- atexit.register(self.release)
- def release(self) -> None:
- Lock.release(self)
- if os.path.isfile(self.filename): # pragma: no branch
- os.unlink(self.filename)
- class BoundedSemaphore(LockBase):
- """
- Bounded semaphore to prevent too many parallel processes from running
- This method is deprecated because multiple processes that are completely
- unrelated could end up using the same semaphore. To prevent this,
- use `NamedBoundedSemaphore` instead. The
- `NamedBoundedSemaphore` is a drop-in replacement for this class.
- >>> semaphore = BoundedSemaphore(2, directory='')
- >>> str(semaphore.get_filenames()[0])
- 'bounded_semaphore.00.lock'
- >>> str(sorted(semaphore.get_random_filenames())[1])
- 'bounded_semaphore.01.lock'
- """
- lock: Lock | None
- def __init__(
- self,
- maximum: int,
- name: str = 'bounded_semaphore',
- filename_pattern: str = '{name}.{number:02d}.lock',
- directory: str = tempfile.gettempdir(),
- timeout: float | None = DEFAULT_TIMEOUT,
- check_interval: float | None = DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool | None = True,
- ) -> None:
- self.maximum = maximum
- self.name = name
- self.filename_pattern = filename_pattern
- self.directory = directory
- self.lock: Lock | None = None
- super().__init__(
- timeout=timeout,
- check_interval=check_interval,
- fail_when_locked=fail_when_locked,
- )
- if not name or name == 'bounded_semaphore':
- warnings.warn(
- '`BoundedSemaphore` without an explicit `name` '
- 'argument is deprecated, use NamedBoundedSemaphore',
- DeprecationWarning,
- stacklevel=1,
- )
- def get_filenames(self) -> typing.Sequence[pathlib.Path]:
- return [self.get_filename(n) for n in range(self.maximum)]
- def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
- filenames = list(self.get_filenames())
- random.shuffle(filenames)
- return filenames
- def get_filename(self, number: int) -> pathlib.Path:
- return pathlib.Path(self.directory) / self.filename_pattern.format(
- name=self.name,
- number=number,
- )
- def acquire( # type: ignore[override]
- self,
- timeout: float | None = None,
- check_interval: float | None = None,
- fail_when_locked: bool | None = None,
- ) -> Lock | None:
- assert not self.lock, 'Already locked'
- filenames = self.get_filenames()
- for n in self._timeout_generator(timeout, check_interval): # pragma:
- logger.debug('trying lock (attempt %d) %r', n, filenames)
- # no branch
- if self.try_lock(filenames): # pragma: no branch
- return self.lock # pragma: no cover
- if fail_when_locked := coalesce(
- fail_when_locked,
- self.fail_when_locked,
- ):
- raise exceptions.AlreadyLocked()
- return None
- def try_lock(self, filenames: typing.Sequence[Filename]) -> bool:
- filename: Filename
- for filename in filenames:
- logger.debug('trying lock for %r', filename)
- self.lock = Lock(filename, fail_when_locked=True)
- try:
- self.lock.acquire()
- except exceptions.AlreadyLocked:
- self.lock = None
- else:
- logger.debug('locked %r', filename)
- return True
- return False
- def release(self) -> None: # pragma: no cover
- if self.lock is not None:
- self.lock.release()
- self.lock = None
- class NamedBoundedSemaphore(BoundedSemaphore):
- """
- Bounded semaphore to prevent too many parallel processes from running
- It's also possible to specify a timeout when acquiring the lock to wait
- for a resource to become available. This is very similar to
- `threading.BoundedSemaphore` but works across multiple processes and across
- multiple operating systems.
- Because this works across multiple processes it's important to give the
- semaphore a name. This name is used to create the lock files. If you
- don't specify a name, a random name will be generated. This means that
- you can't use the same semaphore in multiple processes unless you pass the
- semaphore object to the other processes.
- >>> semaphore = NamedBoundedSemaphore(2, name='test')
- >>> str(semaphore.get_filenames()[0])
- '...test.00.lock'
- >>> semaphore = NamedBoundedSemaphore(2)
- >>> 'bounded_semaphore' in str(semaphore.get_filenames()[0])
- True
- """
- def __init__(
- self,
- maximum: int,
- name: str | None = None,
- filename_pattern: str = '{name}.{number:02d}.lock',
- directory: str = tempfile.gettempdir(),
- timeout: float | None = DEFAULT_TIMEOUT,
- check_interval: float | None = DEFAULT_CHECK_INTERVAL,
- fail_when_locked: bool | None = True,
- ) -> None:
- if name is None:
- name = f'bounded_semaphore.{random.randint(0, 1000000):d}'
- super().__init__(
- maximum,
- name,
- filename_pattern,
- directory,
- timeout,
- check_interval,
- fail_when_locked,
- )
|