package com.ximple.eofms.jobs.context.postgis; import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import com.vividsolutions.jts.geom.Geometry; import com.vividsolutions.jts.util.Assert; import com.ximple.eofms.filter.AbstractFLinkageDispatchableFilter; import com.ximple.eofms.filter.CreateFeatureTypeEventListener; import com.ximple.eofms.filter.ElementDispatcher; import com.ximple.eofms.filter.FeatureTypeEvent; import com.ximple.eofms.jobs.OracleElementLogger; import com.ximple.eofms.util.ElementDigesterUtils; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Element; import com.ximple.io.dgn7.FrammeAttributeData; import org.apache.commons.digester3.Digester; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.transaction.util.CommonsLoggingLogger; import org.apache.commons.transaction.util.LoggerFacade; import org.geotools.data.DataStore; import org.geotools.data.Transaction; import org.geotools.data.jdbc.JDBCUtils; import org.geotools.feature.SchemaException; import org.opengis.feature.simple.SimpleFeature; import org.opengis.feature.simple.SimpleFeatureType; import org.postgresql.util.PSQLException; import org.quartz.JobExecutionContext; import org.xml.sax.SAXException; public class OracleIncrementPostGISJobContext extends AbstractOracleToPostGISJobContext implements CreateFeatureTypeEventListener { static Log logger = LogFactory.getLog(OracleIncrementPostGISJobContext.class); static final LoggerFacade sLogger = new CommonsLoggingLogger(logger); // static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); private OracleElementLogger elmLogger = null; static { try { DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); } catch (SQLException e) { Assert.shouldNeverReachHere(e.getMessage()); } } private String _filterConfig; private ElementDispatcher elementDispatcher; private HashMap> txFeaturesContext = new HashMap>(); private JobExecutionContext executionContext; private String currentSchema = null; private boolean schemaChanged = false; private boolean dropTableMode = true; private int accumulate = 0; public static class ElementTransactionContext { public int transcationId; public int transcationType; public short cid; public int oid; public short compid; public short occid; public int taskid; public Element element; public int result; }; public OracleIncrementPostGISJobContext(String dataPath, DataStore pgDS, String targetSchema, String filterConfig, boolean profileMode, boolean useTransform) { super(dataPath, pgDS, targetSchema, profileMode, useTransform); _filterConfig = filterConfig; elementDispatcher = createElementDispatcher(); elementDispatcher.addCreateFeatureTypeEventListener(this); // txFeaturesContext = new PessimisticMapWrapper(featuresContext, sLogger); } private ElementDispatcher createElementDispatcher() { try { URL filterURL = null; if (_filterConfig != null) { File config = new File(_filterConfig); if (config.exists()) { filterURL = config.toURI().toURL(); } } if (filterURL == null) { // config = new File("conf/DefaultConvertShpFilter.xml"); filterURL = this.getClass().getResource("/conf/DefaultConvertShpFilter.xml"); // filterURL = this.getClass().getResource("/conf/ConvertShpFilterForLevel.xml"); } assert filterURL != null; Digester digester = ElementDigesterUtils.getElementDigester(); return (ElementDispatcher) digester.parse(filterURL); } catch (UnsupportedEncodingException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (MalformedURLException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (IOException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (SAXException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } /** * STATUS 欄位 :0:新增 2:編輯 3:刪除設備 4:刪除元件 * * @param context */ public void processFeatureContext(ElementTransactionContext context) { assert elementDispatcher != null; if (context == null) { logger.warn("putFeatureCollection context is null"); return; } SimpleFeature simpleFeature = null; if (context.transcationType == 0) { // insert Element if (context.element != null) { simpleFeature = generateFeature(context.element); if (simpleFeature == null) return; } if (simpleFeature != null) { SimpleFeatureType featureType = simpleFeature.getFeatureType(); String bindingStmt = makePrepareInsertSql(featureType); logger.trace("Execute SQL(0):" + bindingStmt); executePrepareSQL(bindingStmt, simpleFeature); } } else if (context.transcationType == 2) { // Update Element if (context.element != null) { simpleFeature = generateFeature(context.element); if (simpleFeature == null) return; } if (simpleFeature != null) { SimpleFeatureType featureType = simpleFeature.getFeatureType(); // String deleteStmt = makePrepareDeleteSql(featureType); String deleteStmt = makePrepareDeleteSql(context); logger.trace("Execute SQL(2):" + deleteStmt); executeSQL(deleteStmt); String bindingStmt = makePrepareInsertSql(featureType); logger.trace("Execute SQL(2):" + bindingStmt); executePrepareSQL(bindingStmt, simpleFeature); } } else if (context.transcationType == 3) { // Remove Whole Feature String deleteStmt = makePrepareDeleteSql(context); logger.trace("Execute SQL(3):" + deleteStmt); executeSQL(deleteStmt); } else if (context.transcationType == 4) { // Remove Feature Part try { List tableList = fetchExistTableSchema(getTargetSchema(), context); for (String targetTable : tableList) { String deleteStmt = makePrepareDeleteSql(context, targetTable); logger.trace("Execute SQL(4):" + deleteStmt); executeSQL(deleteStmt); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } } } private SimpleFeature generateFeature(Element element) { assert elementDispatcher != null; // 判斷是否符和條件 SimpleFeature feature = elementDispatcher.execute(element, getDistId(), isTransformed()); if (feature == null) { boolean isEmptySize = false; FrammeAttributeData linkage = AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); logger.warn("Unknown Element:" + element.getElementType().toString() + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + (linkage == null ? "NULL" : "FSC=" + (linkage.getFsc() + "|UFID=" + linkage.getUfid() + "|COMPID=" + linkage.getComponentID()))); if (element instanceof ComplexElement) { ComplexElement complex = (ComplexElement) element; logger.warn("----Complex Element size=" + complex.size() + ":" + (linkage == null ? "NULL" : (linkage.getUfid()))); if (complex.size() == 0) isEmptySize = true; } /* if (getElementLogging() && (!isEmptySize)) { getElementLogger().logElement(element, getCurrentSchema()); } */ return null; } if (((Geometry) feature.getDefaultGeometry()).isEmpty()) { boolean isEmptySize = false; FrammeAttributeData linkage = AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); logger.warn("Empty Geom Element:" + element.getElementType().toString() + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID()))); if (element instanceof ComplexElement) { ComplexElement complex = (ComplexElement) element; logger.warn("----Complex Element size=" + complex.size() + ":" + (linkage == null ? "NULL" : (linkage.getUfid()))); if (complex.size() == 0) isEmptySize = true; } /* if (getElementLogging() && (!isEmptySize)) { getElementLogger().logElement(element, getCurrentSchema()); } */ return null; } return feature; } /* protected void putFeatureCollection(Element element) { assert elementDispatcher != null; // 判斷是否符和條件 SimpleFeature feature = elementDispatcher.execute(element, getDistId(), isTransformed()); if (feature == null) { boolean isEmptySize = false; FrammeAttributeData linkage = AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); logger.warn("Unknown Element:" + element.getElementType().toString() + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + (linkage == null ? "NULL" : "FSC=" + (linkage.getFsc() + "|UFID=" + linkage.getUfid() + "|COMPID=" + linkage.getComponentID()))); if (element instanceof ComplexElement) { ComplexElement complex = (ComplexElement) element; logger.warn("----Complex Element size=" + complex.size() + ":" + (linkage == null ? "NULL" : (linkage.getUfid()))); if (complex.size() == 0) isEmptySize = true; } if (getElementLogging() && (!isEmptySize)) { getElementLogger().logElement(element, getCurrentSchema()); } return; } if (((Geometry)feature.getDefaultGeometry()).isEmpty()) { boolean isEmptySize = false; FrammeAttributeData linkage = AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); logger.warn("Empty Geom Element:" + element.getElementType().toString() + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID()))); if (element instanceof ComplexElement) { ComplexElement complex = (ComplexElement) element; logger.warn("----Complex Element size=" + complex.size() + ":" + (linkage == null ? "NULL" : (linkage.getUfid()))); if (complex.size() == 0) isEmptySize = true; } if (getElementLogging() && (!isEmptySize)) { getElementLogger().logElement(element, getCurrentSchema()); } return; } if (!txFeaturesContext.containsKey(feature.getFeatureType())) { txFeaturesContext.put(feature.getFeatureType(), new ArrayList()); } ArrayList arrayList = txFeaturesContext.get(feature.getFeatureType()); arrayList.add(feature); accumulate++; if (accumulate > BATCHSIZE) { commitTransaction(); } } */ public void startTransaction() { } public void commitTransaction() { if (!txFeaturesContext.isEmpty()) { logger.debug("Transaction size = " + txFeaturesContext.size()); //txFeaturesContext.commitTransaction(); } else { logger.debug("Transaction is empty."); } if (!txFeaturesContext.isEmpty()) { updateDataStore(); } if (this.getElementLogger() != null) this.getElementLogger().flashLogging(); } public void rollbackTransaction() { } public void resetFeatureContext() { txFeaturesContext.clear(); } private void updateDataStore() { if (isProfileMode()) markUpdateTime(); if (txFeaturesContext.keySet().isEmpty()) return; Iterator it = txFeaturesContext.keySet().iterator(); Connection conn = null; try { conn = getConnection(); boolean autoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); while (it.hasNext()) { SimpleFeatureType featureType = it.next(); logger.debug("Begin Save into PostGIS:" + featureType.getTypeName()); int batchCount = 0; String bindingStmt = makePrepareInsertSql(featureType); ArrayList features = txFeaturesContext.get(featureType); PreparedStatement pstmt = conn.prepareStatement(bindingStmt); for (SimpleFeature feature : features) { try { // stmt.execute(feature); bindFeatureParameters(pstmt, feature); // pstmt.executeUpdate(); pstmt.addBatch(); } catch (PSQLException e) { if (bindingStmt != null) { logger.error("Execute:" + bindingStmt); } logger.error(e.getServerErrorMessage()); logger.error(e.getMessage(), e); } catch (NullPointerException e) { if (bindingStmt != null) { logger.error("Execute:" + bindingStmt); } logger.error(feature.toString()); logger.error(e.getMessage(), e); } catch (ClassCastException e) { if (bindingStmt != null) { logger.error("Execute:" + bindingStmt); } for (int i = 0; i < feature.getAttributeCount(); i++) { logger.info("attr[" + i + "]-" + ((feature.getAttribute(i) == null) ? " NULL" : feature.getAttribute(i).toString())); } logger.error(e.getMessage(), e); } batchCount++; } int[] numUpdates = pstmt.executeBatch(); for (int i = 0; i < numUpdates.length; i++) { if (numUpdates[i] == -2) logger.warn("Execution " + i + ": unknown number of rows updated"); } conn.commit(); pstmt.close(); features.clear(); logger.debug("End Save into PostGIS:" + featureType.getTypeName()); } conn.setAutoCommit(autoCommit); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); accumulate = 0; } catch (BatchUpdateException e) { JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.error(e.getMessage(), e); SQLException ex; while ((ex = e.getNextException()) != null) { // logger.warn(ex.getMessage(), ex); logger.warn(ex.getMessage()); } } catch (SQLException e) { JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.error(e.getMessage(), e); } finally { if (isProfileMode()) accumulateUpdateTime(); } } public JobExecutionContext getExecutionContext() { return executionContext; } public void setExecutionContext(JobExecutionContext context) { executionContext = context; } /** * �����]�Ƽg�J�� * * @throws IOException IO�o�Ϳ�~ */ public void closeFeatureWriter() throws IOException { } protected OracleElementLogger getElementLogger() { if (elmLogger == null) { elmLogger = new OracleElementLogger(getOracleConnection()); elmLogger.setDataPath(this.getDataPath()); } return elmLogger; } public String getCurrentSchema() { return currentSchema; } public void setCurrentSchema(String querySchema) { this.currentSchema = querySchema; this.schemaChanged = true; } protected Log getLogger() { return logger; } public boolean isDropTableMode() { return dropTableMode; } public void setDropTableMode(boolean dropTableMode) { this.dropTableMode = dropTableMode; } public void createFeatureTypeOccurred(FeatureTypeEvent evt) { try { createOrClearFeatureDataTable(evt.getFeatureType()); } catch (SchemaException e) { logger.warn(e.getMessage(), e); } } protected void createOrClearFeatureDataTable(SimpleFeatureType featureType) throws SchemaException { String featureName = featureType.getTypeName(); Connection conn = null; if (!isExistFeature(featureType)) { String tempStmt = null; try { conn = getConnection(); ArrayList schemaTexts = createNewSchemaTexts(conn, featureType); for (String stmtText : schemaTexts) { Statement stmt = conn.createStatement(); tempStmt = stmtText; stmt.execute(stmtText); JDBCUtils.close(stmt); } JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); } catch (IOException e) { JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); logger.warn("RUN--" + tempStmt); logger.warn(e.getMessage(), e); } catch (SQLException e) { JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.warn("RUN--" + tempStmt); logger.warn(e.getMessage(), e); } } } public boolean isSchemaChanged() { return schemaChanged; } protected String getFeatureTableName(ElementTransactionContext elmContext, boolean forLookup) { StringBuilder sb = new StringBuilder(); sb.append("fsc-"); sb.append(elmContext.cid); if (!forLookup) { sb.append("-c-"); sb.append(elmContext.compid); } return sb.toString(); } List fetchExistTableSchema(String schemaName, ElementTransactionContext elmContext) throws SQLException { Connection connection = getConnection(); String tablePattern = getFeatureTableName(elmContext, true); ResultSet rsMeta = connection.getMetaData().getTables(null, schemaName, tablePattern + "%", new String[]{"TABLE"}); ArrayList tables = new ArrayList(); try { while (rsMeta.next()) { String tablename = rsMeta.getString(3); tables.add(tablename); } // } catch (SQLException e) } finally { if (rsMeta != null) { rsMeta.close(); rsMeta = null; } } return tables; } @Override protected String makeInsertSql(SimpleFeature feature, int srid) { return super.makeInsertSql(feature, srid); } @Override protected String makePrepareInsertSql(SimpleFeatureType featureType) { return super.makePrepareInsertSql(featureType); } protected String makePrepareDeleteSql(ElementTransactionContext context) { String tableName = getFeatureTableName(context, false); return makePrepareDeleteSql(context, tableName); } private String makePrepareDeleteSql(ElementTransactionContext context, String targetTable) { StringBuilder stmtBuilder = new StringBuilder(); stmtBuilder.append("DELETE FROM "); String tableName = getFeatureTableName(context, false); stmtBuilder.append(encodeSchemaTableName(targetTable)); stmtBuilder.append(" WHERE tid="); stmtBuilder.append(context.cid); stmtBuilder.append(" AND oid="); stmtBuilder.append(context.oid); if (context.compid != -1) { stmtBuilder.append(" AND cid="); stmtBuilder.append(context.compid); } return stmtBuilder.toString(); } private void executeSQL(String sqlStmt) { Connection conn = null; Statement stmt = null; try { conn = getConnection(); stmt = conn.createStatement(); stmt.execute(sqlStmt); JDBCUtils.close(stmt); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); } catch (SQLException e) { JDBCUtils.close(stmt); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.warn("RUN--" + sqlStmt); logger.warn(e.getMessage(), e); } } private void executePrepareSQL(String sqlStmt, SimpleFeature feature) { Connection conn = null; PreparedStatement pstmt = null; try { conn = getConnection(); boolean autoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); pstmt = conn.prepareStatement(sqlStmt); try { // stmt.execute(feature); bindFeatureParameters(pstmt, feature); // pstmt.executeUpdate(); pstmt.addBatch(); } catch (PSQLException e) { if (sqlStmt != null) { logger.error("Execute:" + sqlStmt); } logger.error(e.getServerErrorMessage()); logger.error(e.getMessage(), e); } catch (NullPointerException e) { if (sqlStmt != null) { logger.error("Execute:" + sqlStmt); } logger.error(feature.toString()); logger.error(e.getMessage(), e); } catch (ClassCastException e) { if (sqlStmt != null) { logger.error("Execute:" + sqlStmt); } for (int i = 0; i < feature.getAttributeCount(); i++) { logger.info("attr[" + i + "]-" + ((feature.getAttribute(i) == null) ? " NULL" : feature.getAttribute(i).toString())); } logger.error(e.getMessage(), e); } int[] numUpdates = pstmt.executeBatch(); for (int i = 0; i < numUpdates.length; i++) { if (numUpdates[i] == -2) logger.warn("Execution " + i + ": unknown number of rows updated"); } conn.commit(); JDBCUtils.close(pstmt); logger.debug("End Save into PostGIS:" + feature.getFeatureType().getTypeName()); conn.setAutoCommit(autoCommit); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); accumulate = 0; } catch (BatchUpdateException e) { JDBCUtils.close(pstmt); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.error(e.getMessage(), e); SQLException ex; while ((ex = e.getNextException()) != null) { // logger.warn(ex.getMessage(), ex); logger.warn(ex.getMessage()); } } catch (SQLException e) { JDBCUtils.close(pstmt); JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); logger.error(e.getMessage(), e); } finally { if (isProfileMode()) accumulateUpdateTime(); } } }