From bc926822dddd05d55678696999e3b8b0fd415570 Mon Sep 17 00:00:00 2001
From: Dennis Kao <ulysseskao@gmail.com>
Date: Mon, 10 Mar 2014 12:28:24 +0800
Subject: [PATCH] update for increment and theme jobs

---
 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java |  351 ++++++++++++++++++++++++++--------------------------------
 1 files changed, 158 insertions(+), 193 deletions(-)

diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java
index 6fd990d..8cdc8ab 100644
--- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java
+++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java
@@ -17,6 +17,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import com.ximple.eofms.geoserver.config.XGeosDataConfig;
@@ -39,6 +40,7 @@
 import org.geotools.jdbc.JDBCDataStore;
 import org.postgresql.PGConnection;
 import org.postgresql.copy.CopyManager;
+import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -110,6 +112,8 @@
 
     private long queryTime = 0;
     private long queryTimeStart = 0;
+    private String currentThemeTable = null;
+    private Short currentThemeStatus = -1;
 
     public Log getLogger() {
         return logger;
@@ -124,6 +128,52 @@
 
     protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
         super.extractJobConfiguration(jobDetail);
+
+        JobDataMap dataMap = jobDetail.getJobDataMap();
+        _pgHost = dataMap.getString(PGHOST);
+        _pgDatabase = dataMap.getString(PGDATBASE);
+        _pgPort = dataMap.getString(PGPORT);
+        _pgSchema = dataMap.getString(PGSCHEMA);
+        _pgUsername = dataMap.getString(PGUSER);
+        _pgPassword = dataMap.getString(PGPASS);
+        _pgUseWKB = dataMap.getString(USEWKB);
+
+        Log logger = getLogger();
+        if (_pgHost == null) {
+            logger.warn("PGHOST is null");
+            throw new JobExecutionException("Unknown PostGIS host.");
+        }
+        if (_pgDatabase == null) {
+            logger.warn("PGDATABASE is null");
+            throw new JobExecutionException("Unknown PostGIS database.");
+        }
+        if (_pgPort == null) {
+            logger.warn("PGPORT is null");
+            throw new JobExecutionException("Unknown PostGIS port.");
+        }
+        if (_pgSchema == null) {
+            logger.warn("PGSCHEMA is null");
+            throw new JobExecutionException("Unknown PostGIS schema.");
+        }
+        if (_pgUsername == null) {
+            logger.warn("PGUSERNAME is null");
+            throw new JobExecutionException("Unknown PostGIS username.");
+        }
+        if (_pgPassword == null) {
+            logger.warn("PGPASSWORD is null");
+            throw new JobExecutionException("Unknown PostGIS password.");
+        }
+
+        Map<String, String> remote = new TreeMap<String, String>();
+        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
+        // remote.put("charset", "UTF-8");
+        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
+        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
+        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
+        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
+        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
+        // remote.put( "namespace", null);
+        pgProperties = remote;
     }
 
     protected XGeosDataConfigMapping getConfigMapping() {
@@ -177,17 +227,15 @@
         }
 
         long t1 = System.currentTimeMillis();
-        String targetSchemaName, targetThemeTable;
+        String targetThemeTable;
         try {
             logger.info("-- step:clearOutputDatabase --");
-
-            targetSchemaName = determineTargetSchemaName();
-            targetThemeTable = determineTargetThemeTableName();
 
             OracleConvertPostGISJobContext jobContext = null;
 
             if (checkConvertPWThemes()) {
-                jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
+                targetThemeTable = determineTargetThemeTableName();
+                jobContext = (OracleConvertPostGISJobContext) prepareJobContext("public", _filterPath,
                         isProfileMode(), isTransformed());
                 jobContext.setSourceDataStore(getSourceDataStore());
                 jobContext.setElementLogging(checkElementLogging());
@@ -209,9 +257,9 @@
                     logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd);
                 }
                 jobContext.closeOracleConnection();
-            }
 
-            updatePWThemeStatusToReady(targetThemeTable);
+                updatePWThemeStatusToReady(targetThemeTable);
+            }
 
             long t2 = System.currentTimeMillis();
             // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
@@ -228,13 +276,19 @@
         logger.warn(jobName + " end at " + new Date());
 
 
