Source code for cheshire3.indexStore

"""Cheshire3 IndexStore Implementations."""

import sys
import os
import types
import struct
import time
import glob
import re
import random
import fileinput

try:
    # Python 2.3 vs 2.2
    import bsddb as bdb
except ImportError:
    import bsddb3 as bdb

# Intra-package import - absolute imports are good practice
from cheshire3.baseObjects import IndexStore, Database
from cheshire3.configParser import C3Object
from cheshire3.exceptions import ConfigFileException
from cheshire3.exceptions import FileDoesNotExistException,\
    FileAlreadyExistsException, PermissionException
from cheshire3.resultSet import SimpleResultSetItem
from cheshire3.index import *
from cheshire3.baseStore import SwitchingBdbConnection
from cheshire3.utils import getShellResult


nonTextToken = "\x00\t"
NumTypes = [types.IntType, types.LongType]


class IndexStoreIter(object):
    # step through our indexes!

    def __init__(self, session, indexStore):
        self.session = session
        self.indexStore = indexStore
        dfp = indexStore.get_path(session, "defaultPath")
        files = os.listdir(dfp)
        # self.files = filter(self._fileFilter, files)
        self.files = [f for f in files if self._fileFilter(f)]
        self.position = 0

    def _fileFilter(self, x):
        if x[-6:] != ".index":
            return 0
        elif x[:len(self.indexStore.id)] != self.indexStore.id:
            return 0
        else:
            return 1

    def next(self):
        try:
            start = len(self.indexStore.id) + 2
            idxid = self.files[self.position][start:-6]
            self.position += 1
            # now fetch the index
            return self.indexStore.get_object(self.session, idxid)
        except IndexError:
            raise StopIteration()

    def jump(self, position):
        self.position = position
        return self.next()


