From 92b1e4ec43f3e41f46b4030014b4f9be011664c4 Mon Sep 17 00:00:00 2001
From: Dennis Kao <ulysseskao@gmail.com>
Date: Tue, 07 Jan 2014 18:44:52 +0800
Subject: [PATCH] update for flow mark

---
 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java |  487 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 487 insertions(+), 0 deletions(-)

diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
index e11ac63..8570254 100644
--- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
+++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
@@ -1,16 +1,43 @@
 package com.ximple.eofms.jobs;
 
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Logger;
 
 import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
 import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext;
+import com.ximple.io.dgn7.ComplexElement;
+import com.ximple.io.dgn7.Dgn7fileException;
+import com.ximple.io.dgn7.Element;
+import com.ximple.io.dgn7.ElementType;
+import com.ximple.io.dgn7.FrammeAttributeData;
+import com.ximple.io.dgn7.IElementHandler;
+import com.ximple.util.PrintfFormat;
+import oracle.sql.BLOB;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.geotools.data.DataStore;
+import org.geotools.data.Transaction;
+import org.geotools.data.jdbc.JDBCUtils;
 import org.geotools.data.postgis.PostgisNGDataStoreFactory;
 import org.geotools.jdbc.JDBCDataStore;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+
+import static com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext.*;
 
 public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob {
     final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class);
@@ -42,6 +69,22 @@
     private long queryTime = 0;
     private long queryTimeStart = 0;
 
+    public final void accumulateQueryTime() {
+        queryTime += System.currentTimeMillis() - queryTimeStart;
+    }
+
+    public long getQueryTime() {
+        return queryTime;
+    }
+
+    public final void markQueryTime() {
+        queryTimeStart = System.currentTimeMillis();
+    }
+
+    public final void resetQueryTime() {
+        queryTime = 0;
+    }
+
     @Override
     public Log getLogger() {
         return logger;
@@ -52,14 +95,458 @@
     }
 
     @Override
+    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
+        super.extractJobConfiguration(jobDetail);
+        JobDataMap dataMap = jobDetail.getJobDataMap();
+        _pgHost = dataMap.getString(PGHOST);
+        _pgDatabase = dataMap.getString(PGDATBASE);
+        _pgPort = dataMap.getString(PGPORT);
+        _pgSchema = dataMap.getString(PGSCHEMA);
+        _pgUsername = dataMap.getString(PGUSER);
+        _pgPassword = dataMap.getString(PGPASS);
+        _pgUseWKB = dataMap.getString(USEWKB);
+
+        Log logger = getLogger();
+        if (_pgHost == null) {
+            logger.warn("PGHOST is null");
+            throw new JobExecutionException("Unknown PostGIS host.");
+        }
+        if (_pgDatabase == null) {
+            logger.warn("PGDATABASE is null");
+            throw new JobExecutionException("Unknown PostGIS database.");
+        }
+        if (_pgPort == null) {
+            logger.warn("PGPORT is null");
+            throw new JobExecutionException("Unknown PostGIS port.");
+        }
+        if (_pgSchema == null) {
+            logger.warn("PGSCHEMA is null");
+            throw new JobExecutionException("Unknown PostGIS schema.");
+        }
+        if (_pgUsername == null) {
+            logger.warn("PGUSERNAME is null");
+            throw new JobExecutionException("Unknown PostGIS username.");
+        }
+        if (_pgPassword == null) {
+            logger.warn("PGPASSWORD is null");
+            throw new JobExecutionException("Unknown PostGIS password.");
+        }
+
+        Map<String, String> remote = new TreeMap<String, String>();
+        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
+        // remote.put("charset", "UTF-8");
+        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
+        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
+        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
+        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
+        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
+        // remote.put( "namespace", null);
+        pgProperties = remote;
+    }
+
+    @Override
     protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
         return new OracleIncrementPostGISJobContext(getDataPath(),
             getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
     }
 
