| | |
| | | package com.ximple.eofms.jobs; |
| | | |
| | | import java.io.IOException; |
| | | import java.nio.BufferOverflowException; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.ByteOrder; |
| | | import java.sql.Connection; |
| | | import java.sql.ResultSet; |
| | | import java.sql.SQLException; |
| | | import java.sql.Statement; |
| | | import java.sql.Types; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.Map; |
| | | import java.util.TreeMap; |
| | | import java.util.logging.Logger; |
| | | |
| | | import com.ximple.eofms.jobs.context.AbstractOracleJobContext; |
| | | import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext; |
| | | import com.ximple.io.dgn7.ComplexElement; |
| | | import com.ximple.io.dgn7.Dgn7fileException; |
| | | import com.ximple.io.dgn7.Element; |
| | | import com.ximple.io.dgn7.ElementType; |
| | | import com.ximple.io.dgn7.FrammeAttributeData; |
| | | import com.ximple.io.dgn7.IElementHandler; |
| | | import com.ximple.util.PrintfFormat; |
| | | import oracle.sql.BLOB; |
| | | import org.apache.commons.logging.Log; |
| | | import org.apache.commons.logging.LogFactory; |
| | | import org.geotools.data.DataStore; |
| | | import org.geotools.data.Transaction; |
| | | import org.geotools.data.jdbc.JDBCUtils; |
| | | import org.geotools.data.postgis.PostgisNGDataStoreFactory; |
| | | import org.geotools.jdbc.JDBCDataStore; |
| | | import org.quartz.JobDataMap; |
| | | import org.quartz.JobDetail; |
| | | import org.quartz.JobExecutionContext; |
| | | import org.quartz.JobExecutionException; |
| | | |
| | | import static com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext.*; |
| | | |
| | | public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob { |
| | | final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class); |
| | |
| | | private long queryTime = 0; |
| | | private long queryTimeStart = 0; |
| | | |
| | | public final void accumulateQueryTime() { |
| | | queryTime += System.currentTimeMillis() - queryTimeStart; |
| | | } |
| | | |
| | | public long getQueryTime() { |
| | | return queryTime; |
| | | } |
| | | |
| | | public final void markQueryTime() { |
| | | queryTimeStart = System.currentTimeMillis(); |
| | | } |
| | | |
| | | public final void resetQueryTime() { |
| | | queryTime = 0; |
| | | } |
| | | |
| | | @Override |
| | | public Log getLogger() { |
| | | return logger; |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { |
| | | super.extractJobConfiguration(jobDetail); |
| | | JobDataMap dataMap = jobDetail.getJobDataMap(); |
| | | _pgHost = dataMap.getString(PGHOST); |
| | | _pgDatabase = dataMap.getString(PGDATBASE); |
| | | _pgPort = dataMap.getString(PGPORT); |
| | | _pgSchema = dataMap.getString(PGSCHEMA); |
| | | _pgUsername = dataMap.getString(PGUSER); |
| | | _pgPassword = dataMap.getString(PGPASS); |
| | | _pgUseWKB = dataMap.getString(USEWKB); |
| | | |
| | | Log logger = getLogger(); |
| | | if (_pgHost == null) { |
| | | logger.warn("PGHOST is null"); |
| | | throw new JobExecutionException("Unknown PostGIS host."); |
| | | } |
| | | if (_pgDatabase == null) { |
| | | logger.warn("PGDATABASE is null"); |
| | | throw new JobExecutionException("Unknown PostGIS database."); |
| | | } |
| | | if (_pgPort == null) { |
| | | logger.warn("PGPORT is null"); |
| | | throw new JobExecutionException("Unknown PostGIS port."); |
| | | } |
| | | if (_pgSchema == null) { |
| | | logger.warn("PGSCHEMA is null"); |
| | | throw new JobExecutionException("Unknown PostGIS schema."); |
| | | } |
| | | if (_pgUsername == null) { |
| | | logger.warn("PGUSERNAME is null"); |
| | | throw new JobExecutionException("Unknown PostGIS username."); |
| | | } |
| | | if (_pgPassword == null) { |
| | | logger.warn("PGPASSWORD is null"); |
| | | throw new JobExecutionException("Unknown PostGIS password."); |
| | | } |
| | | |
| | | Map<String, String> remote = new TreeMap<String, String>(); |
| | | remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); |
| | | // remote.put("charset", "UTF-8"); |
| | | remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); |
| | | remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); |
| | | remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); |
| | | remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); |
| | | remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); |
| | | // remote.put( "namespace", null); |
| | | pgProperties = remote; |
| | | } |
| | | |
| | | @Override |
| | | protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { |
| | | return new OracleIncrementPostGISJobContext(getDataPath(), |
| | | getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); |
| | | } |
| | | |
| | | protected void createTargetDataStore() throws JobExecutionException { |
| | | if (targetDataStore != null) { |
| | | targetDataStore.dispose(); |
| | | targetDataStore = null; |
| | | } |
| | | |
| | | /* |
| | | if (!isDriverFound()) |
| | | { |
| | | throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); |
| | | } |
| | | */ |
| | | |
| | | if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { |
| | | pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); |
| | | } |
| | | |
| | | if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { |
| | | pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); |
| | | } |
| | | |
| | | /* |
| | | if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { |
| | | pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); |
| | | } |
| | | */ |
| | | |
| | | if (!dataStoreFactory.canProcess(pgProperties)) { |
| | | getLogger().warn("cannot process properties-"); |
| | | throw new JobExecutionException("cannot process properties-"); |
| | | } |
| | | try { |
| | | targetDataStore = dataStoreFactory.createDataStore(pgProperties); |
| | | } catch (IOException e) { |
| | | getLogger().warn(e.getMessage(), e); |
| | | throw new JobExecutionException(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | protected void disconnect() { |
| | | super.disconnect(); |
| | | if (targetDataStore != null) { |
| | | targetDataStore.dispose(); |
| | | targetDataStore = null; |
| | | } |
| | | } |
| | | |
| | | private void logTimeDiff(String message, long tBefore, long tCurrent) { |
| | | logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + |
| | | (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); |
| | | } |
| | | |
| | | @Override |
| | | public void execute(JobExecutionContext context) throws JobExecutionException { |
| | | // Every job has its own job detail |
| | | JobDetail jobDetail = context.getJobDetail(); |
| | | |
| | | // The name is defined in the job definition |
| | | String jobName = jobDetail.getKey().getName(); |
| | | |
| | | // Log the time the job started |
| | | logger.info(jobName + " fired at " + new Date()); |
| | | |
| | | createSourceDataStore(); |
| | | createTargetDataStore(); |
| | | if (getSourceDataStore() == null) { |
| | | logger.warn("Cannot connect source oracle database."); |
| | | throw new JobExecutionException("Cannot connect source oracle database."); |
| | | } |
| | | |
| | | if (getTargetDataStore() == null) { |
| | | logger.warn("Cannot connect source postgreSQL database."); |
| | | throw new JobExecutionException("Cannot connect source postgreSQL database."); |
| | | } |
| | | |
| | | if (isProfileMode()) { |
| | | queryTime = 0; |
| | | } |
| | | |
| | | long t1 = System.currentTimeMillis(); |
| | | String targetSchemaName, targetThemeTable; |
| | | |
| | | try { |
| | | logger.info("-- step:incrementConvertOracleDB --"); |
| | | targetSchemaName = determineCurrentTargetSchemaName(); |
| | | if (targetSchemaName == null) return; |
| | | |
| | | OracleIncrementPostGISJobContext jobContext = null; |
| | | |
| | | jobContext = (OracleIncrementPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, |
| | | isProfileMode(), isTransformed()); |
| | | jobContext.setSourceDataStore(getSourceDataStore()); |
| | | jobContext.setElementLogging(checkElementLogging()); |
| | | jobContext.setExecutionContext(context); |
| | | |
| | | long tStep = System.currentTimeMillis(); |
| | | fetchTPData(jobContext); |
| | | logger.info("TPC DIST:" + jobContext.getDistId() + ":" + |
| | | ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); |
| | | |
| | | if (isProfileMode()) { |
| | | long tStepEnd = System.currentTimeMillis(); |
| | | logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd); |
| | | } |
| | | |
| | | if (isProfileMode()) { |
| | | jobContext.resetProcessTime(); |
| | | jobContext.resetUpdateTime(); |
| | | } |
| | | tStep = System.currentTimeMillis(); |
| | | exetcuteIncrementConvert(jobContext, _dataPath); |
| | | |
| | | //close all open filewriter instance |
| | | jobContext.closeFeatureWriter(); |
| | | |
| | | if (isProfileMode()) { |
| | | logger.warn("Profile-Current Query Oracle Cost-" + |
| | | ((int) ((getQueryTime()) / 60000.0)) + " min - " + |
| | | (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec"); |
| | | long tStepEnd = System.currentTimeMillis(); |
| | | logger.warn("Profile-Current Process Cost-" + |
| | | ((int) ((getProcessTime()) / 60000.0)) + " min - " + |
| | | (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec"); |
| | | logger.warn("Profile-Current Update Cost-" + |
| | | ((int) ((getUpdateTime()) / 60000.0)) + " min - " + |
| | | (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec"); |
| | | logger.warn("Profile-Current JobContext Process Cost-" + |
| | | ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " + |
| | | (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); |
| | | logger.warn("Profile-Current JobContext Update Cost-" + |
| | | ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " + |
| | | (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); |
| | | logTimeDiff("Profile-Convert[ Increment ]", tStep, tStepEnd); |
| | | |
| | | resetQueryTime(); |
| | | resetProcessTime(); |
| | | resetUpdateTime(); |
| | | } |
| | | |
| | | jobContext.closeOracleConnection(); |
| | | |
| | | long t2 = System.currentTimeMillis(); |
| | | // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; |
| | | // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); |
| | | logTimeDiff("Total ", t1, t2); |
| | | |
| | | } catch (SQLException e) { |
| | | disconnect(); |
| | | logger.warn(e.getMessage(), e); |
| | | throw new JobExecutionException("Database error. " + e.getMessage(), e); |
| | | } catch (IOException ex) { |
| | | disconnect(); |
| | | logger.warn(ex.getMessage(), ex); |
| | | throw new JobExecutionException("IO error. " + ex.getMessage(), ex); |
| | | } finally { |
| | | disconnect(); |
| | | } |
| | | logger.warn(jobName + " end at " + new Date()); |
| | | } |
| | | |
| | | private String determineCurrentTargetSchemaName() throws IOException { |
| | | if (targetDataStore == null) return null; |
| | | Connection connection = null; |
| | | Statement stmt = null; |
| | | ResultSet rs = null; |
| | | String targetSchema = null; |
| | | boolean needCreate = false; |
| | | try { |
| | | connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); |
| | | // Create XGVERSIONTABLE_NAME |
| | | rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); |
| | | if (!rs.next()) needCreate = true; |
| | | rs.close(); |
| | | if (needCreate) return null; |
| | | |
| | | StringBuilder sbSQL = new StringBuilder("SELECT "); |
| | | sbSQL.append("vsschema, vsstatus FROM "); |
| | | sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); |
| | | sbSQL.append("ORDER BY vsid"); |
| | | stmt = connection.createStatement(); |
| | | rs = stmt.executeQuery(sbSQL.toString()); |
| | | ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); |
| | | int i = 0; |
| | | int current = -1; |
| | | while (rs.next()) { |
| | | Object[] values = new Object[2]; |
| | | values[0] = rs.getString("vsschema"); |
| | | values[1] = rs.getShort("vsstatus"); |
| | | tmpSchemas.add(values); |
| | | if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { |
| | | current = i; |
| | | } |
| | | i++; |
| | | } |
| | | |
| | | if (current != -1) { |
| | | Object[] values = tmpSchemas.get(current); |
| | | targetSchema = (String) values[0]; |
| | | } |
| | | } catch (SQLException e) { |
| | | logger.warn(e.getMessage(), e); |
| | | } finally { |
| | | JDBCUtils.close(rs); |
| | | JDBCUtils.close(stmt); |
| | | JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); |
| | | } |
| | | return targetSchema; |
| | | } |
| | | |
| | | public String encodeSchemaTableName(String schemaName, String tableName) { |
| | | if (schemaName == null) |
| | | return "\"" + tableName + "\""; |
| | | return "\"" + schemaName + "\".\"" + tableName + "\""; |
| | | } |
| | | |
| | | /** |
| | | * CREATE TABLE CMMS_POSTDB.GEO_EXCHANGE |
| | | * ( |
| | | * ID NUMBER NOT NULL, |
| | | * TAG_LUFID NUMBER(10) NOT NULL, |
| | | * TAG_SFSC NUMBER(5) NOT NULL, |
| | | * TAG_BCOMPID NUMBER(3) NOT NULL, |
| | | * TAG_SOCCID NUMBER(5) NOT NULL, |
| | | * STATUS NUMBER(3) NOT NULL, |
| | | * IGDSELM BLOB, |
| | | * UPDATETIME DATE DEFAULT sysdate NOT NULL, |
| | | * TASKID NUMBER(10) NOT NULL, |
| | | * ISEXCHANGE NUMBER DEFAULT 0 NOT NULL |
| | | * ) |
| | | * |
| | | * STATUS 欄位 :0:新增 2:編輯 3:刪除設備 4:刪除元件 |
| | | * ISEXCHANGE 欄位:0 未同步 1已同步 或者已同步就刪除 |
| | | * |
| | | * |
| | | * @param jobContext |
| | | * @param targetSchemaName |
| | | * @throws SQLException |
| | | */ |
| | | private void exetcuteIncrementConvert(OracleIncrementPostGISJobContext jobContext, String targetSchemaName) throws SQLException { |
| | | |
| | | Connection connection = jobContext.getOracleConnection(); |
| | | if (connection == null) { |
| | | logger.warn("Cannot Get Oracle Connection for DMMS."); |
| | | return; |
| | | } |
| | | |
| | | // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 |
| | | int exchangeCount = fetchExchangeCount(connection); |
| | | try { |
| | | processIncrementElement(jobContext); |
| | | // jobContext.setCurrentSchema(querySchema); |
| | | } finally { |
| | | } |
| | | |
| | | } |
| | | |
| | | private int fetchExchangeCount(Connection connection) throws SQLException { |
| | | // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 |
| | | Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| | | ResultSet rs = null; |
| | | StringBuilder sbSQL = new StringBuilder(); |
| | | sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0"); |
| | | |
| | | int size = -1; |
| | | try { |
| | | stmt = connection.createStatement(); |
| | | rs = stmt.executeQuery(sbSQL.toString()); |
| | | if (rs.next()) { |
| | | size = (int) rs.getLong(1); |
| | | } |
| | | } finally { |
| | | JDBCUtils.close(rs); |
| | | JDBCUtils.close(stmt); |
| | | } |
| | | |
| | | return size; |
| | | } |
| | | |
| | | static class IncrementRecord { |
| | | Element element; |
| | | }; |
| | | |
| | | private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException { |
| | | Connection connection = jobContext.getOracleConnection(); |
| | | |
| | | // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM |
| | | // FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0 |
| | | String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " + |
| | | "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0"; |
| | | //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; |
| | | PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); |
| | | String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"}); |
| | | Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| | | |
| | | stmtSrc.setFetchSize(FETCHSIZE); |
| | | ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); |
| | | int igdsMetaType = rsSrc.getMetaData().getColumnType(1); |
| | | while (rsSrc.next()) { |
| | | if (isProfileMode()) { |
| | | markQueryTime(); |
| | | } |
| | | ElementTransactionContext xContext = new ElementTransactionContext(); |
| | | xContext.oid = rsSrc.getInt(1); |
| | | xContext.cid = (short) rsSrc.getInt(2); |
| | | xContext.compid = (short) rsSrc.getInt(3); |
| | | xContext.occid = (short) rsSrc.getInt(4); |
| | | xContext.transcationType = rsSrc.getInt(5); |
| | | xContext.taskid = rsSrc.getInt(6); |
| | | |
| | | try { |
| | | if (xContext.transcationType > 2) { |
| | | byte[] raw = null; |
| | | if (igdsMetaType == Types.BLOB) { |
| | | BLOB blob = (BLOB) rsSrc.getBlob(7); |
| | | |
| | | try { |
| | | raw = getBytesFromBLOB(blob); |
| | | } catch (BufferOverflowException e) { |
| | | logger.warn("Wrong Element Structure-", e); |
| | | } finally { |
| | | // blob.close(); |
| | | } |
| | | } else { |
| | | raw = rsSrc.getBytes(7); |
| | | } |
| | | if (raw != null) { |
| | | Element element = fetchBinaryElement(raw); |
| | | if (isProfileMode()) { |
| | | accumulateQueryTime(); |
| | | } |
| | | xContext.element = element; |
| | | } else { |
| | | if (isProfileMode()) { |
| | | accumulateQueryTime(); |
| | | } |
| | | } |
| | | } |
| | | jobContext.putFeatureCollection(xContext); |
| | | } catch (Dgn7fileException e) { |
| | | logger.warn("Dgn7Exception", e); |
| | | } |
| | | } |
| | | |
| | | JDBCUtils.close(rsSrc); |
| | | JDBCUtils.close(stmtSrc); |
| | | } |
| | | |
| | | // Binary to Element |
| | | private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { |
| | | ByteBuffer buffer = ByteBuffer.wrap(raws); |
| | | buffer.order(ByteOrder.LITTLE_ENDIAN); |
| | | short signature = buffer.getShort(); |
| | | |
| | | // byte type = (byte) (buffer.get() & 0x7f); |
| | | byte type = (byte) ((signature >>> 8) & 0x007f); |
| | | |
| | | // silly Bentley say contentLength is in 2-byte words |
| | | // and ByteByffer uses raws. |
| | | // track the record location |
| | | int elementLength = (buffer.getShort() * 2) + 4; |
| | | ElementType recordType = ElementType.forID(type); |
| | | IElementHandler handler; |
| | | |
| | | handler = recordType.getElementHandler(); |
| | | |
| | | Element dgnElement = (Element) handler.read(buffer, signature, elementLength); |
| | | if (recordType.isComplexElement() && (elementLength < raws.length)) { |
| | | int offset = elementLength; |
| | | while (offset < (raws.length - 4)) { |
| | | buffer.position(offset); |
| | | signature = buffer.getShort(); |
| | | type = (byte) ((signature >>> 8) & 0x007f); |
| | | elementLength = (buffer.getShort() * 2) + 4; |
| | | if (raws.length < (offset + elementLength)) { |
| | | logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); |
| | | break; |
| | | } |
| | | recordType = ElementType.forID(type); |
| | | handler = recordType.getElementHandler(); |
| | | if (handler != null) { |
| | | Element subElement = (Element) handler.read(buffer, signature, elementLength); |
| | | ((ComplexElement) dgnElement).add(subElement); |
| | | offset += elementLength; |
| | | } else { |
| | | byte[] remain = new byte[buffer.remaining()]; |
| | | System.arraycopy(raws, offset, remain, 0, buffer.remaining()); |
| | | for (int i = 0; i < remain.length; i++) { |
| | | if (remain[i] != 0) { |
| | | logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); |
| | | } |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | return dgnElement; |
| | | } |
| | | } |