forked from geodmms/xdgnjobs

Dennis Kao
2014-03-07 8228a9616175b94ff0df5a9832184e5459c07c1a
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
@@ -5,6 +5,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -51,7 +52,7 @@
    private static final String USEWKB = "USEWKB";
    private static final int FETCHSIZE = 30;
    private static final int COMMITSIZE = 100;
    private static final int COMMITSIZE = 10;
    protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
@@ -97,6 +98,7 @@
    @Override
    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
        super.extractJobConfiguration(jobDetail);
        JobDataMap dataMap = jobDetail.getJobDataMap();
        _pgHost = dataMap.getString(PGHOST);
        _pgDatabase = dataMap.getString(PGDATBASE);
@@ -213,6 +215,7 @@
        // Log the time the job started
        logger.info(jobName + " fired at " + new Date());
        extractJobConfiguration(jobDetail);
        createSourceDataStore();
        createTargetDataStore();
@@ -397,11 +400,14 @@
            return;
        }
        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE = 0
        int exchangeCount = fetchExchangeCount(connection);
        logger.info("exchangeCount=" + exchangeCount);
        try {
            processIncrementElement(jobContext);
            processIncrementElement(jobContext, exchangeCount);
            // jobContext.setCurrentSchema(querySchema);
        } finally {
        }
@@ -412,7 +418,7 @@
        Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ResultSet rs = null;
        StringBuilder sbSQL = new StringBuilder();
        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0");
        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE = 0");
        int size = -1;
        try {
@@ -433,14 +439,20 @@
        Element element;
    };
    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException {
    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext, int exchangeCount) throws SQLException {
        Connection connection = jobContext.getOracleConnection();
        // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM
        if (exchangeCount == 0) {
            logger.info("GEO_EXCHANGE ELEMENT COUNT IS ZERO.");
            return;
        }
        // SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM
        //  FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0
        String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " +
            "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0";
        //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
        String fetchSrcStmtFmt = "SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM " +
            "FROM \"%s\".\"%s\" WHERE ISEXCHANGE = 0 ORDER BY UPDATETIME";
        // String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\"
        //      WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
        PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
        String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"});
        Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
@@ -448,23 +460,31 @@
        stmtSrc.setFetchSize(FETCHSIZE);
        ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
        int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
        ArrayList<Integer> transIds = new ArrayList<Integer>();
        int step = exchangeCount / 100;
        int order = 0;
        int current = 0;
        jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 0);
        while (rsSrc.next()) {
            if (isProfileMode()) {
                markQueryTime();
            }
            ElementTransactionContext xContext = new ElementTransactionContext();
            xContext.oid = rsSrc.getInt(1);
            xContext.cid = (short) rsSrc.getInt(2);
            xContext.compid = (short) rsSrc.getInt(3);
            xContext.occid = (short) rsSrc.getInt(4);
            xContext.transcationType = rsSrc.getInt(5);
            xContext.taskid = rsSrc.getInt(6);
            xContext.transcationId = rsSrc.getInt(1);
            xContext.oid = rsSrc.getInt(2);
            xContext.cid = (short) rsSrc.getInt(3);
            xContext.compid = (short) rsSrc.getInt(4);
            xContext.occid = (short) rsSrc.getInt(5);
            xContext.transcationType = rsSrc.getInt(6);
            xContext.taskid = rsSrc.getInt(7);
            try {
                if (xContext.transcationType > 2) {
                if (xContext.transcationType <= 2) {
                    byte[] raw = null;
                    if (igdsMetaType == Types.BLOB) {
                        BLOB blob = (BLOB) rsSrc.getBlob(7);
                        BLOB blob = (BLOB) rsSrc.getBlob(8);
                        try {
                            raw = getBytesFromBLOB(blob);
@@ -474,7 +494,7 @@
                            // blob.close();
                        }
                    } else {
                        raw = rsSrc.getBytes(7);
                        raw = rsSrc.getBytes(8);
                    }
                    if (raw != null) {
                        Element element = fetchBinaryElement(raw);
@@ -487,15 +507,88 @@
                            accumulateQueryTime();
                        }
                    }
                } else {
                    xContext.element = null;
                }
                jobContext.putFeatureCollection(xContext);
                if (xContext.transcationType > 1) {
                    // remove first
                }
                jobContext.processFeatureContext(xContext);
                transIds.add(xContext.transcationId);
            } catch (Dgn7fileException e) {
                logger.warn("Dgn7Exception", e);
            }
            if ((order % COMMITSIZE) == 0) {
                // OracleConnection connection = jobContext.getOracleConnection();
                // connection.commitTransaction();
                jobContext.commitTransaction();
                //jobContext.startTransaction();
                System.gc();
                System.runFinalization();
            }
            if (step != 0) {
                int now = order % step;
                if (now != current) {
                    current = now;
                    jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current);
                }
            } else {
                jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current);
                current++;
            }
        }
        jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 100);
        jobContext.commitTransaction();
        jobContext.resetFeatureContext();
        JDBCUtils.close(rsSrc);
        JDBCUtils.close(stmtSrc);
        if (!transIds.isEmpty()) {
            completeTransactionAction(connection, transIds);
        }
    }
    private void completeTransactionAction(Connection connection, ArrayList<Integer> transIds) {
        if (transIds.isEmpty()) return;
        boolean autoCommit = true;
        PreparedStatement statement = null;
        try {
            autoCommit = connection.getAutoCommit();
            connection.setAutoCommit(false);
            String sql = "UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=? WHERE ID=?";
            statement = connection.prepareStatement(sql);
            for (int id : transIds) {
                statement.setInt((int) 1, 1);
                statement.setInt((int) 2, id);
                statement.executeUpdate();
            }
            connection.commit();
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
            try {
                connection.rollback();
            } catch (SQLException e1) {
                logger.warn(e.getMessage(), e1);
            }
        } finally {
            JDBCUtils.close(statement);
            try {
                connection.setAutoCommit(autoCommit);
            } catch (SQLException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
    // Binary to Element