+    protected void createTargetDataStore() throws JobExecutionException {
+        if (targetDataStore != null) {
+            targetDataStore.dispose();
+            targetDataStore = null;
+        }
+
+        /*
+        if (!isDriverFound())
+        {
+            throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER);
+        }
+        */
+
+        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
+            pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
+        }
+
+        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
+            pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
+        }
+
+        /*
+        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
+            pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
+        }
+        */
+
+        if (!dataStoreFactory.canProcess(pgProperties)) {
+            getLogger().warn("cannot process properties-");
+            throw new JobExecutionException("cannot process properties-");
+        }
+        try {
+            targetDataStore = dataStoreFactory.createDataStore(pgProperties);
+        } catch (IOException e) {
+            getLogger().warn(e.getMessage(), e);
+            throw new JobExecutionException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void disconnect() {
+        super.disconnect();
+        if (targetDataStore != null) {
+            targetDataStore.dispose();
+            targetDataStore = null;
+        }
+    }
+
+    private void logTimeDiff(String message, long tBefore, long tCurrent) {
+        logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
+            (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
+    }
+
     @Override
     public void execute(JobExecutionContext context) throws JobExecutionException {
+        // Every job has its own job detail
+        JobDetail jobDetail = context.getJobDetail();
+
+        // The name is defined in the job definition
+        String jobName = jobDetail.getKey().getName();
+
+        // Log the time the job started
+        logger.info(jobName + " fired at " + new Date());
+
+        createSourceDataStore();
+        createTargetDataStore();
+        if (getSourceDataStore() == null) {
+            logger.warn("Cannot connect source oracle database.");
+            throw new JobExecutionException("Cannot connect source oracle database.");
+        }
+
+        if (getTargetDataStore() == null) {
+            logger.warn("Cannot connect source postgreSQL database.");
+            throw new JobExecutionException("Cannot connect source postgreSQL database.");
+        }
+
+        if (isProfileMode()) {
+            queryTime = 0;
+        }
+
+        long t1 = System.currentTimeMillis();
+        String targetSchemaName, targetThemeTable;
+
+        try {
+            logger.info("-- step:incrementConvertOracleDB --");
+            targetSchemaName = determineCurrentTargetSchemaName();
+            if (targetSchemaName == null) return;
+
+            OracleIncrementPostGISJobContext jobContext = null;
+
+            jobContext = (OracleIncrementPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
+                isProfileMode(), isTransformed());
+            jobContext.setSourceDataStore(getSourceDataStore());
+            jobContext.setElementLogging(checkElementLogging());
+            jobContext.setExecutionContext(context);
+
+            long tStep = System.currentTimeMillis();
+            fetchTPData(jobContext);
+            logger.info("TPC DIST:" + jobContext.getDistId() + ":" +
+                ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName()));
+
+            if (isProfileMode()) {
+                long tStepEnd = System.currentTimeMillis();
+                logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd);
+            }
+
+            if (isProfileMode()) {
+                jobContext.resetProcessTime();
+                jobContext.resetUpdateTime();
+            }
+            tStep = System.currentTimeMillis();
+            exetcuteIncrementConvert(jobContext, _dataPath);
+
+            //close all open filewriter instance
+            jobContext.closeFeatureWriter();
+
+            if (isProfileMode()) {
+                logger.warn("Profile-Current Query Oracle Cost-" +
+                    ((int) ((getQueryTime()) / 60000.0)) + " min - " +
+                    (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec");
+                long tStepEnd = System.currentTimeMillis();
+                logger.warn("Profile-Current Process Cost-" +
+                    ((int) ((getProcessTime()) / 60000.0)) + " min - " +
+                    (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec");
+                logger.warn("Profile-Current Update Cost-" +
+                    ((int) ((getUpdateTime()) / 60000.0)) + " min - " +
+                    (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec");
+                logger.warn("Profile-Current JobContext Process Cost-" +
+                    ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " +
+                    (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
+                logger.warn("Profile-Current JobContext Update Cost-" +
+                    ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " +
+                    (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
+                logTimeDiff("Profile-Convert[ Increment ]", tStep, tStepEnd);
+
+                resetQueryTime();
+                resetProcessTime();
+                resetUpdateTime();
+            }
+
+            jobContext.closeOracleConnection();
+
+            long t2 = System.currentTimeMillis();
+            // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
+            // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
+            logTimeDiff("Total ", t1, t2);
+
+        } catch (SQLException e) {
+            disconnect();
+            logger.warn(e.getMessage(), e);
+            throw new JobExecutionException("Database error. " + e.getMessage(), e);
+        } catch (IOException ex) {
+            disconnect();
+            logger.warn(ex.getMessage(), ex);
+            throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
+        } finally {
+            disconnect();
+        }
+        logger.warn(jobName + " end at " + new Date());
+    }
+
+    private String determineCurrentTargetSchemaName() throws IOException {
+        if (targetDataStore == null) return null;
+        Connection connection = null;
+        Statement stmt = null;
+        ResultSet rs = null;
+        String targetSchema = null;
+        boolean needCreate = false;
+        try {
+            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
+            // Create XGVERSIONTABLE_NAME
+            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
+            if (!rs.next()) needCreate = true;
+            rs.close();
+            if (needCreate) return null;
+
+            StringBuilder sbSQL = new StringBuilder("SELECT ");
+            sbSQL.append("vsschema, vsstatus FROM ");
+            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
+            sbSQL.append("ORDER BY vsid");
+            stmt = connection.createStatement();
+            rs = stmt.executeQuery(sbSQL.toString());
+            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
+            int i = 0;
+            int current = -1;
+            while (rs.next()) {
+                Object[] values = new Object[2];
+                values[0] = rs.getString("vsschema");
+                values[1] = rs.getShort("vsstatus");
+                tmpSchemas.add(values);
+                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
+                    current = i;
+                }
+                i++;
+            }
+
+            if (current != -1) {
+                Object[] values = tmpSchemas.get(current);
+                targetSchema = (String) values[0];
+            }
+        } catch (SQLException e) {
+            logger.warn(e.getMessage(), e);
+        } finally {
+            JDBCUtils.close(rs);
+            JDBCUtils.close(stmt);
+            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
+        }
+        return targetSchema;
+    }
+
+    public String encodeSchemaTableName(String schemaName, String tableName) {
+        if (schemaName == null)
+            return "\"" + tableName + "\"";
+        return "\"" + schemaName + "\".\"" + tableName + "\"";
+    }
+
+    /**
+     * CREATE TABLE CMMS_POSTDB.GEO_EXCHANGE
+     * (
+     *   ID           NUMBER                           NOT NULL,
+     *   TAG_LUFID    NUMBER(10)                       NOT NULL,
+     *   TAG_SFSC     NUMBER(5)                        NOT NULL,
+     *   TAG_BCOMPID  NUMBER(3)                        NOT NULL,
+     *   TAG_SOCCID   NUMBER(5)                        NOT NULL,
+     *   STATUS       NUMBER(3)                        NOT NULL,
+     *   IGDSELM      BLOB,
+     *   UPDATETIME   DATE                             DEFAULT sysdate  NOT NULL,
+     *   TASKID       NUMBER(10)                       NOT NULL,
+     *   ISEXCHANGE   NUMBER                           DEFAULT 0  NOT NULL
+     * )
+     *
+     * STATUS 欄位 :0:新增  2:編輯  3:刪除設備   4:刪除元件
+     * ISEXCHANGE   欄位:0 未同步 1已同步  或者已同步就刪除
+     *
+     *
+     * @param jobContext
+     * @param targetSchemaName
+     * @throws SQLException
+     */
+    private void exetcuteIncrementConvert(OracleIncrementPostGISJobContext jobContext, String targetSchemaName) throws SQLException {
+
+        Connection connection = jobContext.getOracleConnection();
+        if (connection == null) {
+            logger.warn("Cannot Get Oracle Connection for DMMS.");
+            return;
+        }
+
+        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
+        int exchangeCount = fetchExchangeCount(connection);
+        try {
+            processIncrementElement(jobContext);
+            // jobContext.setCurrentSchema(querySchema);
+        } finally {
+        }
 
     }
 
+    private int fetchExchangeCount(Connection connection) throws SQLException {
+        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
+        Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        ResultSet rs = null;
+        StringBuilder sbSQL = new StringBuilder();
+        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0");
+
+        int size = -1;
+        try {
+            stmt = connection.createStatement();
+            rs = stmt.executeQuery(sbSQL.toString());
+            if (rs.next()) {
+                size = (int) rs.getLong(1);
+            }
+        } finally {
+            JDBCUtils.close(rs);
+            JDBCUtils.close(stmt);
+        }
+
+        return size;
+    }
+
+    static class IncrementRecord {
+        Element element;
+    };
+
+    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException {
+        Connection connection = jobContext.getOracleConnection();
+
+        // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM
+        //  FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0
+        String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " +
+            "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0";
+        //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
+        PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
+        String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"});
+        Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+        stmtSrc.setFetchSize(FETCHSIZE);
+        ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
+        int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
+        while (rsSrc.next()) {
+            if (isProfileMode()) {
+                markQueryTime();
+            }
+            ElementTransactionContext xContext = new ElementTransactionContext();
+            xContext.oid = rsSrc.getInt(1);
+            xContext.cid = (short) rsSrc.getInt(2);
+            xContext.compid = (short) rsSrc.getInt(3);
+            xContext.occid = (short) rsSrc.getInt(4);
+            xContext.transcationType = rsSrc.getInt(5);
+            xContext.taskid = rsSrc.getInt(6);
+
+            try {
+                if (xContext.transcationType > 2) {
+                    byte[] raw = null;
+                    if (igdsMetaType == Types.BLOB) {
+                        BLOB blob = (BLOB) rsSrc.getBlob(7);
+
+                        try {
+                            raw = getBytesFromBLOB(blob);
+                        } catch (BufferOverflowException e) {
+                            logger.warn("Wrong Element Structure-", e);
+                        } finally {
+                            // blob.close();
+                        }
+                    } else {
+                        raw = rsSrc.getBytes(7);
+                    }
+                    if (raw != null) {
+                        Element element = fetchBinaryElement(raw);
+                        if (isProfileMode()) {
+                            accumulateQueryTime();
+                        }
+                        xContext.element = element;
+                    } else {
+                        if (isProfileMode()) {
+                            accumulateQueryTime();
+                        }
+                    }
+                }
+                jobContext.putFeatureCollection(xContext);
+            } catch (Dgn7fileException e) {
+                logger.warn("Dgn7Exception", e);
+            }
+        }
+
+        JDBCUtils.close(rsSrc);
+        JDBCUtils.close(stmtSrc);
+    }
+
+    // Binary to Element
+    private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException {
+        ByteBuffer buffer = ByteBuffer.wrap(raws);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+        short signature = buffer.getShort();
+
+        // byte type = (byte) (buffer.get() & 0x7f);
+        byte type = (byte) ((signature >>> 8) & 0x007f);
+
+        // silly Bentley say contentLength is in 2-byte words
+        // and ByteByffer uses raws.
+        // track the record location
+        int elementLength = (buffer.getShort() * 2) + 4;
+        ElementType recordType = ElementType.forID(type);
+        IElementHandler handler;
+
+        handler = recordType.getElementHandler();
+
+        Element dgnElement = (Element) handler.read(buffer, signature, elementLength);
+        if (recordType.isComplexElement() && (elementLength < raws.length)) {
+            int offset = elementLength;
+            while (offset < (raws.length - 4)) {
+                buffer.position(offset);
+                signature = buffer.getShort();
+                type = (byte) ((signature >>> 8) & 0x007f);
+                elementLength = (buffer.getShort() * 2) + 4;
+                if (raws.length < (offset + elementLength)) {
+                    logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit());
+                    break;
+                }
+                recordType = ElementType.forID(type);
+                handler = recordType.getElementHandler();
+                if (handler != null) {
+                    Element subElement = (Element) handler.read(buffer, signature, elementLength);
+                    ((ComplexElement) dgnElement).add(subElement);
+                    offset += elementLength;
+                } else {
+                    byte[] remain = new byte[buffer.remaining()];
+                    System.arraycopy(raws, offset, remain, 0, buffer.remaining());
+                    for (int i = 0; i < remain.length; i++) {
+                        if (remain[i] != 0) {
+                            logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]);
+                        }
+                    }
+                    break;
+                }
+            }
+        }
+
+        return dgnElement;
+    }
 }

--
Gitblit v0.0.0-SNAPSHOT