Source code for cheshire3.extractor

"""Cheshire3 Extractor Implementations."""

import re
import types
import string
import copy

from lxml import etree, sax

from cheshire3.baseObjects import Extractor
from cheshire3.record import SaxContentHandler


[docs]class SimpleExtractor(Extractor): """Base extractor, extracts exact text.""" _possibleSettings = { 'extraSpaceElements': { 'docs': ("Space separated list of elements after which to append " "a space so as to not run words together.") }, 'prox': { 'docs': '' }, 'parent': { "docs": ("Should the parent element's identifier be used instead " "of the current element.") }, 'reversable': { "docs": ("Use a hopefully reversable identifier even when the " "record is a DOM tree. 1 = Yes (expensive), 0 = No " "(default)"), 'type': int, 'options': '0|1' }, 'stripWhitespace': { 'docs': ('Should the extracter strip leading/trailing whitespace ' 'from extracted text. 1 = Yes, 0 = No (default)'), 'type': int, 'options': '0|1' }, } def __init__(self, session, config, parent): Extractor.__init__(self, session, config, parent) self.spaceRe = re.compile('\s+') extraSpaceElems = self.get_setting(session, 'extraSpaceElements', '') self.extraSpaceElems = extraSpaceElems.split() self.strip = self.get_setting(session, 'stripWhitespace', 0) self.cachedRoot = None self.cachedElems = {} def _mergeHash(self, a, b): if not a: return b if not b: return a for k in b.iterkeys(): try: a[k]['occurences'] += b[k]['occurences'] try: a[k]['positions'].extend(b[k]['positions']) except: try: a[k]['proxLoc'].extend(b[k]['proxLoc']) except: # Non prox pass except: a[k] = b[k] return a def _flattenTexts(self, elem): texts = [] if (hasattr(elem, 'childNodes')): # minidom/4suite for e in elem.childNodes: if e.nodeType == textType: texts.append(e.data) elif e.nodeType == elementType: # Recurse texts.append(self._flattenTexts(e)) if e.localName in self.extraSpaceElems: texts.append(' ') else: # elementTree/lxml try: walker = elem.getiterator() except AttributeError: # lxml 1.3 or later try: walker = elem.iter() except: # lxml smart string object return elem for c in walker: if c.text: texts.append(c.text) if c.tag in self.extraSpaceElems: texts.append(' ') if c.tail and c != elem: texts.append(c.tail) if c.tag in self.extraSpaceElems: texts.append(' ') return ''.join(texts) def process_string(self, session, data): """Accept just text and return appropriate data structure.""" if self.strip: data = data.strip() return {data: {'text': data, 'occurences': 1, 'proxLoc': [-1]}} def _getProxLocNode(self, session, node): try: tree = node.getroottree() except AttributeError: # lxml smart string result? node = node.getparent() tree = node.getroottree() if self.get_setting(session, 'reversable', 0): root = tree.getroot() if root == self.cachedRoot: lno = self.cachedElems[node] else: lno = 0 self.cachedRoot = root self.cachedElems = {} try: walker = tree.getiterator() except AttributeError: # lxml 1.3 or later walker = tree.iter() for n in walker: self.cachedElems[n] = lno lno += 1 lno = self.cachedElems[node] else: lno = abs(hash(tree.getpath(node))) return lno def process_node(self, session, data): """Walk a DOM structure, extract and return.""" txt = self._flattenTexts(data) # We MUST turn newlines into space or can't index txt = txt.replace('\n', ' ') txt = txt.replace('\r', ' ') if self.strip: txt = txt.strip() if self.get_setting(session, 'prox', 0): lno = self._getProxLocNode(session, data) else: lno = -1 return {txt: {'text': txt, 'occurences': 1, 'proxLoc': [lno]}} def _getProxLocEventList(self, session, events): if (self.get_setting(session, 'parent')): lno = int(events[0].split()[-3]) else: lno = int(events[-1].split()[-1]) return lno def process_eventList(self, session, data): """Process a list of SAX events serialized in C3 internal format.""" txt = [] for e in data: if (e[0] == "3"): if (len(txt) and txt[-1][-1] != ' ' and repr(e[2]).isalnum()): txt.append(' ') txt.append(e[2:]) txt = ''.join(txt) if self.strip: txt = self.spaceRe.sub(' ', txt) if self.get_setting(session, 'prox', 0): lno = self._getProxLocEventList(session, data) else: lno = -1 return {txt: {'text': txt, 'occurences': 1, 'proxLoc': [lno]}} def process_xpathResult(self, session, data): """Process the result of an XPath expression. Convenience function to wrap the other process_* functions and do type checking. """ new = {} for xp in data: for d in xp: if isinstance(d, list): # SAX event new = self._mergeHash(new, self.process_eventList(session, d)) elif (type(d) in types.StringTypes or type(d) in [int, long, float, bool]): # Attribute content new = self._mergeHash(new, self.process_string(session, d)) else: # DOM nodes new = self._mergeHash(new, self.process_node(session, d)) return new
[docs]class TeiExtractor(SimpleExtractor): _possibleSettings = { 'imageSections': { 'docs': 'put in {{ at each new image section', 'type': int } } def process_node(self, session, data): """Walk a DOM structure, extract and return. Turn into SAX and process_eventList() for the mean time. """ handler = SaxContentHandler() sax.saxify(data, handler) saxl = handler.currentText return self.process_eventList(session, saxl) def process_eventList(self, session, data): """Process a list of SAX events serialized in C3 internal format.""" # Easy to find image sections includeBraces = self.get_setting(session, 'imageSections', 0) attrRe = re.compile("u?['\"](.+?)['\"]\)?: u?['\"](.*?)['\"](, |})") txt = [] # None == skip element. Otherwise fn to call on txt processStack = [] # Step through a SAX event list and extract for e in data: if e[0] in ["1", '4']: start = e.find("{") name = e[2:start - 1] sp = name.split(',') if len(sp) == 4: name = sp[1][2:-1] if e[start + 1] == '}': attrs = {} else: attrList = attrRe.findall(e[start:]) attrs = {} for m in attrList: attrs[unicode(m[0])] = unicode(m[1]) if includeBraces and 'img.x' in attrs and name != "initial": txt.append(' {{ ') if name == "uc": processStack.append((name, string.upper)) elif name == "lc": processStack.append((name, string.lower)) elif name == "sic": # Replace contents with corr attribute if 'corr' in attrs: txt.append(attrs['corr']) processStack.append((name, None)) elif name == "p": txt.append(' ') elif name == "abbr": # Replace contents with expan attribute if 'expan' in attrs: txt.append(attrs['expan']) processStack.append((name, None)) elif name == "figdesc": processStack.append((name, None)) elif (e[0] == "2"): if ( processStack and processStack[-1][0] == e[2:len(processStack[-1][0]) + 2] ): processStack.pop() elif e[0] == '5': if ( processStack and processStack[-1][0] == e[9:len(processStack[-1][0]) + 9] ): processStack.pop() elif (e[0] == "3"): if ( len(txt) and txt[-1] and txt[-1][-1] != ' ' and repr(e[2]).isalnum() ): txt.append(' ') bit = e[2:] if processStack: if processStack[-1][1] is None: continue else: bit = processStack[-1][1](bit) txt.append(bit) txt = ''.join(txt) txt = self.spaceRe.sub(' ', txt) txt = txt.replace('- ', '') if self.get_setting(session, 'prox', 0): lno = self._getProxLocEventList(session, data) else: lno = -1 return {txt: {'text': txt, 'occurences': 1, 'proxLoc': [lno]}}
[docs]class SpanXPathExtractor(SimpleExtractor): """Select all text that occurs between a pair of selections.""" def process_xpathResult(self, session, data): new = {} root = None for xp in data: startNode, endNode = xp # Find common ancestor sancs = list(startNode.iterancestors()) try: eancs = list(endNode.iterancestors()) except AttributeError: # Maybe endNode not matched # Should continue to the end common_ancestor = sancs[-1] else: # Common ancestor must exist in the shorter of the 2 lists # Trim both to this size sancs.reverse() eancs.reverse() minlen = min(len(sancs), len(eancs)) sancs = sancs[:minlen] eancs = eancs[:minlen] # Iterate through both, simultaneously for sanc, eanc in zip(sancs, eancs): if sanc == eanc: common_ancestor = sanc break inrange = False text = [] extraSpaceNodes = [] for evt, el in etree.iterwalk(common_ancestor, events=('start', 'end', 'start-ns', 'end-ns')): if el.tag in self.extraSpaceElems: iter = el.itersiblings() try: extraSpaceNodes.append(iter.next()) except: pass if evt in ['start', 'start-ns']: if el == startNode: inrange = True if el in extraSpaceNodes: text.append(' ') if el.text is not None: text.append(el.text) elif el == endNode: inrange = False break elif inrange: if el in extraSpaceNodes: text.append(' ') if el.text is not None: text.append(el.text) elif evt in ['end', 'end-ns'] and inrange: if el.tail is not None: if el in extraSpaceNodes: text.append(' ') text.append(el.tail) txt = ''.join(text) # We MUST turn newlines into space or can't index txt = txt.replace('\n', ' ') txt = txt.replace('\r', ' ') if self.strip: txt = txt.strip() if self.get_setting(session, 'prox', 0): lno = self._getProxLocNode(session, xp[0]) else: lno = -1 new = self._mergeHash(new, {txt: {'text': txt, 'occurences': 1, 'proxLoc': [lno]}}) return new