Commit 304a9173 authored by Robert Butora's avatar Robert Butora
Browse files

separates DatasetsImpl -> DatasetsAmqp DatasetsCli

parent 72e09f2e
Loading
Loading
Loading
Loading
+16 −118
Original line number Diff line number Diff line
@@ -27,36 +27,32 @@ import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStrea

import vo.parameter.*;

class DatasetsImpl implements Datasets
class DatasetsAmqp implements Datasets
{
   static final Logger LOGGER = Logger.getLogger("DatasetsImpl");
   static final Logger LOGGER = Logger.getLogger(DatasetsAmqp.class.getName());

   private Settings    settings   = null;
   private Subsurvey[] subsurveys = null;
   private Cutout      cutout     = null;

   public DatasetsImpl()
   public DatasetsAmqp()
   {
      LOGGER.info("trace DatasetsImpl()");
      LOGGER.info("trace DatasetsAmqp()");
      this.settings = Settings.getInstance();
      cutout = new CutoutImpl(settings, subsurveys);
   }


   public DatasetsImpl(Settings settings)
   public DatasetsAmqp(Settings settings)
   {
      LOGGER.info("trace DatasetsImpl(settings)");
      LOGGER.info("trace DatasetsAmqp(settings)");
      this.settings = settings;
      this.cutout = new CutoutImpl(settings, subsurveys);
   }


   public DatasetsImpl(Settings settings, Subsurvey[] subsurveys)
   public DatasetsAmqp(Settings settings, Subsurvey[] subsurveys)
   {
      LOGGER.info("trace DatasetsImpl(settings, subsurveys)");
      LOGGER.info("trace DatasetsAmqp(settings, subsurveys)");
      this.settings = settings;
      this.subsurveys = subsurveys;
      this.cutout = new CutoutImpl(settings, subsurveys);
   }


@@ -79,114 +75,16 @@ class DatasetsImpl implements Datasets

         MCutResult mCutResult;

