From 92b1e4ec43f3e41f46b4030014b4f9be011664c4 Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Tue, 07 Jan 2014 18:44:52 +0800 Subject: [PATCH] update for flow mark --- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java | 487 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 487 insertions(+), 0 deletions(-) diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java index e11ac63..8570254 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java @@ -1,16 +1,43 @@ package com.ximple.eofms.jobs; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Logger; import com.ximple.eofms.jobs.context.AbstractOracleJobContext; import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext; +import com.ximple.io.dgn7.ComplexElement; +import com.ximple.io.dgn7.Dgn7fileException; +import com.ximple.io.dgn7.Element; +import com.ximple.io.dgn7.ElementType; +import com.ximple.io.dgn7.FrammeAttributeData; +import com.ximple.io.dgn7.IElementHandler; +import com.ximple.util.PrintfFormat; +import oracle.sql.BLOB; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; import org.geotools.data.postgis.PostgisNGDataStoreFactory; import org.geotools.jdbc.JDBCDataStore; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; + +import static com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext.*; public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class); @@ -42,6 +69,22 @@ private long queryTime = 0; private long queryTimeStart = 0; + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + @Override public Log getLogger() { return logger; @@ -52,14 +95,458 @@ } @Override + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + pgProperties = remote; + } + + @Override protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { return new OracleIncrementPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); } + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + /* + if (!isDriverFound()) + { + throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); + } + */ + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); + } + */ + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + @Override + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + @Override public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + + createSourceDataStore(); + createTargetDataStore(); + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName, targetThemeTable; + + try { + logger.info("-- step:incrementConvertOracleDB --"); + targetSchemaName = determineCurrentTargetSchemaName(); + if (targetSchemaName == null) return; + + OracleIncrementPostGISJobContext jobContext = null; + + jobContext = (OracleIncrementPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + jobContext.setSourceDataStore(getSourceDataStore()); + jobContext.setElementLogging(checkElementLogging()); + jobContext.setExecutionContext(context); + + long tStep = System.currentTimeMillis(); + fetchTPData(jobContext); + logger.info("TPC DIST:" + jobContext.getDistId() + ":" + + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); + + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd); + } + + if (isProfileMode()) { + jobContext.resetProcessTime(); + jobContext.resetUpdateTime(); + } + tStep = System.currentTimeMillis(); + exetcuteIncrementConvert(jobContext, _dataPath); + + //close all open filewriter instance + jobContext.closeFeatureWriter(); + + if (isProfileMode()) { + logger.warn("Profile-Current Query Oracle Cost-" + + ((int) ((getQueryTime()) / 60000.0)) + " min - " + + (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec"); + long tStepEnd = System.currentTimeMillis(); + logger.warn("Profile-Current Process Cost-" + + ((int) ((getProcessTime()) / 60000.0)) + " min - " + + (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current Update Cost-" + + ((int) ((getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current JobContext Process Cost-" + + ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current JobContext Update Cost-" + + ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + logTimeDiff("Profile-Convert[ Increment ]", tStep, tStepEnd); + + resetQueryTime(); + resetProcessTime(); + resetUpdateTime(); + } + + jobContext.closeOracleConnection(); + + long t2 = System.currentTimeMillis(); + // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; + // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); + logTimeDiff("Total ", t1, t2); + + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + private String determineCurrentTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XGVERSIONTABLE_NAME + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + rs.close(); + if (needCreate) return null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current != -1) { + Object[] values = tmpSchemas.get(current); + targetSchema = (String) values[0]; + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + if (schemaName == null) + return "\"" + tableName + "\""; + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + /** + * CREATE TABLE CMMS_POSTDB.GEO_EXCHANGE + * ( + * ID NUMBER NOT NULL, + * TAG_LUFID NUMBER(10) NOT NULL, + * TAG_SFSC NUMBER(5) NOT NULL, + * TAG_BCOMPID NUMBER(3) NOT NULL, + * TAG_SOCCID NUMBER(5) NOT NULL, + * STATUS NUMBER(3) NOT NULL, + * IGDSELM BLOB, + * UPDATETIME DATE DEFAULT sysdate NOT NULL, + * TASKID NUMBER(10) NOT NULL, + * ISEXCHANGE NUMBER DEFAULT 0 NOT NULL + * ) + * + * STATUS 欄位 :0:新增 2:編輯 3:刪除設備 4:刪除元件 + * ISEXCHANGE 欄位:0 未同步 1已同步 或者已同步就刪除 + * + * + * @param jobContext + * @param targetSchemaName + * @throws SQLException + */ + private void exetcuteIncrementConvert(OracleIncrementPostGISJobContext jobContext, String targetSchemaName) throws SQLException { + + Connection connection = jobContext.getOracleConnection(); + if (connection == null) { + logger.warn("Cannot Get Oracle Connection for DMMS."); + return; + } + + // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 + int exchangeCount = fetchExchangeCount(connection); + try { + processIncrementElement(jobContext); + // jobContext.setCurrentSchema(querySchema); + } finally { + } } + private int fetchExchangeCount(Connection connection) throws SQLException { + // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 + Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ResultSet rs = null; + StringBuilder sbSQL = new StringBuilder(); + sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0"); + + int size = -1; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + if (rs.next()) { + size = (int) rs.getLong(1); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + + return size; + } + + static class IncrementRecord { + Element element; + }; + + private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException { + Connection connection = jobContext.getOracleConnection(); + + // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM + // FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0 + String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " + + "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0"; + //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; + PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); + String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"}); + Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmtSrc.setFetchSize(FETCHSIZE); + ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); + int igdsMetaType = rsSrc.getMetaData().getColumnType(1); + while (rsSrc.next()) { + if (isProfileMode()) { + markQueryTime(); + } + ElementTransactionContext xContext = new ElementTransactionContext(); + xContext.oid = rsSrc.getInt(1); + xContext.cid = (short) rsSrc.getInt(2); + xContext.compid = (short) rsSrc.getInt(3); + xContext.occid = (short) rsSrc.getInt(4); + xContext.transcationType = rsSrc.getInt(5); + xContext.taskid = rsSrc.getInt(6); + + try { + if (xContext.transcationType > 2) { + byte[] raw = null; + if (igdsMetaType == Types.BLOB) { + BLOB blob = (BLOB) rsSrc.getBlob(7); + + try { + raw = getBytesFromBLOB(blob); + } catch (BufferOverflowException e) { + logger.warn("Wrong Element Structure-", e); + } finally { + // blob.close(); + } + } else { + raw = rsSrc.getBytes(7); + } + if (raw != null) { + Element element = fetchBinaryElement(raw); + if (isProfileMode()) { + accumulateQueryTime(); + } + xContext.element = element; + } else { + if (isProfileMode()) { + accumulateQueryTime(); + } + } + } + jobContext.putFeatureCollection(xContext); + } catch (Dgn7fileException e) { + logger.warn("Dgn7Exception", e); + } + } + + JDBCUtils.close(rsSrc); + JDBCUtils.close(stmtSrc); + } + + // Binary to Element + private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { + ByteBuffer buffer = ByteBuffer.wrap(raws); + buffer.order(ByteOrder.LITTLE_ENDIAN); + short signature = buffer.getShort(); + + // byte type = (byte) (buffer.get() & 0x7f); + byte type = (byte) ((signature >>> 8) & 0x007f); + + // silly Bentley say contentLength is in 2-byte words + // and ByteByffer uses raws. + // track the record location + int elementLength = (buffer.getShort() * 2) + 4; + ElementType recordType = ElementType.forID(type); + IElementHandler handler; + + handler = recordType.getElementHandler(); + + Element dgnElement = (Element) handler.read(buffer, signature, elementLength); + if (recordType.isComplexElement() && (elementLength < raws.length)) { + int offset = elementLength; + while (offset < (raws.length - 4)) { + buffer.position(offset); + signature = buffer.getShort(); + type = (byte) ((signature >>> 8) & 0x007f); + elementLength = (buffer.getShort() * 2) + 4; + if (raws.length < (offset + elementLength)) { + logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); + break; + } + recordType = ElementType.forID(type); + handler = recordType.getElementHandler(); + if (handler != null) { + Element subElement = (Element) handler.read(buffer, signature, elementLength); + ((ComplexElement) dgnElement).add(subElement); + offset += elementLength; + } else { + byte[] remain = new byte[buffer.remaining()]; + System.arraycopy(raws, offset, remain, 0, buffer.remaining()); + for (int i = 0; i < remain.length; i++) { + if (remain[i] != 0) { + logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); + } + } + break; + } + } + } + + return dgnElement; + } } -- Gitblit v0.0.0-SNAPSHOT