Source code for cheshire3.recordStore

import os
import string
import time

from cheshire3.configParser import C3Object
from cheshire3.baseObjects import RecordStore, Record, ResultSetItem, Session
from cheshire3.baseStore import SimpleStore, BdbStore, DeletedObject
from cheshire3.baseStore import DirectoryStore, directoryStoreIter
from cheshire3.record import SaxRecord
from cheshire3.exceptions import *
from cheshire3.document import StringDocument
from cheshire3.utils import nonTextToken


from cheshire3.bootstrap import BSLxmlParser
from cheshire3.baseStore import BdbIter

try:
    # Deal with Python 3.0 deprecation
    import cPickle as pickle
except:
    import pickle

try:
    # Name when installed by hand
    import bsddb3 as bdb
except:
    # Name that comes in python 2.3
    import bsddb as bdb

# Fastest to pickle elementHash, append to list, then join with nonTextToken


class SimpleRecordStore(RecordStore):
    inTransformer = None
    outParser = None

    _possiblePaths = {
        'inTransformer': {
            'docs': ("Identifier for transformer to use to transform incoming "
                     "record into document for storage")
        },
        'outParser': {
            'docs': ("Identifier for parser to use to transform outgoing data "
                     "into a record")
        },
        'inWorkflow': {
            'docs': ("Identifier for a transforming workflow to use to "
                     "transform incoming record in to document for storage")
        },
        'outWorkflow': {
            'docs': ("Identifier for a parsing workflow to use to transform "
                     "outgoing data into a record")
        }
    }

    def __init__(self, session, config, parent):
        if (not self.paths):
            RecordStore.__init__(self, session, config, parent)
        self.inTransformer = self.get_path(session, 'inTransformer', None)
        self.outParser = self.get_path(session, 'outParser', None)
        self.inWorkflow = self.get_path(session, 'inWorkflow', None)
        self.outWorkflow = self.get_path(session, 'outWorkflow', None)

    def create_record(self, session, rec=None):

        p = self.permissionHandlers.get('info:srw/operation/1/create', None)
        if p:
            if not session.user:
                raise PermissionException("Authenticated user required to "
                                          "create an object in %s" % self.id)
            okay = p.hasPermission(session, session.user)
            if not okay:
                raise PermissionException("Permission required to create an "
                                          "object in %s" % self.id)
        id = self.generate_id(session)
        if (rec is None):
            # Create a placeholder
            rec = SaxRecord([], "", id)
        else:
            rec.id = id
        rec.recordStore = self.id

        try:
            self.store_record(session, rec)
        except ObjectAlreadyExistsException:
            # Back out id change
            if type(id) == long:
                self.currentId -= 1
            raise
        except:
            raise
        return rec

    def replace_record(self, session, rec):
        # Hook for permissions check
        p = self.permissionHandlers.get('info:srw/operation/1/replace', None)
        if p:
            if not session.user:
                raise PermissionException("Authenticated user required to "
                                          "replace an object in %s" % self.id)
            okay = p.hasPermission(session, session.user)
            if not okay:
                raise PermissionException("Permission required to replace an "
                                          "object in %s" % self.id)
        self.store_record(session, rec)

    def store_record(self, session, rec, transformer=None):
        rec.recordStore = self.id
        # Maybe add metadata, etc.
        if transformer is not None:
            # Allow custom transformer
            doc = transformer.process_record(session, rec)
            data = doc.get_raw(session)
        elif self.inTransformer is not None:
            doc = self.inTransformer.process_record(session, rec)
            data = doc.get_raw(session)
        elif self.inWorkflow is not None:
            doc = self.inWorkflow.process(session, rec)
            data = doc.get_raw(session)
        else:
            data = rec.get_xml(session)

        dig = self.generate_checkSum(session, data)
        md = {'byteCount': rec.byteCount,
              'wordCount': rec.wordCount,
              'digest': dig}
        # check for expires
        e = self.generate_expires(session, rec)
        if e:
            md['expires'] = e
        # Object metadata will overwrite generated (intentionally)
        md2 = rec.metadata
        md.update(md2)
        # Might raise ObjectAlreadyExistsException
        self.store_data(session, rec.id, data, metadata=md)
        # Now accumulate metadata
        self.accumulate_metadata(session, rec)
        return rec

    def fetch_record(self, session, id, parser=None):
        p = self.permissionHandlers.get('info:srw/operation/2/retrieve', None)
        if p:
            if not session.user:
                raise PermissionException("Authenticated user required to "
                                          "retrieve an object from "
                                          "%s" % self.id)
            okay = p.hasPermission(session, session.user)
            if not okay:
                raise PermissionException("Permission required to retrieve an "
                                          "object from %s" % self.id)
        data = self.fetch_data(session, id)
        if (data):
            rec = self._process_data(session, id, data, parser)
            # fetch metadata
            for attr in ['byteCount', 'wordCount', 'digest']:
                try:
                    setattr(rec,
                            attr,
                            self.fetch_recordMetadata(session, id, attr))
                except:
                    continue
            return rec
        elif (isinstance(data, DeletedObject)):
            raise ObjectDeletedException(data)
        else:
            raise ObjectDoesNotExistException(id)

    def delete_record(self, session, id):
        p = self.permissionHandlers.get('info:srw/operation/1/delete', None)
        if p:
            if not session.user:
                raise PermissionException("Authenticated user required to "
                                          "delete an object from %s" % self.id)
            okay = p.hasPermission(session, session.user)
            if not okay:
                raise PermissionException("Permission required to replace an "
                                          "object from %s" % self.id)

        # FIXME: API: This if sucks but not sure how to avoid for workflow
        if isinstance(id, Record) or isinstance(id, ResultSetItem):
            id = id.id
        self.delete_data(session, id)

    def fetch_recordMetadata(self, session, id, mType):
        if isinstance(id, Record):
            id = id.id
        return self.fetch_metadata(session, id, mType)

    def _process_data(self, session, id, data, parser=None):
        # Split from fetch record for Iterators

        doc = StringDocument(data)
        if (parser is not None):
            rec = parser.process_document(session, doc)
        elif (self.outParser is not None):
            rec = self.outParser.process_document(session, doc)
        elif (self.outWorkflow is not None):
            rec = self.outWorkflow.process(session, doc)
        else:
            # Assume raw XML into LXML
            # try and set self.parser to an LxmlParser
            try:
                p = session.server.get_object(session, 'LxmlParser')
                self.parser = p
                rec = p.process_document(session, doc)
            except:
                rec = BSLxmlParser.process_document(session, doc)

        # Ensure basic required info
        rec.id = id
        rec.recordStore = self.id
        return rec


