[metacat-cvs] metacat/src/edu/ucsb/nceas/metacat IndexingQueue.java

Saurabh Garg sgarg at ecoinformatics.org
Fri Nov 11 10:00:54 PST 2005


sgarg       05/11/11 10:00:54

  Modified:    src/edu/ucsb/nceas/metacat IndexingQueue.java
  Log:
  A version with less number of bugs.
  
  Revision  Changes    Path
  1.2       +99 -46    metacat/src/edu/ucsb/nceas/metacat/IndexingQueue.java
  
  Index: IndexingQueue.java
  ===================================================================
  RCS file: /cvs/metacat/src/edu/ucsb/nceas/metacat/IndexingQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- IndexingQueue.java	10 Nov 2005 22:13:04 -0000	1.1
  +++ IndexingQueue.java	11 Nov 2005 18:00:54 -0000	1.2
  @@ -7,8 +7,8 @@
    *    Release: @release@
    *
    *   '$Author: sgarg $'
  - *     '$Date: 2005/11/10 22:13:04 $'
  - * '$Revision: 1.1 $'
  + *     '$Date: 2005/11/11 18:00:54 $'
  + * '$Revision: 1.2 $'
    *
    * This program is free software; you can redistribute it and/or modify
    * it under the terms of the GNU General Public License as published by
  @@ -31,14 +31,18 @@
   import java.sql.ResultSet;
   import java.sql.SQLException;
   import java.util.Vector;
  +import java.util.HashMap;
  +import java.lang.Comparable;
   import edu.ucsb.nceas.metacat.MetaCatUtil;
   import org.apache.log4j.Logger;
   
   public class IndexingQueue {
   
   	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
  -	private Vector indexingQueue = new Vector();
  +	//	 Map used to keep tracks of docids to be indexed
  +	private HashMap indexingMap = new HashMap();     
   	private Vector currentThreads = new Vector();
  +	public Vector currentDocidsBeingIndexed = new Vector();
   	
   	private static IndexingQueue instance = null;
   
  @@ -60,33 +64,40 @@
   		return instance;
   	}//getInstance
   
  -    public void add(String docid) {
  -	    synchronized (indexingQueue) {
  -	    	indexingQueue.add(new IndexingQueueObject(0, docid));
  -	    	indexingQueue.notify();
  -	    }
  -	  }
  +    public void add(String docid, String rev) {
  +    	add(new IndexingQueueObject(docid, rev, 0));
  +    }
       
  -    public void add(IndexingQueueObject queueObject) {
  -	    synchronized (indexingQueue) {
  -	    	indexingQueue.add(queueObject);
  -	    	indexingQueue.notify();
  +    protected void add(IndexingQueueObject queueObject) {
  +	    synchronized (indexingMap) {
  +	    	if(!indexingMap.containsKey(queueObject.getDocid())){
  +	    		indexingMap.put(queueObject.getDocid(), queueObject);
  +	    		indexingMap.notify();
  +	    	} else {
  +	    		IndexingQueueObject oldQueueObject = 
  +	    			(IndexingQueueObject) indexingMap.get(queueObject.getDocid());
  +	    		if(oldQueueObject.compareTo(queueObject) < 0){
  +	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
  +		    		indexingMap.notify();  			
  +	    		}
  +	    	}
   	    }
   	  }
       
   	
       protected IndexingQueueObject getNext() {
       	IndexingQueueObject returnVal = null;
  -        synchronized (indexingQueue) {
  -          while (indexingQueue.isEmpty()) {
  +        synchronized (indexingMap) {
  +          while (indexingMap.isEmpty()) {
               try {
  -            	indexingQueue.wait();
  +            	indexingMap.wait();
               } catch (InterruptedException ex) {
                 System.err.println("Interrupted");
               }
             }
  -          returnVal = (IndexingQueueObject) indexingQueue.get(0);
  -          indexingQueue.remove(0);
  +          String docid = (String) indexingMap.keySet().iterator().next();
  +          returnVal = (IndexingQueueObject)indexingMap.get(docid);
  +          indexingMap.remove(docid);
           }
           return returnVal;
         }
  @@ -95,38 +106,51 @@
   
   class IndexingTask extends Thread {
     	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
  -      protected final long INDEXDELAY = 5000;
  +      protected final long MAXIMUMINDEXDELAY = Integer.
  +      	parseInt(MetaCatUtil.getOption("maximumIndexDelay"));;
   
   	  public void run() {
   	    while (true) {
   	      // blocks until job
   	      IndexingQueueObject returnVal = 
   	    	  IndexingQueue.getInstance().getNext();
  -	      String docid = returnVal.getDocid();
  -	      
  -	      try {
  -	        checkDocumentTable(docid);
  -	    	DocumentImpl doc = new DocumentImpl(docid, false);
  -	    	logMetacat.warn("Calling buildIndex for " + docid);
  -	    	doc.buildIndex();
  -	      } catch (Exception e) {
  -	        logMetacat.warn("Exception: " + e);
  -	        e.printStackTrace();
  +
  +    	  if(!IndexingQueue.getInstance().
  +	    			currentDocidsBeingIndexed.contains(returnVal.getDocid())){
  +    		  try {
  +    			  IndexingQueue.getInstance().
  +    			  		currentDocidsBeingIndexed.add(returnVal.getDocid());
  +    		      String docid = returnVal.getDocid() + "." + returnVal.getRev();
  +    			  checkDocumentTable(docid, "xml_documents");
  +    			  DocumentImpl doc = new DocumentImpl(docid, false);
  +    			  logMetacat.warn("Calling buildIndex for " + docid);
  +    			  doc.buildIndex();
  +    		  } catch (Exception e) {
  +    			  logMetacat.warn("Exception: " + e);
  +    			  e.printStackTrace();
   	        
  -	        if(returnVal.getCount() < 25){
  -	        	returnVal.setCount(returnVal.getCount()+1);
  -	        	// add the docid back to the list
  -	        	IndexingQueue.getInstance().add(docid);
  -	        } else {
  -	        	logMetacat.fatal("Docid " + returnVal.getDocid() 
  +    			  if(returnVal.getCount() < 25){
  +    				  returnVal.setCount(returnVal.getCount()+1);
  +    				  // add the docid back to the list
  +    				  IndexingQueue.getInstance().add(returnVal);
  +    			  } else {
  +    				  logMetacat.fatal("Docid " + returnVal.getDocid() 
   	        			+ " has been inserted to IndexingQueue "
  -	        			+ "more than 25 times.");
  -	        }
  -	      }
  +	        			+ "more than 25 times. Not adding the docid to"
  +	        			+ " the queue again.");
  +    			  }
  +    		  } finally {
  +    			  IndexingQueue.getInstance().currentDocidsBeingIndexed
  +	    	  			.remove(returnVal.getDocid());	    	  
  +    		  }
  +    	  } else {
  +				returnVal.setCount(returnVal.getCount()+1);
  +				IndexingQueue.getInstance().add(returnVal);    		  
  +    	  }
   	    }
   	  }
   	  
  -      private void checkDocumentTable(String docid) throws Exception{
  +      private void checkDocumentTable(String docid, String tablename) throws Exception{
   	        DBConnection dbConn = null;
   	        int serialNumber = -1;
   
  @@ -150,7 +174,7 @@
   	            boolean inxmldoc = false;
   	            long startTime = System.currentTimeMillis();
   	            while (!inxmldoc) {
  -	                String xmlDocumentsCheck = "select distinct docid from xml_documents"
  +	                String xmlDocumentsCheck = "select distinct docid from " + tablename
   	                        + " where docid ='"
   	                        + docid
   	                        + "' and "
  @@ -171,8 +195,9 @@
   	                xmlDocCheck.close();
   	                // make sure the while loop will be ended in reseaonable time
   	                long stopTime = System.currentTimeMillis();
  -	                if ((stopTime - startTime) > INDEXDELAY) { 
  -	                	logMetacat.warn("Couldn't find the docid:" + docid + " for indexing in "
  +	                if ((stopTime - startTime) > MAXIMUMINDEXDELAY) { 
  +	                	logMetacat.warn("Couldn't find the docid:" + docid 
  +	                			+ " for indexing in "
                                   + "reseaonable time!");
   	                	throw new Exception(
   	                        "Couldn't find the docid for index build in "
  @@ -188,15 +213,18 @@
   	    }
   	}
   
  -class IndexingQueueObject{
  +class IndexingQueueObject implements Comparable{
   	// the docid of the document to be indexed. 
   	private String docid;
  +	// the docid of the document to be indexed. 
  +	private String rev;
   	// the count of number of times the document has been in the queue
   	private int count;
   	
  -	IndexingQueueObject(int count, String docid){
  -		this.count = count;
  +	IndexingQueueObject(String docid, String rev, int count){
   		this.docid = docid;
  +		this.rev = rev;
  +		this.count = count;
   	}
   	
   	public int getCount(){
  @@ -207,11 +235,36 @@
   		return docid;
   	}
   
  +	public String getRev(){
  +		return rev;
  +	}
  +
   	public void setCount(int count){
   		this.count = count;
   	}
   	
   	public void setDocid(String docid){
   		this.docid = docid;
  +	}
  +
  +	public void setRev(String rev){
  +		this.rev = rev;
  +	}
  +	
  +	public int compareTo(Object o){
  +		if(o instanceof IndexingQueueObject){
  +			int revision = Integer.parseInt(rev);
  +			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
  +			
  +			if(revision == oRevision) {
  +				return 0;
  +			} else if (revision > oRevision) {
  +				return 1;
  +			} else {
  +				return -1;
  +			}
  +		} else {
  +			throw new java.lang.ClassCastException();
  +		}
   	}
   }
  
  
  


More information about the Metacat-cvs mailing list