forked from geodmms/xdgnjobs

Dennis Kao
2014-01-07 92b1e4ec43f3e41f46b4030014b4f9be011664c4
update for flow mark
2 files modified
500 ■■■■■ changed files
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java 13 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java 487 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java
@@ -84,6 +84,10 @@
    private static final int COMMITSIZE = 100;
    private static final String INDEXPATHNAME = "index";
    private static final String OTHERPATHNAME = "other";
    public static final String FORWARDFLOW_MARK = "shape://ccarrow";
    public static final String BACKFLOW_MARK = "shape://rccarrow";
    public static final String UNFLOW_MARK = "shape://backslash";
    public static final String NONFLOW_MARK = "shape://slash";
    private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC";
    private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
@@ -413,7 +417,6 @@
            logger.info("begin convert:[" + order + "]-" + tableSrc);
            queryIgsetElement(jobContext, querySchema, tableSrc);
            order++;
@@ -1816,13 +1819,15 @@
                ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
                if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
                        (ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
                    flowMark = "shape://ccarrow";
                    flowMark = FORWARDFLOW_MARK;
                } else if ((ConnectivityDirectionEnum.BackflowON == dir) ||
                        (ConnectivityDirectionEnum.BackFixflowON == dir)) {
                    flowMark = "shape://rccarrow";
                    flowMark = BACKFLOW_MARK;
                } else if (ConnectivityDirectionEnum.Nondeterminate == dir) {
                    flowMark = NONFLOW_MARK;
                } else {
                    flowMark = "shape://backslash";
                    flowMark = UNFLOW_MARK;
                }
                sb.append(cid).append(',');
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
@@ -1,16 +1,43 @@
package com.ximple.eofms.jobs;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext;
import com.ximple.io.dgn7.ComplexElement;
import com.ximple.io.dgn7.Dgn7fileException;
import com.ximple.io.dgn7.Element;
import com.ximple.io.dgn7.ElementType;
import com.ximple.io.dgn7.FrammeAttributeData;
import com.ximple.io.dgn7.IElementHandler;
import com.ximple.util.PrintfFormat;
import oracle.sql.BLOB;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.geotools.data.DataStore;
import org.geotools.data.Transaction;
import org.geotools.data.jdbc.JDBCUtils;
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
import org.geotools.jdbc.JDBCDataStore;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import static com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext.*;
public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob {
    final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class);
