Dennis Kao
2014-03-10 bc926822dddd05d55678696999e3b8b0fd415570
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import com.ximple.eofms.geoserver.config.XGeosDataConfig;
@@ -39,6 +40,7 @@
import org.geotools.jdbc.JDBCDataStore;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -110,6 +112,8 @@
    private long queryTime = 0;
    private long queryTimeStart = 0;
    private String currentThemeTable = null;
    private Short currentThemeStatus = -1;
    public Log getLogger() {
        return logger;
@@ -124,6 +128,52 @@
    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
        super.extractJobConfiguration(jobDetail);
        JobDataMap dataMap = jobDetail.getJobDataMap();
        _pgHost = dataMap.getString(PGHOST);
        _pgDatabase = dataMap.getString(PGDATBASE);
        _pgPort = dataMap.getString(PGPORT);
        _pgSchema = dataMap.getString(PGSCHEMA);
        _pgUsername = dataMap.getString(PGUSER);
        _pgPassword = dataMap.getString(PGPASS);
        _pgUseWKB = dataMap.getString(USEWKB);
        Log logger = getLogger();
        if (_pgHost == null) {
            logger.warn("PGHOST is null");
            throw new JobExecutionException("Unknown PostGIS host.");
        }
        if (_pgDatabase == null) {
            logger.warn("PGDATABASE is null");
            throw new JobExecutionException("Unknown PostGIS database.");
        }
        if (_pgPort == null) {
            logger.warn("PGPORT is null");
            throw new JobExecutionException("Unknown PostGIS port.");
        }
        if (_pgSchema == null) {
            logger.warn("PGSCHEMA is null");
            throw new JobExecutionException("Unknown PostGIS schema.");
        }
        if (_pgUsername == null) {
            logger.warn("PGUSERNAME is null");
            throw new JobExecutionException("Unknown PostGIS username.");
        }
        if (_pgPassword == null) {
            logger.warn("PGPASSWORD is null");
            throw new JobExecutionException("Unknown PostGIS password.");
        }
        Map<String, String> remote = new TreeMap<String, String>();
        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
        // remote.put("charset", "UTF-8");
        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
        // remote.put( "namespace", null);
        pgProperties = remote;
    }
    protected XGeosDataConfigMapping getConfigMapping() {
@@ -177,17 +227,15 @@
        }
        long t1 = System.currentTimeMillis();
        String targetSchemaName, targetThemeTable;
        String targetThemeTable;
        try {
            logger.info("-- step:clearOutputDatabase --");
            targetSchemaName = determineTargetSchemaName();
            targetThemeTable = determineTargetThemeTableName();
            OracleConvertPostGISJobContext jobContext = null;
            if (checkConvertPWThemes()) {
                jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
                targetThemeTable = determineTargetThemeTableName();
                jobContext = (OracleConvertPostGISJobContext) prepareJobContext("public", _filterPath,
                        isProfileMode(), isTransformed());
                jobContext.setSourceDataStore(getSourceDataStore());
                jobContext.setElementLogging(checkElementLogging());
@@ -209,9 +257,9 @@
                    logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd);
                }
                jobContext.closeOracleConnection();
            }
            updatePWThemeStatusToReady(targetThemeTable);
                updatePWThemeStatusToReady(targetThemeTable);
            }
            long t2 = System.currentTimeMillis();
            // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
