psparser.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. #!/usr/bin/env python3
  2. import io
  3. import logging
  4. import re
  5. from typing import (
  6. Any,
  7. BinaryIO,
  8. Dict,
  9. Generic,
  10. Iterator,
  11. List,
  12. Optional,
  13. Tuple,
  14. Type,
  15. TypeVar,
  16. Union,
  17. )
  18. from pdfminer import psexceptions, settings
  19. from pdfminer.utils import choplist
  20. log = logging.getLogger(__name__)
  21. # Adding aliases for these exceptions for backwards compatibility
  22. PSException = psexceptions.PSException
  23. PSEOF = psexceptions.PSEOF
  24. PSSyntaxError = psexceptions.PSSyntaxError
  25. PSTypeError = psexceptions.PSTypeError
  26. PSValueError = psexceptions.PSValueError
  27. class PSObject:
  28. """Base class for all PS or PDF-related data types."""
  29. class PSLiteral(PSObject):
  30. """A class that represents a PostScript literal.
  31. Postscript literals are used as identifiers, such as
  32. variable names, property names and dictionary keys.
  33. Literals are case sensitive and denoted by a preceding
  34. slash sign (e.g. "/Name")
  35. Note: Do not create an instance of PSLiteral directly.
  36. Always use PSLiteralTable.intern().
  37. """
  38. NameType = Union[str, bytes]
  39. def __init__(self, name: NameType) -> None:
  40. self.name = name
  41. def __repr__(self) -> str:
  42. name = self.name
  43. return "/%r" % name
  44. class PSKeyword(PSObject):
  45. """A class that represents a PostScript keyword.
  46. PostScript keywords are a dozen of predefined words.
  47. Commands and directives in PostScript are expressed by keywords.
  48. They are also used to denote the content boundaries.
  49. Note: Do not create an instance of PSKeyword directly.
  50. Always use PSKeywordTable.intern().
  51. """
  52. def __init__(self, name: bytes) -> None:
  53. self.name = name
  54. def __repr__(self) -> str:
  55. name = self.name
  56. return "/%r" % name
  57. _SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword)
  58. class PSSymbolTable(Generic[_SymbolT]):
  59. """A utility class for storing PSLiteral/PSKeyword objects.
  60. Interned objects can be checked its identity with "is" operator.
  61. """
  62. def __init__(self, klass: Type[_SymbolT]) -> None:
  63. self.dict: Dict[PSLiteral.NameType, _SymbolT] = {}
  64. self.klass: Type[_SymbolT] = klass
  65. def intern(self, name: PSLiteral.NameType) -> _SymbolT:
  66. if name in self.dict:
  67. lit = self.dict[name]
  68. else:
  69. # Type confusion issue: PSKeyword always takes bytes as name
  70. # PSLiteral uses either str or bytes
  71. lit = self.klass(name) # type: ignore[arg-type]
  72. self.dict[name] = lit
  73. return lit
  74. PSLiteralTable = PSSymbolTable(PSLiteral)
  75. PSKeywordTable = PSSymbolTable(PSKeyword)
  76. LIT = PSLiteralTable.intern
  77. KWD = PSKeywordTable.intern
  78. KEYWORD_PROC_BEGIN = KWD(b"{")
  79. KEYWORD_PROC_END = KWD(b"}")
  80. KEYWORD_ARRAY_BEGIN = KWD(b"[")
  81. KEYWORD_ARRAY_END = KWD(b"]")
  82. KEYWORD_DICT_BEGIN = KWD(b"<<")
  83. KEYWORD_DICT_END = KWD(b">>")
  84. def literal_name(x: Any) -> str:
  85. if isinstance(x, PSLiteral):
  86. if isinstance(x.name, str):
  87. return x.name
  88. try:
  89. return str(x.name, "utf-8")
  90. except UnicodeDecodeError:
  91. return str(x.name)
  92. else:
  93. if settings.STRICT:
  94. raise PSTypeError(f"Literal required: {x!r}")
  95. return str(x)
  96. def keyword_name(x: Any) -> Any:
  97. if not isinstance(x, PSKeyword):
  98. if settings.STRICT:
  99. raise PSTypeError("Keyword required: %r" % x)
  100. else:
  101. name = x
  102. else:
  103. name = str(x.name, "utf-8", "ignore")
  104. return name
  105. EOL = re.compile(rb"[\r\n]")
  106. SPC = re.compile(rb"\s")
  107. NONSPC = re.compile(rb"\S")
  108. HEX = re.compile(rb"[0-9a-fA-F]")
  109. END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]")
  110. END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]")
  111. HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.")
  112. END_NUMBER = re.compile(rb"[^0-9]")
  113. END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]")
  114. END_STRING = re.compile(rb"[()\134]")
  115. OCT_STRING = re.compile(rb"[0-7]")
  116. ESC_STRING = {
  117. b"b": 8,
  118. b"t": 9,
  119. b"n": 10,
  120. b"f": 12,
  121. b"r": 13,
  122. b"(": 40,
  123. b")": 41,
  124. b"\\": 92,
  125. }
  126. PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes]
  127. class PSBaseParser:
  128. """Most basic PostScript parser that performs only tokenization."""
  129. BUFSIZ = 4096
  130. def __init__(self, fp: BinaryIO) -> None:
  131. self.fp = fp
  132. self.eof = False
  133. self.seek(0)
  134. def __repr__(self) -> str:
  135. return "<%s: %r, bufpos=%d>" % (self.__class__.__name__, self.fp, self.bufpos)
  136. def flush(self) -> None:
  137. pass
  138. def close(self) -> None:
  139. self.flush()
  140. def tell(self) -> int:
  141. return self.bufpos + self.charpos
  142. def poll(self, pos: Optional[int] = None, n: int = 80) -> None:
  143. pos0 = self.fp.tell()
  144. if not pos:
  145. pos = self.bufpos + self.charpos
  146. self.fp.seek(pos)
  147. log.debug("poll(%d): %r", pos, self.fp.read(n))
  148. self.fp.seek(pos0)
  149. def seek(self, pos: int) -> None:
  150. """Seeks the parser to the given position."""
  151. log.debug("seek: %r", pos)
  152. self.fp.seek(pos)
  153. # reset the status for nextline()
  154. self.bufpos = pos
  155. self.buf = b""
  156. self.charpos = 0
  157. # reset the status for nexttoken()
  158. self._parse1 = self._parse_main
  159. self._curtoken = b""
  160. self._curtokenpos = 0
  161. self._tokens: List[Tuple[int, PSBaseParserToken]] = []
  162. self.eof = False
  163. def fillbuf(self) -> None:
  164. if self.charpos < len(self.buf):
  165. return
  166. # fetch next chunk.
  167. self.bufpos = self.fp.tell()
  168. self.buf = self.fp.read(self.BUFSIZ)
  169. if not self.buf:
  170. raise PSEOF("Unexpected EOF")
  171. self.charpos = 0
  172. def nextline(self) -> Tuple[int, bytes]:
  173. """Fetches a next line that ends either with \\r or \\n."""
  174. linebuf = b""
  175. linepos = self.bufpos + self.charpos
  176. eol = False
  177. while 1:
  178. self.fillbuf()
  179. if eol:
  180. c = self.buf[self.charpos : self.charpos + 1]
  181. # handle b'\r\n'
  182. if c == b"\n":
  183. linebuf += c
  184. self.charpos += 1
  185. break
  186. m = EOL.search(self.buf, self.charpos)
  187. if m:
  188. linebuf += self.buf[self.charpos : m.end(0)]
  189. self.charpos = m.end(0)
  190. if linebuf[-1:] == b"\r":
  191. eol = True
  192. else:
  193. break
  194. else:
  195. linebuf += self.buf[self.charpos :]
  196. self.charpos = len(self.buf)
  197. log.debug("nextline: %r, %r", linepos, linebuf)
  198. return (linepos, linebuf)
  199. def revreadlines(self) -> Iterator[bytes]:
  200. """Fetches a next line backword.
  201. This is used to locate the trailers at the end of a file.
  202. """
  203. self.fp.seek(0, io.SEEK_END)
  204. pos = self.fp.tell()
  205. buf = b""
  206. while pos > 0:
  207. prevpos = pos
  208. pos = max(0, pos - self.BUFSIZ)
  209. self.fp.seek(pos)
  210. s = self.fp.read(prevpos - pos)
  211. if not s:
  212. break
  213. while 1:
  214. n = max(s.rfind(b"\r"), s.rfind(b"\n"))
  215. if n == -1:
  216. buf = s + buf
  217. break
  218. yield s[n:] + buf
  219. s = s[:n]
  220. buf = b""
  221. def _parse_main(self, s: bytes, i: int) -> int:
  222. m = NONSPC.search(s, i)
  223. if not m:
  224. return len(s)
  225. j = m.start(0)
  226. c = s[j : j + 1]
  227. self._curtokenpos = self.bufpos + j
  228. if c == b"%":
  229. self._curtoken = b"%"
  230. self._parse1 = self._parse_comment
  231. return j + 1
  232. elif c == b"/":
  233. self._curtoken = b""
  234. self._parse1 = self._parse_literal
  235. return j + 1
  236. elif c in b"-+" or c.isdigit():
  237. self._curtoken = c
  238. self._parse1 = self._parse_number
  239. return j + 1
  240. elif c == b".":
  241. self._curtoken = c
  242. self._parse1 = self._parse_float
  243. return j + 1
  244. elif c.isalpha():
  245. self._curtoken = c
  246. self._parse1 = self._parse_keyword
  247. return j + 1
  248. elif c == b"(":
  249. self._curtoken = b""
  250. self.paren = 1
  251. self._parse1 = self._parse_string
  252. return j + 1
  253. elif c == b"<":
  254. self._curtoken = b""
  255. self._parse1 = self._parse_wopen
  256. return j + 1
  257. elif c == b">":
  258. self._curtoken = b""
  259. self._parse1 = self._parse_wclose
  260. return j + 1
  261. elif c == b"\x00":
  262. return j + 1
  263. else:
  264. self._add_token(KWD(c))
  265. return j + 1
  266. def _add_token(self, obj: PSBaseParserToken) -> None:
  267. self._tokens.append((self._curtokenpos, obj))
  268. def _parse_comment(self, s: bytes, i: int) -> int:
  269. m = EOL.search(s, i)
  270. if not m:
  271. self._curtoken += s[i:]
  272. return len(s)
  273. j = m.start(0)
  274. self._curtoken += s[i:j]
  275. self._parse1 = self._parse_main
  276. # We ignore comments.
  277. # self._tokens.append(self._curtoken)
  278. return j
  279. def _parse_literal(self, s: bytes, i: int) -> int:
  280. m = END_LITERAL.search(s, i)
  281. if not m:
  282. self._curtoken += s[i:]
  283. return len(s)
  284. j = m.start(0)
  285. self._curtoken += s[i:j]
  286. c = s[j : j + 1]
  287. if c == b"#":
  288. self.hex = b""
  289. self._parse1 = self._parse_literal_hex
  290. return j + 1
  291. try:
  292. name: Union[str, bytes] = str(self._curtoken, "utf-8")
  293. except Exception:
  294. name = self._curtoken
  295. self._add_token(LIT(name))
  296. self._parse1 = self._parse_main
  297. return j
  298. def _parse_literal_hex(self, s: bytes, i: int) -> int:
  299. c = s[i : i + 1]
  300. if HEX.match(c) and len(self.hex) < 2:
  301. self.hex += c
  302. return i + 1
  303. if self.hex:
  304. self._curtoken += bytes((int(self.hex, 16),))
  305. self._parse1 = self._parse_literal
  306. return i
  307. def _parse_number(self, s: bytes, i: int) -> int:
  308. m = END_NUMBER.search(s, i)
  309. if not m:
  310. self._curtoken += s[i:]
  311. return len(s)
  312. j = m.start(0)
  313. self._curtoken += s[i:j]
  314. c = s[j : j + 1]
  315. if c == b".":
  316. self._curtoken += c
  317. self._parse1 = self._parse_float
  318. return j + 1
  319. try:
  320. self._add_token(int(self._curtoken))
  321. except ValueError:
  322. pass
  323. self._parse1 = self._parse_main
  324. return j
  325. def _parse_float(self, s: bytes, i: int) -> int:
  326. m = END_NUMBER.search(s, i)
  327. if not m:
  328. self._curtoken += s[i:]
  329. return len(s)
  330. j = m.start(0)
  331. self._curtoken += s[i:j]
  332. try:
  333. self._add_token(float(self._curtoken))
  334. except ValueError:
  335. pass
  336. self._parse1 = self._parse_main
  337. return j
  338. def _parse_keyword(self, s: bytes, i: int) -> int:
  339. m = END_KEYWORD.search(s, i)
  340. if m:
  341. j = m.start(0)
  342. self._curtoken += s[i:j]
  343. else:
  344. self._curtoken += s[i:]
  345. return len(s)
  346. if self._curtoken == b"true":
  347. token: Union[bool, PSKeyword] = True
  348. elif self._curtoken == b"false":
  349. token = False
  350. else:
  351. token = KWD(self._curtoken)
  352. self._add_token(token)
  353. self._parse1 = self._parse_main
  354. return j
  355. def _parse_string(self, s: bytes, i: int) -> int:
  356. m = END_STRING.search(s, i)
  357. if not m:
  358. self._curtoken += s[i:]
  359. return len(s)
  360. j = m.start(0)
  361. self._curtoken += s[i:j]
  362. c = s[j : j + 1]
  363. if c == b"\\":
  364. self.oct = b""
  365. self._parse1 = self._parse_string_1
  366. return j + 1
  367. if c == b"(":
  368. self.paren += 1
  369. self._curtoken += c
  370. return j + 1
  371. if c == b")":
  372. self.paren -= 1
  373. if self.paren:
  374. # WTF, they said balanced parens need no special treatment.
  375. self._curtoken += c
  376. return j + 1
  377. self._add_token(self._curtoken)
  378. self._parse1 = self._parse_main
  379. return j + 1
  380. def _parse_string_1(self, s: bytes, i: int) -> int:
  381. """Parse literal strings
  382. PDF Reference 3.2.3
  383. """
  384. c = s[i : i + 1]
  385. if OCT_STRING.match(c) and len(self.oct) < 3:
  386. self.oct += c
  387. return i + 1
  388. elif self.oct:
  389. chrcode = int(self.oct, 8)
  390. assert chrcode < 256, "Invalid octal %s (%d)" % (repr(self.oct), chrcode)
  391. self._curtoken += bytes((chrcode,))
  392. self._parse1 = self._parse_string
  393. return i
  394. elif c in ESC_STRING:
  395. self._curtoken += bytes((ESC_STRING[c],))
  396. elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n":
  397. # If current and next character is \r\n skip both because enters
  398. # after a \ are ignored
  399. i += 1
  400. # default action
  401. self._parse1 = self._parse_string
  402. return i + 1
  403. def _parse_wopen(self, s: bytes, i: int) -> int:
  404. c = s[i : i + 1]
  405. if c == b"<":
  406. self._add_token(KEYWORD_DICT_BEGIN)
  407. self._parse1 = self._parse_main
  408. i += 1
  409. else:
  410. self._parse1 = self._parse_hexstring
  411. return i
  412. def _parse_wclose(self, s: bytes, i: int) -> int:
  413. c = s[i : i + 1]
  414. if c == b">":
  415. self._add_token(KEYWORD_DICT_END)
  416. i += 1
  417. self._parse1 = self._parse_main
  418. return i
  419. def _parse_hexstring(self, s: bytes, i: int) -> int:
  420. m = END_HEX_STRING.search(s, i)
  421. if not m:
  422. self._curtoken += s[i:]
  423. return len(s)
  424. j = m.start(0)
  425. self._curtoken += s[i:j]
  426. token = HEX_PAIR.sub(
  427. lambda m: bytes((int(m.group(0), 16),)),
  428. SPC.sub(b"", self._curtoken),
  429. )
  430. self._add_token(token)
  431. self._parse1 = self._parse_main
  432. return j
  433. def nexttoken(self) -> Tuple[int, PSBaseParserToken]:
  434. if self.eof:
  435. # It's not really unexpected, come on now...
  436. raise PSEOF("Unexpected EOF")
  437. while not self._tokens:
  438. try:
  439. self.fillbuf()
  440. self.charpos = self._parse1(self.buf, self.charpos)
  441. except PSEOF:
  442. # If we hit EOF in the middle of a token, try to parse
  443. # it by tacking on whitespace, and delay raising PSEOF
  444. # until next time around
  445. self.charpos = self._parse1(b"\n", 0)
  446. self.eof = True
  447. # Oh, so there wasn't actually a token there? OK.
  448. if not self._tokens:
  449. raise
  450. token = self._tokens.pop(0)
  451. log.debug("nexttoken: %r", token)
  452. return token
  453. # Stack slots may by occupied by any of:
  454. # * the name of a literal
  455. # * the PSBaseParserToken types
  456. # * list (via KEYWORD_ARRAY)
  457. # * dict (via KEYWORD_DICT)
  458. # * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT
  459. ExtraT = TypeVar("ExtraT")
  460. PSStackType = Union[str, float, bool, PSLiteral, bytes, List, Dict, ExtraT]
  461. PSStackEntry = Tuple[int, PSStackType[ExtraT]]
  462. class PSStackParser(PSBaseParser, Generic[ExtraT]):
  463. def __init__(self, fp: BinaryIO) -> None:
  464. PSBaseParser.__init__(self, fp)
  465. self.reset()
  466. def reset(self) -> None:
  467. self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = []
  468. self.curtype: Optional[str] = None
  469. self.curstack: List[PSStackEntry[ExtraT]] = []
  470. self.results: List[PSStackEntry[ExtraT]] = []
  471. def seek(self, pos: int) -> None:
  472. PSBaseParser.seek(self, pos)
  473. self.reset()
  474. def push(self, *objs: PSStackEntry[ExtraT]) -> None:
  475. self.curstack.extend(objs)
  476. def pop(self, n: int) -> List[PSStackEntry[ExtraT]]:
  477. objs = self.curstack[-n:]
  478. self.curstack[-n:] = []
  479. return objs
  480. def popall(self) -> List[PSStackEntry[ExtraT]]:
  481. objs = self.curstack
  482. self.curstack = []
  483. return objs
  484. def add_results(self, *objs: PSStackEntry[ExtraT]) -> None:
  485. try:
  486. log.debug("add_results: %r", objs)
  487. except Exception:
  488. log.debug("add_results: (unprintable object)")
  489. self.results.extend(objs)
  490. def start_type(self, pos: int, type: str) -> None:
  491. self.context.append((pos, self.curtype, self.curstack))
  492. (self.curtype, self.curstack) = (type, [])
  493. log.debug("start_type: pos=%r, type=%r", pos, type)
  494. def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]:
  495. if self.curtype != type:
  496. raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}")
  497. objs = [obj for (_, obj) in self.curstack]
  498. (pos, self.curtype, self.curstack) = self.context.pop()
  499. log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs)
  500. return (pos, objs)
  501. def do_keyword(self, pos: int, token: PSKeyword) -> None:
  502. pass
  503. def nextobject(self) -> PSStackEntry[ExtraT]:
  504. """Yields a list of objects.
  505. Arrays and dictionaries are represented as Python lists and
  506. dictionaries.
  507. :return: keywords, literals, strings, numbers, arrays and dictionaries.
  508. """
  509. while not self.results:
  510. (pos, token) = self.nexttoken()
  511. if isinstance(token, (int, float, bool, str, bytes, PSLiteral)):
  512. # normal token
  513. self.push((pos, token))
  514. elif token == KEYWORD_ARRAY_BEGIN:
  515. # begin array
  516. self.start_type(pos, "a")
  517. elif token == KEYWORD_ARRAY_END:
  518. # end array
  519. try:
  520. self.push(self.end_type("a"))
  521. except PSTypeError:
  522. if settings.STRICT:
  523. raise
  524. elif token == KEYWORD_DICT_BEGIN:
  525. # begin dictionary
  526. self.start_type(pos, "d")
  527. elif token == KEYWORD_DICT_END:
  528. # end dictionary
  529. try:
  530. (pos, objs) = self.end_type("d")
  531. if len(objs) % 2 != 0:
  532. error_msg = "Invalid dictionary construct: %r" % objs
  533. raise PSSyntaxError(error_msg)
  534. d = {
  535. literal_name(k): v
  536. for (k, v) in choplist(2, objs)
  537. if v is not None
  538. }
  539. self.push((pos, d))
  540. except PSTypeError:
  541. if settings.STRICT:
  542. raise
  543. elif token == KEYWORD_PROC_BEGIN:
  544. # begin proc
  545. self.start_type(pos, "p")
  546. elif token == KEYWORD_PROC_END:
  547. # end proc
  548. try:
  549. self.push(self.end_type("p"))
  550. except PSTypeError:
  551. if settings.STRICT:
  552. raise
  553. elif isinstance(token, PSKeyword):
  554. log.debug(
  555. "do_keyword: pos=%r, token=%r, stack=%r",
  556. pos,
  557. token,
  558. self.curstack,
  559. )
  560. self.do_keyword(pos, token)
  561. else:
  562. log.error(
  563. "unknown token: pos=%r, token=%r, stack=%r",
  564. pos,
  565. token,
  566. self.curstack,
  567. )
  568. self.do_keyword(pos, token)
  569. raise PSException
  570. if self.context:
  571. continue
  572. else:
  573. self.flush()
  574. obj = self.results.pop(0)
  575. try:
  576. log.debug("nextobject: %r", obj)
  577. except Exception:
  578. log.debug("nextobject: (unprintable object)")
  579. return obj