utils.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import base64
  2. from pathlib import Path
  3. from typing import Optional, Union
  4. from pydantic import BaseModel, Field
  5. class FieldPath(BaseModel):
  6. current: str
  7. tail: Optional[list["FieldPath"]] = Field(default=None)
  8. def as_str_list(self) -> list[str]:
  9. """
  10. >>> FieldPath(current='a', tail=[FieldPath(current='b', tail=[FieldPath(current='c'), FieldPath(current='d')])]).as_str_list()
  11. ['a.b.c', 'a.b.d']
  12. """
  13. # Recursive function to collect all paths
  14. def collect_paths(path: FieldPath, prefix: str = "") -> list[str]:
  15. current_path = prefix + path.current
  16. if not path.tail:
  17. return [current_path]
  18. else:
  19. paths = []
  20. for sub_path in path.tail:
  21. paths.extend(collect_paths(sub_path, current_path + "."))
  22. return paths
  23. # Collect all paths starting from this object
  24. return collect_paths(self)
  25. def convert_paths(paths: list[str]) -> list[FieldPath]:
  26. """Convert string paths into FieldPath objects
  27. Paths which share the same root are grouped together.
  28. Args:
  29. paths: List[str]: List of str paths containing "." as separator
  30. Returns:
  31. List[FieldPath]: List of FieldPath objects
  32. """
  33. sorted_paths = sorted(paths)
  34. prev_root = None
  35. converted_paths = []
  36. for path in sorted_paths:
  37. parts = path.split(".")
  38. root = parts[0]
  39. if root != prev_root:
  40. converted_paths.append(FieldPath(current=root))
  41. prev_root = root
  42. current = converted_paths[-1]
  43. for part in parts[1:]:
  44. if current.tail is None:
  45. current.tail = []
  46. found = False
  47. for tail in current.tail:
  48. if tail.current == part:
  49. current = tail
  50. found = True
  51. break
  52. if not found:
  53. new_tail = FieldPath(current=part)
  54. assert current.tail is not None
  55. current.tail.append(new_tail)
  56. current = new_tail
  57. return converted_paths
  58. def read_base64(file_path: Union[str, Path]) -> str:
  59. """Convert a file path to a base64 encoded string."""
  60. path = Path(file_path)
  61. if not path.exists():
  62. raise FileNotFoundError(f"The file {path} does not exist.")
  63. with open(path, "rb") as file:
  64. file_content = file.read()
  65. return base64.b64encode(file_content).decode("utf-8")