"""Cheshire3 Workflow Implementations."""
import sys
import traceback
from types import MethodType
from lxml import etree
from cheshire3.baseObjects import Workflow, Server
from cheshire3.configParser import C3Object
from cheshire3.utils import elementType, flattenTexts
from cheshire3.exceptions import C3Exception, ConfigFileException,\
ObjectDoesNotExistException
from cheshire3.internal import CONFIG_NS
class WorkflowException(C3Exception):
pass
[docs]class SimpleWorkflow(Workflow):
"""Default workflow implementation.
Translates XML to python and compiles it on object instantiation.
"""
code = None
splitN = 0
splitCode = {}
def __init__(self, session, config, parent):
self.splitN = 0
self.splitCode = {}
self.fnHash = {
u'preParser': 'process_document',
u'parser': 'process_document',
u'transformer': 'process_record',
u'extractor': 'process_xpathResult',
u'normalizer': 'process_hash',
u'xpathProcessor': 'process_record',
u'selector': 'process_record',
u'documentFactory': 'load',
u'logger': 'log',
u'documentStore': 'create_document',
u'recordStore': 'create_record',
u'authStore': 'create_record',
u'configStore': 'create_record',
u'queryStore': 'create_query',
u'resultSetStore': 'create_resultSet',
u'indexStore': 'store_terms',
u'queryStore': 'create_query',
u'workflow': 'process',
u'index': 'store_terms',
u'database': 'search',
u'tokenMerger': 'process_hash',
u'tokenizer': 'process_hash'
}
self.singleFunctions = [
u'begin_indexing', u'commit_indexing',
u'begin_storing', u'commit_storing',
u'begin_logging', u'commit_logging',
u'commit_metadata', u'shutdown'
]
self.singleInputFunctions = [
u'get_indexes',
u'commit_parallelIndexing',
u'get_idChunkGenerator',
u'get_dom', u'get_sax', u'get_xml', # Records
u'get_raw' # Documents
]
Workflow.__init__(self, session, config, parent)
# Somewhere at the top there must be a server
self.server = parent
def _handleLxmlConfigNode(self, session, node):
if node.tag in ['workflow', '{%s}workflow' % CONFIG_NS]:
code = ['def handler(self, session, input=None):']
sub = self._handleLxmlGlobals(node)
for s in sub:
code.append(" " + s)
sub = self._handleLxmlFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
self.code = "\n".join(code)
exec(self.code)
setattr(self,
'process',
MethodType(locals()['handler'], self, self.__class__))
def _handleConfigNode(self, session, node):
# <workflow>
if node.localName == "workflow":
# Nummy.
code = ['def handler(self, session, input=None):']
sub = self._handleGlobals(node)
for s in sub:
code.append(" " + s)
sub = self._handleFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
self.code = "\n".join(code)
exec(self.code)
setattr(self,
'process',
MethodType(locals()['handler'], self, self.__class__))
def _handleLxmlGlobals(self, node):
return self._handleGlobals(node)
def _handleGlobals(self, node):
code = []
code.append('if session.database:')
code.append(' db = self.server.get_object(session, '
'session.database)')
code.append(' self.database = db')
code.append('else:')
code.append(' raise WorkflowException("No database")')
return code
def _handleFlow(self, node):
code = []
for c in node.childNodes:
if c.nodeType == elementType:
n = c.localName
if n == "try":
code.append("try:")
sub = self._handleFlow(c)
for s in sub:
code.append(" " + s)
elif n == "except":
code.append("except Exception as err:")
sub = self._handleFlow(c)
for s in sub:
code.append(" " + s)
elif n == "else":
code.append("else:")
sub = self._handleFlow(c)
for s in sub:
code.append(" " + s)
elif n == "break":
code.append("break")
elif n == "continue":
code.append("continue")
elif n == "return":
code.append("return input")
elif n == "raise":
code.append("raise")
elif n == "assign":
fro = c.getAttributeNS(None, 'from')
to = c.getAttributeNS(None, 'to')
code.append("%s = %s" % (to, fro))
elif n == "for-each":
fcode = self._handleForEach(c)
code.extend(fcode)
sub = self._handleFlow(c)
if sub:
for s in sub:
code.append(" " + s)
else:
code.append(" pass")
elif n == "object":
code.extend(self._handleObject(c))
elif n == "log":
code.extend(self._handleLog(c))
elif n == "fork":
code.extend(self._handleFork(c))
else:
try:
name = n.title()
fn = getattr(self, "_handle%s" % name)
code.extend(fn(c))
except:
raise ConfigFileException("Unknown workflow element: "
"%s" % n)
return code
def _handleLxmlFlow(self, node):
code = []
for c in node.iterchildren(tag=etree.Element):
n = c.tag[c.tag.find('}') + 1:]
if n == "object":
code.extend(self._handleLxmlObject(c))
elif n == "assign":
try:
fro = c.attrib['from']
to = c.attrib['to']
except:
raise ConfigFileException("Workflow element assign "
"requires 'to' and 'from' "
"attributes in %s" % self.id)
code.append("%s = %s" % (to, fro))
elif n == "for-each":
fcode = self._handleForEach(c)
code.extend(fcode)
sub = self._handleLxmlFlow(c)
if sub:
for s in sub:
code.append(" " + s)
else:
code.append(" pass")
elif n == "log":
code.extend(self._handleLxmlLog(c))
elif n == "try":
code.append("try:")
sub = self._handleLxmlFlow(c)
for s in sub:
code.append(" " + s)
elif n == "except":
code.append("except Exception as err:")
sub = self._handleLxmlFlow(c)
for s in sub:
code.append(" " + s)
elif n == "else":
code.append("else:")
sub = self._handleLxmlFlow(c)
for s in sub:
code.append(" " + s)
elif n == "break":
code.append("break")
elif n == "continue":
code.append("continue")
elif n == "return":
code.append("return input")
elif n == "raise":
code.append("raise")
elif n == "fork":
code.extend(self._handleLxmlFork(c))
else:
try:
name = n.title()
fn = getattr(self, "_handleLxml%s" % name)
code.extend(fn(c))
except:
raise ConfigFileException("Unknown workflow element: "
"%s" % n)
return code
def _handleLog(self, node):
code = []
ref = node.getAttributeNS(None, 'ref')
if (ref):
code.append("object = db.get_object(session, '%s')" % ref)
else:
code.append("object = db.get_path(session, 'defaultLogger')")
text = flattenTexts(node)
if not text.startswith('"'):
text = repr(text)
lvl = node.getAttributeNS(None, 'level')
if (lvl):
if lvl.isdigit():
code.append("object.log_lvl(session, %s, "
"str(%s).strip())" % (int(lvl), text))
else:
code.append("object.log_%s(session, "
"str(%s).strip())" % (lvl, text))
else:
code.append("object.log(session, str(%s).strip())" % text)
return code
def _handleLxmlLog(self, node):
code = []
ref = node.attrib.get('ref', '')
if ref:
code.append("object = db.get_object(session, '%s')" % ref)
else:
code.append("object = db.get_path(session, 'defaultLogger')")
text = flattenTexts(node)
if not text.startswith('"'):
text = repr(text)
lvl = node.attrib.get('level', '')
if (lvl):
if lvl.isdigit():
code.append("object.log_lvl(session, %s, "
"str(%s).strip())" % (int(lvl), text))
else:
code.append("object.log_%s(session, "
"str(%s).strip())" % (lvl, text))
else:
code.append("object.log(session, str(%s).strip())" % text)
return code
def _handleForEach(self, node):
return ['looped = input', 'for input in looped:']
def _handleAnonObject(self, ref, typ, function):
code = []
if (ref):
code.append("object = db.get_object(session, '%s')" % ref)
elif typ == 'database':
code.append("object = db")
elif typ == 'input':
code.append("object = input")
elif typ:
code.append("object = db.get_path(session, '%s')" % typ)
else:
raise ConfigFileException("Could not determine object")
if not function:
# Assume most common for object type
function = self.fnHash[typ]
if (function in self.singleFunctions):
code.append('object.%s(session)' % function)
elif function in self.singleInputFunctions:
code.append('input = object.%s(session)' % function)
elif (typ == 'index' and function == 'store_terms'):
code.append('object.store_terms(session, input, inRecord)')
elif typ == 'documentFactory' and function == 'load' and input is None:
code.append('input = object.load(session)')
elif typ == 'documentStore':
# Check for normalizer output (deprecated, use documentFactory)
code.append('if type(input) == {}.__class__:')
code.append(' for k in input.keys():')
code.append(' object.%s(session, k)' % function)
code.append('else:')
code.append(' object.%s(session, input)' % function)
elif typ == 'xpathProcessor':
code.append('global inRecord')
code.append('inRecord = input')
code.append('input = object.process_record(session, input)')
else:
code.append('result = object.%s(session, input)' % function)
code.append('if result is not None:')
code.append(' input = result')
#code.append('else:')
#code.append(' raise WorkflowException("No function: "
#"%s on %%s" %% object)' % function)
return code
def _handleLxmlObject(self, node):
ref = node.attrib.get('ref', '')
try:
typ = node.attrib['type']
except KeyError:
raise ConfigFileException("Workflow element 'object' requires "
"'type' attribute in %s" % self.id)
function = node.attrib.get('function', '')
return self._handleAnonObject(ref, typ, function)
def _handleObject(self, node):
ref = node.getAttributeNS(None, 'ref')
typ = node.getAttributeNS(None, 'type')
function = node.getAttributeNS(None, 'function')
return self._handleAnonObject(ref, typ, function)
def _handleLxmlSplit(self, node):
fn = node.attrib.get('id', '')
if fn:
fname = "split_%s" % fn
else:
fname = "split%s" % self.splitN
self.splitN += 1
code = ['def %s(self, session, input):' % fname]
sub = self._handleLxmlGlobals(node)
for s in sub:
code.append(' ' + s)
sub = self._handleLxmlFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
codestr = "\n".join(code)
self.splitCode[fname] = codestr
exec(codestr)
setattr(self, fname, MethodType(locals()[fname], self,
self.__class__))
return fname
def _handleSplit(self, node):
# <workflow>
fn = node.getAttributeNS(None, 'id')
if fn:
fname = "split_%s" % fn
else:
fname = "split%s" % self.splitN
self.splitN += 1
code = ['def %s(self, session, input):' % fname]
sub = self._handleGlobals(node)
for s in sub:
code.append(' ' + s)
sub = self._handleFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
codestr = "\n".join(code)
self.splitCode[fname] = codestr
exec(codestr)
setattr(self, fname, MethodType(locals()[fname], self,
self.__class__))
return fname
def _handleFork(self, node):
code = []
for c in node.childNodes:
if c.nodeType == elementType:
if c.localName == "split":
fname = self._handleSplit(c)
code.append("self.%s(session, input)" % fname)
return code
def _handleLxmlFork(self, node):
code = []
for c in node.iterchildren(tag=etree.Element):
if c.tag in ["split", '{%s}split' % CONFIG_NS]:
fname = self._handleLxmlSplit(c)
code.append("self.%s(session, input)" % fname)
return code
[docs]class CachingWorkflow(SimpleWorkflow):
"""Slightly faster Workflow implementation that caches the objects.
Object must not be used in one database and then another database without
first calling workflow.load_cache(session, newDatabaseObject).
"""
code = None
splitN = 0
splitCode = {}
objcache = {}
objrefs = None
database = None
defaultLogger = None
def __init__(self, session, config, parent):
self.objcache = {}
self.objrefs = set()
self.database = None
self.defaultLogger = None
SimpleWorkflow.__init__(self, session, config, parent)
def load_cache(self, session, db):
if not db:
raise ValueError("ERROR: db parameter empty when loading cache "
"for workflow %s" % self.id)
self.objcache = {}
self.database = db
self.defaultLogger = db.get_path(session, 'defaultLogger')
for o in self.objrefs:
obj = db.get_object(session, o)
if not obj:
raise ObjectDoesNotExistException(o)
self.objcache[o] = obj
def _handleGlobals(self, node):
code = SimpleWorkflow._handleGlobals(self, node)
code.extend(
["if not self.objcache:",
" self.load_cache(session, db)"])
return code
def _handleLog(self, node):
text = flattenTexts(node)
if not text.startswith('"'):
text = repr(text)
ref = node.getAttributeNS(None, 'ref')
lvl = node.getAttributeNS(None, 'level')
if (ref):
self.objrefs.add(ref)
obj = "self.objcache[%s]" % ref
else:
obj = "self.defaultLogger"
if lvl:
if lvl.isdigit():
return ["%s.log_lvl(session, %s, "
"str(%s).strip())" % (obj, lvl, text)]
else:
return ["%s.log_%s(session, "
"str(%s).strip())" % (obj, lvl, text)]
else:
return ["%s.log(session, str(%s).strip())" % (obj, text)]
def _handleLxmlLog(self, node):
text = flattenTexts(node)
if not text.startswith('"'):
text = repr(text)
ref = node.attrib.get('ref', '')
lvl = node.attrib.get('level', '')
if (ref):
self.objrefs.add(ref)
obj = "self.objcache[%s]" % ref
else:
obj = "self.defaultLogger"
if lvl:
if lvl.isdigit():
return ["%s.log_lvl(session, %s, "
"str(%s).strip())" % (obj, lvl, text)]
else:
return ["%s.log_%s(session, "
"str(%s).strip())" % (obj, lvl, text)]
else:
return ["%s.log(session, str(%s).strip())" % (obj, text)]
def _handleObject(self, node):
ref = node.getAttributeNS(None, 'ref')
typ = node.getAttributeNS(None, 'type')
function = node.getAttributeNS(None, 'function')
return self._handleAnonObject(ref, typ, function)
def _handleLxmlObject(self, node):
ref = node.attrib.get('ref', '')
try:
typ = node.attrib['type']
except KeyError:
raise ConfigFileException("Workflow element 'object' requires "
"attribute 'type' in %s" % self.id)
function = node.get('function', '')
return self._handleAnonObject(ref, typ, function)
def _handleAnonObject(self, ref, typ, function):
code = []
if (ref):
self.objrefs.add(ref)
o = "self.objcache['%s']" % ref
elif typ == 'database':
o = "self.database"
elif typ == 'input':
o = "input"
elif typ:
code.append("obj = self.database.get_path(session, '%s')" % typ)
o = "obj"
else:
raise ConfigFileException("Could not determine object")
if not function:
# Assume most common for object type
try:
function = self.fnHash[typ]
except KeyError:
raise ConfigFileException("No default function for "
"objectType: %s" % typ)
if (function in self.singleFunctions):
code.append('%s.%s(session)' % (o, function))
elif (function in self.singleInputFunctions):
code.append('input = %s.%s(session)' % (o, function))
elif (typ == 'index' and function == 'store_terms'):
code.append('%s.store_terms(session, input, inRecord)' % o)
elif typ == 'documentFactory' and function == 'load' and input is None:
code.append('input = %s.load(session)' % o)
elif typ == 'documentStore':
# Check for normalizer output
code.append('if type(input) == {}.__class__:')
code.append(' for k in input.keys():')
code.append(' %s.%s(session, k)' % (o, function))
code.append('else:')
code.append(' %s.%s(session, input)' % (o, function))
elif typ == 'xpathProcessor':
code.append('global inRecord')
code.append('inRecord = input')
code.append('input = %s.process_record(session, input)' % o)
else:
code.append('result = %s.%s(session, input)' % (o, function))
code.append('if result is not None:')
code.append(' input = result')
return code
def _handleSplit(self, node):
# <workflow>
fn = node.getAttributeNS(None, 'id')
if fn:
fname = "split_%s" % fn
else:
fname = "split%s" % self.splitN
self.splitN += 1
code = ['def %s(self, session, input):' % fname]
sub = self._handleFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
codestr = "\n".join(code)
self.splitCode[fname] = codestr
exec(codestr)
setattr(self, fname, MethodType(locals()[fname], self,
self.__class__))
return fname
def _handleLxmlSplit(self, node):
# <workflow>
fn = node.attrib.get('id', '')
if fn:
fname = "split_%s" % fn
else:
fname = "split%s" % self.splitN
self.splitN += 1
code = ['def %s(self, session, input):' % fname]
sub = self._handleLxmlFlow(node)
for s in sub:
code.append(" " + s)
code.append(' return input')
codestr = "\n".join(code)
self.splitCode[fname] = codestr
exec(codestr)
setattr(self, fname, MethodType(locals()[fname], self,
self.__class__))
return fname