| | |
| | | package com.ximple.eofms.jobs.context.postgis; |
| | | |
| | | /** |
| | | * Created by ulysseskao on 2013/12/23. |
| | | */ |
| | | public class OracleIncrementPostGISJobContext { |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.net.MalformedURLException; |
| | | import java.net.URL; |
| | | import java.sql.BatchUpdateException; |
| | | import java.sql.Connection; |
| | | import java.sql.DriverManager; |
| | | import java.sql.PreparedStatement; |
| | | import java.sql.SQLException; |
| | | import java.sql.Statement; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | |
| | | import com.vividsolutions.jts.geom.Geometry; |
| | | import com.vividsolutions.jts.util.Assert; |
| | | import com.ximple.eofms.filter.AbstractFLinkageDispatchableFilter; |
| | | import com.ximple.eofms.filter.CreateFeatureTypeEventListener; |
| | | import com.ximple.eofms.filter.ElementDispatcher; |
| | | import com.ximple.eofms.filter.FeatureTypeEvent; |
| | | import com.ximple.eofms.jobs.OracleElementLogger; |
| | | import com.ximple.eofms.util.ElementDigesterUtils; |
| | | import com.ximple.io.dgn7.ComplexElement; |
| | | import com.ximple.io.dgn7.Element; |
| | | import com.ximple.io.dgn7.FrammeAttributeData; |
| | | import org.apache.commons.digester3.Digester; |
| | | import org.apache.commons.logging.Log; |
| | | import org.apache.commons.logging.LogFactory; |
| | | import org.apache.commons.transaction.util.CommonsLoggingLogger; |
| | | import org.apache.commons.transaction.util.LoggerFacade; |
| | | import org.geotools.data.DataStore; |
| | | import org.geotools.data.Transaction; |
| | | import org.geotools.data.jdbc.JDBCUtils; |
| | | import org.geotools.feature.SchemaException; |
| | | import org.opengis.feature.simple.SimpleFeature; |
| | | import org.opengis.feature.simple.SimpleFeatureType; |
| | | import org.postgresql.util.PSQLException; |
| | | import org.quartz.JobExecutionContext; |
| | | import org.xml.sax.SAXException; |
| | | |
| | | public class OracleIncrementPostGISJobContext extends AbstractOracleToPostGISJobContext |
| | | implements CreateFeatureTypeEventListener { |
| | | |
| | | static Log logger = LogFactory.getLog(OracleConvertPostGISJobContext.class); |
| | | static final LoggerFacade sLogger = new CommonsLoggingLogger(logger); |
| | | |
| | | // static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); |
| | | |
| | | private OracleElementLogger elmLogger = null; |
| | | |
| | | static { |
| | | try { |
| | | DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); |
| | | } catch (SQLException e) { |
| | | Assert.shouldNeverReachHere(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private String _filterConfig; |
| | | |
| | | private ElementDispatcher elementDispatcher; |
| | | |
| | | private HashMap<SimpleFeatureType, ArrayList<SimpleFeature>> txFeaturesContext = new HashMap<SimpleFeatureType, ArrayList<SimpleFeature>>(); |
| | | |
| | | private JobExecutionContext executionContext; |
| | | |
| | | private String currentSchema = null; |
| | | private boolean schemaChanged = false; |
| | | private boolean dropTableMode = true; |
| | | private int accumulate = 0; |
| | | |
| | | public OracleIncrementPostGISJobContext(String dataPath, DataStore pgDS, String targetSchema, String filterConfig, |
| | | boolean profileMode, boolean useTransform) { |
| | | super(dataPath, pgDS, targetSchema, profileMode, useTransform); |
| | | _filterConfig = filterConfig; |
| | | elementDispatcher = createElementDispatcher(); |
| | | elementDispatcher.addCreateFeatureTypeEventListener(this); |
| | | // txFeaturesContext = new PessimisticMapWrapper(featuresContext, sLogger); |
| | | } |
| | | |
| | | private ElementDispatcher createElementDispatcher() { |
| | | try { |
| | | URL filterURL = null; |
| | | if (_filterConfig != null) { |
| | | File config = new File(_filterConfig); |
| | | if (config.exists()) { |
| | | filterURL = config.toURI().toURL(); |
| | | } |
| | | } |
| | | if (filterURL == null) { |
| | | // config = new File("conf/DefaultConvertShpFilter.xml"); |
| | | filterURL = this.getClass().getResource("/conf/DefaultConvertShpFilter.xml"); |
| | | // filterURL = this.getClass().getResource("/conf/ConvertShpFilterForLevel.xml"); |
| | | } |
| | | assert filterURL != null; |
| | | Digester digester = ElementDigesterUtils.getElementDigester(); |
| | | return (ElementDispatcher) digester.parse(filterURL); |
| | | } catch (UnsupportedEncodingException e) { |
| | | logger.info(e.getMessage(), e); |
| | | throw new RuntimeException(e.getMessage(), e); |
| | | } catch (MalformedURLException e) { |
| | | logger.info(e.getMessage(), e); |
| | | throw new RuntimeException(e.getMessage(), e); |
| | | } catch (IOException e) { |
| | | logger.info(e.getMessage(), e); |
| | | throw new RuntimeException(e.getMessage(), e); |
| | | } catch (SAXException e) { |
| | | logger.info(e.getMessage(), e); |
| | | throw new RuntimeException(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | public void putFeatureCollection(Element element) { |
| | | assert elementDispatcher != null; |
| | | // 判斷是否符和條件 |
| | | SimpleFeature feature = elementDispatcher.execute(element, getDistId(), isTransformed()); |
| | | if (feature == null) { |
| | | boolean isEmptySize = false; |
| | | FrammeAttributeData linkage = |
| | | AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); |
| | | logger.warn("Unknown Element:" + element.getElementType().toString() + |
| | | ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + |
| | | (linkage == null ? "NULL" : "FSC=" + (linkage.getFsc() + "|COMPID=" + linkage.getComponentID()))); |
| | | |
| | | if (element instanceof ComplexElement) { |
| | | ComplexElement complex = (ComplexElement) element; |
| | | logger.warn("----Complex Element size=" + complex.size() + ":" + |
| | | (linkage == null ? "NULL" : (linkage.getUfid()))); |
| | | if (complex.size() == 0) |
| | | isEmptySize = true; |
| | | } |
| | | |
| | | if (getElementLogging() && (!isEmptySize)) { |
| | | getElementLogger().logElement(element, getCurrentSchema()); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | if (((Geometry)feature.getDefaultGeometry()).isEmpty()) { |
| | | boolean isEmptySize = false; |
| | | FrammeAttributeData linkage = |
| | | AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); |
| | | logger.warn("Empty Geom Element:" + element.getElementType().toString() + |
| | | ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + |
| | | (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID()))); |
| | | |
| | | if (element instanceof ComplexElement) { |
| | | ComplexElement complex = (ComplexElement) element; |
| | | logger.warn("----Complex Element size=" + complex.size() + ":" + |
| | | (linkage == null ? "NULL" : (linkage.getUfid()))); |
| | | if (complex.size() == 0) |
| | | isEmptySize = true; |
| | | } |
| | | |
| | | if (getElementLogging() && (!isEmptySize)) { |
| | | getElementLogger().logElement(element, getCurrentSchema()); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | if (!txFeaturesContext.containsKey(feature.getFeatureType())) { |
| | | txFeaturesContext.put(feature.getFeatureType(), new ArrayList<SimpleFeature>()); |
| | | } |
| | | ArrayList<SimpleFeature> arrayList = txFeaturesContext.get(feature.getFeatureType()); |
| | | arrayList.add(feature); |
| | | accumulate++; |
| | | if (accumulate > BATCHSIZE) { |
| | | commitTransaction(); |
| | | } |
| | | } |
| | | |
| | | public void startTransaction() { |
| | | } |
| | | |
| | | public void commitTransaction() { |
| | | if (!txFeaturesContext.isEmpty()) { |
| | | logger.debug("Transaction size = " + txFeaturesContext.size()); |
| | | //txFeaturesContext.commitTransaction(); |
| | | } else { |
| | | logger.debug("Transaction is empty."); |
| | | } |
| | | |
| | | if (!txFeaturesContext.isEmpty()) { |
| | | updateDataStore(); |
| | | } |
| | | |
| | | if (this.getElementLogger() != null) |
| | | this.getElementLogger().flashLogging(); |
| | | } |
| | | |
| | | public void rollbackTransaction() { |
| | | } |
| | | |
| | | public void resetFeatureContext() { |
| | | txFeaturesContext.clear(); |
| | | } |
| | | |
| | | private void updateDataStore() { |
| | | if (isProfileMode()) markUpdateTime(); |
| | | Iterator<SimpleFeatureType> it = txFeaturesContext.keySet().iterator(); |
| | | Connection conn = null; |
| | | try { |
| | | conn = getConnection(); |
| | | boolean autoCommit = conn.getAutoCommit(); |
| | | conn.setAutoCommit(false); |
| | | |
| | | while (it.hasNext()) { |
| | | SimpleFeatureType featureType = it.next(); |
| | | logger.debug("Begin Save into PostGIS:" + featureType.getTypeName()); |
| | | |
| | | int batchCount = 0; |
| | | String bindingStmt = makePrepareInsertSql(featureType); |
| | | ArrayList<SimpleFeature> features = txFeaturesContext.get(featureType); |
| | | PreparedStatement pstmt = conn.prepareStatement(bindingStmt); |
| | | |
| | | for (SimpleFeature feature : features) { |
| | | try { |
| | | // stmt.execute(feature); |
| | | bindFeatureParameters(pstmt, feature); |
| | | // pstmt.executeUpdate(); |
| | | pstmt.addBatch(); |
| | | } catch (PSQLException e) { |
| | | if (bindingStmt != null) { |
| | | logger.error("Execute:" + bindingStmt); |
| | | } |
| | | logger.error(e.getServerErrorMessage()); |
| | | logger.error(e.getMessage(), e); |
| | | } catch (NullPointerException e) { |
| | | if (bindingStmt != null) { |
| | | logger.error("Execute:" + bindingStmt); |
| | | } |
| | | logger.error(feature.toString()); |
| | | logger.error(e.getMessage(), e); |
| | | } catch (ClassCastException e) { |
| | | if (bindingStmt != null) { |
| | | logger.error("Execute:" + bindingStmt); |
| | | } |
| | | for (int i = 0; i < feature.getAttributeCount(); i++) { |
| | | logger.info("attr[" + i + "]-" + ((feature.getAttribute(i) == null) ? " NULL" : |
| | | feature.getAttribute(i).toString())); |
| | | } |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | batchCount++; |
| | | } |
| | | |
| | | int[] numUpdates = pstmt.executeBatch(); |
| | | for (int i = 0; i < numUpdates.length; i++) { |
| | | if (numUpdates[i] == -2) |
| | | logger.warn("Execution " + i + ": unknown number of rows updated"); |
| | | } |
| | | conn.commit(); |
| | | |
| | | pstmt.close(); |
| | | features.clear(); |
| | | logger.debug("End Save into PostGIS:" + featureType.getTypeName()); |
| | | } |
| | | conn.setAutoCommit(autoCommit); |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); |
| | | accumulate = 0; |
| | | } catch (BatchUpdateException e) { |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); |
| | | logger.error(e.getMessage(), e); |
| | | SQLException ex; |
| | | while ((ex = e.getNextException()) != null) { |
| | | // logger.warn(ex.getMessage(), ex); |
| | | logger.warn(ex.getMessage()); |
| | | } |
| | | } catch (SQLException e) { |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); |
| | | logger.error(e.getMessage(), e); |
| | | } finally { |
| | | if (isProfileMode()) accumulateUpdateTime(); |
| | | } |
| | | } |
| | | |
| | | public JobExecutionContext getExecutionContext() { |
| | | return executionContext; |
| | | } |
| | | |
| | | public void setExecutionContext(JobExecutionContext context) { |
| | | executionContext = context; |
| | | } |
| | | |
| | | /** |
| | | * �����]�Ƽg�J�� |
| | | * |
| | | * @throws IOException IO�o�Ϳ�~ |
| | | */ |
| | | public void closeFeatureWriter() throws IOException { |
| | | } |
| | | |
| | | protected OracleElementLogger getElementLogger() { |
| | | if (elmLogger == null) { |
| | | elmLogger = new OracleElementLogger(getOracleConnection()); |
| | | elmLogger.setDataPath(this.getDataPath()); |
| | | } |
| | | return elmLogger; |
| | | } |
| | | |
| | | public String getCurrentSchema() { |
| | | return currentSchema; |
| | | } |
| | | |
| | | public void setCurrentSchema(String querySchema) { |
| | | this.currentSchema = querySchema; |
| | | this.schemaChanged = true; |
| | | } |
| | | |
| | | protected Log getLogger() { |
| | | return logger; |
| | | } |
| | | |
| | | public boolean isDropTableMode() { |
| | | return dropTableMode; |
| | | } |
| | | |
| | | public void setDropTableMode(boolean dropTableMode) { |
| | | this.dropTableMode = dropTableMode; |
| | | } |
| | | |
| | | public void createFeatureTypeOccurred(FeatureTypeEvent evt) { |
| | | try { |
| | | createOrClearFeatureDataTable(evt.getFeatureType()); |
| | | } catch (SchemaException e) { |
| | | logger.warn(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | protected void createOrClearFeatureDataTable(SimpleFeatureType featureType) throws SchemaException { |
| | | String featureName = featureType.getTypeName(); |
| | | Connection conn = null; |
| | | if (isExistFeature(featureType)) { |
| | | try { |
| | | conn = getConnection(); |
| | | if (dropTableMode) { |
| | | dropGeometryColumn(conn, getTargetSchema(), featureName, |
| | | (featureType).getGeometryDescriptor().getName().getLocalPart()); |
| | | dropTable(conn, getTargetSchema(), featureName); |
| | | |
| | | ArrayList<String> schemaTexts = createNewSchemaTexts(conn, featureType); |
| | | for (String stmtText : schemaTexts) { |
| | | Statement stmt = conn.createStatement(); |
| | | stmt.execute(stmtText); |
| | | JDBCUtils.close(stmt); |
| | | } |
| | | } else { |
| | | deleteTable(conn, getTargetSchema(), featureName); |
| | | } |
| | | } catch (IOException e) { |
| | | logger.warn(e.getMessage(), e); |
| | | } catch (SQLException e) { |
| | | logger.warn(e.getMessage(), e); |
| | | } finally { |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); |
| | | } |
| | | } else { |
| | | String tempStmt = null; |
| | | try { |
| | | conn = getConnection(); |
| | | ArrayList<String> schemaTexts = createNewSchemaTexts(conn, featureType); |
| | | for (String stmtText : schemaTexts) { |
| | | Statement stmt = conn.createStatement(); |
| | | tempStmt = stmtText; |
| | | stmt.execute(stmtText); |
| | | stmt.close(); |
| | | } |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); |
| | | } catch (IOException e) { |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); |
| | | logger.warn("RUN--" + tempStmt); |
| | | logger.warn(e.getMessage(), e); |
| | | } catch (SQLException e) { |
| | | JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); |
| | | logger.warn("RUN--" + tempStmt); |
| | | logger.warn(e.getMessage(), e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | public boolean isSchemaChanged() { |
| | | return schemaChanged; |
| | | } |
| | | } |