forked from geodmms/xdgnjobs

Dennis Kao
2013-12-23 5b261e3bb6b78f60993243a653caf0e35fc5fb34
update geotools version
3 files modified
452 ■■■■■ changed files
xdgnjobs/pom.xml 2 ●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java 66 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java 384 ●●●●● patch | view | raw | blame | history
xdgnjobs/pom.xml
@@ -19,7 +19,7 @@
    <src.output>${basedir}/target</src.output>
    <java5>1.6</java5>
    <xdgnio.version>2.1.1</xdgnio.version>
    <gt.version>10.2.x</gt.version>
    <gt.version>10.3.x</gt.version>
    <failIfNoTests>false</failIfNoTests>
    <stress.skip.pattern></stress.skip.pattern>
    <online.skip.pattern></online.skip.pattern>
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java
@@ -1,7 +1,65 @@
package com.ximple.eofms.jobs;
/**
 * Created by ulysseskao on 2013/12/23.
 */
public class OracleIncrementDgn2PostGISJob {
import java.util.Map;
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
import com.ximple.eofms.jobs.context.postgis.OracleIncrementPostGISJobContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.geotools.data.DataStore;
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
import org.geotools.jdbc.JDBCDataStore;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class OracleIncrementDgn2PostGISJob extends AbstractOracleDatabaseJob {
    final static Log logger = LogFactory.getLog(OracleIncrementDgn2PostGISJob.class);
    private static final String PGHOST = "PGHOST";
    private static final String PGDATBASE = "PGDATBASE";
    private static final String PGPORT = "PGPORT";
    private static final String PGSCHEMA = "PGSCHEMA";
    private static final String PGUSER = "PGUSER";
    private static final String PGPASS = "PGPASS";
    private static final String USEWKB = "USEWKB";
    private static final int FETCHSIZE = 30;
    private static final int COMMITSIZE = 100;
    protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
    protected String _pgHost;
    protected String _pgDatabase;
    protected String _pgPort;
    protected String _pgSchema;
    protected String _pgUsername;
    protected String _pgPassword;
    protected String _pgUseWKB;
    protected Map<String, String> pgProperties;
    protected JDBCDataStore targetDataStore;
    private long queryTime = 0;
    private long queryTimeStart = 0;
    @Override
    public Log getLogger() {
        return logger;
    }
    public DataStore getTargetDataStore() {
        return targetDataStore;
    }
    @Override
    protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
        return new OracleIncrementPostGISJobContext(getDataPath(),
            getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
    }
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
    }
}
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java
@@ -1,7 +1,387 @@
package com.ximple.eofms.jobs.context.postgis;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.util.Assert;
import com.ximple.eofms.filter.AbstractFLinkageDispatchableFilter;
import com.ximple.eofms.filter.CreateFeatureTypeEventListener;
import com.ximple.eofms.filter.ElementDispatcher;
import com.ximple.eofms.filter.FeatureTypeEvent;
import com.ximple.eofms.jobs.OracleElementLogger;
import com.ximple.eofms.util.ElementDigesterUtils;
import com.ximple.io.dgn7.ComplexElement;
import com.ximple.io.dgn7.Element;
import com.ximple.io.dgn7.FrammeAttributeData;
import org.apache.commons.digester3.Digester;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.transaction.util.CommonsLoggingLogger;
import org.apache.commons.transaction.util.LoggerFacade;
import org.geotools.data.DataStore;
import org.geotools.data.Transaction;
import org.geotools.data.jdbc.JDBCUtils;
import org.geotools.feature.SchemaException;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.postgresql.util.PSQLException;
import org.quartz.JobExecutionContext;
import org.xml.sax.SAXException;
public class OracleIncrementPostGISJobContext extends AbstractOracleToPostGISJobContext
    implements CreateFeatureTypeEventListener {
    static Log logger = LogFactory.getLog(OracleConvertPostGISJobContext.class);
    static final LoggerFacade sLogger = new CommonsLoggingLogger(logger);
    // static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
    private OracleElementLogger elmLogger = null;
    static {
        try {
            DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());
        } catch (SQLException e) {
            Assert.shouldNeverReachHere(e.getMessage());
        }
    }
    private String _filterConfig;
    private ElementDispatcher elementDispatcher;
    private HashMap<SimpleFeatureType, ArrayList<SimpleFeature>> txFeaturesContext = new HashMap<SimpleFeatureType, ArrayList<SimpleFeature>>();
    private JobExecutionContext executionContext;
    private String currentSchema = null;
    private boolean schemaChanged = false;
    private boolean dropTableMode = true;
    private int accumulate = 0;
    public OracleIncrementPostGISJobContext(String dataPath, DataStore pgDS, String targetSchema, String filterConfig,
                                          boolean profileMode, boolean useTransform) {
        super(dataPath, pgDS, targetSchema, profileMode, useTransform);
        _filterConfig = filterConfig;
        elementDispatcher = createElementDispatcher();
        elementDispatcher.addCreateFeatureTypeEventListener(this);
        // txFeaturesContext = new PessimisticMapWrapper(featuresContext, sLogger);
    }
    private ElementDispatcher createElementDispatcher() {
        try {
            URL filterURL = null;
            if (_filterConfig != null) {
                File config = new File(_filterConfig);
                if (config.exists()) {
                    filterURL = config.toURI().toURL();
                }
            }
            if (filterURL == null) {
                // config = new File("conf/DefaultConvertShpFilter.xml");
                filterURL = this.getClass().getResource("/conf/DefaultConvertShpFilter.xml");
                // filterURL = this.getClass().getResource("/conf/ConvertShpFilterForLevel.xml");
            }
            assert filterURL != null;
            Digester digester = ElementDigesterUtils.getElementDigester();
            return (ElementDispatcher) digester.parse(filterURL);
        } catch (UnsupportedEncodingException e) {
            logger.info(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        } catch (MalformedURLException e) {
            logger.info(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        } catch (IOException e) {
            logger.info(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        } catch (SAXException e) {
            logger.info(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }
    public void putFeatureCollection(Element element) {
        assert elementDispatcher != null;
        // 判斷是否符和條件
        SimpleFeature feature = elementDispatcher.execute(element, getDistId(), isTransformed());
        if (feature == null) {
            boolean isEmptySize = false;
            FrammeAttributeData linkage =
                AbstractFLinkageDispatchableFilter.getFeatureLinkage(element);
            logger.warn("Unknown Element:" + element.getElementType().toString() +
                ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" +
                (linkage == null ? "NULL" : "FSC=" + (linkage.getFsc() + "|COMPID=" + linkage.getComponentID())));
            if (element instanceof ComplexElement) {
                ComplexElement complex = (ComplexElement) element;
                logger.warn("----Complex Element size=" + complex.size() + ":" +
                    (linkage == null ? "NULL" : (linkage.getUfid())));
                if (complex.size() == 0)
                    isEmptySize = true;
            }
            if (getElementLogging() && (!isEmptySize)) {
                getElementLogger().logElement(element, getCurrentSchema());
            }
            return;
        }
        if (((Geometry)feature.getDefaultGeometry()).isEmpty()) {
            boolean isEmptySize = false;
            FrammeAttributeData linkage =
                AbstractFLinkageDispatchableFilter.getFeatureLinkage(element);
            logger.warn("Empty Geom Element:" + element.getElementType().toString() +
                ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" +
                (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID())));
            if (element instanceof ComplexElement) {
                ComplexElement complex = (ComplexElement) element;
                logger.warn("----Complex Element size=" + complex.size() + ":" +
                    (linkage == null ? "NULL" : (linkage.getUfid())));
                if (complex.size() == 0)
                    isEmptySize = true;
            }
            if (getElementLogging() && (!isEmptySize)) {
                getElementLogger().logElement(element, getCurrentSchema());
            }
            return;
        }
        if (!txFeaturesContext.containsKey(feature.getFeatureType())) {
            txFeaturesContext.put(feature.getFeatureType(), new ArrayList<SimpleFeature>());
        }
        ArrayList<SimpleFeature> arrayList = txFeaturesContext.get(feature.getFeatureType());
        arrayList.add(feature);
        accumulate++;
        if (accumulate > BATCHSIZE) {
            commitTransaction();
        }
    }
    public void startTransaction() {
    }
    public void commitTransaction() {
        if (!txFeaturesContext.isEmpty()) {
            logger.debug("Transaction size = " + txFeaturesContext.size());
            //txFeaturesContext.commitTransaction();
        } else {
            logger.debug("Transaction is empty.");
        }
        if (!txFeaturesContext.isEmpty()) {
            updateDataStore();
        }
        if (this.getElementLogger() != null)
            this.getElementLogger().flashLogging();
    }
    public void rollbackTransaction() {
    }
    public void resetFeatureContext() {
        txFeaturesContext.clear();
    }
    private void updateDataStore() {
        if (isProfileMode()) markUpdateTime();
        Iterator<SimpleFeatureType> it = txFeaturesContext.keySet().iterator();
        Connection conn = null;
        try {
            conn = getConnection();
            boolean autoCommit = conn.getAutoCommit();
            conn.setAutoCommit(false);
            while (it.hasNext()) {
                SimpleFeatureType featureType = it.next();
                logger.debug("Begin Save into PostGIS:" + featureType.getTypeName());
                int batchCount = 0;
                String bindingStmt = makePrepareInsertSql(featureType);
                ArrayList<SimpleFeature> features = txFeaturesContext.get(featureType);
                PreparedStatement pstmt = conn.prepareStatement(bindingStmt);
                for (SimpleFeature feature : features) {
                    try {
                        // stmt.execute(feature);
                        bindFeatureParameters(pstmt, feature);
                        // pstmt.executeUpdate();
                        pstmt.addBatch();
                    } catch (PSQLException e) {
                        if (bindingStmt != null) {
                            logger.error("Execute:" + bindingStmt);
                        }
                        logger.error(e.getServerErrorMessage());
                        logger.error(e.getMessage(), e);
                    } catch (NullPointerException e) {
                        if (bindingStmt != null) {
                            logger.error("Execute:" + bindingStmt);
                        }
                        logger.error(feature.toString());
                        logger.error(e.getMessage(), e);
                    } catch (ClassCastException e) {
                        if (bindingStmt != null) {
                            logger.error("Execute:" + bindingStmt);
                        }
                        for (int i = 0; i < feature.getAttributeCount(); i++) {
                            logger.info("attr[" + i + "]-" + ((feature.getAttribute(i) == null) ? " NULL" :
                                feature.getAttribute(i).toString()));
                        }
                        logger.error(e.getMessage(), e);
                    }
                    batchCount++;
                }
                int[] numUpdates = pstmt.executeBatch();
                for (int i = 0; i < numUpdates.length; i++) {
                    if (numUpdates[i] == -2)
                        logger.warn("Execution " + i + ": unknown number of rows updated");
                }
                conn.commit();
                pstmt.close();
                features.clear();
                logger.debug("End Save into PostGIS:" + featureType.getTypeName());
            }
            conn.setAutoCommit(autoCommit);
            JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null);
            accumulate = 0;
        } catch (BatchUpdateException e) {
            JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e);
            logger.error(e.getMessage(), e);
            SQLException ex;
            while ((ex = e.getNextException()) != null) {
                // logger.warn(ex.getMessage(), ex);
                logger.warn(ex.getMessage());
            }
        } catch (SQLException e) {
            JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e);
            logger.error(e.getMessage(), e);
        } finally {
            if (isProfileMode()) accumulateUpdateTime();
        }
    }
    public JobExecutionContext getExecutionContext() {
        return executionContext;
    }
    public void setExecutionContext(JobExecutionContext context) {
        executionContext = context;
    }
/**
 * Created by ulysseskao on 2013/12/23.
     * �����]�Ƽg�J��
     *
     * @throws IOException IO�o�Ϳ�~
 */
public class OracleIncrementPostGISJobContext {
    public void closeFeatureWriter() throws IOException {
    }
    protected OracleElementLogger getElementLogger() {
        if (elmLogger == null) {
            elmLogger = new OracleElementLogger(getOracleConnection());
            elmLogger.setDataPath(this.getDataPath());
        }
        return elmLogger;
    }
    public String getCurrentSchema() {
        return currentSchema;
    }
    public void setCurrentSchema(String querySchema) {
        this.currentSchema = querySchema;
        this.schemaChanged = true;
    }
    protected Log getLogger() {
        return logger;
    }
    public boolean isDropTableMode() {
        return dropTableMode;
    }
    public void setDropTableMode(boolean dropTableMode) {
        this.dropTableMode = dropTableMode;
    }
    public void createFeatureTypeOccurred(FeatureTypeEvent evt) {
        try {
            createOrClearFeatureDataTable(evt.getFeatureType());
        } catch (SchemaException e) {
            logger.warn(e.getMessage(), e);
        }
    }
    protected void createOrClearFeatureDataTable(SimpleFeatureType featureType) throws SchemaException {
        String featureName = featureType.getTypeName();
        Connection conn = null;
        if (isExistFeature(featureType)) {
            try {
                conn = getConnection();
                if (dropTableMode) {
                    dropGeometryColumn(conn, getTargetSchema(), featureName,
                                       (featureType).getGeometryDescriptor().getName().getLocalPart());
                    dropTable(conn, getTargetSchema(), featureName);
                    ArrayList<String> schemaTexts = createNewSchemaTexts(conn, featureType);
                    for (String stmtText : schemaTexts) {
                        Statement stmt = conn.createStatement();
                        stmt.execute(stmtText);
                        JDBCUtils.close(stmt);
                    }
                } else {
                    deleteTable(conn, getTargetSchema(), featureName);
                }
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            } catch (SQLException e) {
                logger.warn(e.getMessage(), e);
            } finally {
                JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null);
            }
        } else {
            String tempStmt = null;
            try {
                conn = getConnection();
                ArrayList<String> schemaTexts = createNewSchemaTexts(conn, featureType);
                for (String stmtText : schemaTexts) {
                    Statement stmt = conn.createStatement();
                    tempStmt = stmtText;
                    stmt.execute(stmtText);
                    stmt.close();
                }
                JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null);
            } catch (IOException e) {
                JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null);
                logger.warn("RUN--" + tempStmt);
                logger.warn(e.getMessage(), e);
            } catch (SQLException e) {
                JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e);
                logger.warn("RUN--" + tempStmt);
                logger.warn(e.getMessage(), e);
            }
        }
    }
    public boolean isSchemaChanged() {
        return schemaChanged;
    }
}