| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
- #!/usr/bin/env python3
- import io
- import logging
- import re
- from typing import (
- Any,
- BinaryIO,
- Dict,
- Generic,
- Iterator,
- List,
- Optional,
- Tuple,
- Type,
- TypeVar,
- Union,
- )
- from pdfminer import psexceptions, settings
- from pdfminer.utils import choplist
- log = logging.getLogger(__name__)
- # Adding aliases for these exceptions for backwards compatibility
- PSException = psexceptions.PSException
- PSEOF = psexceptions.PSEOF
- PSSyntaxError = psexceptions.PSSyntaxError
- PSTypeError = psexceptions.PSTypeError
- PSValueError = psexceptions.PSValueError
- class PSObject:
- """Base class for all PS or PDF-related data types."""
- class PSLiteral(PSObject):
- """A class that represents a PostScript literal.
- Postscript literals are used as identifiers, such as
- variable names, property names and dictionary keys.
- Literals are case sensitive and denoted by a preceding
- slash sign (e.g. "/Name")
- Note: Do not create an instance of PSLiteral directly.
- Always use PSLiteralTable.intern().
- """
- NameType = Union[str, bytes]
- def __init__(self, name: NameType) -> None:
- self.name = name
- def __repr__(self) -> str:
- name = self.name
- return "/%r" % name
- class PSKeyword(PSObject):
- """A class that represents a PostScript keyword.
- PostScript keywords are a dozen of predefined words.
- Commands and directives in PostScript are expressed by keywords.
- They are also used to denote the content boundaries.
- Note: Do not create an instance of PSKeyword directly.
- Always use PSKeywordTable.intern().
- """
- def __init__(self, name: bytes) -> None:
- self.name = name
- def __repr__(self) -> str:
- name = self.name
- return "/%r" % name
- _SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword)
- class PSSymbolTable(Generic[_SymbolT]):
- """A utility class for storing PSLiteral/PSKeyword objects.
- Interned objects can be checked its identity with "is" operator.
- """
- def __init__(self, klass: Type[_SymbolT]) -> None:
- self.dict: Dict[PSLiteral.NameType, _SymbolT] = {}
- self.klass: Type[_SymbolT] = klass
- def intern(self, name: PSLiteral.NameType) -> _SymbolT:
- if name in self.dict:
- lit = self.dict[name]
- else:
- # Type confusion issue: PSKeyword always takes bytes as name
- # PSLiteral uses either str or bytes
- lit = self.klass(name) # type: ignore[arg-type]
- self.dict[name] = lit
- return lit
- PSLiteralTable = PSSymbolTable(PSLiteral)
- PSKeywordTable = PSSymbolTable(PSKeyword)
- LIT = PSLiteralTable.intern
- KWD = PSKeywordTable.intern
- KEYWORD_PROC_BEGIN = KWD(b"{")
- KEYWORD_PROC_END = KWD(b"}")
- KEYWORD_ARRAY_BEGIN = KWD(b"[")
- KEYWORD_ARRAY_END = KWD(b"]")
- KEYWORD_DICT_BEGIN = KWD(b"<<")
- KEYWORD_DICT_END = KWD(b">>")
- def literal_name(x: Any) -> str:
- if isinstance(x, PSLiteral):
- if isinstance(x.name, str):
- return x.name
- try:
- return str(x.name, "utf-8")
- except UnicodeDecodeError:
- return str(x.name)
- else:
- if settings.STRICT:
- raise PSTypeError(f"Literal required: {x!r}")
- return str(x)
- def keyword_name(x: Any) -> Any:
- if not isinstance(x, PSKeyword):
- if settings.STRICT:
- raise PSTypeError("Keyword required: %r" % x)
- else:
- name = x
- else:
- name = str(x.name, "utf-8", "ignore")
- return name
- EOL = re.compile(rb"[\r\n]")
- SPC = re.compile(rb"\s")
- NONSPC = re.compile(rb"\S")
- HEX = re.compile(rb"[0-9a-fA-F]")
- END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]")
- END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]")
- HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.")
- END_NUMBER = re.compile(rb"[^0-9]")
- END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]")
- END_STRING = re.compile(rb"[()\134]")
- OCT_STRING = re.compile(rb"[0-7]")
- ESC_STRING = {
- b"b": 8,
- b"t": 9,
- b"n": 10,
- b"f": 12,
- b"r": 13,
- b"(": 40,
- b")": 41,
- b"\\": 92,
- }
- PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes]
- class PSBaseParser:
- """Most basic PostScript parser that performs only tokenization."""
- BUFSIZ = 4096
- def __init__(self, fp: BinaryIO) -> None:
- self.fp = fp
- self.eof = False
- self.seek(0)
- def __repr__(self) -> str:
- return "<%s: %r, bufpos=%d>" % (self.__class__.__name__, self.fp, self.bufpos)
- def flush(self) -> None:
- pass
- def close(self) -> None:
- self.flush()
- def tell(self) -> int:
- return self.bufpos + self.charpos
- def poll(self, pos: Optional[int] = None, n: int = 80) -> None:
- pos0 = self.fp.tell()
- if not pos:
- pos = self.bufpos + self.charpos
- self.fp.seek(pos)
- log.debug("poll(%d): %r", pos, self.fp.read(n))
- self.fp.seek(pos0)
- def seek(self, pos: int) -> None:
- """Seeks the parser to the given position."""
- log.debug("seek: %r", pos)
- self.fp.seek(pos)
- # reset the status for nextline()
- self.bufpos = pos
- self.buf = b""
- self.charpos = 0
- # reset the status for nexttoken()
- self._parse1 = self._parse_main
- self._curtoken = b""
- self._curtokenpos = 0
- self._tokens: List[Tuple[int, PSBaseParserToken]] = []
- self.eof = False
- def fillbuf(self) -> None:
- if self.charpos < len(self.buf):
- return
- # fetch next chunk.
- self.bufpos = self.fp.tell()
- self.buf = self.fp.read(self.BUFSIZ)
- if not self.buf:
- raise PSEOF("Unexpected EOF")
- self.charpos = 0
- def nextline(self) -> Tuple[int, bytes]:
- """Fetches a next line that ends either with \\r or \\n."""
- linebuf = b""
- linepos = self.bufpos + self.charpos
- eol = False
- while 1:
- self.fillbuf()
- if eol:
- c = self.buf[self.charpos : self.charpos + 1]
- # handle b'\r\n'
- if c == b"\n":
- linebuf += c
- self.charpos += 1
- break
- m = EOL.search(self.buf, self.charpos)
- if m:
- linebuf += self.buf[self.charpos : m.end(0)]
- self.charpos = m.end(0)
- if linebuf[-1:] == b"\r":
- eol = True
- else:
- break
- else:
- linebuf += self.buf[self.charpos :]
- self.charpos = len(self.buf)
- log.debug("nextline: %r, %r", linepos, linebuf)
- return (linepos, linebuf)
- def revreadlines(self) -> Iterator[bytes]:
- """Fetches a next line backword.
- This is used to locate the trailers at the end of a file.
- """
- self.fp.seek(0, io.SEEK_END)
- pos = self.fp.tell()
- buf = b""
- while pos > 0:
- prevpos = pos
- pos = max(0, pos - self.BUFSIZ)
- self.fp.seek(pos)
- s = self.fp.read(prevpos - pos)
- if not s:
- break
- while 1:
- n = max(s.rfind(b"\r"), s.rfind(b"\n"))
- if n == -1:
- buf = s + buf
- break
- yield s[n:] + buf
- s = s[:n]
- buf = b""
- def _parse_main(self, s: bytes, i: int) -> int:
- m = NONSPC.search(s, i)
- if not m:
- return len(s)
- j = m.start(0)
- c = s[j : j + 1]
- self._curtokenpos = self.bufpos + j
- if c == b"%":
- self._curtoken = b"%"
- self._parse1 = self._parse_comment
- return j + 1
- elif c == b"/":
- self._curtoken = b""
- self._parse1 = self._parse_literal
- return j + 1
- elif c in b"-+" or c.isdigit():
- self._curtoken = c
- self._parse1 = self._parse_number
- return j + 1
- elif c == b".":
- self._curtoken = c
- self._parse1 = self._parse_float
- return j + 1
- elif c.isalpha():
- self._curtoken = c
- self._parse1 = self._parse_keyword
- return j + 1
- elif c == b"(":
- self._curtoken = b""
- self.paren = 1
- self._parse1 = self._parse_string
- return j + 1
- elif c == b"<":
- self._curtoken = b""
- self._parse1 = self._parse_wopen
- return j + 1
- elif c == b">":
- self._curtoken = b""
- self._parse1 = self._parse_wclose
- return j + 1
- elif c == b"\x00":
- return j + 1
- else:
- self._add_token(KWD(c))
- return j + 1
- def _add_token(self, obj: PSBaseParserToken) -> None:
- self._tokens.append((self._curtokenpos, obj))
- def _parse_comment(self, s: bytes, i: int) -> int:
- m = EOL.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- self._parse1 = self._parse_main
- # We ignore comments.
- # self._tokens.append(self._curtoken)
- return j
- def _parse_literal(self, s: bytes, i: int) -> int:
- m = END_LITERAL.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- c = s[j : j + 1]
- if c == b"#":
- self.hex = b""
- self._parse1 = self._parse_literal_hex
- return j + 1
- try:
- name: Union[str, bytes] = str(self._curtoken, "utf-8")
- except Exception:
- name = self._curtoken
- self._add_token(LIT(name))
- self._parse1 = self._parse_main
- return j
- def _parse_literal_hex(self, s: bytes, i: int) -> int:
- c = s[i : i + 1]
- if HEX.match(c) and len(self.hex) < 2:
- self.hex += c
- return i + 1
- if self.hex:
- self._curtoken += bytes((int(self.hex, 16),))
- self._parse1 = self._parse_literal
- return i
- def _parse_number(self, s: bytes, i: int) -> int:
- m = END_NUMBER.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- c = s[j : j + 1]
- if c == b".":
- self._curtoken += c
- self._parse1 = self._parse_float
- return j + 1
- try:
- self._add_token(int(self._curtoken))
- except ValueError:
- pass
- self._parse1 = self._parse_main
- return j
- def _parse_float(self, s: bytes, i: int) -> int:
- m = END_NUMBER.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- try:
- self._add_token(float(self._curtoken))
- except ValueError:
- pass
- self._parse1 = self._parse_main
- return j
- def _parse_keyword(self, s: bytes, i: int) -> int:
- m = END_KEYWORD.search(s, i)
- if m:
- j = m.start(0)
- self._curtoken += s[i:j]
- else:
- self._curtoken += s[i:]
- return len(s)
- if self._curtoken == b"true":
- token: Union[bool, PSKeyword] = True
- elif self._curtoken == b"false":
- token = False
- else:
- token = KWD(self._curtoken)
- self._add_token(token)
- self._parse1 = self._parse_main
- return j
- def _parse_string(self, s: bytes, i: int) -> int:
- m = END_STRING.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- c = s[j : j + 1]
- if c == b"\\":
- self.oct = b""
- self._parse1 = self._parse_string_1
- return j + 1
- if c == b"(":
- self.paren += 1
- self._curtoken += c
- return j + 1
- if c == b")":
- self.paren -= 1
- if self.paren:
- # WTF, they said balanced parens need no special treatment.
- self._curtoken += c
- return j + 1
- self._add_token(self._curtoken)
- self._parse1 = self._parse_main
- return j + 1
- def _parse_string_1(self, s: bytes, i: int) -> int:
- """Parse literal strings
- PDF Reference 3.2.3
- """
- c = s[i : i + 1]
- if OCT_STRING.match(c) and len(self.oct) < 3:
- self.oct += c
- return i + 1
- elif self.oct:
- chrcode = int(self.oct, 8)
- assert chrcode < 256, "Invalid octal %s (%d)" % (repr(self.oct), chrcode)
- self._curtoken += bytes((chrcode,))
- self._parse1 = self._parse_string
- return i
- elif c in ESC_STRING:
- self._curtoken += bytes((ESC_STRING[c],))
- elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n":
- # If current and next character is \r\n skip both because enters
- # after a \ are ignored
- i += 1
- # default action
- self._parse1 = self._parse_string
- return i + 1
- def _parse_wopen(self, s: bytes, i: int) -> int:
- c = s[i : i + 1]
- if c == b"<":
- self._add_token(KEYWORD_DICT_BEGIN)
- self._parse1 = self._parse_main
- i += 1
- else:
- self._parse1 = self._parse_hexstring
- return i
- def _parse_wclose(self, s: bytes, i: int) -> int:
- c = s[i : i + 1]
- if c == b">":
- self._add_token(KEYWORD_DICT_END)
- i += 1
- self._parse1 = self._parse_main
- return i
- def _parse_hexstring(self, s: bytes, i: int) -> int:
- m = END_HEX_STRING.search(s, i)
- if not m:
- self._curtoken += s[i:]
- return len(s)
- j = m.start(0)
- self._curtoken += s[i:j]
- token = HEX_PAIR.sub(
- lambda m: bytes((int(m.group(0), 16),)),
- SPC.sub(b"", self._curtoken),
- )
- self._add_token(token)
- self._parse1 = self._parse_main
- return j
- def nexttoken(self) -> Tuple[int, PSBaseParserToken]:
- if self.eof:
- # It's not really unexpected, come on now...
- raise PSEOF("Unexpected EOF")
- while not self._tokens:
- try:
- self.fillbuf()
- self.charpos = self._parse1(self.buf, self.charpos)
- except PSEOF:
- # If we hit EOF in the middle of a token, try to parse
- # it by tacking on whitespace, and delay raising PSEOF
- # until next time around
- self.charpos = self._parse1(b"\n", 0)
- self.eof = True
- # Oh, so there wasn't actually a token there? OK.
- if not self._tokens:
- raise
- token = self._tokens.pop(0)
- log.debug("nexttoken: %r", token)
- return token
- # Stack slots may by occupied by any of:
- # * the name of a literal
- # * the PSBaseParserToken types
- # * list (via KEYWORD_ARRAY)
- # * dict (via KEYWORD_DICT)
- # * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT
- ExtraT = TypeVar("ExtraT")
- PSStackType = Union[str, float, bool, PSLiteral, bytes, List, Dict, ExtraT]
- PSStackEntry = Tuple[int, PSStackType[ExtraT]]
- class PSStackParser(PSBaseParser, Generic[ExtraT]):
- def __init__(self, fp: BinaryIO) -> None:
- PSBaseParser.__init__(self, fp)
- self.reset()
- def reset(self) -> None:
- self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = []
- self.curtype: Optional[str] = None
- self.curstack: List[PSStackEntry[ExtraT]] = []
- self.results: List[PSStackEntry[ExtraT]] = []
- def seek(self, pos: int) -> None:
- PSBaseParser.seek(self, pos)
- self.reset()
- def push(self, *objs: PSStackEntry[ExtraT]) -> None:
- self.curstack.extend(objs)
- def pop(self, n: int) -> List[PSStackEntry[ExtraT]]:
- objs = self.curstack[-n:]
- self.curstack[-n:] = []
- return objs
- def popall(self) -> List[PSStackEntry[ExtraT]]:
- objs = self.curstack
- self.curstack = []
- return objs
- def add_results(self, *objs: PSStackEntry[ExtraT]) -> None:
- try:
- log.debug("add_results: %r", objs)
- except Exception:
- log.debug("add_results: (unprintable object)")
- self.results.extend(objs)
- def start_type(self, pos: int, type: str) -> None:
- self.context.append((pos, self.curtype, self.curstack))
- (self.curtype, self.curstack) = (type, [])
- log.debug("start_type: pos=%r, type=%r", pos, type)
- def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]:
- if self.curtype != type:
- raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}")
- objs = [obj for (_, obj) in self.curstack]
- (pos, self.curtype, self.curstack) = self.context.pop()
- log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs)
- return (pos, objs)
- def do_keyword(self, pos: int, token: PSKeyword) -> None:
- pass
- def nextobject(self) -> PSStackEntry[ExtraT]:
- """Yields a list of objects.
- Arrays and dictionaries are represented as Python lists and
- dictionaries.
- :return: keywords, literals, strings, numbers, arrays and dictionaries.
- """
- while not self.results:
- (pos, token) = self.nexttoken()
- if isinstance(token, (int, float, bool, str, bytes, PSLiteral)):
- # normal token
- self.push((pos, token))
- elif token == KEYWORD_ARRAY_BEGIN:
- # begin array
- self.start_type(pos, "a")
- elif token == KEYWORD_ARRAY_END:
- # end array
- try:
- self.push(self.end_type("a"))
- except PSTypeError:
- if settings.STRICT:
- raise
- elif token == KEYWORD_DICT_BEGIN:
- # begin dictionary
- self.start_type(pos, "d")
- elif token == KEYWORD_DICT_END:
- # end dictionary
- try:
- (pos, objs) = self.end_type("d")
- if len(objs) % 2 != 0:
- error_msg = "Invalid dictionary construct: %r" % objs
- raise PSSyntaxError(error_msg)
- d = {
- literal_name(k): v
- for (k, v) in choplist(2, objs)
- if v is not None
- }
- self.push((pos, d))
- except PSTypeError:
- if settings.STRICT:
- raise
- elif token == KEYWORD_PROC_BEGIN:
- # begin proc
- self.start_type(pos, "p")
- elif token == KEYWORD_PROC_END:
- # end proc
- try:
- self.push(self.end_type("p"))
- except PSTypeError:
- if settings.STRICT:
- raise
- elif isinstance(token, PSKeyword):
- log.debug(
- "do_keyword: pos=%r, token=%r, stack=%r",
- pos,
- token,
- self.curstack,
- )
- self.do_keyword(pos, token)
- else:
- log.error(
- "unknown token: pos=%r, token=%r, stack=%r",
- pos,
- token,
- self.curstack,
- )
- self.do_keyword(pos, token)
- raise PSException
- if self.context:
- continue
- else:
- self.flush()
- obj = self.results.pop(0)
- try:
- log.debug("nextobject: %r", obj)
- except Exception:
- log.debug("nextobject: (unprintable object)")
- return obj
|