constants.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. """
  2. Locking constants
  3. Lock types:
  4. - `EXCLUSIVE` exclusive lock
  5. - `SHARED` shared lock
  6. Lock flags:
  7. - `NON_BLOCKING` non-blocking
  8. Manually unlock, only needed internally
  9. - `UNBLOCK` unlock
  10. """
  11. import enum
  12. import os
  13. # The actual tests will execute the code anyhow so the following code can
  14. # safely be ignored from the coverage tests
  15. if os.name == 'nt': # pragma: no cover
  16. import msvcrt
  17. #: exclusive lock
  18. LOCK_EX = 0x1
  19. #: shared lock
  20. LOCK_SH = 0x2
  21. #: non-blocking
  22. LOCK_NB = 0x4
  23. #: unlock
  24. LOCK_UN = msvcrt.LK_UNLCK # type: ignore[attr-defined]
  25. elif os.name == 'posix': # pragma: no cover
  26. import fcntl
  27. #: exclusive lock
  28. LOCK_EX = fcntl.LOCK_EX # type: ignore[attr-defined]
  29. #: shared lock
  30. LOCK_SH = fcntl.LOCK_SH # type: ignore[attr-defined]
  31. #: non-blocking
  32. LOCK_NB = fcntl.LOCK_NB # type: ignore[attr-defined]
  33. #: unlock
  34. LOCK_UN = fcntl.LOCK_UN # type: ignore[attr-defined]
  35. else: # pragma: no cover
  36. raise RuntimeError('PortaLocker only defined for nt and posix platforms')
  37. class LockFlags(enum.IntFlag):
  38. #: exclusive lock
  39. EXCLUSIVE = LOCK_EX
  40. #: shared lock
  41. SHARED = LOCK_SH
  42. #: non-blocking
  43. NON_BLOCKING = LOCK_NB
  44. #: unlock
  45. UNBLOCK = LOCK_UN