@@ -228,13 +276,19 @@
        logger.warn(jobName + " end at " + new Date());
        createTargetDataStore();
        if (getTargetDataStore() == null) {
            logger.warn("Cannot connect source postgreSQL database.");
            throw new JobExecutionException("Cannot connect source postgreSQL database.");
        }
        try {
            logger.info("-- step:resetPostgisViewMapping --");
            logger.info("-- step:resetThemesViewMapping --");
            long tStep = System.currentTimeMillis();
            resetThemesViewMapping(context);
            if (isProfileMode()) {
                long tStepEnd = System.currentTimeMillis();
                logTimeDiff("Profile-resetPostgisViewMapping", tStep, tStepEnd);
                logTimeDiff("Profile-resetThemesViewMapping", tStep, tStepEnd);
            }
            logger.info("-- step:resetGeoServerConfig --");
            tStep = System.currentTimeMillis();
@@ -253,7 +307,7 @@
     *
     * @param executionContext 批次執行的關係
     */
    private void resetThemesViewMapping(JobExecutionContext executionContext) {
    private void resetThemesViewMapping(JobExecutionContext executionContext) throws JobExecutionException {
        assert executionContext != null;
        Connection connection = null;
        try {
@@ -273,43 +327,18 @@
            resetThemesBaseView(connection, ownerName, currentTargetThemesName);
            XGeosDataConfigMapping configMapping = getConfigMapping();
            String[] allView = retrieveTargetStoreAllViewNames(connection);
            TreeSet<String> allViewNames = new TreeSet<String>();
            if (allView != null) {
                allViewNames.addAll(Arrays.asList(allView));
            }
            List values = (List) configMapping.getMapping().get("pgOMS");
            for (Object value : values) {
                XGeosDataConfig xgeosConfig = (XGeosDataConfig) value;
                short tid = xgeosConfig.getFSC();
                short cid = xgeosConfig.getCOMP();
                StringBuilder sbTable = new StringBuilder("fsc-");
                sbTable.append(tid).append("-c-");
                sbTable.append(cid);
                int index = realTableNames.indexOf(sbTable.toString());
                if (index == -1) {
                    logger.debug("pgOMS LayerView Cannot found-" + xgeosConfig.toString());
                    continue;
                }
                StringBuilder sbView = new StringBuilder("fsc-");
                sbView.append(tid).append("-c");
                sbView.append(cid).append("-l");
                sbView.append(xgeosConfig.getLEV()).append("-w");
                sbView.append(xgeosConfig.getWEIGHT());
                String viewName = sbView.toString();
                if (allViewNames.contains(viewName)) {
                    resetThemesPostgisDataView(connection, ownerName, null, viewName);
                    if (tid == 106) {
                        resetFlowThemesPostgisDataView(connection, ownerName, null, viewName);
                    }
                }
            if (currentThemeTable == null) {
                transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY,
                    DataReposVersionManager.VSSTATUS_LINKVIEW, false);
            } else {
                transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY,
                    currentThemeStatus, true);
            }
            /*
            updateCurrentThemeStatus(connection, currentTargetThemesName,
                DataReposVersionManager.VSSTATUS_LINKVIEW);
            */
            // String[] featureNames = dataStore.getTypeNames();
            // logger.info("featureNames[] size = " + featureNames.length);
@@ -572,12 +601,12 @@
        // connection.commit();
    }
    private Timestamp retrieveCurrentSchemaTimestamp(Connection connection, short status) throws SQLException {
        StringBuilder sbSQL = new StringBuilder("SELECT vstimestamp, vsschema, vsstatus FROM ");
        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME);
        sbSQL.append(" WHERE vsstatus = ");
    private Timestamp retrieveCurrentThemeTimestamp(Connection connection, short status) throws SQLException {
        StringBuilder sbSQL = new StringBuilder("SELECT vpttimestamp, vptname, vptstatus FROM ");
        sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME);
        sbSQL.append(" WHERE vptstatus = ");
        sbSQL.append(status);
        sbSQL.append(" ORDER BY vsid");
        sbSQL.append(" ORDER BY vptid");
        Timestamp result = null;
        Statement stmt = null;
@@ -597,66 +626,14 @@
        }
    }
    private void updateCurrentRepositoryStatus(Connection connection, String schemaName, short newStatus)
            throws SQLException {
        StringBuilder sbSQL = new StringBuilder("UPDATE ");
        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME).append(' ');
        sbSQL.append(" SET vsstatus = ");
        sbSQL.append(newStatus);
        sbSQL.append(", vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '");
        sbSQL.append(schemaName).append("'");
        Statement stmt = null;
        try {
            stmt = connection.createStatement();
            stmt.executeUpdate(sbSQL.toString());
        } finally {
            if (stmt != null) stmt.close();
        }
    }
    private boolean checkCurrentRepositoryStatus(Connection connection, short status) {
        try {
            return (retrieveCurrentSchemaName(connection, status) != null);
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
            return false;
        }
    }
    private String retrieveCurrentSchemaName(Connection connection, short status) throws SQLException {
        StringBuilder sbSQL = new StringBuilder("SELECT vsschema, vstimestamp, vsstatus FROM ");
        sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME);
        sbSQL.append(" WHERE vsstatus = ");
        sbSQL.append(status);
        sbSQL.append(" ORDER BY vsid");
        String result = null;
        Statement stmt = null;
        ResultSet rs = null;
        try {
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            // get first result
            if (rs.next()) {
                result = rs.getString(1);
            }
            return result;
        } finally {
            if (rs != null) rs.close();
            if (stmt != null) stmt.close();
        }
    }
    private void updateCurrentThemeStatus(Connection connection, String schemaName, short newStatus)
    private void updateCurrentThemeStatus(Connection connection, String themeTableName, short newStatus)
            throws SQLException {
        StringBuilder sbSQL = new StringBuilder("UPDATE ");
        sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME).append(' ');
        sbSQL.append(" SET vptstatus = ");
        sbSQL.append(newStatus);
        sbSQL.append(", vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '");
        sbSQL.append(schemaName).append("'");
        sbSQL.append(themeTableName).append("'");
        Statement stmt = null;
        try {
@@ -677,11 +654,12 @@
        }
    }
    private String retrieveCurrentThemeName(Connection connection, short status) throws SQLException {
        StringBuilder sbSQL = new StringBuilder("SELECT ");
        sbSQL.append("vptname, vptstatus, vpttimestamp FROM ");
        sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
        sbSQL.append("vptname, vpttimestamp, vptstatus FROM ");
        sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
        sbSQL.append(" WHERE vptstatus = ");
        sbSQL.append(status);
        sbSQL.append("ORDER BY vptid");
        String result = null;
@@ -702,11 +680,10 @@
        }
    }
    protected String[] retrieveTargetStoreAllViewNames(Connection connection) {
        try {
            final int TABLE_NAME_COL = 3;
            List list = new ArrayList();
            List<String> list = new ArrayList<String>();
            DatabaseMetaData meta = connection.getMetaData();
            // String[] tableType = { "TABLE", "VIEW" };
@@ -755,12 +732,6 @@
            pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
        }
        /*
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
        }
        */
        if (!dataStoreFactory.canProcess(pgProperties)) {
            getLogger().warn("cannot process properties-");
            throw new JobExecutionException("cannot process properties-");
@@ -781,76 +752,6 @@
        }
    }
    private String determineTargetSchemaName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetSchema = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XGVERSIONTABLE_NAME
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate) {
                rs.close();
                return null;
            }
            rs.close();
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vsschema, vsstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vsid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vsschema");
                values[1] = rs.getShort("vsstatus");
                tmpSchemas.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current == -1) {
                Object[] values = tmpSchemas.get(0);
                targetSchema = (String) values[0];
            } else if (current < (tmpSchemas.size() - 1)) {
                Object[] values = tmpSchemas.get(current + 1);
                targetSchema = (String) values[0];
            } else {
                Object[] values = tmpSchemas.get(0);
                targetSchema = (String) values[0];
            }
            sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vsstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
            sbSQL.append(" WHERE vsschema = '");
            sbSQL.append(targetSchema).append("'");
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetSchema + " update result count="
                    + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetSchema;
    }
    private String determineTargetThemeTableName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