+        createTargetDataStore();
+        if (getTargetDataStore() == null) {
+            logger.warn("Cannot connect source postgreSQL database.");
+            throw new JobExecutionException("Cannot connect source postgreSQL database.");
+        }
+
         try {
-            logger.info("-- step:resetPostgisViewMapping --");
+            logger.info("-- step:resetThemesViewMapping --");
             long tStep = System.currentTimeMillis();
             resetThemesViewMapping(context);
             if (isProfileMode()) {
                 long tStepEnd = System.currentTimeMillis();
-                logTimeDiff("Profile-resetPostgisViewMapping", tStep, tStepEnd);
+                logTimeDiff("Profile-resetThemesViewMapping", tStep, tStepEnd);
             }
             logger.info("-- step:resetGeoServerConfig --");
             tStep = System.currentTimeMillis();
@@ -253,7 +307,7 @@
      *
      * @param executionContext 批次執行的關係
      */
-    private void resetThemesViewMapping(JobExecutionContext executionContext) {
+    private void resetThemesViewMapping(JobExecutionContext executionContext) throws JobExecutionException {
         assert executionContext != null;
         Connection connection = null;
         try {
@@ -273,43 +327,18 @@
 
             resetThemesBaseView(connection, ownerName, currentTargetThemesName);
 
-            XGeosDataConfigMapping configMapping = getConfigMapping();
-            String[] allView = retrieveTargetStoreAllViewNames(connection);
-            TreeSet<String> allViewNames = new TreeSet<String>();
-            if (allView != null) {
-                allViewNames.addAll(Arrays.asList(allView));
-            }
-            List values = (List) configMapping.getMapping().get("pgOMS");
-            for (Object value : values) {
-                XGeosDataConfig xgeosConfig = (XGeosDataConfig) value;
-                short tid = xgeosConfig.getFSC();
-                short cid = xgeosConfig.getCOMP();
-                StringBuilder sbTable = new StringBuilder("fsc-");
-                sbTable.append(tid).append("-c-");
-                sbTable.append(cid);
-
-                int index = realTableNames.indexOf(sbTable.toString());
-                if (index == -1) {
-                    logger.debug("pgOMS LayerView Cannot found-" + xgeosConfig.toString());
-                    continue;
-                }
-
-                StringBuilder sbView = new StringBuilder("fsc-");
-                sbView.append(tid).append("-c");
-                sbView.append(cid).append("-l");
-                sbView.append(xgeosConfig.getLEV()).append("-w");
-                sbView.append(xgeosConfig.getWEIGHT());
-                String viewName = sbView.toString();
-                if (allViewNames.contains(viewName)) {
-                    resetThemesPostgisDataView(connection, ownerName, null, viewName);
-                    if (tid == 106) {
-                        resetFlowThemesPostgisDataView(connection, ownerName, null, viewName);
-                    }
-                }
+            if (currentThemeTable == null) {
+                transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY,
+                    DataReposVersionManager.VSSTATUS_LINKVIEW, false);
+            } else {
+                transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY,
+                    currentThemeStatus, true);
             }
 
+            /*
             updateCurrentThemeStatus(connection, currentTargetThemesName,
                 DataReposVersionManager.VSSTATUS_LINKVIEW);
+            */
 
             // String[] featureNames = dataStore.getTypeNames();
             // logger.info("featureNames[] size = " + featureNames.length);
@@ -572,12 +601,12 @@
         // connection.commit();
     }
 
-    private Timestamp retrieveCurrentSchemaTimestamp(Connection connection, short status) throws SQLException {
-        StringBuilder sbSQL = new StringBuilder("SELECT vstimestamp, vsschema, vsstatus FROM ");
-        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME);
-        sbSQL.append(" WHERE vsstatus = ");
+    private Timestamp retrieveCurrentThemeTimestamp(Connection connection, short status) throws SQLException {
+        StringBuilder sbSQL = new StringBuilder("SELECT vpttimestamp, vptname, vptstatus FROM ");
+        sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME);
+        sbSQL.append(" WHERE vptstatus = ");
         sbSQL.append(status);
-        sbSQL.append(" ORDER BY vsid");
+        sbSQL.append(" ORDER BY vptid");
 
         Timestamp result = null;
         Statement stmt = null;
@@ -597,66 +626,14 @@
         }
     }
 
