package com.ximple.eofms.jobs; import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.Date; import java.util.Map; import java.util.TreeMap; import java.util.logging.Logger; import com.ximple.eofms.jobs.context.AbstractOracleJobContext; import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Dgn7fileException; import com.ximple.io.dgn7.Element; import com.ximple.io.dgn7.ElementType; import com.ximple.io.dgn7.FrammeAttributeData; import com.ximple.io.dgn7.IElementHandler; import com.ximple.util.PrintfFormat; import oracle.sql.BLOB; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; import org.geotools.data.Transaction; import org.geotools.data.jdbc.JDBCUtils; import org.geotools.data.postgis.PostgisNGDataStoreFactory; import org.geotools.jdbc.JDBCDataStore; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import static com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext.*; public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class); private static final String PGHOST = "PGHOST"; private static final String PGDATBASE = "PGDATBASE"; private static final String PGPORT = "PGPORT"; private static final String PGSCHEMA = "PGSCHEMA"; private static final String PGUSER = "PGUSER"; private static final String PGPASS = "PGPASS"; private static final String USEWKB = "USEWKB"; private static final int FETCHSIZE = 30; private static final int COMMITSIZE = 10; protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); protected String _pgHost; protected String _pgDatabase; protected String _pgPort; protected String _pgSchema; protected String _pgUsername; protected String _pgPassword; protected String _pgUseWKB; protected Map pgProperties; protected JDBCDataStore targetDataStore; private long queryTime = 0; private long queryTimeStart = 0; public final void accumulateQueryTime() { queryTime += System.currentTimeMillis() - queryTimeStart; } public long getQueryTime() { return queryTime; } public final void markQueryTime() { queryTimeStart = System.currentTimeMillis(); } public final void resetQueryTime() { queryTime = 0; } @Override public Log getLogger() { return logger; } public DataStore getTargetDataStore() { return targetDataStore; } @Override protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDATBASE); _pgPort = dataMap.getString(PGPORT); _pgSchema = dataMap.getString(PGSCHEMA); _pgUsername = dataMap.getString(PGUSER); _pgPassword = dataMap.getString(PGPASS); _pgUseWKB = dataMap.getString(USEWKB); Log logger = getLogger(); if (_pgHost == null) { logger.warn("PGHOST is null"); throw new JobExecutionException("Unknown PostGIS host."); } if (_pgDatabase == null) { logger.warn("PGDATABASE is null"); throw new JobExecutionException("Unknown PostGIS database."); } if (_pgPort == null) { logger.warn("PGPORT is null"); throw new JobExecutionException("Unknown PostGIS port."); } if (_pgSchema == null) { logger.warn("PGSCHEMA is null"); throw new JobExecutionException("Unknown PostGIS schema."); } if (_pgUsername == null) { logger.warn("PGUSERNAME is null"); throw new JobExecutionException("Unknown PostGIS username."); } if (_pgPassword == null) { logger.warn("PGPASSWORD is null"); throw new JobExecutionException("Unknown PostGIS password."); } Map remote = new TreeMap(); remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); // remote.put("charset", "UTF-8"); remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); // remote.put( "namespace", null); pgProperties = remote; } @Override protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { return new OracleIncrementPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); } protected void createTargetDataStore() throws JobExecutionException { if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } /* if (!isDriverFound()) { throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); } */ if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); } if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); } /* if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); } */ if (!dataStoreFactory.canProcess(pgProperties)) { getLogger().warn("cannot process properties-"); throw new JobExecutionException("cannot process properties-"); } try { targetDataStore = dataStoreFactory.createDataStore(pgProperties); } catch (IOException e) { getLogger().warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } @Override protected void disconnect() { super.disconnect(); if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } } private void logTimeDiff(String message, long tBefore, long tCurrent) { logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); } @Override public void execute(JobExecutionContext context) throws JobExecutionException { // Every job has its own job detail JobDetail jobDetail = context.getJobDetail(); // The name is defined in the job definition String jobName = jobDetail.getKey().getName(); // Log the time the job started logger.info(jobName + " fired at " + new Date()); extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); if (getSourceDataStore() == null) { logger.warn("Cannot connect source oracle database."); throw new JobExecutionException("Cannot connect source oracle database."); } if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } if (isProfileMode()) { queryTime = 0; } long t1 = System.currentTimeMillis(); String targetSchemaName, targetThemeTable; try { logger.info("-- step:incrementConvertOracleDB --"); targetSchemaName = determineCurrentTargetSchemaName(); if (targetSchemaName == null) return; OracleIncrementPostGISJobContext jobContext = null; jobContext = (OracleIncrementPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, isProfileMode(), isTransformed()); jobContext.setSourceDataStore(getSourceDataStore()); jobContext.setElementLogging(checkElementLogging()); jobContext.setExecutionContext(context); long tStep = System.currentTimeMillis(); fetchTPData(jobContext); logger.info("TPC DIST:" + jobContext.getDistId() + ":" + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd); } if (isProfileMode()) { jobContext.resetProcessTime(); jobContext.resetUpdateTime(); } tStep = System.currentTimeMillis(); exetcuteIncrementConvert(jobContext, _dataPath); //close all open filewriter instance jobContext.closeFeatureWriter(); if (isProfileMode()) { logger.warn("Profile-Current Query Oracle Cost-" + ((int) ((getQueryTime()) / 60000.0)) + " min - " + (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec"); long tStepEnd = System.currentTimeMillis(); logger.warn("Profile-Current Process Cost-" + ((int) ((getProcessTime()) / 60000.0)) + " min - " + (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec"); logger.warn("Profile-Current Update Cost-" + ((int) ((getUpdateTime()) / 60000.0)) + " min - " + (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec"); logger.warn("Profile-Current JobContext Process Cost-" + ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " + (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); logger.warn("Profile-Current JobContext Update Cost-" + ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " + (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); logTimeDiff("Profile-Convert[ Increment ]", tStep, tStepEnd); resetQueryTime(); resetProcessTime(); resetUpdateTime(); } jobContext.closeOracleConnection(); long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); logTimeDiff("Total ", t1, t2); } catch (SQLException e) { disconnect(); logger.warn(e.getMessage(), e); throw new JobExecutionException("Database error. " + e.getMessage(), e); } catch (IOException ex) { disconnect(); logger.warn(ex.getMessage(), ex); throw new JobExecutionException("IO error. " + ex.getMessage(), ex); } finally { disconnect(); } logger.warn(jobName + " end at " + new Date()); } private String determineCurrentTargetSchemaName() throws IOException { if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; String targetSchema = null; boolean needCreate = false; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); // Create XGVERSIONTABLE_NAME rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); if (!rs.next()) needCreate = true; rs.close(); if (needCreate) return null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vsschema, vsstatus FROM "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); sbSQL.append("ORDER BY vsid"); stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); ArrayList tmpSchemas = new ArrayList(); int i = 0; int current = -1; while (rs.next()) { Object[] values = new Object[2]; values[0] = rs.getString("vsschema"); values[1] = rs.getShort("vsstatus"); tmpSchemas.add(values); if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { current = i; } i++; } if (current != -1) { Object[] values = tmpSchemas.get(current); targetSchema = (String) values[0]; } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } return targetSchema; } public String encodeSchemaTableName(String schemaName, String tableName) { if (schemaName == null) return "\"" + tableName + "\""; return "\"" + schemaName + "\".\"" + tableName + "\""; } /** * CREATE TABLE CMMS_POSTDB.GEO_EXCHANGE * ( * ID NUMBER NOT NULL, * TAG_LUFID NUMBER(10) NOT NULL, * TAG_SFSC NUMBER(5) NOT NULL, * TAG_BCOMPID NUMBER(3) NOT NULL, * TAG_SOCCID NUMBER(5) NOT NULL, * STATUS NUMBER(3) NOT NULL, * IGDSELM BLOB, * UPDATETIME DATE DEFAULT sysdate NOT NULL, * TASKID NUMBER(10) NOT NULL, * ISEXCHANGE NUMBER DEFAULT 0 NOT NULL * ) * * STATUS 欄位 :0:新增 2:編輯 3:刪除設備 4:刪除元件 * ISEXCHANGE 欄位:0 未同步 1已同步 或者已同步就刪除 * * * @param jobContext * @param targetSchemaName * @throws SQLException */ private void exetcuteIncrementConvert(OracleIncrementPostGISJobContext jobContext, String targetSchemaName) throws SQLException { Connection connection = jobContext.getOracleConnection(); if (connection == null) { logger.warn("Cannot Get Oracle Connection for DMMS."); return; } // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE = 0 int exchangeCount = fetchExchangeCount(connection); logger.info("exchangeCount=" + exchangeCount); try { processIncrementElement(jobContext, exchangeCount); // jobContext.setCurrentSchema(querySchema); } finally { } } private int fetchExchangeCount(Connection connection) throws SQLException { // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = null; StringBuilder sbSQL = new StringBuilder(); sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE = 0"); int size = -1; try { stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); if (rs.next()) { size = (int) rs.getLong(1); } } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } return size; } static class IncrementRecord { Element element; }; private void processIncrementElement(OracleIncrementPostGISJobContext jobContext, int exchangeCount) throws SQLException { Connection connection = jobContext.getOracleConnection(); if (exchangeCount == 0) { logger.info("GEO_EXCHANGE ELEMENT COUNT IS ZERO."); return; } // SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM // FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0 String fetchSrcStmtFmt = "SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM " + "FROM \"%s\".\"%s\" WHERE ISEXCHANGE = 0 ORDER BY UPDATETIME"; // String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" // WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); int igdsMetaType = rsSrc.getMetaData().getColumnType(1); ArrayList transIds = new ArrayList(); int step = exchangeCount / 100; int order = 0; int current = 0; jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 0); while (rsSrc.next()) { if (isProfileMode()) { markQueryTime(); } ElementTransactionContext xContext = new ElementTransactionContext(); xContext.transcationId = rsSrc.getInt(1); xContext.oid = rsSrc.getInt(2); xContext.cid = (short) rsSrc.getInt(3); xContext.compid = (short) rsSrc.getInt(4); xContext.occid = (short) rsSrc.getInt(5); xContext.transcationType = rsSrc.getInt(6); xContext.taskid = rsSrc.getInt(7); try { if (xContext.transcationType <= 2) { byte[] raw = null; if (igdsMetaType == Types.BLOB) { BLOB blob = (BLOB) rsSrc.getBlob(8); try { raw = getBytesFromBLOB(blob); } catch (BufferOverflowException e) { logger.warn("Wrong Element Structure-", e); } finally { // blob.close(); } } else { raw = rsSrc.getBytes(8); } if (raw != null) { Element element = fetchBinaryElement(raw); if (isProfileMode()) { accumulateQueryTime(); } xContext.element = element; } else { if (isProfileMode()) { accumulateQueryTime(); } } } else { xContext.element = null; } if (xContext.transcationType > 1) { // remove first } jobContext.processFeatureContext(xContext); transIds.add(xContext.transcationId); } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } if ((order % COMMITSIZE) == 0) { // OracleConnection connection = jobContext.getOracleConnection(); // connection.commitTransaction(); jobContext.commitTransaction(); //jobContext.startTransaction(); System.gc(); System.runFinalization(); } if (step != 0) { int now = order % step; if (now != current) { current = now; jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); } } else { jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); current++; } } jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 100); jobContext.commitTransaction(); jobContext.resetFeatureContext(); JDBCUtils.close(rsSrc); JDBCUtils.close(stmtSrc); if (!transIds.isEmpty()) { completeTransactionAction(connection, transIds); } } private void completeTransactionAction(Connection connection, ArrayList transIds) { if (transIds.isEmpty()) return; boolean autoCommit = true; PreparedStatement statement = null; try { autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); String sql = "UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=? WHERE ID=?"; statement = connection.prepareStatement(sql); for (int id : transIds) { statement.setInt((int) 1, 1); statement.setInt((int) 2, id); statement.executeUpdate(); } connection.commit(); } catch (SQLException e) { logger.warn(e.getMessage(), e); try { connection.rollback(); } catch (SQLException e1) { logger.warn(e.getMessage(), e1); } } finally { JDBCUtils.close(statement); try { connection.setAutoCommit(autoCommit); } catch (SQLException e) { logger.warn(e.getMessage(), e); } } } // Binary to Element private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { ByteBuffer buffer = ByteBuffer.wrap(raws); buffer.order(ByteOrder.LITTLE_ENDIAN); short signature = buffer.getShort(); // byte type = (byte) (buffer.get() & 0x7f); byte type = (byte) ((signature >>> 8) & 0x007f); // silly Bentley say contentLength is in 2-byte words // and ByteByffer uses raws. // track the record location int elementLength = (buffer.getShort() * 2) + 4; ElementType recordType = ElementType.forID(type); IElementHandler handler; handler = recordType.getElementHandler(); Element dgnElement = (Element) handler.read(buffer, signature, elementLength); if (recordType.isComplexElement() && (elementLength < raws.length)) { int offset = elementLength; while (offset < (raws.length - 4)) { buffer.position(offset); signature = buffer.getShort(); type = (byte) ((signature >>> 8) & 0x007f); elementLength = (buffer.getShort() * 2) + 4; if (raws.length < (offset + elementLength)) { logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); break; } recordType = ElementType.forID(type); handler = recordType.getElementHandler(); if (handler != null) { Element subElement = (Element) handler.read(buffer, signature, elementLength); ((ComplexElement) dgnElement).add(subElement); offset += elementLength; } else { byte[] remain = new byte[buffer.remaining()]; System.arraycopy(raws, offset, remain, 0, buffer.remaining()); for (int i = 0; i < remain.length; i++) { if (remain[i] != 0) { logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); } } break; } } } return dgnElement; } }