package com.ximple.eofms; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import java.util.TreeMap; import org.apache.commons.collections.OrderedMap; import org.apache.commons.collections.OrderedMapIterator; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.collections.map.MultiValueMap; import org.apache.commons.digester.Digester; import org.apache.commons.digester.xmlrules.DigesterLoader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.Transaction; import org.geotools.data.oracle.OracleDataStore; import org.geotools.data.oracle.OracleDataStoreFactory; import org.xml.sax.SAXException; import com.vividsolutions.jts.geom.GeometryFactory; import com.vividsolutions.jts.util.Assert; import oracle.sql.BLOB; import com.ximple.eofms.filter.ElementDispatcher; import com.ximple.eofms.jobs.OracleElementLogger; import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext; import com.ximple.io.dgn7.ArcElement; import com.ximple.io.dgn7.ComplexChainElement; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Dgn7fileException; import com.ximple.io.dgn7.Element; import com.ximple.io.dgn7.ElementType; import com.ximple.io.dgn7.IElementHandler; import com.ximple.util.PrintfFormat; public class XElementFetcher implements Runnable { /** * The Oracle driver class name */ private static final String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver"; private static final String ORAHOST = "ORAHOST"; private static final String ORAINST = "ORAINST"; private static final String ORAPORT = "ORAPORT"; private static final String ORAUSER = "ORAUSER"; private static final String ORAPASS = "ORAPASS"; private static final String ORGSCHEMA = "ORGSCHEMA"; private static final int FETCHSIZE = 30; private static final int COMMITSIZE = 100; private static final int MAXELM_LOGCOUNT = 6000; private static final String FETCHLOGGER_PREFIX = "XFE_"; static Log logger = LogFactory.getLog(XElementFetcher.class); static final GeometryFactory geometryFactory = new GeometryFactory(); static final boolean isCompactMode = true; static OracleDataStoreFactory dataStoreFactory = new OracleDataStoreFactory(); protected static class Pair { Object first; Object second; public Pair(Object first, Object second) { this.first = first; this.second = second; } } private HashMap dataConfig; private ElementDispatcher elementDispatcher; private MultiValueMap featuresContext = new MultiValueMap(); private boolean driverFound = true; public static void main(String[] args) { XElementFetcher fetcher = new XElementFetcher(); fetcher.run(); } public XElementFetcher() { initializeDataConfig(); try { Class.forName(JDBC_DRIVER); } catch (Throwable t) { // must be running off dummy jar! driverFound = false; } } private void initializeDataConfig() { dataConfig = new HashMap(); dataConfig.put(XElementParser.ROOTDATAPATH, XElementParser.DEFAULT_DATAPATH); dataConfig.put(ORAHOST, "192.168.11.200"); dataConfig.put(ORAINST, "tctpc"); dataConfig.put(ORAPORT, "1521"); dataConfig.put(ORAUSER, "spatialdb"); dataConfig.put(ORAPASS, "spatialdb000"); // dataConfig.put(ORGSCHEMA, "SPATIALDB, CMMS_SPATIALDB"); dataConfig.put(ORGSCHEMA, "CMMS_SPATIALDB"); elementDispatcher = createElementDispatcher(); } private ElementDispatcher createElementDispatcher() { try { URL rulesURL = ElementDispatcher.class.getResource("ElementDispatcherRules.xml"); assert rulesURL != null; Digester digester = DigesterLoader.createDigester(rulesURL); URL filterURL; filterURL = FeatureDgnConvertPostGISJobContext.class.getResource("/conf/DefaultConvertShpFilter.xml"); assert filterURL != null; return (ElementDispatcher) digester.parse(filterURL); } catch (UnsupportedEncodingException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (MalformedURLException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (IOException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } catch (SAXException e) { logger.info(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } public void run() { try { OracleDataStore dataStore = createSourceDataStore(); ArrayList schemas = getSchemaNames(); for (String schema : schemas) { executeFetchElement(dataStore, schema, getOutputPath()); } } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } private String getOutputPath() { return dataConfig.get(XElementParser.ROOTDATAPATH) + File.separator + XElementParser.DEFAULT_DGNOUTPATH; } protected OracleDataStore createSourceDataStore() { if (!driverFound) { throw new RuntimeException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); } Map map = new TreeMap(); map.put("host", dataConfig.get(ORAHOST)); map.put("port", dataConfig.get(ORAPORT)); map.put("instance", dataConfig.get(ORAINST)); map.put("user", dataConfig.get(ORAUSER)); map.put("passwd", dataConfig.get(ORAPASS)); map.put("dbtype", "oracle"); map.put("alias", dataConfig.get(ORAINST)); map.put("namespace", null); if (!map.containsKey(OracleDataStoreFactory.MAXCONN.key)) { map.put(OracleDataStoreFactory.MAXCONN.key, "10"); } if (!map.containsKey(OracleDataStoreFactory.MINCONN.key)) { map.put(OracleDataStoreFactory.MINCONN.key, "1"); } if (!dataStoreFactory.canProcess(map)) { logger.warn("cannot process properties-"); throw new RuntimeException("cannot process properties-"); } try { return (OracleDataStore) dataStoreFactory.createDataStore(map); } catch (IOException e) { logger.warn(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } private ArrayList getSchemaNames() { ArrayList result = new ArrayList(); String strSchema = dataConfig.get(ORGSCHEMA); StringTokenizer st = new StringTokenizer(strSchema, ","); while (st.hasMoreTokens()) { String aSchema = st.nextToken().trim(); result.add(aSchema); } return result; } protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc, OrderedMap orderedMap) throws SQLException { if (orderedMap == null) orderedMap = new LinkedMap(99); String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\""; PrintfFormat spf = new PrintfFormat(fetchStmtFmt); String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = null; stmt.setFetchSize(FETCHSIZE); try { rs = stmt.executeQuery(fetchStmt); int size = rs.getMetaData().getColumnCount(); while (rs.next()) { Object[] values = new Object[size]; for (int i = 0; i < size; i++) { values[i] = rs.getObject(i + 1); } Integer key = ((BigDecimal) values[0]).intValue(); String name = (String) values[1]; Pair pair = (Pair) orderedMap.get(key); if (pair == null) orderedMap.put(key, new Pair(name, null)); else pair.first = name; } } catch (SQLException e) { logger.error(e.toString(), e); logger.error("stmt=" + fetchStmt); throw e; } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); } return orderedMap; } public Connection getOracleConnection(OracleDataStore dataStore) { try { return dataStore.getConnection(Transaction.AUTO_COMMIT); } catch (IOException e) { logger.warn(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); } } private void executeFetchElement(OracleDataStore dataStore, String schema, String dataPath) throws SQLException { Connection connection = getOracleConnection(dataStore); int order = 0; OrderedMap map = getBlobStorageList(connection, schema, "SD$SPACENODES", null); logger.info("begin fecther job:[" + map.size() + "]"); int total = map.size(); //spacenodes count int step = total / 100; int current = 0; OracleElementLogger elmLogger = createElementLogger(connection, dataPath); elmLogger.setLogPrefix(FETCHLOGGER_PREFIX); for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext();) { it.next(); Pair pair = (Pair) it.getValue(); String tableSrc = (String) pair.first; logger.info("begin convert:[" + order + "]-" + tableSrc); queryNFetchIgsetElement(connection, schema, tableSrc, elmLogger); order++; if ((order % COMMITSIZE) == 0) { elmLogger.flashLogging(); System.gc(); System.runFinalization(); } int now = order % step; if (now != current) { current = now; } } elmLogger.flashLogging(); } protected byte[] getBytesFromBLOB(BLOB blob) throws SQLException { byte[] raw = null; // BLOB blob = (BLOB) rs.getBlob(1); int optimalSize = blob.getChunkSize(); byte[] chunk = new byte[optimalSize]; InputStream is = blob.getBinaryStream(0); ByteBuffer buffer = null; // ByteBuffer.allocate(optimalSize); int len; try { while ((len = (is.read(chunk))) != -1) { if (buffer != null) { buffer.limit(buffer.limit() + len); } else { buffer = ByteBuffer.allocate(len); } buffer.put(chunk); } assert buffer != null; buffer.position(0); raw = buffer.array(); } catch (IOException e) { e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates. Assert.shouldNeverReachHere(); } finally { try { is.close(); } catch (IOException e) { logger.warn("InputStream cannot close", e); }; } return raw; } private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { ByteBuffer buffer = ByteBuffer.wrap(raws); buffer.order(ByteOrder.LITTLE_ENDIAN); short signature = buffer.getShort(); // byte type = (byte) (buffer.get() & 0x7f); byte type = (byte) ((signature >>> 8) & 0x007f); // silly Bentley say contentLength is in 2-byte words // and ByteByffer uses raws. // track the record location int elementLength = (buffer.getShort() * 2) + 4; ElementType recordType = ElementType.forID(type); IElementHandler handler; handler = recordType.getElementHandler(); Element dgnElement = (Element) handler.read(buffer, signature, elementLength); if (recordType.isComplexElement() && (elementLength < raws.length)) { int offset = elementLength; while (offset < (raws.length - 4)) { buffer.position(offset); signature = buffer.getShort(); type = (byte) ((signature >>> 8) & 0x007f); elementLength = (buffer.getShort() * 2) + 4; if (raws.length < (offset + elementLength)) { System.out.println("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); break; } recordType = ElementType.forID(type); handler = recordType.getElementHandler(); if (handler != null) { Element subElement = (Element) handler.read(buffer, signature, elementLength); ((ComplexElement) dgnElement).add(subElement); offset += elementLength; } else { byte[] remain = new byte[buffer.remaining()]; System.arraycopy(raws, offset, remain, 0, buffer.remaining()); for (int i = 0; i < remain.length; i++) { if (remain[i] != 0) { logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); System.out.println("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); } } break; } } } return dgnElement; } private void queryNFetchIgsetElement(Connection connection, String srcschema, String srctable, OracleElementLogger elmLogger) throws SQLException { String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); try { int igdsMetaType = rsSrc.getMetaData().getColumnType(1); while (rsSrc.next()) { byte[] raw; if (igdsMetaType == Types.BLOB) { BLOB blob = (BLOB) rsSrc.getBlob(1); raw = getBytesFromBLOB(blob); // blob.close(); } else { raw = rsSrc.getBytes(1); } try { Element element = fetchBinaryElement(raw); processFeatureElement(elmLogger, element, srcschema); } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } } } finally { rsSrc.close(); stmtSrc.close(); } } protected OracleElementLogger createElementLogger(Connection connection, String dataPath) { return new OracleElementLogger(connection, dataPath, MAXELM_LOGCOUNT); } private void processFeatureElement(OracleElementLogger elmLogger, Element element, String currentSchema) { boolean match = false; if (element instanceof ArcElement) { match = true; } else if (element instanceof ComplexChainElement) { ComplexChainElement complex = (ComplexChainElement) element; for (Element subElm : complex) { if (subElm instanceof ArcElement) { match = true; break; } } } if (match) elmLogger.logElement(element, currentSchema); } }