class BdbRecordIter(BdbIter):
    # Get data from bdbIter and turn into record

    def next(self):
        d = BdbIter.next(self)
        rec = self.store._process_data(self.session, d[0], d[1])
        return rec


[docs]class BdbRecordStore(BdbStore, SimpleRecordStore): def __init__(self, session, config, parent): BdbStore.__init__(self, session, config, parent) SimpleRecordStore.__init__(self, session, config, parent) def get_storageTypes(self, session): types = ['database', 'wordCount', 'byteCount'] if self.get_setting(session, 'digest'): types.append('digest') if self.get_setting(session, 'expires'): types.append('expires') return types def __iter__(self): # return an iter object return BdbRecordIter(self.session, self)
try: from cheshire3.baseStore import PostgresStore except: pass else: class PostgresRecordStore(PostgresStore, SimpleRecordStore): def __init__(self, session, config, parent): PostgresStore.__init__(self, session, config, parent) SimpleRecordStore.__init__(self, session, config, parent)
[docs]class RedirectRecordStore(SimpleRecordStore, SimpleStore): # Store in unparsed format. Parse on load # cf buildassoc vs datastore in C2 _possiblePaths = { 'documentStore': { 'docs': "documentStore identifier where the data is held" } } documentStore = None def __iter__(self): # Return an iterator object that calls self.workflow return ParsingIter(self) def __init__(self, session, config, parent): SimpleRecordStore.__init__(self, session, config, parent) self.documentStore = self.get_path(session, 'documentStore') def create_record(self, session, rec): # maybe just copy some stuff around... rec.recordStore = self.id if rec.parent and rec.parent[2]: rec.id = rec.parent[2] if rec.id == -1: raise ValueError return rec else: return SimpleRecordStore.create_record(self, session, rec) def store_data(self, session, id, data, metadata={}): # write this to documentStore as document return self.documentStore.store_data(session, id, data, metadata) def fetch_data(self, session, id): # read from documentStore return self.documentStore.fetch_data(session, id) def begin_storing(self, session): return self.documentStore.begin_storing(session) def commit_storing(self, session): return self.documentStore.commit_storing(session) def fetch_recordMetadata(self, session, id, mType): return self.documentStore.fetch_documentMetadata(session, id, mType) # Pass back info for PVM/MPI/SOAP/etc
[docs]class RemoteWriteRecordStore(BdbRecordStore): """ Listen for records and write """ def store_data(self, session, id, data, metadata={}): # Return Id to other task # Almost don't need this function/class if id is None: id = self.generate_id(session) try: BdbRecordStore.store_data(self, session, id, data, metadata) return id except ObjectAlreadyExistsException: return -1
[docs]class RemoteSlaveRecordStore(BdbRecordStore): recordStore = "" writeTask = None taskType = None protocol = "" _possiblePaths = { 'remoteStore': { 'docs': "Remote store to send data to." } } _possibleSettings = { 'protocol': { 'docs': "Protocol to use for sending data. Currently MPI or PVM" } } def __init__(self, session, config, parent): SimpleRecordStore.__init__(self, session, config, parent) self.writeTask = None self.recordStore = self.get_path(session, 'remoteStore') if not self.recordStore: raise ConfigFileException('Missing recordStore identifier') def begin_storing(self, session): # set tasks self.writeTask = session.processManager.namedTasks['writeTask'] return None def create_record(self, session, rec=None): if (rec is None): rec = SaxRecord([], "", None) else: rec.id = None self.store_record(session, rec) return rec def store_record(self, session, rec, transformer=None): rec.recordStore = self.recordStore.id # Maybe add metadata, etc. if transformer is not None: # Allow custom transformer doc = transformer.process_record(session, rec) data = doc.get_raw(session) elif self.inTransformer is not None: doc = self.inTransformer.process_record(session, rec) data = doc.get_raw(session) elif self.inWorkflow is not None: doc = self.inWorkflow.process(session, rec) data = doc.get_raw(session) else: sax = [x.encode('utf8') for x in rec.get_sax(session)] sax.append("9 " + pickle.dumps(rec.elementHash)) data = nonTextToken.join(sax) dig = self.generate_checkSum(session, data) md = {'byteCount': rec.byteCount, 'wordCount': rec.wordCount, 'digest': dig} if (self.writeTask is not None): self.writeTask.call(self.recordStore, 'store_data', session, rec.id, data, md) msg = self.writeTask.recv() else: raise ValueError('WriteTask is None... ' 'did you call begin_storing?') if rec.id is None: rec.id = msg.data return rec def fetch_record(self, session, id, parser=None): raise NotImplementedError
class DirectoryRecordStore(DirectoryStore, SimpleRecordStore): def __init__(self, session, config, parent): DirectoryStore.__init__(self, session, config, parent) SimpleRecordStore.__init__(self, session, config, parent) def get_storageTypes(self, session): types = ['database', 'wordCount', 'byteCount'] if self.get_setting(session, 'digest'): types.append('digest') if self.get_setting(session, 'expires'): types.append('expires') return types def __iter__(self): # return an iter object return directoryRecordStoreIter(self) def directoryRecordStoreIter(store): session = Session() for id_, data in directoryStoreIter(store): yield store._process_data(session, id_, data)