-    private void updateCurrentRepositoryStatus(Connection connection, String schemaName, short newStatus)
-            throws SQLException {
-        StringBuilder sbSQL = new StringBuilder("UPDATE ");
-        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME).append(' ');
-        sbSQL.append(" SET vsstatus = ");
-        sbSQL.append(newStatus);
-        sbSQL.append(", vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '");
-        sbSQL.append(schemaName).append("'");
-
-        Statement stmt = null;
-        try {
-            stmt = connection.createStatement();
-            stmt.executeUpdate(sbSQL.toString());
-        } finally {
-            if (stmt != null) stmt.close();
-        }
-    }
-
-    private boolean checkCurrentRepositoryStatus(Connection connection, short status) {
-        try {
-            return (retrieveCurrentSchemaName(connection, status) != null);
-        } catch (SQLException e) {
-            logger.warn(e.getMessage(), e);
-            return false;
-        }
-    }
-
-    private String retrieveCurrentSchemaName(Connection connection, short status) throws SQLException {
-        StringBuilder sbSQL = new StringBuilder("SELECT vsschema, vstimestamp, vsstatus FROM ");
-        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME);
-        sbSQL.append(" WHERE vsstatus = ");
-        sbSQL.append(status);
-        sbSQL.append(" ORDER BY vsid");
-
-        String result = null;
-        Statement stmt = null;
-        ResultSet rs = null;
-
-        try {
-            stmt = connection.createStatement();
-            rs = stmt.executeQuery(sbSQL.toString());
-            // get first result
-            if (rs.next()) {
-                result = rs.getString(1);
-            }
-            return result;
-        } finally {
-            if (rs != null) rs.close();
-            if (stmt != null) stmt.close();
-        }
-    }
-
-    private void updateCurrentThemeStatus(Connection connection, String schemaName, short newStatus)
+    private void updateCurrentThemeStatus(Connection connection, String themeTableName, short newStatus)
             throws SQLException {
         StringBuilder sbSQL = new StringBuilder("UPDATE ");
         sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME).append(' ');
         sbSQL.append(" SET vptstatus = ");
         sbSQL.append(newStatus);
         sbSQL.append(", vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '");
-        sbSQL.append(schemaName).append("'");
+        sbSQL.append(themeTableName).append("'");
 
         Statement stmt = null;
         try {
@@ -677,11 +654,12 @@
         }
     }
 
-
     private String retrieveCurrentThemeName(Connection connection, short status) throws SQLException {
         StringBuilder sbSQL = new StringBuilder("SELECT ");
-        sbSQL.append("vptname, vptstatus, vpttimestamp FROM ");
-        sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
+        sbSQL.append("vptname, vpttimestamp, vptstatus FROM ");
+        sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
+        sbSQL.append(" WHERE vptstatus = ");
+        sbSQL.append(status);
         sbSQL.append("ORDER BY vptid");
 
         String result = null;
@@ -702,11 +680,10 @@
         }
     }
 
-
     protected String[] retrieveTargetStoreAllViewNames(Connection connection) {
         try {
             final int TABLE_NAME_COL = 3;
-            List list = new ArrayList();
+            List<String> list = new ArrayList<String>();
 
             DatabaseMetaData meta = connection.getMetaData();
             // String[] tableType = { "TABLE", "VIEW" };
@@ -755,12 +732,6 @@
             pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
         }
 
-        /*
-        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
-            pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
-        }
-        */
-
         if (!dataStoreFactory.canProcess(pgProperties)) {
             getLogger().warn("cannot process properties-");
             throw new JobExecutionException("cannot process properties-");
@@ -781,76 +752,6 @@
         }
     }
 
-    private String determineTargetSchemaName() throws IOException {
-        if (targetDataStore == null) return null;
-        Connection connection = null;
-        Statement stmt = null;
-        ResultSet rs = null;
-        String targetSchema = null;
-        boolean needCreate = false;
-        try {
-            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
-            // Create XGVERSIONTABLE_NAME
-            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
-            if (!rs.next()) needCreate = true;
-            if (needCreate) {
-                rs.close();
-                return null;
-            }
-            rs.close();
-
-            StringBuilder sbSQL = new StringBuilder("SELECT ");
-            sbSQL.append("vsschema, vsstatus FROM ");
-            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
-            sbSQL.append("ORDER BY vsid");
-            stmt = connection.createStatement();
-            rs = stmt.executeQuery(sbSQL.toString());
-            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
-            int i = 0;
-            int current = -1;
-            while (rs.next()) {
-                Object[] values = new Object[2];
-                values[0] = rs.getString("vsschema");
-                values[1] = rs.getShort("vsstatus");
-                tmpSchemas.add(values);
-                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
-                    current = i;
-                }
-                i++;
-            }
-
-            if (current == -1) {
-                Object[] values = tmpSchemas.get(0);
-                targetSchema = (String) values[0];
-            } else if (current < (tmpSchemas.size() - 1)) {
-                Object[] values = tmpSchemas.get(current + 1);
-                targetSchema = (String) values[0];
-            } else {
-                Object[] values = tmpSchemas.get(0);
-                targetSchema = (String) values[0];
-            }
-
-            sbSQL = new StringBuilder("UPDATE ");
-            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
-            sbSQL.append(" SET vsstatus = ");
-            sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
-            sbSQL.append(" WHERE vsschema = '");
-            sbSQL.append(targetSchema).append("'");
-            int count = stmt.executeUpdate(sbSQL.toString());
-            if (count != 1) {
-                logger.info("update status for " + targetSchema + " update result count="
-                    + count);
-            }
-        } catch (SQLException e) {
-            logger.warn(e.getMessage(), e);
-        } finally {
-            JDBCUtils.close(rs);
-            JDBCUtils.close(stmt);
-            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
-        }
-        return targetSchema;
-    }
-
     private String determineTargetThemeTableName() throws IOException {
         if (targetDataStore == null) return null;
         Connection connection = null;
@@ -864,10 +765,10 @@
             needCreate = false;
             rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"});
             if (!rs.next()) needCreate = true;
-            if (needCreate)
+            if (needCreate) {
                 createXPWThemeVersionTable(connection, _pgSchema);
+            }
             rs.close();
-
             rs = null;
 
             StringBuilder sbSQL = new StringBuilder("SELECT ");
@@ -884,8 +785,10 @@
                 values[0] = rs.getString("vptname");
                 values[1] = rs.getShort("vptstatus");
                 tmpTablenames.add(values);
-                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
+                if (((Short) values[1]) >= DataReposVersionManager.VSSTATUS_LINKVIEW) {
                     current = i;
+                    currentThemeTable = (String) values[0];
+                    currentThemeStatus = (Short) values[1];
                 }
                 i++;
             }
