forked from geodmms/xdgnjobs

Dennis Kao
2013-12-04 98fc790f9ec053d5367d7ce5851ef66d5a7ced5b
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java
@@ -6,12 +6,15 @@
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PushbackReader;
import java.io.StringReader;
import java.math.BigDecimal;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -22,6 +25,7 @@
import java.util.Map;
import java.util.TreeMap;
import com.ximple.eofms.util.DefaultColorTable;
import org.apache.commons.collections.OrderedMap;
import org.apache.commons.collections.OrderedMapIterator;
import org.apache.commons.collections.map.LinkedMap;
@@ -34,6 +38,8 @@
import org.geotools.feature.SchemaException;
import org.geotools.jdbc.JDBCDataStore;
import org.opengis.feature.IllegalAttributeException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
@@ -79,6 +85,15 @@
    private static final int COMMITSIZE = 100;
    private static final String INDEXPATHNAME = "index";
    private static final String OTHERPATHNAME = "other";
    private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC";
    private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
    private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)";
    private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)";
    public static final String FDYNCOLOR_SUFFIX = "_fdyncolor";
    public static final String FOWNER_SUFFIX = "_fowner";
    protected static class Pair {
        Object first;
@@ -209,11 +224,12 @@
        }
        long t1 = System.currentTimeMillis();
        String targetSchemaName;
        String targetSchemaName, targetThemeTable;
        try {
            logger.info("-- step:clearOutputDatabase --");
            clearOutputDatabase();
            targetSchemaName = determineTargetSchemaName();
            targetThemeTable = determineTargetThemeTableName();
            if (checkConvertFile()) {
                logger.info("-- step:convertIndexDesignFile --");
@@ -234,11 +250,11 @@
            }
            OracleConvertPostGISJobContext jobContext = null;
            if (checkConvertDB()) {
                logger.info("-- step:convertOracleDB --");
                OracleConvertPostGISJobContext jobContext =
                    (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
                jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
                        isProfileMode(), isTransformed());
                jobContext.setSourceDataStore(getSourceDataStore());
                // jobContext.setConvertElementIn(_convertElementIn);
@@ -317,12 +333,32 @@
                createDummyFeatureFile(context);
            }
            updateRepoStatusToReady(targetSchemaName);
            if (checkConvertPWThemes()) {
                long tStep = System.currentTimeMillis();
                if (!convertPowerOwnerThemeWithCopyAPI(jobContext, targetThemeTable)) {
                    convertPowerOwnerTheme(jobContext, targetThemeTable);
                }
                if (isProfileMode()) {
                    long tStepEnd = System.currentTimeMillis();
                    logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd);
                }
                tStep = System.currentTimeMillis();
                if (!convertDynamicColorThemeWithCopyAPI(jobContext, targetThemeTable))
                    convertDynamicColorTheme(jobContext, targetThemeTable);
                if (isProfileMode()) {
                    long tStepEnd = System.currentTimeMillis();
                    logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd);
                }
            }
            updatePWThemeStatusToReady(targetThemeTable);
            long t2 = System.currentTimeMillis();
            // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
            // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
            logTimeDiff("Total ", t1, t2);
            updateRepoStatusToReady(targetSchemaName);
        } catch (SQLException e) {
            disconnect();
@@ -1174,12 +1210,12 @@
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XGVERSIONTABLE_NAME
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate)
                createXGeosVersionTable(connection, _pgSchema);
            rs.close();
            rs = null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vsschema, vsstatus FROM ");
@@ -1233,6 +1269,77 @@
        return targetSchema;
    }
    private String determineTargetThemeTableName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetTable = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XPTVERSIONTABLE_NAME
            needCreate = false;
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate)
                createXPWThemeVersionTable(connection, _pgSchema);
            rs.close();
            rs = null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vptname, vptstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vptid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vptname");
                values[1] = rs.getShort("vptstatus");
                tmpTablenames.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current == -1) {
                Object[] values = tmpTablenames.get(0);
                targetTable = (String) values[0];
            } else if (current < (tmpTablenames.size() - 1)) {
                Object[] values = tmpTablenames.get(current + 1);
                targetTable = (String) values[0];
            } else {
                Object[] values = tmpTablenames.get(0);
                targetTable = (String) values[0];
            }
            sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vptstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
            sbSQL.append(" WHERE vptname = '");
            sbSQL.append(targetTable).append("'");
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetTable + " update result count="
                    + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetTable;
    }
    public String encodeSchemaTableName(String schemaName, String tableName) {
        return "\"" + schemaName + "\".\"" + tableName + "\"";
    }