@@ -864,10 +765,10 @@
            needCreate = false;
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate)
            if (needCreate) {
                createXPWThemeVersionTable(connection, _pgSchema);
            }
            rs.close();
            rs = null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
@@ -884,8 +785,10 @@
                values[0] = rs.getString("vptname");
                values[1] = rs.getShort("vptstatus");
                tmpTablenames.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                if (((Short) values[1]) >= DataReposVersionManager.VSSTATUS_LINKVIEW) {
                    current = i;
                    currentThemeTable = (String) values[0];
                    currentThemeStatus = (Short) values[1];
                }
                i++;
            }
@@ -988,7 +891,7 @@
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, dyncolor");
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
@@ -1044,7 +947,7 @@
                long oid = rs.getLong(2);
                int ownerId = rs.getInt(3);
                short dirId = (short) rs.getInt(4);
                String flowMark = null;
                String flowMark;
                ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
                if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
                        (ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
@@ -1079,7 +982,7 @@
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, fowner, flow");
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
@@ -1195,15 +1098,35 @@
        }
    }
    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException {
    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable,
                                                   String fields) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
            boolean found = false;
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                    " ADD PRIMARY KEY (tid, oid)");
                found = true;
            }
            JDBCUtils.close(rs);
            if (!found) {
                stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
                rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
                if (rs.next()) {
                    stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                        " ADD PRIMARY KEY (tid, oid)");
                }
            } else {
                stmt.execute("TRUNCATE "+ tableName + " CASCADE");
                stmt.execute("INSERT INTO " + tableName + "(" + fields + ") SELECT " +
                    fields + " FROM " + tempTable);
                /*
                --insert into xpwtheme1_fdyncolor (tid, oid, dyncolor) select tid, oid, dyncolor from xpwtheme2_fdyncolor;
                --reindex table xpwtheme1_fdyncolor;
                --alter table xpwtheme1_fdyncolor drop constraint xpwtheme1_fdyncolor_pkey;
                --alter table xpwtheme1_fdyncolor ADD PRIMARY KEY (tid, oid);
                 */
            }
            stmt.execute("DROP TABLE " + tempTable);
        } finally {
@@ -1357,4 +1280,46 @@
            if (stmt != null) stmt.close();
        }
    }
    protected void transferThemesVersionStatus(Connection connection,
                                       short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException {
        try {
            String currentTargetTheme = retrieveCurrentThemeName(connection, vsstatusBefore);
            if (currentTargetTheme == null) {
                logger.info("Cannot found target schema in dataStore. status=" + vsstatusBefore);
                return;
            }
            String existTargetSchema = null;
            if (exclusive)
                existTargetSchema = retrieveCurrentThemeName(connection, vsstatusAfter);
            updateCurrentThemeStatus(connection, currentTargetTheme, vsstatusAfter);
            if ((exclusive) && (existTargetSchema != null)) {
                updateCurrentThemeStatus(connection, existTargetSchema,
                    DataReposVersionManager.VSSTATUS_AVAILABLE);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
            throw new JobExecutionException("Update " + DataReposVersionManager.XPTVERSIONTABLE_NAME +
                " has error-", e);
        }
    }
    protected void transferThemesVersionStatus(short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException {
        if (targetDataStore == null) return;
        Connection connection = null;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            transferThemesVersionStatus(connection, vsstatusBefore, vsstatusAfter, exclusive);
        } catch (IOException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
    }
}