@@ -988,7 +891,7 @@
 
             reader.unread(sb.toString().toCharArray());
             cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
-            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
+            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, dyncolor");
 
             logger.info("Execute Copy Count=" + count);
         } catch (SQLException e) {
@@ -1044,7 +947,7 @@
                 long oid = rs.getLong(2);
                 int ownerId = rs.getInt(3);
                 short dirId = (short) rs.getInt(4);
-                String flowMark = null;
+                String flowMark;
                 ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
                 if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
                         (ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
@@ -1079,7 +982,7 @@
 
             reader.unread(sb.toString().toCharArray());
             cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
-            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
+            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, fowner, flow");
 
             logger.info("Execute Copy Count=" + count);
         } catch (SQLException e) {
@@ -1195,15 +1098,35 @@
         }
     }
 
-    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException {
+    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable,
+                                                   String fields) throws SQLException {
         Statement stmt = connection.createStatement();
         ResultSet rs = null;
         try {
-            stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
+            boolean found = false;
             rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
             if (rs.next()) {
-                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
-                    " ADD PRIMARY KEY (tid, oid)");
+                found = true;
+            }
+            JDBCUtils.close(rs);
+
+            if (!found) {
+                stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
+                rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
+                if (rs.next()) {
+                    stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
+                        " ADD PRIMARY KEY (tid, oid)");
+                }
+            } else {
+                stmt.execute("TRUNCATE "+ tableName + " CASCADE");
+                stmt.execute("INSERT INTO " + tableName + "(" + fields + ") SELECT " +
+                    fields + " FROM " + tempTable);
+                /*
+                --insert into xpwtheme1_fdyncolor (tid, oid, dyncolor) select tid, oid, dyncolor from xpwtheme2_fdyncolor;
+                --reindex table xpwtheme1_fdyncolor;
+                --alter table xpwtheme1_fdyncolor drop constraint xpwtheme1_fdyncolor_pkey;
+                --alter table xpwtheme1_fdyncolor ADD PRIMARY KEY (tid, oid);
+                 */
             }
             stmt.execute("DROP TABLE " + tempTable);
         } finally {
@@ -1357,4 +1280,46 @@
             if (stmt != null) stmt.close();
         }
     }
+
+    protected void transferThemesVersionStatus(Connection connection,
+                                       short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException {
+
+        try {
+            String currentTargetTheme = retrieveCurrentThemeName(connection, vsstatusBefore);
+            if (currentTargetTheme == null) {
+                logger.info("Cannot found target schema in dataStore. status=" + vsstatusBefore);
+                return;
+            }
+            String existTargetSchema = null;
+            if (exclusive)
+                existTargetSchema = retrieveCurrentThemeName(connection, vsstatusAfter);
+
+
+            updateCurrentThemeStatus(connection, currentTargetTheme, vsstatusAfter);
+            if ((exclusive) && (existTargetSchema != null)) {
+                updateCurrentThemeStatus(connection, existTargetSchema,
+                    DataReposVersionManager.VSSTATUS_AVAILABLE);
+            }
+        } catch (SQLException e) {
+            logger.warn(e.getMessage(), e);
+            throw new JobExecutionException("Update " + DataReposVersionManager.XPTVERSIONTABLE_NAME +
+                " has error-", e);
+        }
+    }
+
+    protected void transferThemesVersionStatus(short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException {
+
+        if (targetDataStore == null) return;
+        Connection connection = null;
+
+        try {
+            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
+
+            transferThemesVersionStatus(connection, vsstatusBefore, vsstatusAfter, exclusive);
+        } catch (IOException e) {
+            logger.warn(e.getMessage(), e);
+        } finally {
+            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
+        }
+    }
 }

--
Gitblit v0.0.0-SNAPSHOT