From 8228a9616175b94ff0df5a9832184e5459c07c1a Mon Sep 17 00:00:00 2001
From: Dennis Kao <ulysseskao@gmail.com>
Date: Fri, 07 Mar 2014 02:10:56 +0800
Subject: [PATCH] update for increment and theme jobs

---
 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java |  131 +++++++++++++++++++++++++++++++++++++------
 1 files changed, 112 insertions(+), 19 deletions(-)

diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
index 8570254..2a904f2 100644
--- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
+++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
@@ -5,6 +5,7 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -51,7 +52,7 @@
     private static final String USEWKB = "USEWKB";
 
     private static final int FETCHSIZE = 30;
-    private static final int COMMITSIZE = 100;
+    private static final int COMMITSIZE = 10;
 
     protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
 
@@ -97,6 +98,7 @@
     @Override
     protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
         super.extractJobConfiguration(jobDetail);
+
         JobDataMap dataMap = jobDetail.getJobDataMap();
         _pgHost = dataMap.getString(PGHOST);
         _pgDatabase = dataMap.getString(PGDATBASE);
@@ -213,6 +215,7 @@
 
         // Log the time the job started
         logger.info(jobName + " fired at " + new Date());
+        extractJobConfiguration(jobDetail);
 
         createSourceDataStore();
         createTargetDataStore();
@@ -397,11 +400,14 @@
             return;
         }
 
-        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
+        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE = 0
         int exchangeCount = fetchExchangeCount(connection);
+        logger.info("exchangeCount=" + exchangeCount);
+
         try {
-            processIncrementElement(jobContext);
+            processIncrementElement(jobContext, exchangeCount);
             // jobContext.setCurrentSchema(querySchema);
+
         } finally {
         }
 
@@ -412,7 +418,7 @@
         Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
         ResultSet rs = null;
         StringBuilder sbSQL = new StringBuilder();
-        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0");
+        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE = 0");
 
         int size = -1;
         try {
@@ -433,14 +439,20 @@
         Element element;
     };
 
-    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException {
+    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext, int exchangeCount) throws SQLException {
         Connection connection = jobContext.getOracleConnection();
 
-        // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM
+        if (exchangeCount == 0) {
+            logger.info("GEO_EXCHANGE ELEMENT COUNT IS ZERO.");
+            return;
+        }
+
+        // SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM
         //  FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0
-        String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " +
-            "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0";
-        //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
+        String fetchSrcStmtFmt = "SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM " +
+            "FROM \"%s\".\"%s\" WHERE ISEXCHANGE = 0 ORDER BY UPDATETIME";
+        // String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\"
+        //      WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
         PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
         String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"});
         Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
@@ -448,23 +460,31 @@
         stmtSrc.setFetchSize(FETCHSIZE);
         ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
         int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
+        ArrayList<Integer> transIds = new ArrayList<Integer>();
+
+        int step = exchangeCount / 100;
+        int order = 0;
+        int current = 0;
+        jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 0);
+
         while (rsSrc.next()) {
             if (isProfileMode()) {
                 markQueryTime();
             }
             ElementTransactionContext xContext = new ElementTransactionContext();
-            xContext.oid = rsSrc.getInt(1);
-            xContext.cid = (short) rsSrc.getInt(2);
-            xContext.compid = (short) rsSrc.getInt(3);
-            xContext.occid = (short) rsSrc.getInt(4);
-            xContext.transcationType = rsSrc.getInt(5);
-            xContext.taskid = rsSrc.getInt(6);
+            xContext.transcationId = rsSrc.getInt(1);
+            xContext.oid = rsSrc.getInt(2);
+            xContext.cid = (short) rsSrc.getInt(3);
+            xContext.compid = (short) rsSrc.getInt(4);
+            xContext.occid = (short) rsSrc.getInt(5);
+            xContext.transcationType = rsSrc.getInt(6);
+            xContext.taskid = rsSrc.getInt(7);
 
             try {
-                if (xContext.transcationType > 2) {
+                if (xContext.transcationType <= 2) {
                     byte[] raw = null;
                     if (igdsMetaType == Types.BLOB) {
-                        BLOB blob = (BLOB) rsSrc.getBlob(7);
+                        BLOB blob = (BLOB) rsSrc.getBlob(8);
 
                         try {
                             raw = getBytesFromBLOB(blob);
@@ -474,7 +494,7 @@
                             // blob.close();
                         }
                     } else {
-                        raw = rsSrc.getBytes(7);
+                        raw = rsSrc.getBytes(8);
                     }
                     if (raw != null) {
                         Element element = fetchBinaryElement(raw);
@@ -487,15 +507,88 @@
                             accumulateQueryTime();
                         }
                     }
+                } else {
+                    xContext.element = null;
                 }
-                jobContext.putFeatureCollection(xContext);
+
+                if (xContext.transcationType > 1) {
+                    // remove first
+                }
+
+                jobContext.processFeatureContext(xContext);
+                transIds.add(xContext.transcationId);
+
             } catch (Dgn7fileException e) {
                 logger.warn("Dgn7Exception", e);
             }
+
+            if ((order % COMMITSIZE) == 0) {
+                // OracleConnection connection = jobContext.getOracleConnection();
+                // connection.commitTransaction();
+                jobContext.commitTransaction();
+                //jobContext.startTransaction();
+                System.gc();
+                System.runFinalization();
+            }
+
+            if (step != 0) {
+                int now = order % step;
+                if (now != current) {
+                    current = now;
+                    jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current);
+
+                }
+            } else {
+                jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current);
+                current++;
+            }
         }
+
+        jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 100);
+
+        jobContext.commitTransaction();
+        jobContext.resetFeatureContext();
 
         JDBCUtils.close(rsSrc);
         JDBCUtils.close(stmtSrc);
+
+        if (!transIds.isEmpty()) {
+            completeTransactionAction(connection, transIds);
+        }
+    }
+
+    private void completeTransactionAction(Connection connection, ArrayList<Integer> transIds) {
+        if (transIds.isEmpty()) return;
+
+        boolean autoCommit = true;
+        PreparedStatement statement = null;
+        try {
+            autoCommit = connection.getAutoCommit();
+            connection.setAutoCommit(false);
+            String sql = "UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=? WHERE ID=?";
+
+            statement = connection.prepareStatement(sql);
+            for (int id : transIds) {
+                statement.setInt((int) 1, 1);
+                statement.setInt((int) 2, id);
+                statement.executeUpdate();
+            }
+            connection.commit();
+        } catch (SQLException e) {
+            logger.warn(e.getMessage(), e);
+            try {
+                connection.rollback();
+            } catch (SQLException e1) {
+                logger.warn(e.getMessage(), e1);
+            }
+        } finally {
+            JDBCUtils.close(statement);
+            try {
+                connection.setAutoCommit(autoCommit);
+            } catch (SQLException e) {
+                logger.warn(e.getMessage(), e);
+            }
+        }
     }
 
     // Binary to Element

--
Gitblit v0.0.0-SNAPSHOT