import os
import re
import codecs
import mimetypes
import zipfile
import tarfile
import urlparse
from lxml import etree
from xml.sax.saxutils import escape
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from cheshire3.baseObjects import DocumentFactory
from cheshire3.document import StringDocument
from cheshire3.record import SaxRecord
from cheshire3.utils import elementType, getFirstData
from cheshire3.utils import flattenTexts, getShellResult
from cheshire3.workflow import CachingWorkflow
from cheshire3.xpathProcessor import SimpleXPathProcessor
from cheshire3.exceptions import (
FileDoesNotExistException,
ConfigFileException,
PermissionException
)
from cheshire3.internal import CONFIG_NS
mimetypes.add_type('application/marc', '.marc')
sliceRe = re.compile('^(.*)\[([0-9]+):([-0-9]+)\]$')
# NB:
# cache = 0: yield, no caching
# cache = 1: step through, cache positions in stream
# cache = 2: step through, cache full documents
# other cache values undefined
class BaseDocumentStream:
streamLocation = ""
format = ""
tagName = ""
codec = ""
factory = None
filterRe = None
stream = None
locations = []
documents = []
length = 0
startOffset = 0
endOffset = -1
def __init__(self, session, stream, format_,
tagName="", codec=None, factory=None):
self.startOffset = 0
self.endOffset = -1
self.factory = factory
self.format = format_
if type(tagName) == unicode:
self.tagName = tagName.encode('utf-8')
else:
self.tagName = tagName
self.codec = codec
self.stream = self.open_stream(stream)
def open_stream(self, stream):
u"""Perform any operations needed before data stream can be read."""
if hasattr(stream, 'read') and hasattr(stream, 'seek'):
# Is a stream
self.streamLocation = "UNKNOWN"
return stream
else:
exists = os.path.exists(stream)
m = sliceRe.match(stream)
if not exists and m:
(stream, start, end) = m.groups()
exists = os.path.exists(stream)
self.startOffset = int(start)
self.endOffset = int(end)
else:
self.startOffset = 0
self.endOffset = -1
if exists:
# is a file
self.streamLocation = stream
if not os.path.isdir(stream):
if self.codec:
return codecs.open(self.streamLocation,
'r',
self.codec)
else:
return file(self.streamLocation)
else:
return stream
else:
# is a string
self.streamLocation = "STRING"
return StringIO(stream)
def fetch_document(self, idx):
if self.length and idx >= self.length:
try:
self.stream.close()
except:
# If loaded with cache == 2 will already have been closed
pass
raise StopIteration
if self.documents:
return self.documents[idx]
elif self.locations:
self.stream.seek(self.locations[idx][0])
data = self.stream.read(self.locations[idx][1])
return StringDocument(data,
filename=self.streamLocation,
byteOffset=self.locations[idx][0],
byteCount=self.locations[idx][1])
else:
raise StopIteration
def find_documents(self, session, cache=0):
raise NotImplementedError
class FileDocumentStream(BaseDocumentStream):
u"""Reads in a single file."""
def find_documents(self, session, cache=0):
doc = StringDocument(self.stream.read(),
filename=self.streamLocation
)
# Attempt to guess the mime-type
mimetype = mimetypes.guess_type(self.streamLocation, 0)
if mimetype[0]:
doc.mimeType = mimetype[0]
if mimetype[1]:
doc.compression = mimetype[1]
# Return/Yield Document
if cache == 0:
yield doc
elif cache == 2:
self.documents = [doc]
class TermHashDocumentStream(BaseDocumentStream):
u"""Given data as a hash of terms, treat each term as a Document."""
def open_stream(self, stream):
# is a hash...
self.streamLocation = "TERM-STRING"
return stream.keys()
def find_documents(self, session, cache=0):
# step through terms
if cache == 0:
for k in self.stream:
yield StringDocument(k)
raise StopIteration
elif cache == 2:
documents = []
for k in self.stream:
documents.append(StringDocument(k))
self.documents = documents
class XmlDocumentStream(BaseDocumentStream):
start = None
endtag = ""
def __init__(self, session, stream, format_,
tagName="", codec="", factory=None):
BaseDocumentStream.__init__(self, session, stream, format_,
tagName, codec, factory)
if (not tagName):
tagregex = "<([-a-zA-Z0-9_.]+:)?([-a-zA-Z0-9_.]+)[\s>]"
self.start = re.compile(tagregex)
self.endtag = ""
else:
self.start = re.compile("<%s[\s>]" % tagName)
self.endtag = "</" + tagName + ">"
self.maxGarbageBytes = factory.get_setting(session,
'maxGarbageBytes',
10000)
def _getStreamLen(self):
# irods
# irods tell: strm.getPosition()
if hasattr(self.stream, 'getSize'):
return self.stream.getSize()
else:
orig = self.stream.tell()
self.stream.seek(0, os.SEEK_END)
fl = self.stream.tell()
self.stream.seek(orig, os.SEEK_SET)
return fl
def find_documents(self, session, cache=0):
docs = []
locs = []
endtag = self.endtag
let = len(endtag)
myTell = 0
xpi = ""
line = ""
offOffset = 0
filelen = self._getStreamLen()
if self.startOffset > 0:
self.stream.seek(self.startOffset, os.SEEK_SET)
else:
self.stream.seek(0, os.SEEK_SET)
while True:
ol = len(line)
# Should exit if self.maxGarbageBytes (default 10000) bytes of
# garbage between docs
if self.tagName or ol < self.maxGarbageBytes:
line += self.stream.read(1024)
else:
msg = ("Exiting from XML Document Stream before end of "
"stream ({0}), reached maximum garbage bytes "
"({1})".format(self.streamLocation,
self.maxGarbageBytes))
self.factory.log_critical(session, msg)
break
pi = line.find("<?xml ")
if (pi > -1):
# Store info
endpi = line.find("?>")
xpi = line[pi:endpi + 2] + "\n"
xpi = ""
m = self.start.search(line)
if m:
if not self.endtag:
endtag = "</%s>" % m.group()[1:-1]
let = len(endtag)
s = m.start()
line = line[s:]
myTell += s
start = myTell
end = -1
strStart = 0
while end == -1:
if strStart:
# allow for end tag to be broken across reads
end = line.find(endtag, strStart - let)
else:
end = line.find(endtag)
if end > 0:
tlen = end + len(endtag)
txt = line[:tlen]
line = line[tlen:]
myTell += tlen
try:
byteCount = len(txt.encode('utf-8'))
except:
byteCount = tlen
if (
self.endOffset > 0 and
myTell >= (self.endOffset - self.startOffset)
):
if cache == 0:
self.stream.close()
raise StopIteration
else:
break
doc = StringDocument(xpi + txt,
mimeType="text/xml",
tagName=self.tagName,
byteCount=byteCount,
byteOffset=start + offOffset,
filename=self.streamLocation)
if cache == 0:
yield doc
elif cache == 1:
locs.append((start, tlen))
elif cache == 2:
docs.append(doc)
offOffset += (byteCount - tlen)
else:
strStart = len(line)
# Can we get by without using 'tell()' or similar?
# eg for stream APIs that don't support it
try:
tll = self.stream.tell()
except AttributeError:
tll = self.stream.getPosition()
# Check we have at least 1024 to read
if tll == filelen:
# we've got nuffink!
if cache == 0:
self.stream.close()
raise StopIteration
else:
break
if tll + 1024 < filelen:
line += self.stream.read(1024)
else:
line += self.stream.read()
if len(line) == ol and not m:
if cache == 0:
self.stream.close()
raise StopIteration
else:
break
if cache == 2:
# If cache == 1 , we'll need the file open later to actually read
# Documents from the identified offsets
self.stream.close()
self.locations = locs
self.documents = docs
self.length = max(len(locs), len(docs))
class MarcDocumentStream(BaseDocumentStream):
def find_documents(self, session, cache=0):
docs = []
locs = []
data = self.stream.read(1536)
myTell = 0
while data:
rt = data.find("\x1D")
while (rt > -1):
txt = data[:rt + 1]
tlen = len(txt)
if cache == 0:
yield StringDocument(txt, mimeType="application/marc")
elif cache == 1:
locs.append((myTell, tlen))
elif cache == 2:
docs.append(StringDocument(txt,
mimeType="application/marc"))
data = data[rt + 1:]
myTell += tlen
rt = data.find("\x1D")
dlen = len(data)
data += self.stream.read(1536)
if (len(data) == dlen):
# Junk at end of file
data = ""
if cache == 2:
# If cache == 1 , we'll need the file open later to actually read
# Documents from the identified offsets
self.stream.close()
self.locations = locs
self.documents = docs
self.length = max(len(locs), len(docs))
# XmlTapeDocStream
# ArcFileDocStream
# MetsDocStream
class MultipleDocumentStream(BaseDocumentStream):
def __init__(self, session, stream, format_,
tagName=None, codec=None, factory=None):
BaseDocumentStream.__init__(self, session, stream, format_,
tagName, codec, factory)
filterStr = factory.get_setting(session,
'filterRegexp',
"\.([a-zA-Z0-9]+|tar.gz|tar.bz2)$")
if filterStr:
self.filterRe = re.compile(filterStr)
else:
self.filterRe = None
def _fetchStream(self, path):
return self.open_stream(path)
def _fetchName(self, item):
return item
def _processFile(self, session, item):
name = self._fetchName(item)
if self.filterRe:
m = self.filterRe.search(name)
if not m:
return None
mimetype = mimetypes.guess_type(name, 0)
if (mimetype[0] in ['text/sgml',
'text/xml',
'application/sgml',
'application/xml']):
if mimetype[1] == 'gzip':
msg = '''\
XML files compressed using gzip are not yet supported. \
You could try using zip.'''
raise NotImplementedError(msg)
trip = ('stream', XmlDocumentStream, 'xml')
elif (mimetype[0] == 'application/x-tar'):
if mimetype[1] == 'gzip':
trip = ('stream', TarDocumentStream, 'tar.gz')
elif mimetype[1] == 'bzip2':
trip = ('stream', TarDocumentStream, 'tar.bz2')
else:
trip = ('stream', TarDocumentStream, 'tar')
elif (mimetype[0] == 'application/zip'):
trip = ('stream', ZipDocumentStream, 'zip')
elif (mimetype[0] == 'application/marc'):
trip = ('stream', MarcDocumentStream, 'marc')
else:
if self.tagName:
trip = ('stream', XmlDocumentStream, 'xml')
else:
trip = ('document', None, mimetype[0])
s = self._fetchStream(item)
if trip[0] == 'stream':
cls = trip[1]
nstream = cls(session, s, format_=trip[2],
tagName=self.tagName,
codec=self.codec,
factory=self.factory)
# copy streamLocation in to copy to document
nstream.streamLocation = item
return ('stream', nstream)
elif trip[0] == 'document':
data = s.read()
s.close()
doc = StringDocument(data, mimeType=trip[2], filename=name)
if mimetype[1]:
doc.compression = mimetype[1]
return ('document', doc)
def _processFiles(self, session, items, cache=0):
docs = []
for item in items:
# Look for records in these places
stuff = self._processFile(session, item)
if not stuff:
# None means skip object
continue
(dtype, obj) = stuff
if dtype == 'stream':
gen = obj.find_documents(session, cache=cache)
if cache == 0:
# Will yield its documents, yield back up
for g in gen:
yield g
elif cache == 1:
try:
gen.next()
except:
pass
locs.append((fullname, mimetype, nstream.locs))
elif cache == 2:
try:
gen.next()
except:
pass
docs.extend(nstream.docs)
elif dtype == 'document':
if cache == 0:
yield obj
elif cache == 1:
raise NotImplementedError
elif cache == 2:
docs.append(obj)
self.documents = docs
class DirectoryDocumentStream(MultipleDocumentStream):
def find_documents(self, session, cache=0):
if not os.path.isabs(self.streamLocation):
self.streamLocation = os.path.join(
self.factory.get_path(session, 'defaultPath'),
self.streamLocation
)
for root, dirs, files in os.walk(self.streamLocation):
for d in dirs:
if os.path.islink(os.path.join(root, d)):
for root2, dirs2, files2 in os.walk(os.path.join(root,
d)):
# Sort for intuitive processing order
files2.sort()
files2 = [os.path.join(root2, x) for x in files2]
for f in self._processFiles(session, files2, cache):
yield f
files.sort()
files = [os.path.join(root, x) for x in files]
# files = map(lambda x: os.path.join(root, x), files)
for f in self._processFiles(session, files, cache):
yield f
class TarDocumentStream(MultipleDocumentStream):
u"""Unpacks a tar file."""
def open_stream(self, stream):
if self.format in ['tar.gz', 'tgz']:
modeSuf = "gz"
elif self.format == 'tar.bz2':
modeSuf = "bz2"
else:
modeSuf = ""
if hasattr(stream, 'read'):
return tarfile.open(fileobj=stream, mode="r|%s" % modeSuf)
elif os.path.exists(stream):
# Transparent
return tarfile.open(stream, mode="r")
else:
s = StringIO(stream)
return tarfile.open(fileobj=s, mode="r|%s" % modeSuf)
def _processFile(self, session, item):
name = self._fetchName(item)
if name[-1] == "/":
return None
else:
return MultipleDocumentStream._processFile(self, session, item)
def _fetchStream(self, path):
return self.stream.extractfile(path)
def _fetchName(self, item):
return item.name
def find_documents(self, session, cache=0):
# NB can't reverse in stream, send each in turn
for tarinfo in self.stream:
for doc in self._processFiles(session, [tarinfo], cache):
yield doc
self.stream.close()
class ZipDocumentStream(DirectoryDocumentStream):
u"""Unzips a ZIP file."""
def open_stream(self, stream):
if hasattr(stream, 'read') or os.path.exists(stream):
return zipfile.ZipFile(stream, mode="r")
else:
s = StringIO(stream)
return zipfile.ZipFile(s, mode="r")
def _fetchStream(self, path):
return StringIO(self.stream.read(path))
def _fetchName(self, item):
return item
def find_documents(self, session, cache=0):
# For info in self.stream.infolist():
for info in self.stream.namelist():
for doc in self._processFiles(session, [info], cache):
yield doc
self.stream.close()
# RarDocStream
class LocateDocumentStream(DirectoryDocumentStream):
u"""Find files whose name matches the data argument to 'load'."""
def find_documents(self, session, cache=0):
fl = getShellResult("locate {0} | grep {1}$".format(self.stream,
self.stream))
docs = fl.split('\n')
while docs and docs[0][:8] == "warning:":
docs.pop(0)
self._processFiles("", docs, cache)
class ClusterDocumentStream(BaseDocumentStream):
u"""Takes a raw cluster file, create documents from it."""
def open_stream(self, stream):
# stream must be the filename
# And we don't really care about it until after sorting
if os.path.exists(stream):
self.streamLocation = stream
else:
# FIXME: API: Required None for session to get_path()
dfp = self.factory.get_path(None, 'defaultPath')
# TODO: testme
# dfp = self.factory.get_path(self.factory.loadSession,
# 'defaultPath')
abspath = os.path.join(dfp, stream)
if os.path.exists(abspath):
self.streamLocation = abspath
else:
raise FileDoesNotExistException(stream)
def find_documents(self, session, cache=0):
if cache == 1:
# Can't store offsets as there's no file to offset to.
raise NotImplementedError
data = self.streamLocation
sortx = self.factory.get_path(session, 'sortPath', None)
if sortx is None:
sortx = getShellResult('which sort')
sortedFn = data + "_SORT"
os.spawnl(os.P_WAIT, sortx, sortx, data, '-o', sortedFn)
# Now construct cluster documents.
doc = ["<cluster>"]
f = file(sortedFn)
l = f.readline()
# Term docid recstore occs (line, posn)*
currKey = ""
while(l):
docdata = {}
ldata = l.split('\x00')
key = ldata[0]
if (not key):
# Data from records with no key
l = f.readline()
l = l[:-1]
continue
doc.append("<key>%s</key>\n" % (key))
ldata = ldata[1:-1]
for bit in range(len(ldata) / 2):
d = docdata.get(ldata[bit * 2], [])
d.append(ldata[bit * 2 + 1])
docdata[ldata[bit * 2]] = d
l = f.readline()
l = l[:-1]
ldata2 = l.split('\x00')
key2 = ldata2[0]
while key == key2:
ldata2 = ldata2[1:-1]
for bit in range(len(ldata2) / 2):
d = docdata.get(ldata2[bit * 2], [])
d.append(ldata2[bit * 2 + 1])
docdata[ldata2[bit * 2]] = d
l = f.readline()
l = l[:-1]
ldata2 = l.split('\x00')
key2 = ldata2[0]
for k in docdata.keys():
doc.append("<%s>" % (k))
for i in docdata[k]:
doc.append("%s" % i)
doc.append("</%s>" % (k))
doc.append("</cluster>")
sdoc = StringDocument(" ".join(doc))
if cache == 0:
yield sdoc
else:
self.documents.append(sdoc)
doc = ["<cluster>"]
l = f.readline()
l = l[:-1]
f.close()
class ComponentDocumentStream(BaseDocumentStream):
u"""Accepts a record, and componentize."""
sources = []
def __init__(self, session, stream, format_,
tagName=None, codec=None, factory=None):
BaseDocumentStream.__init__(self, session, stream, format_,
tagName, codec, factory)
self.sources = factory.sources
def open_stream(self, stream):
return stream
def _make_startTag(self, element, addText=True):
# Return a string representing the start tag for this element
if not isinstance(element, etree._Element):
raise TypeError("called _make_startTag on non-etree element")
if addText and element.text:
text = element.text
else:
text = ""
# Serialize attributes
attrs = ' '.join(['%s="%s"' % x
for x in element.attrib.items()])
if attrs:
attrs = ' ' + attrs
if element.nsmap:
ns = element.tag[1:element.tag.find('}') + 1]
tag = element.tag[element.tag.find('}') + 1:]
for prefix, namespace in element.nsmap.iteritems():
if ns == namespace:
break
if prefix is None:
return "<{0}{1}>{2}".format(tag, attrs, text)
else:
return "<{0}:{1}{2}>{3}".format(prefix, tag, attrs, text)
else:
return "<{0}{1}>{2}".format(element.tag,
attrs,
text
)
def _make_endTag(self, element, addTail=True):
# Return a string representing the end tag for this element
if not isinstance(element, etree._Element):
raise TypeError("called _make_endTag on non-etree element")
if addTail and element.tail:
tail = element.tail
else:
tail = ""
if element.nsmap:
ns = element.tag[1:element.tag.find('}') + 1]
tag = element.tag[element.tag.find('}') + 1:]
for prefix, namespace in element.nsmap.iteritems():
if ns == namespace:
break
if prefix is None:
return "</{0}>{1}".format(tag, tail)
else:
return "</{0}:{1}>{2}".format(prefix, tag, tail)
else:
return "</{0}>{1}".format(element.tag,
tail
)
def find_documents(self, session, cache=0):
# Should extract records by xpath or span and store as X/SGML
if cache == 1:
# Nothing to offset into
raise NotImplementedError
rec = self.stream
hasNsRe = re.compile('<([a-zA-Z1-9_-]+:[a-zA-Z1-9_-])[ >]')
for src in self.sources:
raw = src.process_record(session, rec)
for xp in raw:
if (
isinstance(xp, tuple) and
len(xp) == 2 and
isinstance(xp[0], etree._Element)
):
# Result of a SpanXPathSelector: (startNode, endNode)
startNode, endNode = xp
# Find common ancestor
sancs = list(startNode.iterancestors())
eancs = list(endNode.iterancestors())
# Common ancestor must exist in the shorter of the 2 lists
# Trim both to this size
sancs.reverse()
eancs.reverse()
minlen = min(len(sancs), len(eancs))
sancs = sancs[:minlen]
eancs = eancs[:minlen]
# Iterate through both, simultaneously
for sanc, eanc in zip(sancs, eancs):
if sanc == eanc:
common_ancestor = sanc
break
# Should include start and end tags
includeStartTag = self.factory.get_setting(session,
"keepStart",
1)
includeEndTag = self.factory.get_setting(session,
"keepEnd",
1)
recording = False
doc = []
opened = []
closed = []
# Walk events (start element, end element etc.)
for evt, el in etree.iterwalk(common_ancestor,
events=('start', 'end',
'start-ns',
'end-ns')):
if (el == startNode and not recording):
if (
includeStartTag and
evt in ['start', 'start-ns']
):
recording = True
elif (not includeStartTag and
evt in ['end', 'end-ns']):
# No start node
# Append tail and skip to next node
recording = True
if el.tail:
doc.append(el.tail)
continue
elif (
el == endNode and
evt in ['start', 'start-ns']
):
if includeEndTag:
doc.append(self._make_startTag(el))
doc.append(self._make_endTag(el,
addTail=False))
break
if recording and isinstance(el.tag, str):
if evt in ['start', 'start-ns']:
doc.append(self._make_startTag(el))
opened.append(el)
else:
doc.append(self._make_endTag(el))
if el in opened:
opened.remove(el)
else:
closed.append(el)
# Close opened things
opened.reverse()
for el in opened:
doc.append(self._make_endTag(el, addTail=False))
# Open closed things
for el in closed:
doc.insert(0, self._make_startTag(el, addText=False))
docstr = ''.join(doc)
r = startNode
tree = r.getroottree()
path = tree.getpath(r)
if (r.nsmap):
namespaceList = []
for (pref, ns) in r.nsmap.iteritems():
if pref is None:
namespaceList.append("xmlns=\"%s\"" % (ns))
else:
namespaceList.append("xmlns:%s=\"%s\"" % (pref,
ns))
namespaces = " ".join(namespaceList)
docstr = """\
<c3:component xmlns:c3="http://www.cheshire3.org/schemas/component/" \
%s parent="%r" xpath="%s">%s</c3:component>""" % (namespaces,
rec,
path,
docstr
)
else:
docstr = """\
<c3component parent="%r" xpath="%s">%s</c3component>""" % (rec, path, docstr)
doc = StringDocument(docstr)
if cache == 0:
yield doc
else:
self.documents.append(doc)
continue
# Iterate through selected data
for r in xp:
if isinstance(r, list):
tempRec = SaxRecord(r)
docstr = tempRec.get_xml(session)
hasNs = hasNsRe.search(docstr)
saxid = r[-1][r[-1].rfind(' ') + 1:]
if hasNs:
docstr = """\
<c3:component xmlns:c3=\"http://www.cheshire3.org/schemas/component/\" \
parent=\"%r\" event=\"%s\">%s</c3:component>""" % (rec, saxid, docstr)
else:
docstr = """\
<c3component parent=\"%r\" event=\"%s\">%s</c3component>""" % (rec,
saxid,
docstr)
elif isinstance(r, basestring):
docstr = """\
<c3component parent=\"%r\"><data>%s</data></c3component>""" % (rec, escape(r))
else:
if isinstance(r, etree._Element):
# Lxml Record
docstr = etree.tostring(r)
tree = r.getroottree()
path = tree.getpath(r)
if (r.nsmap):
#if hasNs:
namespaceList = []
for (pref, ns) in r.nsmap.iteritems():
namespaceList.append(
'xmlns:%s="%s"' % (pref, ns)
)
namespaces = " ".join(namespaceList)
docstr = """\
<c3:component xmlns:c3="http://www.cheshire3.org/schemas/component/" \
%s parent="%r" xpath="%s">%s</c3component>""" % (namespaces, rec, path, docstr)
else:
docstr = """\
<c3component parent="%r" xpath="%s">%s</c3component>""" % (rec, path, docstr)
else:
raise ValueError("Unknown Record Type")
doc = StringDocument(docstr)
if cache == 0:
yield doc
else:
self.documents.append(doc)
class DocumentFactoryIter(object):
factory = None
session = None
def __init__(self, factory):
self.factory = factory
self.session = factory.loadSession
def next(self):
try:
return self.factory.get_document(self.session)
except IndexError:
raise StopIteration
streamHash = {"xml": XmlDocumentStream,
"marc": MarcDocumentStream,
"dir": DirectoryDocumentStream,
"tar": TarDocumentStream,
"zip": ZipDocumentStream,
"cluster": ClusterDocumentStream,
"locate": LocateDocumentStream,
"component": ComponentDocumentStream,
"termHash": TermHashDocumentStream,
"file": FileDocumentStream
}
class SimpleDocumentFactory(DocumentFactory):
[docs] cache = 0
format = ""
tagName = ""
codec = ""
dataPath = ""
previousIdx = -1
streamHash = {}
docStream = None
generator = None
loadSession = None
_possibleDefaults = {
'cache': {
'docs': "Default value for cache parameter for load()",
'type': int,
'options': "0|1|2"
},
'format': {
'docs': "Default value for format parameter"
},
'tagName': {
'docs': "Default value for tagName parameter"
},
'codec': {
'docs': "Default value for codec parameter"
},
'data': {
'docs': "Default value for data parameter"
}
}
_possibleSettings = {
'filterRegexp': {
'docs': ("Filename filter for files to attempt to load in a "
"multiple document stream (eg from a directory)")
},
'googleKey': {
'docs': ("Key supplied by Google for using their web service "
"interface")
},
'osdUrl': {
'docs': "URL to the OpenSearch description document"
},
'linkedItem': {
'docs': ("Should the factory return the RSS/ATOM item, or the "
"item which it is linked to.")
},
'maxGarbageBytes': {
'docs': ('Number of bytes of non document content after which to '
'exit'),
'type': int
}
}
def __init__(self, session, config, parent):
DocumentFactory.__init__(self, session, config, parent)
self.docStream = None
self.generator = None
self.cache = int(self.get_default(session, 'cache', 0))
self.format = self.get_default(session, 'format', '').encode('utf-8')
self.tagName = self.get_default(session, 'tagName', '')
self.codec = self.get_default(session, 'codec', "")
self.dataPath = self.get_default(session, 'data', '')
self.previousIdx = -1
@classmethod
def register_stream(cls, format_, streamClass):
cls.streamHash[format_] = streamClass
def load(self, session, data=None, cache=None,
format_=None, tagName=None, codec=None):
self.loadSession = session
if data is None:
data = self.dataPath
if format_ is None:
format_ = self.format
if cache is None:
cache = self.cache
if tagName is None:
tagName = self.tagName
if codec is None:
codec = self.codec
# Some laziness checking
if not format_:
if os.path.exists(data):
if data.endswith('.zip'):
format_ = 'zip'
elif data.endswith('.tar'):
format_ = 'tar'
elif data.endswith('.xml'):
format_ = 'xml'
elif data.endswith('.marc'):
format_ = 'marc'
elif os.path.isdir(data):
format_ = 'dir'
else:
if data.startswith("ftp://"):
format_ = 'ftp'
elif data.startswith("srb://"):
format_ = 'srb'
elif data.startswith(("irods://", "rods://")):
format_ = 'irods'
elif data.startswith(("http://", "https://")):
if hasattr(data, '_formatter_parser'):
# RDF URIRef
data = str(data)
format_ = "http"
if data.find('?') > -1:
# Parse url and extract param names
bits = urlparse.urlsplit(data)
plist = [x.split('=')[0] for x in bits[3].split('&')]
if 'verb' in plist and 'metadataPrefix' in plist:
format_ = 'oai'
elif ('operation' in plist and
'version' in plist and
'query' in plist):
format_ = 'sru'
try:
cls = self.streamHash[format_]
except KeyError:
# Just assume single binary data file path
cls = self.streamHash['file']
ds = cls(session, data, format_, tagName, codec, self)
# Store and call generator on first ping
self.docStream = ds
self.generator = ds.find_documents(session, cache=cache)
if cache:
# Need to run generator to completion to actually find the
# documents. Do this now rather than when 1st document requested
for doc in self.generator:
# Nothing to do, just populate df.locations or df.documents
pass
self.previousIdx = -1
self.cache = cache
# Return self for workflows, mostly can ignore
return self
def __iter__(self):
return DocumentFactoryIter(self)
def get_document(self, session, n=-1):
if n == -1:
self.previousIdx += 1
idx = self.previousIdx
if self.cache == 0:
# gen will yield, return
return self.generator.next()
else:
return self.docStream.fetch_document(idx)
for (k, v) in streamHash.items():
SimpleDocumentFactory.register_stream(k, v)
class ComponentDocumentFactory(SimpleDocumentFactory):
_possibleSettings = {
'keepStart': {
'docs': ("Should the factory include the starting element of the "
"selected span component in the output Document. "
"Default: yes"),
'type': int,
'options': "0|1"
},
'keepEnd': {
'docs': ("Should the factory include the ending element of the "
"selected span component in the output Document. "
"Default: yes"),
'type': int,
'options': "0|1"
},
}
def _handleConfigNode(self, session, node):
# Source
if (node.localName == "source"):
xpaths = []
for child in node.childNodes:
if child.nodeType == elementType:
if child.localName == "xpath":
# add XPath
ref = child.getAttributeNS(None, 'ref')
if ref:
xp = self.get_object(session, ref)
else:
xp = SimpleXPathProcessor(session, node, self)
xp._handleConfigNode(session, node)
self.sources.append(xp)
def _handleLxmlConfigNode(self, session, node):
# Source
if node.tag in ["source", '{%s}source' % CONFIG_NS]:
xpaths = []
for child in node.iterchildren(tag=etree.Element):
if child.tag in ["xpath", '{%s}xpath' % CONFIG_NS]:
# add XPath
ref = child.attrib.get(
'ref',
child.attrib.get('{%s}ref' % CONFIG_NS, '')
)
if ref:
xp = self.get_object(session, ref)
else:
xp = SimpleXPathProcessor(session, node, self)
xp._handleLxmlConfigNode(session, node)
self.sources.append(xp)
def __init__(self, session, config, parent):
self.sources = []
SimpleDocumentFactory.__init__(self, session, config, parent)
class AccumulatingStream(BaseDocumentStream):
def __init__(self, session, stream, format_,
tagName=None, codec=None, factory=None):
self.factory = factory
self.format = format_
self.tagName = tagName
self.codec = codec
# And call accumulate to record stream
self.accumulate(session, stream, format_, tagName, codec, factory)
def accumulate(self, session, stream, format_,
tagName=None, codec=None, factory=None):
raise NotImplementedError
class AccTransformerStream(AccumulatingStream):
"""Call a transformer on each input record and concatenate results.
Transformer should return a string.
"""
def __init__(self, session, stream, format_,
tagName=None, codec=None, factory=None):
if not factory:
msg = """\
Cannot build transformer stream without associated documentFactory"""
raise ValueError(msg)
self.transformer = factory.get_path(session,
'accumulatingTransformer',
None)
if not self.transformer:
msg = """\
DocumentFactory does not have 'accumulatingTransformer' path \
for AccTransformerStream"""
raise ConfigFileException(msg)
self.data = []
# now init the AccStream after discovering txr
AccumulatingStream.__init__(self, session, stream, format_,
tagName, codec, factory)
def accumulate(self, session, stream, format_,
tagName=None, codec=None, factory=None):
# stream should be record instance
doc = self.transformer.process_record(session, stream)
self.data.append(doc.get_raw(session))
def find_documents(self, session, cache=0):
yield StringDocument(''.join(self.data))
class AccVectorTransformerStream(AccumulatingStream):
"""Accumulate data to be fed to DM, via a vector transformer."""
def __init__(self, session, stream, format_,
tagName=None, codec=None, factory=None):
if not factory:
msg = """\
Cannot build transformer stream without associated documentFactory"""
raise ValueError(msg)
self.transformer = factory.get_path(session,
'accumulatingTransformer',
None)
if not self.transformer:
msg = """\
DocumentFactory does not have 'accumulatingTransformer' path \
for AccTransformerStream"""
raise ConfigFileException()
self.classes = []
self.vectors = []
self.totalAttributes = 0
# now init the AccStream after discovering txr
AccumulatingStream.__init__(self, session, stream, format_,
tagName, codec, factory)
def accumulate(self, session, stream, format_,
tagName=None, codec=None, factory=None):
# session should be record instance
doc = self.transformer.process_record(session, stream)
raw = doc.get_raw(session)
if type(raw) == list:
# multiple from proxVector (etc)
for (l, v) in raw:
self.classes.append(l)
self.vectors.append(v)
self.totalAttributes += len(v.keys())
else:
# we're a tuple
self.classes.append(raw[0])
self.vectors.append(raw[1])
self.totalAttributes += len(raw[1].keys())
def find_documents(self, session, cache=0):
doc = StringDocument([self.classes, self.vectors])
doc.totalAttributes = self.totalAttributes
yield doc
class AccumulatingDocumentFactory(SimpleDocumentFactory):
"""Accumulate data across multiple .load() calls to produce 1+ documents.
Call load() repeatedly before fetching document(s)
"""
_possiblePaths = {
'accumulatingTransformer': {
'docs': ("Transformer through which to pass records before "
"accumulating."
)
}
}
def __init__(self, session, config, parent):
SimpleDocumentFactory.__init__(self, session, config, parent)
def loadMany(self, session, data=None, cache=None,
format_=None, tagName=None, codec=None):
for item in data:
self.load(session, item, cache, format_, tagName, codec)
# Return self for workflows, mostly can ignore
return self
def load(self, session, data=None, cache=None,
format_=None, tagName=None, codec=None):
self.loadSession = session
if data is None:
data = self.dataPath
if format_ is None:
format_ = self.format
if cache is None:
cache = self.cache
if tagName is None:
tagName = self.tagName
if codec is None:
codec = self.codec
# Some laziness checking
if not format_:
if os.path.exists(data):
if data[-4:] == '.zip':
format_ = 'zip'
elif data[-4:] == '.tar':
format_ = 'tar'
elif data[-4:] == '.xml':
format_ = 'xml'
elif data[-5:] == '.marc':
format_ = 'marc'
elif os.path.isdir(data):
format_ = 'dir'
else:
if data[:6] == "ftp://":
format_ = 'ftp'
elif data[:6] == "srb://":
format_ = 'srb'
elif data[:7] == "http://" or data[:8] == "https://":
format_ = "http"
if data.find('?') > -1:
# parse url and extract param names
bits = urlparse.urlsplit(data)
plist = [x.split('=')[0] for x in bits[3].split('&')]
if 'verb' in plist and 'metadataPrefix' in plist:
format_ = 'oai'
elif ('operation' in plist and
'version' in plist and
'query' in plist):
format_ = 'sru'
if not self.docStream:
cls = self.streamHash[format_]
self.docStream = cls(session, data, format_, tagName, codec, self)
else:
self.docStream.accumulate(session, data, format_,
tagName, codec, self)
self.previousIdx = -1
self.cache = cache
# Return self for workflows, mostly can ignore
return self
def get_document(self, session, n=-1):
if self.previousIdx == -1:
# call find docs for real
self.generator = self.docStream.find_documents(session,
cache=self.cache)
return SimpleDocumentFactory.get_document(self, session, n)
AccumulatingDocumentFactory.register_stream('transformer',
AccTransformerStream)
AccumulatingDocumentFactory.register_stream('vectorTransformer',
AccVectorTransformerStream)
class ClusterExtractionDocumentFactory(AccumulatingDocumentFactory):