@@ -42,6 +69,22 @@
    private long queryTime = 0;
    private long queryTimeStart = 0;
    public final void accumulateQueryTime() {
        queryTime += System.currentTimeMillis() - queryTimeStart;
    }
    public long getQueryTime() {
        return queryTime;
    }
    public final void markQueryTime() {
        queryTimeStart = System.currentTimeMillis();
    }
    public final void resetQueryTime() {
        queryTime = 0;
    }
    @Override
    public Log getLogger() {
        return logger;
@@ -52,14 +95,458 @@
    }
    @Override
    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
        super.extractJobConfiguration(jobDetail);
        JobDataMap dataMap = jobDetail.getJobDataMap();
        _pgHost = dataMap.getString(PGHOST);
        _pgDatabase = dataMap.getString(PGDATBASE);
        _pgPort = dataMap.getString(PGPORT);
        _pgSchema = dataMap.getString(PGSCHEMA);
        _pgUsername = dataMap.getString(PGUSER);
        _pgPassword = dataMap.getString(PGPASS);
        _pgUseWKB = dataMap.getString(USEWKB);
        Log logger = getLogger();
        if (_pgHost == null) {
            logger.warn("PGHOST is null");
            throw new JobExecutionException("Unknown PostGIS host.");
        }
        if (_pgDatabase == null) {
            logger.warn("PGDATABASE is null");
            throw new JobExecutionException("Unknown PostGIS database.");
        }
        if (_pgPort == null) {
            logger.warn("PGPORT is null");
            throw new JobExecutionException("Unknown PostGIS port.");
        }
        if (_pgSchema == null) {
            logger.warn("PGSCHEMA is null");
            throw new JobExecutionException("Unknown PostGIS schema.");
        }
        if (_pgUsername == null) {
            logger.warn("PGUSERNAME is null");
            throw new JobExecutionException("Unknown PostGIS username.");
        }
        if (_pgPassword == null) {
            logger.warn("PGPASSWORD is null");
            throw new JobExecutionException("Unknown PostGIS password.");
        }
        Map<String, String> remote = new TreeMap<String, String>();
        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
        // remote.put("charset", "UTF-8");
        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
        // remote.put( "namespace", null);
        pgProperties = remote;
    }
    @Override
    protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
        return new OracleIncrementPostGISJobContext(getDataPath(),
            getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
    }
    protected void createTargetDataStore() throws JobExecutionException {
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
        /*
        if (!isDriverFound())
        {
            throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER);
        }
        */
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
        }
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
        }
        /*
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
        }
        */
        if (!dataStoreFactory.canProcess(pgProperties)) {
            getLogger().warn("cannot process properties-");
            throw new JobExecutionException("cannot process properties-");
        }
        try {
            targetDataStore = dataStoreFactory.createDataStore(pgProperties);
        } catch (IOException e) {
            getLogger().warn(e.getMessage(), e);
            throw new JobExecutionException(e.getMessage(), e);
        }
    }
    @Override
    protected void disconnect() {
        super.disconnect();
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
    }
    private void logTimeDiff(String message, long tBefore, long tCurrent) {
        logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
            (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
    }
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // Every job has its own job detail
        JobDetail jobDetail = context.getJobDetail();
        // The name is defined in the job definition
        String jobName = jobDetail.getKey().getName();
        // Log the time the job started
        logger.info(jobName + " fired at " + new Date());
        createSourceDataStore();
        createTargetDataStore();
        if (getSourceDataStore() == null) {
            logger.warn("Cannot connect source oracle database.");
            throw new JobExecutionException("Cannot connect source oracle database.");
        }
        if (getTargetDataStore() == null) {
            logger.warn("Cannot connect source postgreSQL database.");
            throw new JobExecutionException("Cannot connect source postgreSQL database.");
        }
        if (isProfileMode()) {
            queryTime = 0;
        }
        long t1 = System.currentTimeMillis();
        String targetSchemaName, targetThemeTable;
        try {
            logger.info("-- step:incrementConvertOracleDB --");
            targetSchemaName = determineCurrentTargetSchemaName();
            if (targetSchemaName == null) return;
            OracleIncrementPostGISJobContext jobContext = null;
            jobContext = (OracleIncrementPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
                isProfileMode(), isTransformed());
            jobContext.setSourceDataStore(getSourceDataStore());
            jobContext.setElementLogging(checkElementLogging());
            jobContext.setExecutionContext(context);
            long tStep = System.currentTimeMillis();
            fetchTPData(jobContext);
            logger.info("TPC DIST:" + jobContext.getDistId() + ":" +
                ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName()));
            if (isProfileMode()) {
                long tStepEnd = System.currentTimeMillis();
                logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd);
            }
            if (isProfileMode()) {
                jobContext.resetProcessTime();
                jobContext.resetUpdateTime();
            }
            tStep = System.currentTimeMillis();
            exetcuteIncrementConvert(jobContext, _dataPath);
            //close all open filewriter instance
            jobContext.closeFeatureWriter();
            if (isProfileMode()) {
                logger.warn("Profile-Current Query Oracle Cost-" +
                    ((int) ((getQueryTime()) / 60000.0)) + " min - " +
                    (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec");
                long tStepEnd = System.currentTimeMillis();
                logger.warn("Profile-Current Process Cost-" +
                    ((int) ((getProcessTime()) / 60000.0)) + " min - " +
                    (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec");
                logger.warn("Profile-Current Update Cost-" +
                    ((int) ((getUpdateTime()) / 60000.0)) + " min - " +
                    (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec");
                logger.warn("Profile-Current JobContext Process Cost-" +
                    ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " +
                    (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
                logger.warn("Profile-Current JobContext Update Cost-" +
                    ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " +
                    (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
                logTimeDiff("Profile-Convert[ Increment ]", tStep, tStepEnd);
                resetQueryTime();
                resetProcessTime();
                resetUpdateTime();
            }
            jobContext.closeOracleConnection();
            long t2 = System.currentTimeMillis();
            // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
            // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
            logTimeDiff("Total ", t1, t2);
        } catch (SQLException e) {
            disconnect();
            logger.warn(e.getMessage(), e);
            throw new JobExecutionException("Database error. " + e.getMessage(), e);
        } catch (IOException ex) {
            disconnect();
            logger.warn(ex.getMessage(), ex);
            throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
        } finally {
            disconnect();
        }
        logger.warn(jobName + " end at " + new Date());
    }
    private String determineCurrentTargetSchemaName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetSchema = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XGVERSIONTABLE_NAME
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            rs.close();
            if (needCreate) return null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vsschema, vsstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vsid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vsschema");
                values[1] = rs.getShort("vsstatus");
                tmpSchemas.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current != -1) {
                Object[] values = tmpSchemas.get(current);
                targetSchema = (String) values[0];
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetSchema;
    }
    public String encodeSchemaTableName(String schemaName, String tableName) {
        if (schemaName == null)
            return "\"" + tableName + "\"";
        return "\"" + schemaName + "\".\"" + tableName + "\"";
    }
    /**
     * CREATE TABLE CMMS_POSTDB.GEO_EXCHANGE
     * (
     *   ID           NUMBER                           NOT NULL,
     *   TAG_LUFID    NUMBER(10)                       NOT NULL,
     *   TAG_SFSC     NUMBER(5)                        NOT NULL,
     *   TAG_BCOMPID  NUMBER(3)                        NOT NULL,
     *   TAG_SOCCID   NUMBER(5)                        NOT NULL,
     *   STATUS       NUMBER(3)                        NOT NULL,
     *   IGDSELM      BLOB,
     *   UPDATETIME   DATE                             DEFAULT sysdate  NOT NULL,
     *   TASKID       NUMBER(10)                       NOT NULL,
     *   ISEXCHANGE   NUMBER                           DEFAULT 0  NOT NULL
     * )
     *
     * STATUS 欄位 :0:新增  2:編輯  3:刪除設備   4:刪除元件
     * ISEXCHANGE   欄位:0 未同步 1已同步  或者已同步就刪除
     *
     *
     * @param jobContext
     * @param targetSchemaName
     * @throws SQLException
     */
    private void exetcuteIncrementConvert(OracleIncrementPostGISJobContext jobContext, String targetSchemaName) throws SQLException {
        Connection connection = jobContext.getOracleConnection();
        if (connection == null) {
            logger.warn("Cannot Get Oracle Connection for DMMS.");
            return;
        }
        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
        int exchangeCount = fetchExchangeCount(connection);
        try {
            processIncrementElement(jobContext);
            // jobContext.setCurrentSchema(querySchema);
        } finally {
        }
    }
    private int fetchExchangeCount(Connection connection) throws SQLException {
        // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0
        Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ResultSet rs = null;
        StringBuilder sbSQL = new StringBuilder();
        sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0");
        int size = -1;
        try {
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            if (rs.next()) {
                size = (int) rs.getLong(1);
            }
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
        return size;
    }
    static class IncrementRecord {
        Element element;
    };
    private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException {
        Connection connection = jobContext.getOracleConnection();
        // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM
        //  FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0
        String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " +
            "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0";
        //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
        PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
        String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"});
        Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        stmtSrc.setFetchSize(FETCHSIZE);
        ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
        int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
        while (rsSrc.next()) {
            if (isProfileMode()) {
                markQueryTime();
            }
            ElementTransactionContext xContext = new ElementTransactionContext();
            xContext.oid = rsSrc.getInt(1);
            xContext.cid = (short) rsSrc.getInt(2);
            xContext.compid = (short) rsSrc.getInt(3);
            xContext.occid = (short) rsSrc.getInt(4);
            xContext.transcationType = rsSrc.getInt(5);
            xContext.taskid = rsSrc.getInt(6);
            try {
                if (xContext.transcationType > 2) {
                    byte[] raw = null;
                    if (igdsMetaType == Types.BLOB) {
                        BLOB blob = (BLOB) rsSrc.getBlob(7);
                        try {
                            raw = getBytesFromBLOB(blob);
                        } catch (BufferOverflowException e) {
                            logger.warn("Wrong Element Structure-", e);
                        } finally {
                            // blob.close();
                        }
                    } else {
                        raw = rsSrc.getBytes(7);
                    }
                    if (raw != null) {
                        Element element = fetchBinaryElement(raw);
                        if (isProfileMode()) {
                            accumulateQueryTime();
                        }
                        xContext.element = element;
                    } else {
                        if (isProfileMode()) {
                            accumulateQueryTime();
                        }
                    }
                }
                jobContext.putFeatureCollection(xContext);
            } catch (Dgn7fileException e) {
                logger.warn("Dgn7Exception", e);
            }
        }
        JDBCUtils.close(rsSrc);
        JDBCUtils.close(stmtSrc);
    }
    // Binary to Element
    private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException {
        ByteBuffer buffer = ByteBuffer.wrap(raws);
        buffer.order(ByteOrder.LITTLE_ENDIAN);
        short signature = buffer.getShort();
        // byte type = (byte) (buffer.get() & 0x7f);
        byte type = (byte) ((signature >>> 8) & 0x007f);
        // silly Bentley say contentLength is in 2-byte words
        // and ByteByffer uses raws.
        // track the record location
        int elementLength = (buffer.getShort() * 2) + 4;
        ElementType recordType = ElementType.forID(type);
        IElementHandler handler;
        handler = recordType.getElementHandler();
        Element dgnElement = (Element) handler.read(buffer, signature, elementLength);
        if (recordType.isComplexElement() && (elementLength < raws.length)) {
            int offset = elementLength;
            while (offset < (raws.length - 4)) {
                buffer.position(offset);
                signature = buffer.getShort();
                type = (byte) ((signature >>> 8) & 0x007f);
                elementLength = (buffer.getShort() * 2) + 4;
                if (raws.length < (offset + elementLength)) {
                    logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit());
                    break;
                }
                recordType = ElementType.forID(type);
                handler = recordType.getElementHandler();
                if (handler != null) {
                    Element subElement = (Element) handler.read(buffer, signature, elementLength);
                    ((ComplexElement) dgnElement).add(subElement);
                    offset += elementLength;
                } else {
                    byte[] remain = new byte[buffer.remaining()];
                    System.arraycopy(raws, offset, remain, 0, buffer.remaining());
                    for (int i = 0; i < remain.length; i++) {
                        if (remain[i] != 0) {
                            logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]);
                        }
                    }
                    break;
                }
            }
        }
        return dgnElement;
    }
}