[docs]class BdbIndexStore(IndexStore): indexing = 0 outFiles = {} storeHash = {} storeHashReverse = {} sortStoreCxn = {} identifierMapCxn = {} indexCxn = {} vectorCxn = {} proxVectorCxn = {} termIdCxn = {} reservedLongs = 3 _possiblePaths = { 'recordStoreHash': { 'docs': ("Space separated list of recordStore identifiers kept " "in this indexStore (e.g. map from position in list to " "integer)") }, 'tempPath': { 'docs': ("Path in which temporary files are kept during batch" "mode indexing") }, 'sortPath': { 'docs': ("Path to the 'sort' utility used for sorting temporary " "files") } } _possibleSettings = { 'maxVectorCacheSize': { 'docs': "Number of terms to cache when building vectors", 'type': int }, 'bucketType': { 'docs': ("Type of 'bucket' to use when splitting an index over " "multiple files."), 'options': 'term1|term2|hash' }, 'maxBuckets': { 'docs': "Maximum number of 'buckets' to split an index into", 'type': int }, 'maxItemsPerBucket': { 'docs': "Maximum number of items to put into each 'bucket'", 'type': int }, 'vectorBucketType': { 'docs': ("Type of 'bucket' to use when splitting an index's " "vectors over multiple files."), 'options': 'hash|int' }, 'vectorMaxBuckets': { 'docs': ("Maximum number of 'buckets' to split an index's " "vectors into"), 'type': int }, 'vectorMaxItemsPerBucket': { 'docs': ("Maximum number of items to put into each vector " "'bucket'"), 'type': int }, 'dirPerIndex': { 'docs': "Create a sub-directory for each index, yes or no?", 'type': int, 'options': '0|1' } } def __init__(self, session, config, parent): IndexStore.__init__(self, session, config, parent) self.session = session self.reservedLongs = 3 # Temporary, small, necessarily single, or don't care dbs/files self.outFiles = {} # batch loading file self.metadataCxn = None # indexStore level metadata self.identifierMapCxn = {} # str recid <--> long recid # Splittable files (Bdb connection objects) # Controlled by switching self.switching = ( self.get_setting(session, 'bucketType', '') or self.get_setting(session, 'maxBuckets', 0) or self.get_setting(session, 'maxItemsPerBucket', 0) ) self.switchingClass = SwitchingBdbConnection self.indexCxn = {} # term -> rec list # Controlled by vectorSwitching self.vectorSwitching = ( self.get_setting(session, 'vectorBucketType', '') or self.get_setting(session, 'vectorMaxBuckets', 0) or self.get_setting(session, 'vectorMaxItemsPerBucket', 0) ) self.vectorSwitchingClass = SwitchingBdbConnection self.sortStoreCxn = {} # recid -> sort value self.termIdCxn = {} # termid -> term value self.vectorCxn = {} # recid -> term vector self.proxVectorCxn = {} # recid -> term prox vector self.termFreqCxn = {} # rank -> term self.createArgs = {} self.storeHash = {} rsh = self.get_path(session, 'recordStoreHash') if rsh: wds = rsh.split() for (w, wd) in enumerate(wds): self.storeHash[w] = wd self.storeHashReverse[wd] = w self.dirPerIndex = self.get_setting(session, 'dirPerIndex', 0) # Record Hash (in bdb) fnbase = "recordIdentifiers_" + self.id + "_" fnlen = len(fnbase) dfp = self.get_path(session, "defaultPath") try: files = os.listdir(dfp) except OSError: try: os.mkdir(dfp, 0755) files = os.listdir(dfp) except: msg = "Cannot create default path for %s: %s" % (self.id, dfp) raise ConfigFileException(msg) for f in files: if (f[:fnlen] == fnbase): recstore = f[fnlen:-4] dbp = os.path.join(dfp, f) cxn = bdb.db.DB() if session.environment == "apache": cxn.open(dbp, flags=bdb.db.DB_NOMMAP) else: cxn.open(dbp) self.identifierMapCxn[recstore] = cxn def __iter__(self): return IndexStoreIter(self.session, self) def _openMetadata(self, session): if self.metadataCxn is not None: return self.metadataCxn else: mp = self.get_path(session, 'metadataPath') if not mp: dfp = self.get_path(session, 'defaultPath') mp = os.path.join(dfp, 'metadata.bdb') if not os.path.exists(mp): oxn = bdb.db.DB() oxn.open(mp, dbtype=bdb.db.DB_BTREE, flags=bdb.db.DB_CREATE, mode=0660) oxn.close() cxn = bdb.db.DB() if session.environment == "apache": cxn.open(mp, flags=bdb.db.DB_NOMMAP) else: cxn.open(mp) self.metadataCxn = cxn return cxn def _closeMetadata(self, session): if self.metadataCxn is not None: self.metadataCxn.close() self.metadataCxn = None def _closeIndex(self, session, index): if index in self.indexCxn: self.indexCxn[index].close() del self.indexCxn[index] def _openIndex(self, session, index): if index in self.indexCxn: return self.indexCxn[index] else: dfp = self.get_path(session, 'defaultPath') basename = self._generateFilename(index) dbp = os.path.join(dfp, basename) if self.switching: # check for overrides vbt = index.get_setting(session, 'bucketType', '') vmb = index.get_setting(session, 'maxBuckets', 0) vmi = index.get_setting(session, 'maxItemsPerBucket', 0) if vbt or vmb or vmi: cxn = self.switchingClass( session, self, dbp, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = self.switchingClass(session, self, dbp) else: cxn = bdb.db.DB() if session.environment == "apache": cxn.open(dbp, flags=bdb.db.DB_NOMMAP) else: cxn.open(dbp) self.indexCxn[index] = cxn return cxn def _generateFilename(self, index): if self.dirPerIndex: return os.path.join(index.id, index.id + ".index") else: return "%s--%s.index" % (self.id, index.id) def _listExistingFiles(self, session, index): dfp = self.get_path(session, "defaultPath") name = self._generateFilename(index) return glob.glob(os.path.join(dfp, name + '*')) def _get_internalId(self, session, rec): if rec.recordStore in self.identifierMapCxn: cxn = self.identifierMapCxn[rec.recordStore] else: fn = "recordIdentifiers_{0}_{1}.bdb".format(self.id, rec.recordStore) dfp = self.get_path(session, "defaultPath") dbp = os.path.join(dfp, fn) if not os.path.exists(dbp): cxn = bdb.db.DB() cxn.open(dbp, dbtype=bdb.db.DB_BTREE, flags=bdb.db.DB_CREATE, mode=0660) cxn.close() cxn = bdb.db.DB() if session.environment == "apache": cxn.open(dbp, flags=bdb.db.DB_NOMMAP) else: cxn.open(dbp) self.identifierMapCxn[rec.recordStore] = cxn # Now we have cxn, check it rec exists recid = rec.id if type(recid) == unicode: try: recid = rec.id.encode('utf-8') except: recid = rec.id.encode('utf-16') try: data = cxn.get(recid) if data: return long(data) except: pass # Doesn't exist, write c = cxn.cursor() c.set_range("__i2s_999999999999999") data = c.prev() if (data and data[0][:6] == "__i2s_"): max = long(data[0][6:]) intid = "%015d" % (max + 1) else: intid = "000000000000000" cxn.put(recid, intid) cxn.put("__i2s_%s" % intid, recid) return long(intid) def _get_externalId(self, session, recordStore, identifier): if recordStore in self.identifierMapCxn: cxn = self.identifierMapCxn[recordStore] else: fn = "recordIdentifiers_" + self.id + "_" + recordStore + ".bdb" dfp = self.get_path(session, "defaultPath") dbp = os.path.join(dfp, fn) if not os.path.exists(dbp): raise FileDoesNotExistException(dbp) cxn = bdb.db.DB() if session.environment == "apache": cxn.open(dbp, flags=bdb.db.DB_NOMMAP) else: cxn.open(dbp) self.identifierMapCxn[recordStore] = cxn identifier = "%015d" % identifier data = cxn.get("__i2s_%s" % identifier) if data: return data elif data is not None: # We shouldn't raise DoesNotExistException, but we we probably # shouldn't return an empty string either. # However, returning anything other than the real identifier # (i.e. the empty string) results in errors further down the line return data else: msg = "%s/%s" % (recordStore, identifier) raise FileDoesNotExistException(msg) def fetch_summary(self, session, index): # Fetch summary data for all terms in index # eg for sorting, then iterating # USE WITH CAUTION # Everything done here for speed cxn = self._openIndex(session, index) cursor = cxn.cursor() dataLen = index.longStructSize * self.reservedLongs terms = [] (term, val) = cursor.first(doff=0, dlen=dataLen) while (val): (termid, recs, occs) = struct.unpack('<lll', val) terms.append({'t': term, 'i': termid, 'r': recs, 'o': occs}) try: (term, val) = cursor.next(doff=0, dlen=dataLen) except: val = 0 self._closeIndex(session, index) return terms def begin_indexing(self, session, index): """Begin indexing process. Open BerkeleyDB connections, create temp files etc. """ p = self.permissionHandlers.get('info:srw/operation/2/index', None) if p: if not session.user: msg = ("Authenticated user required to add to indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to add to indexStore %s" % self.id raise PermissionException(msg) temp = self.get_path(session, 'tempPath') if not os.path.isabs(temp): temp = os.path.join(self.get_path(session, 'defaultPath'), temp) if (not os.path.exists(temp)): try: os.mkdir(temp) except OSError: msg = 'TempPath does not exist and is not creatable.' raise ConfigFileException(msg) elif (not os.path.isdir(temp)): raise(ConfigFileException('TempPath is not a directory.')) if self.dirPerIndex: indexDirPath = os.path.join(temp, index.id) try: os.mkdir(indexDirPath) except OSError: # Already exists pass except: self.log_critical(session, "Could not create {0}" "".format(indexDirPath) ) raise basename = os.path.join(temp, self._generateFilename(index)) if (hasattr(session, 'task')): basename += str(session.task) # In case we're called twice if (not index in self.outFiles): self.outFiles[index] = codecs.open(basename + "_TEMP", 'a', 'utf-8', 'xmlcharrefreplace') if (index.get_setting(session, "sortStore")): # Store in db for faster sorting dfp = self.get_path(session, "defaultPath") name = self._generateFilename(index) + "_VALUES" fullname = os.path.join(dfp, name) if self.vectorSwitching: vbt = self.get_setting(session, 'vectorBucketType', '' ) vmb = self.get_setting(session, 'vectorMaxBuckets', 0 ) vmi = self.get_setting(session, 'vectorMaxItemsPerBucket', 0 ) cxn = self.vectorSwitchingClass( session, self, fullname, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = bdb.db.DB() if session.environment == "apache" or session.task: # Do not memory map in multiprocess environments cxn.open(fullname, flags=bdb.db.DB_NOMMAP) else: cxn.open(fullname) self.sortStoreCxn[index] = cxn def commit_indexing(self, session, index): # Need to do this per index so one store can be doing multiple # things at once # Should only be called if begin_indexing() has been p = self.permissionHandlers.get('info:srw/operation/2/index', None) if p: if not session.user: msg = ("Authenticated user required to add to indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to add to indexStore %s" % self.id raise PermissionException(msg) temp = self.get_path(session, 'tempPath') dfp = self.get_path(session, 'defaultPath') if not os.path.isabs(temp): temp = os.path.join(dfp, temp) try: self.sortStoreCxn[index].close() del self.sortStoreCxn[index] except KeyError: pass if (not index in self.outFiles): raise FileDoesNotExistException(index.id) sort = self.get_path(session, 'sortPath') if (not os.path.exists(sort)): msg = "Sort executable for %s does not exist" % self.id raise ConfigFileException(msg) fh = self.outFiles[index] fh.flush() fh.close() del self.outFiles[index] for db in self.identifierMapCxn.values(): db.sync() # Sort the _TEMP file written during indexing into a _SORT file basename = self._generateFilename(index) if (hasattr(session, 'task')): basename += str(session.task) basename = os.path.join(temp, basename) tempfile = basename + "_TEMP" sorted = basename + "_SORT" cmd = "%s %s -T %s -o %s" % (sort, tempfile, temp, sorted) getShellResult(cmd) # Sorting might fail. if (not os.path.exists(sorted)): msg = "Failed to sort {0}".format(tempfile) self.log_critical(session, msg) raise ValueError(msg) if not index.get_setting(session, 'vectors'): os.remove(tempfile) if ((hasattr(session, 'task') and session.task) or (hasattr(session, 'phase') and session.phase == 'commit_indexing1')): return sorted # Original terms from data return self.commit_centralIndexing(session, index, sorted) def commit_parallelIndexing(self, session, index): sort = self.get_path(session, 'sortPath') temp = self.get_path(session, 'tempPath') dfp = self.get_path(session, 'defaultPath') if not os.path.isabs(temp): temp = os.path.join(dfp, temp) # Merge multiple _SORT files into a single _SORT file for finalizing self.log_debug(session, "Merging parallel sort files for {0}" "".format(index.id) ) if (not os.path.exists(sort)): msg = "Sort executable for %s does not exist" % self.id raise ConfigFileException(msg) basename = self._generateFilename(index) baseGlob = os.path.join(temp, "%s*SORT" % basename) sortFileList = glob.glob(baseGlob) sortFiles = " ".join(sortFileList) sorted = os.path.join(temp, "%s_SORT" % basename) cmd = "%s -m -T %s -o %s %s" % (sort, temp, sorted, sortFiles) out = getShellResult(cmd) if not os.path.exists(sorted): msg = "Didn't sort %s" % index.id self.log_error(session, msg) raise ValueError(msg) # Merge multiple _TEMP files into a single _TEMP file baseGlob = os.path.join(temp, "%s*_TEMP" % basename) tempFileList = glob.glob(baseGlob) self.log_debug(session, "Concatenating {0} parallel _TEMP files for {1}" "".format(len(tempFileList), index.id) ) mergedFn = os.path.join(temp, "%s_TEMP" % basename) # Merge natively in Python. This takes longer than using `cat` but is # more reliable and should work cross-platform with open(mergedFn, 'wb') as outfh: for line in fileinput.input(tempFileList, mode='rb'): outfh.write(line) # Clean up for tsfn in sortFileList + tempFileList: os.remove(tsfn) return self.commit_centralIndexing(session, index, sorted) def commit_centralIndexing(self, session, index, filePath): p = self.permissionHandlers.get('info:srw/operation/2/index', None) if p: if not session.user: msg = ("Authenticated user required to add to indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to add to indexStore %s" % self.id raise PermissionException(msg) if not filePath: temp = self.get_path(session, 'tempPath') dfp = self.get_path(session, 'defaultPath') if not os.path.isabs(temp): temp = os.path.join(dfp, temp) basename = self._generateFilename(index) filePath = os.path.join(temp, basename + "_SORT") cxn = self._openIndex(session, index) cursor = cxn.cursor() try: nonEmpty = cursor.first() except: nonEmpty = False metadataCxn = self._openMetadata(session) tidIdx = index.get_path(session, 'termIdIndex', None) vectors = index.get_setting(session, 'vectors', 0) proxVectors = index.get_setting(session, 'proxVectors', 0) termIds = index.get_setting(session, 'termIds', 0) if not nonEmpty: termid = long(0) else: # find highest termid. First check termid->term map tidcxn = None if vectors: tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: self._openVectors(session, index) tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: # okay, no termid hash. hope for best with final set # of terms from regular index (term, value) = cursor.last(doff=0, dlen=(3 * index.longStructSize)) (last, x, y) = index.deserialize_term(session, value) else: tidcursor = tidcxn.cursor() (finaltid, term) = tidcursor.last() last = long(finaltid) tidcxn.close() self.termIdCxn[index] = None del tidcxn termid = last currTerm = None currData = [] l = 1 s2t = index.deserialize_term mt = index.merge_term t2s = index.serialize_term minTerms = index.get_setting(session, 'minimumSupport') if not minTerms: minTerms = 0 dfp = self.get_path(session, 'defaultPath') basename = self._generateFilename(index) dbname = os.path.join(dfp, basename) if vectors or termIds: tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: self._openVectors(session, index) tidcxn = self.termIdCxn.get(index, None) nTerms = 0 nRecs = 0 nOccs = 0 totalChars = 0 maxNRecs = 0 maxNOccs = 0 # Finalize sorted data in _SORT into Index file(s) f = file(filePath) while(l): l = f.readline()[:-1] data = l.split(nonTextToken) term = data[0] fullinfo = [] for x in data[1:]: try: fullinfo.append(long(x)) except ValueError: # Some unexpected data in the index :( self.log_debug(session, 'Unexpected value in raw index, ' 'attempting to recover...') # Attempt to recover # Find the first thing that could be a long but not # entirely composed of 0s try: mylong = long(re.search('\d*[1-9]\d*', x).group(0)) except AttributeError: # No match - nothing we can do self.log_error(session, 'Unexpected value in raw index data, ' 'skipping entry') continue else: fullinfo.append(mylong) self.log_debug(session, 'Recovered value: {0}'.format(mylong)) if term == currTerm: # Accumulate if fullinfo: totalRecs += 1 totalOccs += fullinfo[2] currData.extend(fullinfo) else: # Store if currData: if (nonEmpty): val = cxn.get(currTerm) if (val is not None): unpacked = s2t(session, val) tempTermId = unpacked[0] unpacked = mt(session, unpacked, currData, 'add', nRecs=totalRecs, nOccs=totalOccs) totalRecs = unpacked[1] totalOccs = unpacked[2] unpacked = unpacked[3:] termid -= 1 else: tempTermId = termid unpacked = currData packed = t2s(session, tempTermId, unpacked, nRecs=totalRecs, nOccs=totalOccs) else: tempTermId = termid try: packed = t2s(session, termid, currData, nRecs=totalRecs, nOccs=totalOccs) except: raise if totalRecs >= minTerms: nTerms += 1 nRecs += totalRecs nOccs += totalOccs totalChars += len(currTerm) maxNRecs = max(maxNRecs, totalRecs) maxNOccs = max(maxNOccs, totalOccs) cxn.put(currTerm, packed) del packed if (vectors or termIds) and tempTermId == termid: tidcxn.put("%012d" % termid, currTerm) else: # cheat and undo our term increment termid -= 1 try: totalOccs = fullinfo[2] termid += 1 currTerm = term currData = fullinfo totalRecs = 1 except: pass self._closeIndex(session, index) os.remove(filePath) if metadataCxn: # LLLLLL: nTerms, nRecs, nOccs, maxRecs, maxOccs, totalChars val = struct.pack("<LLLLLL", nTerms, nRecs, nOccs, maxNRecs, maxNOccs, totalChars) metadataCxn.put(index.id.encode('utf8'), val) self._closeMetadata(session) if vectors or termIds: tidcxn.close() self.termIdCxn[index] = None del tidcxn if vectors: self._buildVectors(session, index, filePath[:-4] + "TEMP") fl = index.get_setting(session, 'freqList', "") if fl: cxn = self._openIndex(session, index) cursor = cxn.cursor() dataLen = index.longStructSize * self.reservedLongs terms = [] try: (term, val) = cursor.first(doff=0, dlen=dataLen) while (val): (termid, recs, occs) = struct.unpack('<lll', val) terms.append({'i': termid, 'r': recs, 'o': occs}) try: (term, val) = cursor.next(doff=0, dlen=dataLen) except: val = 0 lt = len(terms) if fl.find('rec') > -1: # 1 = most frequent terms.sort(key=lambda x: x['r'], reverse=True) cxn = self._openTermFreq(session, index, 'rec') for (t, term) in enumerate(terms): termidstr = struct.pack('<ll', term['i'], term['r']) cxn.put("%012d" % t, termidstr) self._closeTermFreq(session, index, 'rec') if fl.find('occ') > -1: terms.sort(key=lambda x: x['o'], reverse=True) cxn = self._openTermFreq(session, index, 'occ') #cxn = bdb.db.DB() #cxn.open(dbname + "_FREQ_OCC") for (t, term) in enumerate(terms): termidstr = struct.pack('<ll', term['i'], term['o']) cxn.put("%012d" % t, termidstr) self._closeTermFreq(session, index, 'occ') except TypeError: # no data in index pass self._closeIndex(session, index) return None def _buildVectors(self, session, index, filePath): # build vectors here termCache = {} freqCache = {} maxCacheSize = index.get_setting(session, 'maxVectorCacheSize', -1) if maxCacheSize == -1: maxCacheSize = self.get_setting(session, 'maxVectorCacheSize', 50000) rand = random.Random() # Settings for what to go into vector store # -1 for just put everything in minGlobalFreq = int(index.get_setting(session, 'vectorMinGlobalFreq', '-1')) maxGlobalFreq = int(index.get_setting(session, 'vectorMaxGlobalFreq', '-1')) minGlobalOccs = int(index.get_setting(session, 'vectorMinGlobalOccs', '-1')) maxGlobalOccs = int(index.get_setting(session, 'vectorMaxGlobalOccs', '-1')) minLocalFreq = int(index.get_setting(session, 'vectorMinLocalFreq', '-1')) maxLocalFreq = int(index.get_setting(session, 'vectorMaxLocalFreq', '-1')) proxVectors = index.get_setting(session, 'proxVectors', 0) # Temp filepath base = filePath fh = codecs.open(base, 'r', 'utf-8', 'xmlcharrefreplace') # Read in each line, look up currDoc = "000000000000" currStore = "0" docArray = [] proxHash = {} cxn = self.vectorCxn.get(index, None) if cxn is None: self._openVectors(session, index) cxn = self.vectorCxn[index] if proxVectors: proxCxn = self.proxVectorCxn[index] totalTerms = 0 totalFreq = 0 while True: try: l = fh.readline()[:-1] bits = l.split(nonTextToken) except: break if len(bits) < 4: break (term, docid, storeid, freq) = bits[:4] if docArray and (docid != currDoc or currStore != storeid): # store previous docArray.sort() flat = [] [flat.extend(x) for x in docArray] fmt = '<' + "L" * len(flat) packed = struct.pack(fmt, *flat) cxn.put(str("%s|%s" % (currStore, currDoc.encode('utf8'))), packed) docArray = [] if proxVectors: pdocid = long(currDoc) for (elem, parr) in proxHash.iteritems(): proxKey = struct.pack('<LL', pdocid, elem) if elem < 0 or elem > 4294967295: raise ValueError(elem) proxKey = "%s|%s" % (currStore.encode('utf8'), proxKey) parr.sort() flat = [] [flat.extend(x) for x in parr] proxVal = struct.pack('<' + 'L' * len(flat), *flat) proxCxn.put(proxKey, proxVal) proxHash = {} currDoc = docid currStore = storeid tid = termCache.get(term, None) if tid is None: if not term: #??? continue tdata = self.fetch_term(session, index, term, summary=True) if tdata: try: (tid, tdocs, tfreq) = tdata[:3] except: msg = u"Broken: %r %r %r" % (term, index.id, tdata) self.log_critical(session, msg.encode('utf-8')) raise else: termCache[term] = (0, 0) freqCache[term] = (0, 0) continue termCache[term] = tid freqCache[term] = [tdocs, tfreq] # check caches aren't exploding ltc = len(termCache) if ltc >= maxCacheSize: # select random key to remove (k, v) = termCache.popitem() del freqCache[k] else: (tdocs, tfreq) = freqCache[term] if not tdocs or not tfreq: continue if ( (minGlobalFreq == -1 or tdocs >= minGlobalFreq) and (maxGlobalFreq == -1 or tdocs <= maxGlobalFreq) and (minGlobalOccs == -1 or tfreq >= minGlobalOccs) and (maxGlobalOccs == -1 or tfreq <= maxGlobalOccs) and (minLocalFreq == -1 or tfreq >= minLocalFreq) and (maxLocalFreq == -1 or tfreq <= maxLocalFreq) ): docArray.append([tid, long(freq)]) totalTerms += 1 totalFreq += long(freq) if proxVectors: nProxInts = index.get_setting(session, 'nProxInts', 2) proxInfo = [long(x) for x in bits[4:]] tups = [proxInfo[x:x + nProxInts] for x in range(0, len(proxInfo), nProxInts) ] for t in tups: val = [t[1], tid] val.extend(t[2:]) try: proxHash[t[0]].append(val) except KeyError: proxHash[t[0]] = [val] # Catch final document if docArray: docArray.sort() # Put in total terms, total occurences flat = [] [flat.extend(x) for x in docArray] fmt = '<' + "L" * len(flat) packed = struct.pack(fmt, *flat) cxn.put(str("%s|%s" % (storeid, docid.encode('utf8'))), packed) if proxVectors: pdocid = long(currDoc) for (elem, parr) in proxHash.iteritems(): proxKey = struct.pack('<LL', pdocid, elem) proxKey = "%s|%s" % (storeid.encode('utf8'), proxKey) parr.sort() flat = [] [flat.extend(x) for x in parr] proxVal = struct.pack('<' + 'L' * len(flat), *flat) proxCxn.put(proxKey, proxVal) proxCxn.close() self.proxVectorCxn[index] = None del proxCxn fh.close() cxn.close() os.remove(base) def _closeVectors(self, session, index): for cxnx in [self.termIdCxn, self.vectorCxn, self.proxVectorCxn]: try: cxnx[index].close() except KeyError: continue del cxnx[index] def _openVectors(self, session, index): dfp = self.get_path(session, 'defaultPath') basename = self._generateFilename(index) dbname = os.path.join(dfp, basename) dbp = dbname + "_TERMIDS" if self.vectorSwitching: vbt = self.get_setting(session, 'vectorBucketType', '') vmb = self.get_setting(session, 'vectorMaxBuckets', 0) vmi = self.get_setting(session, 'vectorMaxItemsPerBucket', 0) cxn = self.vectorSwitchingClass( session, self, dbp, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = bdb.db.DB() cxn.open(dbp) self.termIdCxn[index] = cxn if index.get_setting(session, 'vectors'): dbp = dbname + "_VECTORS" if self.vectorSwitching: cxn = self.vectorSwitchingClass( session, self, dbp, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = bdb.db.DB() cxn.open(dbp) self.vectorCxn[index] = cxn if index.get_setting(session, 'proxVectors'): dbp = dbname + "_PROXVECTORS" if self.vectorSwitching: cxn = self.vectorSwitchingClass(session, self, dbp, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi) else: cxn = bdb.db.DB() cxn.open(dbp) self.proxVectorCxn[index] = cxn def fetch_vector(self, session, index, rec, summary=False): # rec can be resultSetItem or record tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: self._openVectors(session, index) tidcxn = self.termIdCxn.get(index, None) cxn = self.vectorCxn.get(index, None) else: cxn = self.vectorCxn[index] docid = rec.id if (not type(docid) in NumTypes): if (isinstance(docid, basestring) and docid.isdigit()): docid = long(docid) else: # Look up identifier in local bdb docid = self._get_internalId(session, rec) docid = "%012d" % docid # now add in store docid = "%s|%s" % (self.storeHashReverse[rec.recordStore], docid) if summary: data = cxn.get(docid, doff=0, dlen=(2 * index.longStructSize)) else: data = cxn.get(docid) if data: flat = struct.unpack( '<' + 'L' * (len(data) / index.longStructSize), data ) lf = len(flat) unflatten = [(flat[x], flat[x + 1]) for x in xrange(0, lf, 2)] # totalTerms, totalFreq, [(termId, freq),...] lu = lf / 2 return [lu, sum([x[1] for x in unflatten]), unflatten] else: return [0, 0, []] def fetch_proxVector(self, session, index, rec, elemId=-1): """Fetch and return the proximity vector. Fetch proximity vector for rec from index. rec can be resultSetItem or record """ cxn = self.proxVectorCxn.get(index, None) if cxn is None: self._openVectors(session, index) cxn = self.proxVectorCxn.get(index, None) docid = rec.id if (not type(docid) in NumTypes): if (isinstance(docid, basestring) and docid.isdigit()): docid = long(docid) else: # Look up identifier in local bdb docid = self._get_internalId(session, rec) nProxInts = index.get_setting(session, 'nProxInts', 2) longStructSize = index.longStructSize def unpack(data): flat = struct.unpack( '<' + 'L' * (len(data) / longStructSize), data ) lf = len(flat) unflat = [flat[x:x + nProxInts] for x in range(0, lf, nProxInts)] return unflat if elemId == -1: # fetch all for this record key = struct.pack('<LL', docid, 0) keyid = key[:4] key = str(self.storeHashReverse[rec.recordStore]) + "|" + key c = cxn.cursor() (k, v) = c.set_range(key) vals = {} # XXX won't work for > 9 recordStores... while k[2:6] == keyid: elemId = struct.unpack('<L', k[6:])[0] vals[elemId] = unpack(v) try: (k, v) = c.next() except TypeError: break return vals else: key = struct.pack('<LL', docid, elemId) # now add in store key = str(self.storeHashReverse[rec.recordStore]) + "|" + key data = cxn.get(key) if data: return unpack(data) else: return [] def fetch_termById(self, session, index, termId): tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: self._openVectors(session, index) tidcxn = self.termIdCxn.get(index, None) termid = "%012d" % termId data = tidcxn.get(termid) if type(data) == tuple and data[0] == termid: data = data[1] if not data: return "" else: return data def _closeTermFreq(self, session, index, which): try: cxns = self.termFreqCxn[index] except KeyError: return try: cxns[which].close() except KeyError: return del self.termFreqCxn[index][which] def _openTermFreq(self, session, index, which): fl = index.get_setting(session, "freqList", "") if fl.find(which) == -1: return None cxns = self.termFreqCxn.get(index, {}) tfcxn = cxns.get(which, None) if tfcxn is not None: return tfcxn dfp = self.get_path(session, 'defaultPath') basename = self._generateFilename(index) dbname = os.path.join(dfp, basename) dbp = dbname + "_FREQ_" + which.upper() if self.vectorSwitching: vbt = self.get_setting(session, 'vectorBucketType', '') vmb = self.get_setting(session, 'vectorMaxBuckets', 0) vmi = self.get_setting(session, 'vectorMaxItemsPerBucket', 0) tfcxn = self.vectorSwitchingClass( session, self, dbp, bucketType=vbt, maxBuckets=vmb, vectorMaxItemsPerBucket=vmi ) else: tfcxn = bdb.db.DB() if which == 'rec': tfcxn.open(dbp) if cxns == {}: self.termFreqCxn[index] = {'rec': tfcxn} else: self.termFreqCxn[index]['rec'] = tfcxn elif which == 'occ': tfcxn.open(dbp) if cxns == {}: self.termFreqCxn[index] = {'occ': tfcxn} else: self.termFreqCxn[index]['occ'] = tfcxn return tfcxn def fetch_termFrequencies(self, session, index, mType='occ', start=0, nTerms=100, direction=">"): cxn = self._openTermFreq(session, index, mType) if cxn is None: return [] else: c = cxn.cursor() freqs = [] if start < 0: # go to end and reverse (t, v) = c.last() if start != -1: slot = int(t) + start + 1 (t, v) = c.set_range("%012d" % slot) elif start == 0: (t, v) = c.first() else: (t, v) = c.set_range("%012d" % start) if direction[0] == "<": next = c.prev else: next = c.next while len(freqs) < nTerms: (tid, fr) = struct.unpack('<ll', v) freqs.append((int(t), tid, fr)) try: (t, v) = next() except TypeError: break return freqs def fetch_indexMetadata(self, session, index): cxn = self._openMetadata(session) data = cxn.get(index.id.encode('utf-8')) if data: # LLLLLL: nTerms, nRecs, nOccs, maxRecs, maxOccs, totalChars keys = ('nTerms', 'nRecs', 'nOccs', 'maxRecs', 'maxOccs', 'totalChars') vals = struct.unpack('<LLLLLL', data) return dict(zip(keys, vals)) return vals else: return [] def create_term(self, session, index, termId, resultSet): # Take resultset and munge to index format, serialise, store term = resultSet.queryTerm unpacked = [] # only way to be sure totalRecs = resultSet.totalRecs totalOccs = resultSet.totalOccs for item in resultSet: unpacked.extend([item.id, self.storeHashReverse[item.recordStore], item.occurences]) packed = index.serialize_term(session, termId, unpacked, nRecs=totalRecs, nOccs=totalOccs) cxn = self._openIndex(session, index) cxn.put(term, packed) # NB: need to remember to close index manually def contains_index(self, session, index): # Send Index object, check exists, return boolean dfp = self.get_path(session, "defaultPath") name = self._generateFilename(index) return os.path.exists(os.path.join(dfp, name)) def _create(self, session, dbname, flags=[], vectorType=1): # for use by self.create_index if os.path.exists(dbname): raise FileAlreadyExistsException(dbname) if vectorType == 0 and self.switching: cxn = self.switchingClass(session, self, dbname) elif vectorType and self.vectorSwitching: dbp = dbname + "_VECTORS" vbt = self.get_setting(session, 'vectorBucketType', '') vmb = self.get_setting(session, 'vectorMaxBuckets', 0) vmi = self.get_setting(session, 'vectorMaxItemsPerBucket', 0) cxn = self.vectorSwitchingClass( session, self, dbp, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = bdb.db.DB() for f in flags: cxn.set_flags(f) try: cxn.open(dbname, dbtype=bdb.db.DB_BTREE, flags=bdb.db.DB_CREATE, mode=0660 ) except: raise ConfigFileException(dbname) else: cxn.close() def create_index(self, session, index): # Send Index object to create, null return p = self.permissionHandlers.get('info:srw/operation/1/create', None) if p: if not session.user: msg = ("Authenticated user required to create index in " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to create index in %s" % self.id raise PermissionException(msg) dfp = self.get_path(session, "defaultPath") if self.dirPerIndex: idp = os.path.join(dfp, index.id) if not os.path.exists(idp): os.mkdir(idp) name = self._generateFilename(index) fullname = os.path.join(dfp, name) try: self._create(session, fullname, vectorType=0) except FileAlreadyExistsException: pass if (index.get_setting(session, "sortStore")): try: self._create(session, fullname + "_VALUES") except FileAlreadyExistsException: pass vecs = index.get_setting(session, "vectors") tids = index.get_setting(session, "termIds") if vecs or tids: try: self._create(session, fullname + "_TERMIDS", flags=[bdb.db.DB_RECNUM]) except FileAlreadyExistsException: pass if vecs: try: try: self._create(session, fullname + "_VECTORS", flags=[bdb.db.DB_RECNUM]) except FileAlreadyExistsException: pass if index.get_setting(session, 'proxVectors'): try: self._create(session, fullname + "_PROXVECTORS", flags=[bdb.db.DB_RECNUM]) except FileAlreadyExistsException: pass except: raise(ValueError) fl = index.get_setting(session, "freqList", "") if fl: if fl.find('rec') > -1: try: self._create(session, fullname + "_FREQ_REC", flags=[bdb.db.DB_RECNUM]) except FileAlreadyExistsException: pass if fl.find('occ') > -1: try: self._create(session, fullname + "_FREQ_OCC", flags=[bdb.db.DB_RECNUM]) except FileAlreadyExistsException: pass return 1 def clear_index(self, session, index): self._closeIndex(session, index) self._closeVectors(session, index) self._closeTermFreq(session, index, 'rec') self._closeTermFreq(session, index, 'occ') for dbname in self._listExistingFiles(session, index): cxn = bdb.db.DB() cxn.remove(dbname) self.create_index(session, index) return None def delete_index(self, session, index): # Send Index object to delete, null return p = self.permissionHandlers.get('info:srw/operation/1/delete', None) if p: if not session.user: msg = ("Authenticated user required to delete index from " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to delete index from %s" % self.id raise PermissionException(msg) for dbname in self._listExistingFiles(session, index): os.remove(dbname) return 1 def _openSortStore(self, session, index): if (not index.get_setting(session, 'sortStore')): raise FileDoesNotExistException() cxn = self.sortStoreCxn.get(index, None) if cxn: return cxn dfp = self.get_path(session, "defaultPath") name = self._generateFilename(index) + "_VALUES" fullname = os.path.join(dfp, name) if self.vectorSwitching: pass else: cxn = bdb.db.DB() if session.environment == "apache": cxn.open(fullname, flags=bdb.db.DB_NOMMAP) else: cxn.open(fullname) self.sortStoreCxn[index] = cxn return cxn def fetch_sortValue(self, session, index, rec, lowest=True): try: cxn = self.sortStoreCxn[index] except: cxn = self._openSortStore(session, index) val = cxn.get("%s/%s" % (str(rec.recordStore), rec.id)) if val is None: val = cxn.get("%s/%s" % (str(rec.recordStore), rec.numericId)) if val is None: # Missing value return val # Split value at non-text token values = val.split('\0') if lowest: return values[0] else: return values[-1] def store_terms(self, session, index, terms, rec): # Store terms from hash # Need to store: # term, totalOccs, totalRecs, # (record id, recordStore id, number of occs in record) # hash is now {tokenA:tokenA, ...} p = self.permissionHandlers.get('info:srw/operation/2/index', None) if p: if not session.user: msg = ("Authenticated user required to add to indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to add to indexStore %s" % self.id raise PermissionException(msg) if (not terms): # No terms to index return storeid = rec.recordStore if (not type(storeid) in NumTypes): # Map if (storeid in self.storeHashReverse): storeid = self.storeHashReverse[storeid] else: # YYY: Error or metadata store? numeric_storeid = len(self.storeHash.keys()) self.storeHashReverse[storeid] = numeric_storeid self.storeHash[numeric_storeid] = storeid msg = ("indexStore %s does not recognise recordStore: " "%s" % (self.id, storeid)) raise ConfigFileException(msg) docid = rec.id if (not type(docid) in NumTypes): if (isinstance(docid, basestring) and docid.isdigit()): docid = long(docid) else: # Look up identifier in local bdb docid = self._get_internalId(session, rec) elif (docid == -1): # Unstored record raise ValueError(str(rec)) if index in self.outFiles: # Batch loading if (index in self.sortStoreCxn): # Concatenate lowest and highest values, separated by a # non-text tokento enable ascending and descending sort # Fetch existing values existingVal = self.sortStoreCxn[index].get( "%s/%s" % (str(rec.recordStore), docid) ) if existingVal: sortVals = existingVal.split('\0') else: sortVals = [] for valueHash in terms.itervalues(): if 'sortValue' in valueHash: sortVal = valueHash['sortValue'] else: sortVal = valueHash['text'] if type(sortVal) == unicode: sortVal = sortVal.encode('utf-8') sortVals.append(sortVal) # Sort the combined list sortVals.sort() # Concatenate lowest and highest values self.sortStoreCxn[index].put( "%s/%s" % (str(rec.recordStore), docid), sortVals[0] + '\0' + sortVals[-1] ) valueHash = terms.values()[0] value = valueHash['text'] prox = 'positions' in terms[value] for k in terms.values(): kw = k['text'] if type(kw) != unicode: try: try: kw = kw.decode('utf-8') except AttributeError: # kw is not a unicode object # Maybe it's an integer, e.g. wordCount, byteCount kw = unicode(kw).decode('utf-8') except: msg = u"%s failed to decode %s" % (self.id, repr(kw)) self.log_critical(session, msg.encode('utf-8')) raise self.outFiles[index].write(kw) # ensure that docids are sorted to numeric order lineList = ["", "%012d" % docid, str(storeid), str(k['occurences'])] if prox: # lineList.extend(map(str, k['positions'])) lineList.extend([str(x) for x in k['positions']]) self.outFiles[index].write(nonTextToken.join(lineList) + "\n") else: # Directly insert into index cxn = self._openIndex(session, index) # This is going to be ... slow ... with lots of i/o # Use commit method unless only doing very small amounts of work. for k in terms.values(): key = k['text'] stuff = [docid, storeid, k['occurences']] try: stuff.extend(k['positions']) except: pass val = cxn.get(key.encode('utf-8')) if (val is not None): current = index.deserialize_term(session, val) unpacked = index.merge_term(session, current, stuff, op="replace", nRecs=1, nOccs=k['occurences']) (termid, totalRecs, totalOccs) = unpacked[:3] unpacked = unpacked[3:] else: vecs = index.get_setting(session, "vectors") tids = index.get_setting(session, "termIds") tidcxn = None if vecs or tids: tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: self._openVectors(session, index) tidcxn = self.termIdCxn.get(index, None) if tidcxn is None: # Okay, no termid hash. hope for best with final set # of terms from regular index cursor = cxn.cursor() (term, value) = cursor.last( doff=0, dlen=(3 * index.longStructSize) ) (last, x, y) = index.deserialize_term(session, value) else: tidcursor = tidcxn.cursor() (finaltid, term) = tidcursor.last() last = long(finaltid) termid = last + 1 unpacked = stuff totalRecs = 1 totalOccs = k['occurences'] packed = index.serialize_term(session, termid, unpacked, nRecs=totalRecs, nOccs=totalOccs) cxn.put(key.encode('utf-8'), packed) self._closeIndex(session, index) def delete_terms(self, session, index, terms, rec): p = self.permissionHandlers.get('info:srw/operation/2/unindex', None) if p: if not session.user: msg = ("Authenticated user required to delete from indexStore" " %s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = ("Permission required to delete from indexStore " "%s" % self.id) raise PermissionException(msg) if not terms: return docid = rec.id # Hash if (isinstance(docid, basestring) and docid.isdigit()): docid = long(docid) elif (type(docid) in NumTypes): pass else: # Look up identifier in local bdb docid = self._get_internalId(session, rec) storeid = rec.recordStore if (not type(storeid) in NumTypes): # Map if (storeid in self.storeHashReverse): storeid = self.storeHashReverse[storeid] else: # YYY: Error or metadata store? self.storeHashReverse[storeid] = len(self.storeHash.keys()) self.storeHash[self.storeHashReverse[storeid]] = storeid storeid = self.storeHashReverse[storeid] msg = ("indexStore %s does not recognise recordStore: " "%s" % (self.id, storeid)) raise ConfigFileException(msg) # Directly insert into index cxn = self._openIndex(session, index) for k in terms.keys(): val = cxn.get(k.encode('utf-8')) if (val is not None): current = index.deserialize_term(session, val) gone = [docid, storeid, terms[k]['occurences']] unpacked = index.merge_term(session, current, gone, 'delete') if not unpacked[1]: # all terms deleted cxn.delete(k.encode('utf-8')) else: packed = index.serialize_term(session, current[0], unpacked[3:]) cxn.put(k.encode('utf-8'), packed) self._closeIndex(session, index) # NB: c.set_range('a', dlen=12, doff=0) # --> (key, 12bytestring) # --> unpack for termid, docs, occs def fetch_termList(self, session, index, term, nTerms=0, relation="", end="", summary=0, reverse=0): p = self.permissionHandlers.get('info:srw/operation/2/scan', None) if p: if not session.user: msg = ("Authenticated user required to scan indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to scan indexStore %s" % self.id raise PermissionException(msg) if (not (nTerms or relation or end)): nTerms = 20 if (not relation and not end): relation = ">=" if type(end) == unicode: end = end.encode('utf-8') if (not relation): if (term > end): relation = "<=" else: relation = ">" if reverse: dfp = self.get_path(session, "defaultPath") name = self._generateFilename(index) fullname = os.path.join(dfp, name) fullname += "_REVERSE" term = term[::-1] end = end[::-1] if self.switching: vbt = index.get_setting(session, 'bucketType', '') vmb = index.get_setting(session, 'maxBuckets', 0) vmi = index.get_setting(session, 'maxItemsPerBucket', 0) if vbt or vmb or vmi: cxn = self.switchingClass( session, self, fullname, bucketType=vbt, maxBuckets=vmb, maxItemsPerBucket=vmi ) else: cxn = self.switchingClass(session, self, fullname) else: cxn = bdb.db.DB() if session.environment == "apache": cxn.open(fullname, flags=bdb.db.DB_NOMMAP) else: cxn.open(fullname) else: cxn = self._openIndex(session, index) dataLen = index.longStructSize * self.reservedLongs c = cxn.cursor() if not term: # Special escape for first term (i.e. empty string) term = c.first()[0] else: term = term.encode('utf-8') try: if summary: (key, data) = c.set_range(term, dlen=dataLen, doff=0) else: (key, data) = c.set_range(term) except Exception as e: try: if summary: (key, data) = c.last(dlen=dataLen, doff=0) else: (key, data) = c.last() except TypeError: # Index is empty return [] if (relation in [">", ">="] and term > key): # Asked for > than maximum key return [] if (end and relation in ['>', '>='] and key > end): return [] elif (end and relation in ['<', '<='] and key < end): return [] tlist = [] fetching = 1 if (key == term and relation in ['>', '<']): pass elif (key > term and relation in ['<', '<=']): pass elif (key > term and relation in ['<', '<=']): pass else: unpacked = index.deserialize_term(session, data) if reverse: key = key[::-1] tlist.append([key.decode('utf-8'), unpacked]) if nTerms == 1: fetching = 0 while fetching: dir = relation[0] if (dir == ">"): if summary: tup = c.next(dlen=dataLen, doff=0) else: tup = c.next() else: if summary: tup = c.prev(dlen=dataLen, doff=0) else: tup = c.prev() if tup: (key, rec) = tup if (end and dir == '>' and key >= end): fetching = 0 elif (end and dir == "<" and key <= end): fetching = 0 else: unpacked = index.deserialize_term(session, rec) if reverse: key = key[::-1] tlist.append([key.decode('utf-8'), unpacked]) if (nTerms and len(tlist) == nTerms): fetching = 0 if dir == '<': fltup = c.prev(dlen=dataLen, doff=0) if not fltup: tlist[-1].append('first') else: fltup = c.next(dlen=dataLen, doff=0) if not fltup: tlist[-1].append('last') else: if tlist: if (dir == ">"): tlist[-1].append("last") else: tlist[-1].append("first") key = None fetching = 0 return tlist def construct_resultSetItem(self, session, recId, recStoreId, nOccs, rsiType="SimpleResultSetItem"): recStore = self.storeHash[recStoreId] if self.identifierMapCxn and recStore in self.identifierMapCxn: numericId = recId recId = self._get_externalId(session, recStore, numericId) else: numericId = None if rsiType == "SimpleResultSetItem": return SimpleResultSetItem(session, recId, recStore, nOccs, session.database, numeric=numericId) elif rsiType == "Hash": return ("%s/%s" % (recStore, recId), {"recordStore": recStore, "recordId": recId, "occurences": nOccs, "database": session.database}) else: raise NotImplementedError(rsiType) def fetch_term(self, session, index, term, summary=False, prox=True): p = self.permissionHandlers.get('info:srw/operation/2/search', None) if p: if not session.user: msg = ("Authenticated user required to search indexStore " "%s" % self.id) raise PermissionException(msg) okay = p.hasPermission(session, session.user) if not okay: msg = "Permission required to search indexStore %s" % self.id raise PermissionException(msg) unpacked = [] val = self._fetch_packed(session, index, term, summary) if (val is not None): try: unpacked = index.deserialize_term(session, val, prox=prox) except: msg = (u"{0} failed to deserialise {1} {2}" u"".format(self.id, index.id, term, val) ) self.log_critical(session, msg.encode('utf-8')) raise return unpacked def _fetch_packed(self, session, index, term, summary=False, numReq=0, start=0): try: term = term.encode('utf-8') except: pass cxn = self._openIndex(session, index) if summary: dataLen = index.longStructSize * self.reservedLongs val = cxn.get(term, doff=0, dlen=dataLen) elif (start != 0 or numReq != 0) and index.canExtractSection: # try to extract only section... fulllen = cxn.get_size(term) offsets = index.calc_sectionOffsets(session, start, numReq, fulllen) if not numReq: numReq = 100 # XXX this should be on index somewhere!! # first get summary vals = [] dataLen = index.longStructSize * self.reservedLongs val = cxn.get(term, doff=0, dlen=dataLen) vals.append(val) # now step through offsets, prolly only 1 for o in offsets: val = cxn.get(term, doff=o[0], dlen=o[1]) if len(o) > 2: val = o[2] + val if len(o) > 3: val = val + o[3] vals.append(val) return ''.join(vals) else: val = cxn.get(term) return val