         if(settings.amqpConn.isHostnameEmpty())
         {
            LOGGER.info("doMCutout with CLI");
            CutArgs[] cutArgsArr = CutArgs.parseCutArgsArr(jdlJson);
            MCutResult.Cut[] cutResultArr = doCutouts( cutArgsArr );
            mCutResult = doCompressCutFiles( cutResultArr );
         }
         else
         {
         LOGGER.info("doMCutout over AMQP");
         String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys);
         LOGGER.info("doMCutout over AMQP : " + updatedJsonString);
         String outJson = doRpc( JdlMCutout.mcutoutToJson(updatedJsonString) );
         mCutResult = JdlMCutout.responseFromMCutoutJson(outJson);
         }

         return mCutResult;
      }



   /* ================= ALL ================================== */


   private MCutResult.Cut[] doCutouts(CutArgs[] cutArgsArr)
   {
      LOGGER.info("trace, count of cuts : " + String.valueOf( cutArgsArr.length ) );

      List<MCutResult.Cut> cutResList = new ArrayList<MCutResult.Cut>();

      int ix = 0;
      for(CutArgs cutArgs: cutArgsArr)
      {
         MCutResult.Cut cut = doFileByIdWithErr(cutArgs.id,
               cutArgs.pos, cutArgs.band, cutArgs.time,  cutArgs.pol,  cutArgs.pixels,
               cutArgs.countNullValues,  null);//cutArgs.extraCards);

         cut.index = ix++;

         LOGGER.info("cut" + String.valueOf(cut.index) + " : " + cut.content);

         cutResList.add(cut);
      }

      return cutResList.toArray(new MCutResult.Cut[0]);
   }


   // FIXME implement similar for Merge: MCutResult = call-Montage-demosaic-sequence(cutResultArr)
   private MCutResult doCompressCutFiles(MCutResult.Cut[] cutArr)
   {
      // FIXME do compression here
      for(MCutResult.Cut cut : cutArr)
      {
         LOGGER.info("TBD compress cut-id"+ String.valueOf(cut.index) + " -> " + cut.content);
      }

      MCutResult mCutResult = new MCutResult();
      mCutResult.cutResArr = cutArr;
      mCutResult.fileName = "filename.tar.gz"; // FIXME do-zip-all-cuts(cutResultArr)
      mCutResult.fileSize = 0;

         return mCutResult;
      }


   private MCutResult.Cut doFileByIdWithErr(String id, Pos pos, Band band, Time time, Pol pol, String pixels,
      boolean countNullValues, Subsurvey[] subsurveys)
   {
      LOGGER.info("trace");

      MCutResult mCutResult = new MCutResult();
      MCutResult.Cut cut = mCutResult.new Cut(/* FIXME eventually add here new Inputs(id, pos,..) */);

      try
      {
         CutResult cutResult = cutout.doFileById(id,
               pos,  band, time,  pol, pixels,
               countNullValues,  subsurveys);

         cut.content     = cutResult.fileName;
         cut.contentType = MCutResult.Cut.ContentType.FILENAME;
      }
      catch(MultiValuedParamNotSupported ex) 
      {
         cut.content = "MultiValuedParamNotSupported: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST;
         LOGGER.info(cut.content);
      }
      catch(IllegalArgumentException ex) 
      {
         cut.content = "IllegalArgumentException: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST;
         LOGGER.info(cut.content);
      }
      catch(Exception ex) 
      {
         cut.content     = "Exception: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.SERVICE_ERROR;
         LOGGER.info(cut.content);
         ex.printStackTrace();
      }

      return cut;
   }



   private String doRpc(String InStr)
   {
      LOGGER.info("trace");
@@ -231,6 +129,7 @@ class DatasetsImpl implements Datasets
   }


   /* ================= MERGE =============================== */

   private  String generateSubimgPathname(String relPathname, int hdunum)
   {
@@ -254,7 +153,6 @@ class DatasetsImpl implements Datasets
   }


   /* ================= MERGE =============================== */

   private CutResult cutout(
         String publisherDid, Coord coord,
@@ -397,7 +295,7 @@ class DatasetsImpl implements Datasets
      LOGGER.info("mergefiles_parallel()");

      String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
      for(String sentence : responseCH) DatasetsImpl.LOGGER.info("responseCmnHdr: " + sentence);
      for(String sentence : responseCH) DatasetsAmqp.LOGGER.info("responseCmnHdr: " + sentence);
      // check if response errored -> abort with 500: Internal Server Error & log details

      int threadsCount = filestomerge.length;
@@ -429,7 +327,7 @@ class DatasetsImpl implements Datasets
         }


         for(String sentence : reprojectArr[i].response) DatasetsImpl.LOGGER.info("response[" + String.valueOf(i) + "]: " + sentence);
         for(String sentence : reprojectArr[i].response) DatasetsAmqp.LOGGER.info("response[" + String.valueOf(i) + "]: " + sentence);
         if(!isResponseOk(reprojectArr[i].response))
         {
            ;// FIXME response incorrect -> abort merge-job, free resources
+174 −0
Original line number Diff line number Diff line

import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;

import java.time.Instant;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.file.StandardOpenOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import java.time.*;// Timestamp in cut-filename
import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream

import vo.parameter.*;

class DatasetsCli implements Datasets
{
   static final Logger LOGGER = Logger.getLogger("DatasetsCli");

   private Settings    settings   = null;
   private Subsurvey[] subsurveys = null;
   private Cutout      cutout     = null;

   public DatasetsCli()
   {
      LOGGER.info("trace DatasetsCli()");
      this.settings = Settings.getInstance();
      cutout = new CutoutImpl(settings, subsurveys);
   }


   public DatasetsCli(Settings settings)
   {
      LOGGER.info("trace DatasetsCli(settings)");
      this.settings = settings;
      this.cutout = new CutoutImpl(settings, subsurveys);
   }


   public DatasetsCli(Settings settings, Subsurvey[] subsurveys)
   {
      LOGGER.info("trace DatasetsCli(settings, subsurveys)");
      this.settings = settings;
      this.subsurveys = subsurveys;
      this.cutout = new CutoutImpl(settings, subsurveys);
   }




   public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues)
         throws FileNotFoundException, IOException
      {
         LOGGER.info("trace doMerge by CLI is NOT IMPLEMENTED (only by AMQP)");

         return new CutResult();
      }



   public MCutResult doMCutout(String jdlJson)
         throws IOException
      {
         LOGGER.info("trace");

         MCutResult mCutResult;

         CutArgs[] cutArgsArr = CutArgs.parseCutArgsArr(jdlJson);
         MCutResult.Cut[] cutResultArr = doCutouts( cutArgsArr );
         mCutResult = doCompressCutFiles( cutResultArr );

         return mCutResult;
      }



   private MCutResult.Cut[] doCutouts(CutArgs[] cutArgsArr)
   {
      LOGGER.info("trace, count of cuts : " + String.valueOf( cutArgsArr.length ) );

      List<MCutResult.Cut> cutResList = new ArrayList<MCutResult.Cut>();

      int ix = 0;
      for(CutArgs cutArgs: cutArgsArr)
      {
         MCutResult.Cut cut = doFileByIdWithErr(cutArgs.id,
               cutArgs.pos, cutArgs.band, cutArgs.time,  cutArgs.pol,  cutArgs.pixels,
               cutArgs.countNullValues,  null);//cutArgs.extraCards);

         cut.index = ix++;

         LOGGER.info("cut" + String.valueOf(cut.index) + " : " + cut.content);

         cutResList.add(cut);
      }

      return cutResList.toArray(new MCutResult.Cut[0]);
   }


   // FIXME implement similar for Merge: MCutResult = call-Montage-demosaic-sequence(cutResultArr)
   private MCutResult doCompressCutFiles(MCutResult.Cut[] cutArr)
   {
      // FIXME do compression here
      for(MCutResult.Cut cut : cutArr)
      {
         LOGGER.info("TBD compress cut-id"+ String.valueOf(cut.index) + " -> " + cut.content);
      }

      MCutResult mCutResult = new MCutResult();
      mCutResult.cutResArr = cutArr;
      mCutResult.fileName = "filename.tar.gz"; // FIXME do-zip-all-cuts(cutResultArr)
      mCutResult.fileSize = 0;

      return mCutResult;
   }


   private MCutResult.Cut doFileByIdWithErr(String id, Pos pos, Band band, Time time, Pol pol, String pixels,
         boolean countNullValues, Subsurvey[] subsurveys)
   {
      LOGGER.info("trace");

      MCutResult mCutResult = new MCutResult();
      MCutResult.Cut cut = mCutResult.new Cut(/* FIXME eventually add here new Inputs(id, pos,..) */);

      try
      {
         CutResult cutResult = cutout.doFileById(id,
               pos,  band, time,  pol, pixels,
               countNullValues,  subsurveys);

         cut.content     = cutResult.fileName;
         cut.contentType = MCutResult.Cut.ContentType.FILENAME;
      }
      catch(MultiValuedParamNotSupported ex) 
      {
         cut.content = "MultiValuedParamNotSupported: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST;
         LOGGER.info(cut.content);
      }
      catch(IllegalArgumentException ex) 
      {
         cut.content = "IllegalArgumentException: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST;
         LOGGER.info(cut.content);
      }
      catch(Exception ex) 
      {
         cut.content     = "Exception: " + ex.getMessage();
         cut.contentType = MCutResult.Cut.ContentType.SERVICE_ERROR;
         LOGGER.info(cut.content);
         ex.printStackTrace();
      }

      return cut;
   }

}
+4 −4
Original line number Diff line number Diff line
@@ -8,9 +8,9 @@ class Reproject implements Runnable
   String prefix;
   String fileName;
   String[] response;
   DatasetsImpl datasets;
   DatasetsAmqp datasets;

   public Reproject(DatasetsImpl datasets, String id, String prefix, String fileName)
   public Reproject(DatasetsAmqp datasets, String id, String prefix, String fileName)
   {
      this.datasets  = datasets;
      this.id        = id;
@@ -23,9 +23,9 @@ class Reproject implements Runnable
   public void run()
   {
      String name = Thread.currentThread().getName();
      DatasetsImpl.LOGGER.info("Start of " + name);
      DatasetsAmqp.LOGGER.info("Start of " + name);
      response = datasets.mergefiles_reproject(id, prefix, fileName);
      DatasetsImpl.LOGGER.info("End   of " + name);
      DatasetsAmqp.LOGGER.info("End   of " + name);
   }

}
+1 −2
Original line number Diff line number Diff line
@@ -40,8 +40,7 @@ public class ServletMCutout extends javax.servlet.http.HttpServlet
   private static final Logger   LOGGER   = Logger.getLogger(ServletMCutout.class.getName());
   private static final Settings settings = Settings.getInstance();

   protected Datasets datasets = new DatasetsImpl(settings);

   protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) );


   public void init() throws ServletException
+1 −2
Original line number Diff line number Diff line
@@ -47,8 +47,7 @@ public class ServletMerge extends javax.servlet.http.HttpServlet
   final String DEFAULT_SPEC_SYSTEM    = settings.defaults.specSystem;


   Datasets datasets = new DatasetsImpl(settings);

   protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) );


   public void init() throws ServletException
Loading