Commit 7613f228 authored by gmantele's avatar gmantele
Browse files

[TAP] Fix bug in VOTable reading. The STIL consumer must be in a different...

[TAP] Fix bug in VOTable reading. The STIL consumer must be in a different thread. OnceRowPipe has been then replaced by a new internal static class re-doing the same work plus some adaptations, particularly to stop properly the stream reading before reaching the end of the VOTable.
parent 9b8fe1c9
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -55,7 +55,7 @@ import com.oreilly.servlet.multipart.ExceededSizeException;
 * </p>
 * 
 * @author Gr&eacute;gory Mantelet (ARI)
 * @version 2.0 (08/2014)
 * @version 2.0 (12/2014)
 * @since 2.0
 */
public class LimitedTableIterator implements TableIterator {
@@ -171,7 +171,7 @@ public class LimitedTableIterator implements TableIterator {
	}

	@Override
	public TAPColumn[] getMetadata(){
	public TAPColumn[] getMetadata() throws DataReadException{
		return innerIt.getMetadata();
	}

+4 −4
Original line number Diff line number Diff line
@@ -21,8 +21,8 @@ package tap.data;

import java.util.NoSuchElementException;

import adql.db.DBType;
import tap.metadata.TAPColumn;
import adql.db.DBType;

/**
 * <p>Let's iterate on each row and then on each column over a table dataset.</p>
@@ -50,8 +50,8 @@ import tap.metadata.TAPColumn;
 * 	}
 * </pre>
 * 
 * @author Gr&eacute;gory Mantelet (ARI) - gmantele@ari.uni-heidelberg.de
 * @version 2.0 (08/2014)
 * @author Gr&eacute;gory Mantelet (ARI)
 * @version 2.0 (12/2014)
 * @since 2.0
 */
public interface TableIterator {
@@ -70,7 +70,7 @@ public interface TableIterator {
	 * 
	 * @see #getColType()
	 */
	public TAPColumn[] getMetadata();
	public TAPColumn[] getMetadata() throws DataReadException;

	/**
	 * <p>Go to the next row if there is one.</p>
+364 −184
Original line number Diff line number Diff line
package tap.data;

/*
 * This file is part of TAPLibrary.
 * 
 * TAPLibrary is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * TAPLibrary is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License
 * along with TAPLibrary.  If not, see <http://www.gnu.org/licenses/>.
 * 
 * Copyright 2014 - Astronomisches Rechen Institut (ARI)
 */

import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;

import adql.db.DBType;
import tap.TAPException;
import tap.metadata.TAPColumn;
import tap.metadata.VotType;
import tap.metadata.VotType.VotDatatype;
import uk.ac.starlink.table.ColumnInfo;
import uk.ac.starlink.table.DescribedValue;
import uk.ac.starlink.table.OnceRowPipe;
import uk.ac.starlink.table.RowSequence;
import uk.ac.starlink.table.StarTable;
import uk.ac.starlink.table.StarTableFactory;
import uk.ac.starlink.table.TableBuilder;
import uk.ac.starlink.table.TableFormatException;
import uk.ac.starlink.table.TableSink;
import adql.db.DBType;

/**
 * <p>{@link TableIterator} which lets iterate over a VOTable input stream using STIL.</p>
@@ -42,63 +23,195 @@ import uk.ac.starlink.table.TableBuilder;
 * <p>{@link #getColType()} will return TAP type based on the type declared in the VOTable metadata part.</p>
 * 
 * @author Gr&eacute;gory Mantelet (ARI)
 * @version 2.0 (07/2014)
 * @version 2.0 (12/2014)
 * @since 2.0
 */
public class VOTableIterator implements TableIterator {

	/** Metadata of all columns identified before the iteration. */
	private final TAPColumn[] colMeta;
	/** Number of columns to read. */
	private final int nbColumns;
	/** Sequence of rows over which we must iterate. */
	private final RowSequence rowSeq;
	/**
	 * <p>This class lets consume the metadata and rows of a VOTable document.</p>
	 * 
	 * <p>
	 * 	On the contrary to a usual TableSink, this one will stop after each row until this row has been fetched by {@link VOTableIterator}.
	 * </p>
	 * 
	 * <p>
	 * 	Besides, the metadata returned by StarTable are immediately converted into TAP metadata. If this conversion fails, the error is kept
	 * 	in metaError, so that the VOTable reading can continue if the fact that metadata are missing is not a problem for the class using the
	 * 	{@link VOTableIterator}.
	 * </p> 
	 * 
	 * @author Gr&eacute;gory Mantelet (ARI)
	 * @version 2.0 (12/2014)
	 * @since 2.0
	 */
	protected static class StreamVOTableSink implements TableSink {

	/** Indicate whether the row iteration has already started. */
	private boolean iterationStarted = false;
	/** Indicate whether the last row has already been reached. */
		/** <p>The accepted VOTable metadata, after conversion from StarTable metadata.</p>
		 * <p><i>Note: this may be NULL after the metadata has been read if an error occurred while performing the conversion.
		 * In this case, metaError contains this error.</> */
		private TAPColumn[] meta = null;
		
		/** The error which happened while converting the StarTable metadata into TAP metadata. */
		private DataReadException metaError = null;
		
		/** The last accepted row. */
		private Object[] pendingRow = null;
		
		/** Flag meaning that the end of the stream has been reached
		 * OR if the VOTable reading should be stopped before reading more rows. */
		private boolean endReached = false;
	/** Index of the last read column (=0 just after {@link #nextRow()} and before {@link #nextCol()}, ={@link #nbColumns} after the last column has been read). */
	private int colIndex;
		
		/**
	 * Build a TableIterator able to read rows and columns inside the given VOTable input stream.
	 * 
	 * @param input	Input stream over a VOTable document.
		 * <p>Stop nicely reading the VOTable.</p>
		 * 
	 * @throws NullPointerException	If NULL is given in parameter.
	 * @throws DataReadException	If the given VOTable can not be parsed.
		 * <p>
		 * 	An exception will be thrown to the STILTS class using this TableSink,
		 * 	but no exception should be thrown to VOTableIterator.
		 * </p>
		 */
	public VOTableIterator(final InputStream input) throws DataReadException{
		// An input stream MUST BE provided:
		if (input == null)
			throw new NullPointerException("Missing VOTable document input stream over which to iterate!");
		public synchronized void stop(){
			endReached = true;
			notifyAll();
		}
		
		@Override
		public synchronized void acceptMetadata(final StarTable metaTable) throws TableFormatException {
			try{
				// Convert the StartTable metadata into TAP metadata:
				meta = extractColMeta(metaTable);
				
			// Set the VOTable builder/interpreter:
			TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable");
			}catch(DataReadException dre){
				// Save the error ; this error will be throw when a call to getMetadata() will be done:
				metaError = dre;
				
			// Set the TableSink to use in order to stream the data: 
			OnceRowPipe rowPipe = new OnceRowPipe();

			// Initiate the stream process:
			tb.streamStarTable(input, rowPipe, null);
			}finally{
				// Free all waiting threads:
				notifyAll();
			}
		}

			// Start by reading just the metadata:
			StarTable table = rowPipe.waitForStarTable();
		@Override
		public synchronized void acceptRow(final Object[] row) throws IOException {
			try{
				// Wait until the last accepted row has been consumed: 
				while(!endReached && pendingRow != null)
					wait();
				
				/* If the end has been reached, this is not normal
				 * (because endRows() is always called after acceptRow()...so, it means the iteration has been aborted before the end)
				 * and so the stream reading should be interrupted: */
				if (endReached)
					throw new IOException("Streaming aborted!");
				
				// Otherwise, keep the given row:
				pendingRow = row;

				/* Security for the cases where a row to accept is NULL.
				 * In such case, pendingRow will be set to NULL and the function getRow() will wait for ever.
				 * This case is not supposed to happen because the caller of acceptRow(...) should not give a NULL row...
				 * ...which should then mean that the end of the stream has been reached. */
				if (pendingRow == null)
					endReached = true;
				
			}catch(InterruptedException ie){
				/* If the thread has been interrupted, set this TableSink in a state similar to
				 * when the end of the stream has been reached: */
				pendingRow = null;
				endReached = true;
				
			}finally{
				// In all cases, all waiting threads must be freed:
				notifyAll();
			}
		}

			// Convert columns' information into TAPColumn object:
			colMeta = extractColMeta(table);
			nbColumns = colMeta.length;
		@Override
		public synchronized void endRows() throws IOException {
			// No more rows are available:
			pendingRow = null;
			// Set the END flag:
			endReached = true;
			// Notify all waiting threads that the end has been reached:
			notifyAll();
		}
		
			// Set the sequence of rows on which this iterator will iterate:
			rowSeq = table.getRowSequence();
		/**
		 * <p>Get the metadata found in the VOTable.</p>
		 * 
		 * <p><i>Note:
		 * 	This method is blocking until metadata are fully available by this TableSink
		 * 	or if an error occurred while converting them in TAP metadata.
		 * 	A Thread interruption will also make this function returning.
		 * </i></p>
		 * 
		 * @return The metadata found in the VOTable header.
		 * 
		 * @throws DataReadException	If the metadata can not be interpreted correctly.
		 */
		public synchronized TAPColumn[] getMeta() throws DataReadException{
			try{
				// Wait until metadata are available, or if an error has occurred while accepting them:
				while(metaError == null && meta == null)
					wait();
				
				// If there was an error while interpreting the accepted metadata, throw it:
				if (metaError != null)
					throw metaError;
				
				// Otherwise, just return the metadata:
				return meta;
				
			}catch(InterruptedException ie){
				/* If the thread has been interrupted, set this TableSink in a state similar to
				 * when the end of the stream has been reached: */
				endReached = true;
				/* Return the metadata ;
				 * NULL will be returned if the interruption has occurred before the real reading of the VOTable metadata: */
				return meta;
				
			}finally{
				// In all cases, the waiting threads must be freed:
				notifyAll();
			}
		}
		
		}catch(TAPException te){
			throw new DataReadException("Unexpected field datatype: " + te.getMessage(), te);
		}catch(Exception ex){
			throw new DataReadException("Unable to parse/read the given VOTable input stream!", ex);
		/**
		 * <p>Get the last accepted row.</p>
		 * 
		 * <p><i>Note:
		 * 	This function is blocking until a row has been accepted or the end of the stream has been reached.
		 * 	A Thread interruption will also make this function returning.
		 * </i></p>
		 * 
		 * @return
		 */
		public synchronized Object[] getRow() {
			try{
				// Wait until a row has been accepted or the end has been reached:
				while(!endReached && pendingRow == null)
					wait();
				
				// If there is no more rows, just return NULL (meaning for the called "end of stream"):
				if (endReached)
					return null;
				
				/* Otherwise, reset pendingRow to NULL in order to enable the reading of the next row,
				 * and finally return the last accepted row: */
				Object[] row = pendingRow;
				pendingRow = null;
				return row;
				
			}catch(InterruptedException ie){
				/* If the thread has been interrupted, set this TableSink in a state similar to
				 * when the end of the stream has been reached: */
				endReached = true;
				// Return NULL, meaning the end of the stream has been reached:
				return null;
				
			}finally {
				// In all cases, the waiting threads must be freed:
				notifyAll();
			}
		}
		
@@ -110,9 +223,9 @@ public class VOTableIterator implements TableIterator {
		 * 
		 * @return			The corresponding list of {@link TAPColumn} objects.
		 * 
	 * @throws TAPException	If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize).
		 * @throws DataReadException	If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize).
		 */
	private static final TAPColumn[] extractColMeta(final StarTable table) throws TAPException{
		protected TAPColumn[] extractColMeta(final StarTable table) throws DataReadException{
			// Count the number columns and initialize the array:
			TAPColumn[] columns = new TAPColumn[table.getColumnCount()];

@@ -131,7 +244,15 @@ public class VOTableIterator implements TableIterator {
				String xtype = getAuxDatumValue(colInfo, "xtype");

				// Resolve the field type:
			DBType type = resolveVotType(datatype, arraysize, xtype).toTAPType();
				DBType type;
				try{
					type = resolveVotType(datatype, arraysize, xtype).toTAPType();
				}catch(TAPException te){
					if (te instanceof DataReadException)
						throw (DataReadException)te;
					else
						throw new DataReadException(te.getMessage(), te);
				}

				// build the TAPColumn object:
				TAPColumn col = new TAPColumn(colInfo.getName(), type, colInfo.getDescription(), colInfo.getUnitString(), colInfo.getUCD(), colInfo.getUtype());
@@ -154,7 +275,7 @@ public class VOTableIterator implements TableIterator {
		 * 
		 * @return	The extracted value as String.
		 */
	private static final String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){
		protected String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){
			DescribedValue value = colInfo.getAuxDatumByName(auxDatumName);
			return (value != null) ? value.getValue().toString() : null;
		}
@@ -168,9 +289,9 @@ public class VOTableIterator implements TableIterator {
		 * 
		 * @return	The resolved VOTable field type, or a CHAR(*) type if the specified type can not be resolved.
		 * 
	 * @throws TAPException	If a field datatype is unknown.
		 * @throws DataReadException	If a field datatype is unknown.
		 */
	private static VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws TAPException{
		protected VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws DataReadException{
			// If no datatype is specified, return immediately a CHAR(*) type:
			if (datatype == null || datatype.trim().length() == 0)
				return new VotType(VotDatatype.CHAR, "*");
@@ -180,55 +301,96 @@ public class VOTableIterator implements TableIterator {
			try{
				votdatatype = VotDatatype.valueOf(datatype.toUpperCase());
			}catch(IllegalArgumentException iae){
			throw new TAPException("unknown field datatype: \"" + datatype + "\"");
				throw new DataReadException("unknown field datatype: \"" + datatype + "\"");
			}

			// Build the VOTable type:
			return new VotType(votdatatype, arraysize, xtype);
		}
		
	}
	
	/** Stream containing the VOTable on which this {@link TableIterator} is iterating. */
	protected final InputStream input;
	/** The StarTable consumer which is used to iterate on each row. */
	protected final StreamVOTableSink sink;

	/** Indicate whether the row iteration has already started. */
	protected boolean iterationStarted = false;
	/** Indicate whether the last row has already been reached. */
	protected boolean endReached = false;
	
	/** The last read row. Column iteration is done on this array. */
	protected Object[] row;
	/** Index of the last read column (=0 just after {@link #nextRow()} and before {@link #nextCol()}, ={@link #nbColumns} after the last column has been read). */
	protected int indCol = -1;
	/** Number of columns available according to the metadata. */
	protected int nbCol = 0;
	
	/**
	 * <p>Check the row iteration state. That's to say whether:</p>
	 * <ul>
	 * 	<li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li>
	 * 	<li>AND the row iteration is not finished = the last row has been read.</li>
	 * </ul>
	 * @throws IllegalStateException
	 * Build a TableIterator able to read rows and columns inside the given VOTable input stream.
	 * 
	 * @param input	Input stream over a VOTable document.
	 * 
	 * @throws NullPointerException	If NULL is given in parameter.
	 * @throws DataReadException	If the given VOTable can not be parsed.
	 */
	private void checkReadState() throws IllegalStateException{
		if (!iterationStarted)
			throw new IllegalStateException("No row has yet been read!");
		else if (endReached)
			throw new IllegalStateException("End of VOTable file already reached!");
	}
	public VOTableIterator(final InputStream input) throws DataReadException{
		// An input stream MUST BE provided:
		if (input == null)
			throw new NullPointerException("Missing VOTable document input stream over which to iterate!");
		this.input = input;
		
	@Override
	public void close() throws DataReadException{
		try{
			rowSeq.close();
		}catch(IOException ioe){
			throw new DataReadException("Can not close the iterated VOTable!", ioe);

			// Set the VOTable builder/interpreter:
			final TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable");

			// Build the TableSink to use:
			sink = new StreamVOTableSink();
			
			// Initiate the stream process:
			Thread streamThread = new Thread() {
                public void run() {
                    try{
            			tb.streamStarTable(input, sink, null);
                    }catch(IOException e) {
                    	if (e.getMessage() != null && !e.getMessage().equals("Reading interrupted!"))
                    		e.printStackTrace();
                    }
                }
            };
            streamThread.start();

		}catch(Exception ex){
			throw new DataReadException("Unable to parse/read the given VOTable input stream!", ex);
		}
	}

	@Override
	public TAPColumn[] getMetadata(){
		return colMeta;
	public TAPColumn[] getMetadata() throws DataReadException {
		return sink.getMeta();
	}

	@Override
	public boolean nextRow() throws DataReadException {
		try{
			// go to the next row:
			boolean rowFetched = rowSeq.next();
			endReached = !rowFetched;
			// prepare the iteration over its columns:
			colIndex = 0;
		// If no more rows, return false directly:
		if (endReached)
			return false;
		
		// Fetch the row:
		row = sink.getRow();
		
		// Reset the column iteration:
		if (!iterationStarted){
			iterationStarted = true;
			return rowFetched;
		}catch(IOException e){
			throw new DataReadException("Unable to read the next VOTable row!", e);
			nbCol = sink.getMeta().length;
		}
		indCol = 0;
		
		// Tells whether there is more rows or not:
		endReached = (row == null);
		return !endReached;
	}

	@Override
@@ -237,7 +399,7 @@ public class VOTableIterator implements TableIterator {
		checkReadState();

		// Determine whether the last column has been reached or not:
		return (colIndex < nbColumns);
		return (indCol < nbCol);
	}

	@Override
@@ -247,11 +409,7 @@ public class VOTableIterator implements TableIterator {
			throw new NoSuchElementException("No more field to read!");

		// Get the column value:
		try{
			return rowSeq.getCell(colIndex++);
		}catch(IOException se){
			throw new DataReadException("Can not read the value of the " + colIndex + "-th field!", se);
		}
		return row[indCol++];
	}

	@Override
@@ -260,13 +418,35 @@ public class VOTableIterator implements TableIterator {
		checkReadState();

		// Check deeper the read state (for columns iteration):
		if (colIndex <= 0)
		if (indCol <= 0)
			throw new IllegalStateException("No field has yet been read!");
		else if (colIndex > nbColumns)
		else if (indCol > nbCol)
			throw new IllegalStateException("All fields have already been read!");

		// Return the column type:
		return colMeta[colIndex - 1].getDatatype();
		return sink.getMeta()[indCol - 1].getDatatype();
	}

	@Override
	public void close() throws DataReadException {
		endReached = true;
		sink.stop();
		// input.close(); // in case sink.stop() is not enough to stop the VOTable reading!
	}

	/**
	 * <p>Check the row iteration state. That's to say whether:</p>
	 * <ul>
	 * 	<li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li>
	 * 	<li>AND the row iteration is not finished = the last row has been read.</li>
	 * </ul>
	 * @throws IllegalStateException
	 */
	protected void checkReadState() throws IllegalStateException{
		if (!iterationStarted)
			throw new IllegalStateException("No row has yet been read!");
		else if (endReached)
			throw new IllegalStateException("End of VOTable file already reached!");
	}

}