@@ -1275,6 +1382,42 @@
        }
    }
    private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException {
        Statement stmt = null;
        StringBuilder sql = new StringBuilder("CREATE TABLE ");
        sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
        sql.append(" ( vptid serial PRIMARY KEY, ");
        sql.append(" vptname character varying(64) NOT NULL, ");
        sql.append(" vptstatus smallint NOT NULL, ");
        sql.append(" vpttimestamp timestamp with time zone ) ");
        try {
            stmt = connection.createStatement();
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("ALTER TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
            sql.append(" OWNER TO ").append(_pgUsername);
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("GRANT ALL ON TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
            sql.append(" TO public");
            stmt.executeUpdate(sql.toString());
            for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) {
                sql = new StringBuilder("INSERT INTO ");
                sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
                sql.append(" (vptname, vptstatus) VALUES ('");
                sql.append(schemaName).append("', ");
                sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )");
                stmt.executeUpdate(sql.toString());
            }
        } finally {
            if (stmt != null) stmt.close();
        }
    }
    private void updateRepoStatusToReady(String targetSchema) {
        if (targetDataStore == null) return;
        Connection connection = null;
@@ -1287,6 +1430,38 @@
            sbSQL.append(" SET vsstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
            sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '");
            sbSQL.append(targetSchema).append("'");
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            stmt = connection.createStatement();
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetSchema + " update result count="
                    + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } catch (IOException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
    }
    private void updatePWThemeStatusToReady(String targetSchema) {
        if (targetDataStore == null) return;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        boolean needCreate = false;
        try {
            StringBuilder sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vptstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
            sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vptname = '");
            sbSQL.append(targetSchema).append("'");
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
@@ -1352,4 +1527,321 @@
    public final void resetQueryTime() {
        queryTime = 0;
    }
    private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertDynamicColorTheme");
            return;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        PreparedStatement pstmt = null;
        try {
            DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
            String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_COLORTAB);
            rs.setFetchSize(50);
            createOrClearTargetTable(connectionPG, targetTableName,
                "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
            pstmt = connectionPG.prepareStatement("INSERT INTO " +
                encodeSchemaTableName(_pgSchema, targetTableName) +
                " (tid, oid, dyncolor) VALUES (?, ?, ?)" );
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int colorId = rs.getInt(3);
                String colorText = colorTable.getColorCode(colorId);
                pstmt.setShort(1, (short) cid);
                pstmt.setInt(2, (int) oid);
                pstmt.setString(3, colorText);
                pstmt.addBatch();
                if (count % MAX_BATCHSIZE == 0) {
                    pstmt.executeBatch();
                }
                ++count;
            }
            pstmt.executeBatch();
            createTargetTableIndex(connectionPG, targetTableName);
            logger.info("Execute Update Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(pstmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
    }
    private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertPowerOwnerTheme");
            return;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        PreparedStatement pstmt = null;
        try {
            connectionPG.setAutoCommit(false);
            String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_CONNFDR);
            rs.setFetchSize(50);
            createOrClearTargetTable(connectionPG, targetTableName,
                "(tid smallint not null, oid int not null, fowner smallint not null)");
            pstmt = connectionPG.prepareStatement("INSERT INTO " +
                encodeSchemaTableName(_pgSchema, targetTableName) +
                " (tid, oid, fowner) VALUES (?, ?, ?)" );
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int ownerId = rs.getInt(3);
                pstmt.setShort(1, (short) cid);
                pstmt.setInt(2, (int) oid);
                pstmt.setShort(3, (short) ownerId);
                pstmt.addBatch();
                if (count % MAX_BATCHSIZE == 0) {
                    pstmt.executeBatch();
                }
                ++count;
            }
            pstmt.executeBatch();
            createTargetTableIndex(connectionPG, targetTableName);
            logger.info("Execute Update Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(pstmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
    }
    private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE");
            }
            stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void createTargetTableIndex(Connection connection, String tableName) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                    " ADD PRIMARY KEY (tid, oid)");
            }
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
        throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI");
            return false;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        if (!(connectionPG instanceof PGConnection)) {
            return false;
        }
        ResultSet rs = null;
        Statement stmt = null;
        try {
            DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
            String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
            String targetTempName = "tmp_" + targetTableName;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_COLORTAB);
            rs.setFetchSize(50);
            createOrClearTempTargetTable(connectionPG, targetTempName,
                "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
            StringBuilder sb = new StringBuilder();
            CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
            PushbackReader reader = new PushbackReader(new StringReader(""), 4096);
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int colorId = rs.getInt(3);
                String colorText = colorTable.getColorCode(colorId);
                sb.append(cid).append(',');
                sb.append(oid).append(",'");
                sb.append(colorText).append("'\n");
                if (count % MAX_BATCHSIZE == 0) {
                    reader.unread(sb.toString().toCharArray());
                    cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
                    sb.delete(0, sb.length());
                }
                ++count;
            }
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
        return true;
    }
    private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
        throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI");
            return false;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        try {
            connectionPG.setAutoCommit(false);
            String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
            String targetTempName = "tmp_" + targetTableName;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_CONNFDR);
            rs.setFetchSize(50);
            createOrClearTempTargetTable(connectionPG, targetTempName,
                "(tid smallint not null, oid int not null, fowner smallint not null)");
            StringBuilder sb = new StringBuilder();
            CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
            PushbackReader reader = new PushbackReader(new StringReader(""), 4096);
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int ownerId = rs.getInt(3);
                sb.append(cid).append(',');
                sb.append(oid).append(',');
                sb.append(ownerId).append('\n');
                if (count % MAX_BATCHSIZE == 0) {
                    reader.unread(sb.toString().toCharArray());
                    cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
                    sb.delete(0, sb.length());
                }
                ++count;
            }
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
        return true;
    }
    private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE");
            }
            stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                    " ADD PRIMARY KEY (tid, oid)");
            }
            stmt.execute("DROP TABLE " + tempTable);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
}