package com.ximple.eofms.jobs.context.postgis; import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import org.apache.commons.digester.Digester; import org.apache.commons.digester.xmlrules.DigesterLoader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.transaction.memory.PessimisticMapWrapper; import org.apache.commons.transaction.util.CommonsLoggingLogger; import org.apache.commons.transaction.util.LoggerFacade; import org.geotools.data.DataStore; import org.geotools.data.FeatureWriter; import org.geotools.data.Transaction; import org.geotools.data.postgis.PostgisDataStoreFactory; import org.geotools.feature.Feature; import org.geotools.feature.FeatureType; import org.geotools.feature.IllegalAttributeException; import org.geotools.feature.SimpleFeature; import org.geotools.feature.FeatureTypeBuilder; import org.geotools.feature.SchemaException; import org.quartz.JobExecutionContext; import org.xml.sax.SAXException; import org.postgresql.util.PSQLException; import com.vividsolutions.jts.util.Assert; import com.ximple.eofms.filter.AbstractFLinkageDispatchableFilter; import com.ximple.eofms.filter.ElementDispatcher; import com.ximple.eofms.filter.CreateFeatureTypeEventListener; import com.ximple.eofms.filter.FeatureTypeEvent; import com.ximple.eofms.jobs.OracleElementLogger; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Element; import com.ximple.io.dgn7.FrammeAttributeData; public class OracleConvertPostGISJobContext extends AbstractOracleToPostGISJobContext implements CreateFeatureTypeEventListener { static Log logger = LogFactory.getLog(OracleConvertPostGISJobContext.class); static final LoggerFacade sLogger = new CommonsLoggingLogger(logger); static PostgisDataStoreFactory dataStoreFactory = new PostgisDataStoreFactory(); private OracleElementLogger elmLogger = null; static { try { DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); } catch (SQLException e) { Assert.shouldNeverReachHere(e.getMessage()); } } private String _filterConfig; private ElementDispatcher elementDispatcher; // private HashMap featuresContext = new HashMap(); // private HashMap featuresWriterContext = new HashMap(); // private PessimisticMapWrapper txFeaturesContext; private HashMap> txFeaturesContext = new HashMap>(); private JobExecutionContext executionContext; private String currentSchema = null; private String pgCurrentSchema = null; private boolean schemaChanged = false; private boolean dropTableMode = true; public OracleConvertPostGISJobContext(String dataPath, DataStore pgDS, String filterConfig) { super(dataPath, pgDS); _filterConfig = filterConfig; elementDispatcher = createElementDispatcher(); elementDispatcher.addCreateFeatureTypeEventListener(this); // txFeaturesContext = new PessimisticMapWrapper(featuresContext, sLogger); } private ElementDispatcher createElementDispatcher() { try { URL rulesURL = ElementDispatcher.class.getResource("ElementDispatcherRules.xml"); assert rulesURL != null; Digester digester = DigesterLoader.createDigester(rulesURL); URL filterURL = null; if (_filterConfig != null) { File config = new File(_filterConfig); if (config.exists()) { filterURL = config.toURI().toURL(); } } if (filterURL == null) { // config = new File("conf/DefaultConvertShpFilter.xml"); filterURL = this.getClass().getResource("/conf/DefaultConvertShpFilter.xml"); // filterURL = this.getClass().getResource("/conf/ConvertShpFilterForLevel.xml"); } assert filterURL != null; return (ElementDispatcher) digester.parse(filterURL); } catch (UnsupportedEncodingException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (MalformedURLException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (IOException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (SAXException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } public void putFeatureCollection(Element element) { assert elementDispatcher != null; // 判斷是否符和條件 Feature feature = elementDispatcher.execute(element); if (feature == null) { boolean isEmptySize = false; FrammeAttributeData linkage = AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); logger.warn("Unknown Element:" + element.getElementType().toString() + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID()))); if (element instanceof ComplexElement) { ComplexElement complex = (ComplexElement) element; logger.warn("----Complex Element size=" + complex.size() + ":" + (linkage == null ? "NULL" : (linkage.getUfid()))); isEmptySize = true; } if (getElementLogging() && (!isEmptySize)) { getElementLogger().logElement(element, getCurrentSchema()); } return; } if (!txFeaturesContext.containsKey(feature.getFeatureType())) { txFeaturesContext.put(feature.getFeatureType(), new ArrayList()); } ArrayList arrayList = txFeaturesContext.get(feature.getFeatureType()); arrayList.add(makeInsertSql(feature, -1)); } public void startTransaction() { } public void commitTransaction() { if (!txFeaturesContext.isEmpty()) { logger.debug("Transaction size = " + txFeaturesContext.size()); //txFeaturesContext.commitTransaction(); } else { logger.debug("Transaction is empty."); } if (!txFeaturesContext.isEmpty()) { updateDataStore(); } if (this.getElementLogger() != null) this.getElementLogger().flashLogging(); } public void rollbackTransaction() { if (!txFeaturesContext.isEmpty()) { updateDataStore(); } } public void resetFeatureContext() { txFeaturesContext.clear(); } private void updateDataStore() { Iterator it = txFeaturesContext.keySet().iterator(); String currentStmt = null; try { while (it.hasNext()) { FeatureType featureType = it.next(); logger.debug("Begin Save into PostGIS:" + featureType.getTypeName()); ArrayList stmtTexts = txFeaturesContext.get(featureType); Connection conn = getConnection(); boolean autoCommit = conn.getAutoCommit(); conn.setAutoCommit(true); for (String stmtText : stmtTexts) { currentStmt = stmtText; Statement stmt = conn.createStatement(); try { stmt.execute(stmtText); } catch (PSQLException e) { if (currentStmt != null) { logger.error("Execute:" + currentStmt); } logger.error(e.getServerErrorMessage()); logger.error(e.getMessage(), e); } finally { stmt.close(); } } stmtTexts.clear(); conn.setAutoCommit(autoCommit); logger.debug("End Save into PostGIS:" + featureType.getTypeName()); } } catch (SQLException e) { logger.error(e.getMessage(), e); } } public JobExecutionContext getExecutionContext() { return executionContext; } public void setExecutionContext(JobExecutionContext context) { executionContext = context; } /** * 關閉設備寫入器 * * @throws IOException IO發生錯誤 */ public void closeFeatureWriter() throws IOException { } protected OracleElementLogger getElementLogger() { if (elmLogger == null) { elmLogger = new OracleElementLogger(getOracleConnection()); elmLogger.setDataPath(this.getDataPath()); } return elmLogger; } public String getCurrentSchema() { return currentSchema; } public void setCurrentSchema(String querySchema) { this.currentSchema = querySchema; this.schemaChanged = true; } protected Log getLogger() { return logger; } public boolean isDropTableMode() { return dropTableMode; } public void setDropTableMode(boolean dropTableMode) { this.dropTableMode = dropTableMode; } public void createFeatureTypeOccurred(FeatureTypeEvent evt) { try { clearFeatureData(evt.getFeatureType()); } catch (SchemaException e) { logger.warn(e.getMessage(), e); } } protected void clearFeatureData(FeatureType featureType) throws SchemaException { String featureName = featureType.getTypeName(); if (isExistFeature(featureType)) { try { Connection conn = targetDataStore.getConnection(Transaction.AUTO_COMMIT); if (dropTableMode) { dropGeometryColumn(conn, featureName, featureType.getDefaultGeometry().getLocalName()); dropTable(conn, featureName); ArrayList schemaTexts = createSchemaTexts(featureType); for (String stmtText : schemaTexts) { Statement stmt = conn.createStatement(); stmt.execute(stmtText); stmt.close(); } } else { deleteTable(conn, featureName); } conn.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } catch (SQLException e) { logger.warn(e.getMessage(), e); } } else { try { Connection conn = targetDataStore.getConnection(Transaction.AUTO_COMMIT); ArrayList schemaTexts = createSchemaTexts(featureType); for (String stmtText : schemaTexts) { Statement stmt = conn.createStatement(); stmt.execute(stmtText); stmt.close(); } conn.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } catch (SQLException e) { logger.warn(e.getMessage(), e); } } } }