From 90e7660cac64a949ac7d4e71302ac3509e8ce5a5 Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Tue, 08 Apr 2014 15:32:49 +0800 Subject: [PATCH] Merge branch 'origin/2.1.x' --- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java | 131 +++++++++++++++++++++++++++++++++++++------ 1 files changed, 112 insertions(+), 19 deletions(-) diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java index 8570254..2a904f2 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java @@ -5,6 +5,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -51,7 +52,7 @@ private static final String USEWKB = "USEWKB"; private static final int FETCHSIZE = 30; - private static final int COMMITSIZE = 100; + private static final int COMMITSIZE = 10; protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); @@ -97,6 +98,7 @@ @Override protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDATBASE); @@ -213,6 +215,7 @@ // Log the time the job started logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); @@ -397,11 +400,14 @@ return; } - // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 + // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE = 0 int exchangeCount = fetchExchangeCount(connection); + logger.info("exchangeCount=" + exchangeCount); + try { - processIncrementElement(jobContext); + processIncrementElement(jobContext, exchangeCount); // jobContext.setCurrentSchema(querySchema); + } finally { } @@ -412,7 +418,7 @@ Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = null; StringBuilder sbSQL = new StringBuilder(); - sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0"); + sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE = 0"); int size = -1; try { @@ -433,14 +439,20 @@ Element element; }; - private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException { + private void processIncrementElement(OracleIncrementPostGISJobContext jobContext, int exchangeCount) throws SQLException { Connection connection = jobContext.getOracleConnection(); - // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM + if (exchangeCount == 0) { + logger.info("GEO_EXCHANGE ELEMENT COUNT IS ZERO."); + return; + } + + // SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM // FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0 - String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " + - "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0"; - //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; + String fetchSrcStmtFmt = "SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM " + + "FROM \"%s\".\"%s\" WHERE ISEXCHANGE = 0 ORDER BY UPDATETIME"; + // String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" + // WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); @@ -448,23 +460,31 @@ stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); int igdsMetaType = rsSrc.getMetaData().getColumnType(1); + ArrayList<Integer> transIds = new ArrayList<Integer>(); + + int step = exchangeCount / 100; + int order = 0; + int current = 0; + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 0); + while (rsSrc.next()) { if (isProfileMode()) { markQueryTime(); } ElementTransactionContext xContext = new ElementTransactionContext(); - xContext.oid = rsSrc.getInt(1); - xContext.cid = (short) rsSrc.getInt(2); - xContext.compid = (short) rsSrc.getInt(3); - xContext.occid = (short) rsSrc.getInt(4); - xContext.transcationType = rsSrc.getInt(5); - xContext.taskid = rsSrc.getInt(6); + xContext.transcationId = rsSrc.getInt(1); + xContext.oid = rsSrc.getInt(2); + xContext.cid = (short) rsSrc.getInt(3); + xContext.compid = (short) rsSrc.getInt(4); + xContext.occid = (short) rsSrc.getInt(5); + xContext.transcationType = rsSrc.getInt(6); + xContext.taskid = rsSrc.getInt(7); try { - if (xContext.transcationType > 2) { + if (xContext.transcationType <= 2) { byte[] raw = null; if (igdsMetaType == Types.BLOB) { - BLOB blob = (BLOB) rsSrc.getBlob(7); + BLOB blob = (BLOB) rsSrc.getBlob(8); try { raw = getBytesFromBLOB(blob); @@ -474,7 +494,7 @@ // blob.close(); } } else { - raw = rsSrc.getBytes(7); + raw = rsSrc.getBytes(8); } if (raw != null) { Element element = fetchBinaryElement(raw); @@ -487,15 +507,88 @@ accumulateQueryTime(); } } + } else { + xContext.element = null; } - jobContext.putFeatureCollection(xContext); + + if (xContext.transcationType > 1) { + // remove first + } + + jobContext.processFeatureContext(xContext); + transIds.add(xContext.transcationId); + } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } + + if ((order % COMMITSIZE) == 0) { + // OracleConnection connection = jobContext.getOracleConnection(); + // connection.commitTransaction(); + jobContext.commitTransaction(); + //jobContext.startTransaction(); + System.gc(); + System.runFinalization(); + } + + if (step != 0) { + int now = order % step; + if (now != current) { + current = now; + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); + + } + } else { + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); + current++; + } } + + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 100); + + jobContext.commitTransaction(); + jobContext.resetFeatureContext(); JDBCUtils.close(rsSrc); JDBCUtils.close(stmtSrc); + + if (!transIds.isEmpty()) { + completeTransactionAction(connection, transIds); + } + } + + private void completeTransactionAction(Connection connection, ArrayList<Integer> transIds) { + if (transIds.isEmpty()) return; + + boolean autoCommit = true; + PreparedStatement statement = null; + try { + autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + String sql = "UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=? WHERE ID=?"; + + statement = connection.prepareStatement(sql); + for (int id : transIds) { + statement.setInt((int) 1, 1); + statement.setInt((int) 2, id); + statement.executeUpdate(); + } + connection.commit(); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + try { + connection.rollback(); + } catch (SQLException e1) { + logger.warn(e.getMessage(), e1); + } + } finally { + JDBCUtils.close(statement); + try { + connection.setAutoCommit(autoCommit); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } + } } // Binary to Element -- Gitblit v0.0.0-SNAPSHOT