package com.ximple.eofms.jobs; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FilenameFilter; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.Date; import java.util.Map; import java.util.TreeMap; import java.util.ArrayList; import org.apache.commons.collections.OrderedMap; import org.apache.commons.collections.OrderedMapIterator; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; import org.geotools.data.Transaction; import org.geotools.data.postgis.PostgisDataStore; import org.geotools.data.postgis.PostgisDataStoreFactory; import org.geotools.feature.IllegalAttributeException; import org.geotools.feature.SchemaException; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import com.vividsolutions.jts.geom.GeometryFactory; import oracle.jdbc.OracleConnection; import oracle.jdbc.OracleResultSet; import oracle.sql.ARRAY; import oracle.sql.BLOB; import com.ximple.eofms.jobs.context.AbstractOracleJobContext; import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext; import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext; import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext; import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; import com.ximple.eofms.util.BinConverter; import com.ximple.eofms.util.ByteArrayCompressor; import com.ximple.eofms.util.StringUtils; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Dgn7fileException; import com.ximple.io.dgn7.Dgn7fileReader; import com.ximple.io.dgn7.Element; import com.ximple.io.dgn7.ElementType; import com.ximple.io.dgn7.IElementHandler; import com.ximple.io.dgn7.Lock; import com.ximple.io.dgn7.TextElement; import com.ximple.util.PrintfFormat; public class OracleConvertDgn2PostGISJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(OracleConvertDgn2PostGISJob.class); private static final String XGVERSION_NAME = "xgversion"; private static final String PGHOST = "PGHOST"; private static final String PGDDATBASE = "PGDDATBASE"; private static final String PGPORT = "PGPORT"; private static final String PGSCHEMA = "PGSCHEMA"; private static final String PGUSER = "PGUSER"; private static final String PGPASS = "PGPASS"; private static final String USEWKB = "USEWKB"; private static final int FETCHSIZE = 30; private static final int COMMITSIZE = 100; class Pair { Object first; Object second; public Pair(Object first, Object second) { this.first = first; this.second = second; } } protected static PostgisDataStoreFactory dataStoreFactory = new PostgisDataStoreFactory(); GeometryFactory _geomFactory = new GeometryFactory(); protected String _pgHost; protected String _pgDatabase; protected String _pgPort; protected String _pgSchema; protected String _pgUsername; protected String _pgPassword; protected String _pgUseWKB; protected Map pgProperties; protected PostgisDataStore targetDataStore; // protected OracleConvertPostGISJobContext oracleJobContext; public Log getLogger() { return logger; } protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath) { return new OracleConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath); } protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDDATBASE); _pgPort = dataMap.getString(PGPORT); _pgSchema = dataMap.getString(PGSCHEMA); _pgUsername = dataMap.getString(PGUSER); _pgPassword = dataMap.getString(PGPASS); _pgUseWKB = dataMap.getString(USEWKB); Log logger = getLogger(); /* logger.info("PGHOST=" + _myHost); logger.info("PGDDATBASE=" + _myDatabase); logger.info("PGPORT=" + _myPort); logger.info("PGSCHEMA=" + _mySchema); logger.info("PGUSER=" + _myUsername); logger.info("PGPASS=" + _myPassword); logger.info("USEWKB=" + _myUseWKB); */ if (_pgHost == null) { logger.warn("PGHOST is null"); throw new JobExecutionException("Unknown PostGIS host."); } if (_pgDatabase == null) { logger.warn("PGDATABASE is null"); throw new JobExecutionException("Unknown PostGIS database."); } if (_pgPort == null) { logger.warn("PGPORT is null"); throw new JobExecutionException("Unknown PostGIS port."); } if (_pgSchema == null) { logger.warn("PGSCHEMA is null"); throw new JobExecutionException("Unknown PostGIS schema."); } if (_pgUsername == null) { logger.warn("PGUSERNAME is null"); throw new JobExecutionException("Unknown PostGIS username."); } if (_pgPassword == null) { logger.warn("PGPASSWORD is null"); throw new JobExecutionException("Unknown PostGIS password."); } Map remote = new TreeMap(); remote.put("dbtype", "postgis"); remote.put("charset", "UTF-8"); remote.put("host", _pgHost); remote.put("port", _pgPort); remote.put("database", _pgDatabase); remote.put("user", _pgUsername); remote.put("passwd", _pgPassword); remote.put("namespace", null); pgProperties = remote; } public void execute(JobExecutionContext context) throws JobExecutionException { // Every job has its own job detail JobDetail jobDetail = context.getJobDetail(); // The name is defined in the job definition String jobName = jobDetail.getName(); // Log the time the job started logger.info(jobName + " fired at " + new Date()); extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); if (getSourceDataStore() == null) { logger.warn("Cannot connect source oracle database."); throw new JobExecutionException("Cannot connect source oracle database."); } if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } long t1 = System.currentTimeMillis(); String targetSchemaName = null; try { logger.info("-- step:clearOutputDatabase --"); clearOutputDatabase(); targetSchemaName = determineTargetSchemaName(); if (checkConvertDB()) { logger.info("-- step:convertOracleDB --"); OracleConvertPostGISJobContext jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath); jobContext.setSourceDataStore(getSourceDataStore()); // jobContext.setConvertElementIn(_convertElementIn); jobContext.setElementLogging(checkElementLogging()); jobContext.setExecutionContext(context); if (isCopyConnectivityMode()) copyConnectivity(jobContext); for (String orgSchema : _orgSchema) { logger.info("----- start schema:" + orgSchema + " -----"); exetcuteConvert(jobContext, orgSchema, _dataPath); //close all open filewriter instance jobContext.closeFeatureWriter(); } } if (checkConvertFile()) { logger.info("-- step:convertIndexDesignFile --"); convertIndexDesignFile(context, targetSchemaName); logger.info("-- step:convertOtherDesignFile --"); convertOtherDesignFile(context, targetSchemaName); } if (checkConvertElementIn()) { logger.info("-- step:convertFeatureDesignFile --"); convertFeatureDesignFile(context, targetSchemaName); } if (checkCreateDummy()) { logger.info("-- step:createDummyFeatureFile --"); createDummyFeatureFile(context); } disconnect(); long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); logger.warn("use time = " + ((int) ((t2 - t1) / 60000.0)) + " min - " + (((int) ((t2 - t1) % 60000.0)) / 1000) + " sec"); } catch (SQLException e) { logger.warn(e.getMessage(), e); throw new JobExecutionException("Database error. " + e.getMessage(), e); } catch (IOException ex) { logger.warn(ex.getMessage(), ex); throw new JobExecutionException("IO error. " + ex.getMessage(), ex); } updateRepoStatusToReady(targetSchemaName); logger.warn(jobName + " end at " + new Date()); } /** * Connectivity複製一個版本,在查詢電流方向時用來比對OMS資料庫的電器連接性(Connectivity) * * @param jobContext job context * @throws SQLException sql exception */ private void copyConnectivity(OracleConvertPostGISJobContext jobContext) throws SQLException { Connection connection = jobContext.getOracleConnection(); ResultSet rsMeta = connection.getMetaData().getTables(null, "BASEDB", AbstractOracleJobContext.CONNECTIVITY_WEBCHECK_NAME + "%", new String[]{"TABLE"}); boolean found = false; try { while (rsMeta.next()) { String tablename = rsMeta.getString(3); if (AbstractOracleJobContext.CONNECTIVITY_WEBCHECK_NAME.equalsIgnoreCase(tablename)) { found = true; break; } } // } catch (SQLException e) } finally { if (rsMeta != null) { rsMeta.close(); rsMeta = null; } } Statement stmt = connection.createStatement(); if (found) { stmt.execute(AbstractOracleJobContext.TRUNCATE_CONNECTIVITY_WEBCHECK); } else { logger.info("Create CONNECTIVITY_WEBCHECK table."); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_1); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_2); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_3); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_4); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_5); stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_6); stmt.execute(AbstractOracleJobContext.ALTER_CONNECTIVITY_WEBCHECK_1); stmt.execute(AbstractOracleJobContext.ALTER_CONNECTIVITY_WEBCHECK_2); } stmt.execute(AbstractOracleJobContext.COPY_CONNECTIVITY_TO_WEBCHECK); stmt.close(); } private void exetcuteConvert(OracleConvertPostGISJobContext jobContext, String querySchema, String targetSchemaName) throws SQLException { int order = 0; OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(), querySchema, "SD$SPACENODES", null); logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode); int total = map.size(); //spacenodes count int step = total / 100; int current = 0; //jobContext.startTransaction(); jobContext.setCurrentSchema(querySchema); jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0); for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext();) { it.next(); Pair pair = (Pair) it.getValue(); String tableSrc = (String) pair.first; logger.info("begin convert:[" + order + "]-" + tableSrc); queryIgsetElement(jobContext, querySchema, tableSrc); order++; if (_testMode) { if ((_testCount < 0) || (order >= _testCount)) break; } if ((order % COMMITSIZE) == 0) { // OracleConnection connection = jobContext.getOracleConnection(); // connection.commitTransaction(); jobContext.commitTransaction(); //jobContext.startTransaction(); System.gc(); System.runFinalization(); } int now = order % step; if (now != current) { current = now; jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); } } jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100); jobContext.commitTransaction(); jobContext.resetFeatureContext(); logger.info("end convert job:[" + order + "]"); System.gc(); System.runFinalization(); } protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc, OrderedMap orderedMap) throws SQLException { if (orderedMap == null) orderedMap = new LinkedMap(99); String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\""; PrintfFormat spf = new PrintfFormat(fetchStmtFmt); String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = null; stmt.setFetchSize(FETCHSIZE); try { rs = stmt.executeQuery(fetchStmt); int size = rs.getMetaData().getColumnCount(); while (rs.next()) { Object[] values = new Object[size]; for (int i = 0; i < size; i++) { values[i] = rs.getObject(i + 1); } Integer key = ((BigDecimal) values[0]).intValue(); String name = (String) values[1]; Pair pair = (Pair) orderedMap.get(key); if (pair == null) orderedMap.put(key, new Pair(name, null)); else pair.first = name; } } catch (SQLException e) { logger.error(e.toString(), e); logger.error("stmt=" + fetchStmt); throw e; } finally { if (rs != null) rs.close(); stmt.close(); } return orderedMap; } protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc, OrderedMap orderedMap) throws SQLException { if (orderedMap == null) orderedMap = new LinkedMap(99); String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\""; PrintfFormat spf = new PrintfFormat(fetchStmtFmt); String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(FETCHSIZE); ResultSet rs = stmt.executeQuery(fetchStmt); try { int size = rs.getMetaData().getColumnCount(); while (rs.next()) { Object[] values = new Object[size]; for (int i = 0; i < size; i++) { values[i] = rs.getObject(i + 1); } Integer key = ((BigDecimal) values[0]).intValue(); String name = (String) values[1]; Pair pair = (Pair) orderedMap.get(key); if (pair == null) orderedMap.put(key, new Pair(null, name)); else pair.second = name; } } finally { rs.close(); stmt.close(); } return orderedMap; } protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext, String srcschema, String srctable) throws SQLException { Connection connection = jobContext.getOracleConnection(); String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); int igdsMetaType = rsSrc.getMetaData().getColumnType(1); while (rsSrc.next()) { byte[] raw; if (igdsMetaType == Types.BLOB) { BLOB blob = (BLOB) rsSrc.getBlob(1); raw = getBytesFromBLOB(blob); blob.close(); } else { raw = rsSrc.getBytes(1); } try { Element element = fetchBinaryElement(raw); jobContext.putFeatureCollection(element); } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } } rsSrc.close(); stmtSrc.close(); } protected void queryRawElement(OracleConvertPostGISJobContext jobContext, String srcschema, String srctable) throws SQLException { Connection connection = jobContext.getOracleConnection(); String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt); String fetchDestStmt = spf.sprintf(new Object[]{srcschema, srctable}); Statement stmtDest = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmtDest.setFetchSize(FETCHSIZE); ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt); try { while (rsDest.next()) { ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1); long[] rawData = rawsValue.getLongArray(); byte[] comparessedValue; /* if (dataMode == TransferTask.DataMode.Normal) { comparessedValue = BinConverter.unmarshalByteArray(rawData, true); } else { comparessedValue = BinConverter.unmarshalCompactByteArray(rawData); } */ comparessedValue = BinConverter.unmarshalByteArray(rawData, true); byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue); try { Element element = fetchBinaryElement(rawDest); jobContext.putFeatureCollection(element); } catch (Dgn7fileException e) { logger.warn("Dgn7Exception:" + e.getMessage(), e); } } } finally { rsDest.close(); stmtDest.close(); } } // Binary to Element private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { ByteBuffer buffer = ByteBuffer.wrap(raws); buffer.order(ByteOrder.LITTLE_ENDIAN); short signature = buffer.getShort(); // byte type = (byte) (buffer.get() & 0x7f); byte type = (byte) ((signature >>> 8) & 0x007f); // silly Bentley say contentLength is in 2-byte words // and ByteByffer uses raws. // track the record location int elementLength = (buffer.getShort() * 2) + 4; ElementType recordType = ElementType.forID(type); IElementHandler handler; handler = recordType.getElementHandler(); Element dgnElement = (Element) handler.read(buffer, signature, elementLength); if (recordType.isComplexElement() && (elementLength < raws.length)) { int offset = elementLength; while (offset < (raws.length - 4)) { buffer.position(offset); signature = buffer.getShort(); type = (byte) ((signature >>> 8) & 0x007f); elementLength = (buffer.getShort() * 2) + 4; if (raws.length < (offset + elementLength)) { System.out.println("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); break; } recordType = ElementType.forID(type); handler = recordType.getElementHandler(); if (handler != null) { Element subElement = (Element) handler.read(buffer, signature, elementLength); ((ComplexElement) dgnElement).add(subElement); offset += elementLength; } else { byte[] remain = new byte[buffer.remaining()]; System.arraycopy(raws, offset, remain, 0, buffer.remaining()); for (int i = 0; i < remain.length; i++) { if (remain[i] != 0) { logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); System.out.println("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); } } break; } } } return dgnElement; } /** * 執行轉換索引圖檔的工作 * * @param context 工作執行環境 * @throws org.quartz.JobExecutionException * exception */ private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { File indexDir = new File(getDataPath(), "index"); if (!indexDir.exists()) { logger.info("index dir=" + indexDir + " not exist."); return; } if (!indexDir.isDirectory()) { logger.info("index dir=" + indexDir + " is not a directory."); } File[] dgnFiles = indexDir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".dgn"); } }); for (File dgnFile : dgnFiles) { IndexDgnConvertPostGISJobContext convertContext = new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName); logger.debug("--- start dgnfile-" + dgnFile.toString() + " ---"); try { convertContext.clearOutputDatabase(); convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); convertContext.startTransaction(); FileInputStream fs = new FileInputStream(dgnFile); FileChannel fc = fs.getChannel(); Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); convertContext.setReader(reader); scanIndexDgnElement(convertContext); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); System.gc(); System.runFinalization(); } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } } protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext) throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; while (reader.hasNext()) { Dgn7fileReader.Record record = reader.nextElement(); if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); if ((!type.isComplexElement()) && (!element.isComponentElement())) { if (lastComplex != null) { processIndexElement(lastComplex, convertContext); lastComplex = null; } processIndexElement(element, convertContext); } else if (element.isComponentElement()) { if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } } else if (type.isComplexElement()) { if (lastComplex != null) { processIndexElement(lastComplex, convertContext); } lastComplex = element; } } count++; } if (lastComplex != null) { processIndexElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext) throws IllegalAttributeException, SchemaException { if (element instanceof TextElement) { convertContext.putFeatureCollection(element); } } /** * 執行轉換其他設計圖檔的工作 * * @param context jobContext * @throws org.quartz.JobExecutionException * exception */ private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { File otherDir = new File(getDataPath(), "other"); if (!otherDir.exists()) { logger.info("other dir=" + otherDir + " not exist."); return; } if (!otherDir.isDirectory()) { logger.info("other dir=" + otherDir + " is not a directory."); } File[] dgnFiles = otherDir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".dgn"); } }); for (File dgnFile : dgnFiles) { GeneralDgnConvertPostGISJobContext convertContext = new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName); logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); try { convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); convertContext.startTransaction(); FileInputStream fs = new FileInputStream(dgnFile); FileChannel fc = fs.getChannel(); Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); convertContext.setReader(reader); scanOtherDgnElement(convertContext); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); System.gc(); System.runFinalization(); } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } } public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext) throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; while (reader.hasNext()) { Dgn7fileReader.Record record = reader.nextElement(); if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); if ((!type.isComplexElement()) && (!element.isComponentElement())) { if (lastComplex != null) { processOtherElement(lastComplex, convertContext); lastComplex = null; } processOtherElement(element, convertContext); } else if (element.isComponentElement()) { if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } } else if (type.isComplexElement()) { if (lastComplex != null) { processOtherElement(lastComplex, convertContext); } lastComplex = element; } } count++; } if (lastComplex != null) { processOtherElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext) throws IllegalAttributeException, SchemaException { convertContext.putFeatureCollection(element); } private void clearOutputDatabase() { /* File outDataPath = new File(getDataPath(), OracleConvertPostGISJobContext.SHPOUTPATH); if (outDataPath.exists() && outDataPath.isDirectory()) { deleteFilesInPath(outDataPath); } outDataPath = new File(getDataPath(), IndexDgnConvertShpJobContext.SHPOUTPATH); if (outDataPath.exists() && outDataPath.isDirectory()) { deleteFilesInPath(outDataPath); } outDataPath = new File(getDataPath(), GeneralDgnConvertShpJobContext.SHPOUTPATH); if (outDataPath.exists() && outDataPath.isDirectory()) { deleteFilesInPath(outDataPath); } */ } private void deleteFilesInPath(File outDataPath) { deleteFilesInPath(outDataPath, true); } private void deleteFilesInPath(File outDataPath, boolean removeSubDir) { if (!outDataPath.isDirectory()) { return; } File[] files = outDataPath.listFiles(); for (File file : files) { if (file.isFile()) { if (!file.delete()) { logger.info("Cannot delete file-" + file.toString()); } } else if (file.isDirectory()) { deleteFilesInPath(file, removeSubDir); if (removeSubDir) { if (file.delete()) { logger.info("Cannot delete dir-" + file.toString()); } } } } } private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { File elminDir = new File(getDataPath(), "elmin"); if (!elminDir.exists()) { logger.info("elmin dir=" + elminDir + " not exist."); return; } if (!elminDir.isDirectory()) { logger.info("elmin dir=" + elminDir + " is not a directory."); } File[] dgnFiles = elminDir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".dgn"); } }); for (File dgnFile : dgnFiles) { FeatureDgnConvertPostGISJobContext convertContext = new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath); logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); try { convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); convertContext.startTransaction(); FileInputStream fs = new FileInputStream(dgnFile); FileChannel fc = fs.getChannel(); Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); convertContext.setReader(reader); scanFeatureDgnElement(convertContext); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); System.gc(); System.runFinalization(); } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } } public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext) throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; while (reader.hasNext()) { Dgn7fileReader.Record record = reader.nextElement(); if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); if ((!type.isComplexElement()) && (!element.isComponentElement())) { if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); lastComplex = null; } processFeatureElement(element, convertContext); } else if (element.isComponentElement()) { if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } } else if (type.isComplexElement()) { if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); } lastComplex = element; } } count++; } if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext) throws IllegalAttributeException, SchemaException { convertContext.putFeatureCollection(element); } private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException { /* DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath); try { convertContext.startTransaction(); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); } catch (IOException e) { logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } */ } public DataStore getTargetDataStore() { return targetDataStore; } protected void createTargetDataStore() throws JobExecutionException { if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } /* if (!isDriverFound()) { throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); } */ if (!pgProperties.containsKey(PostgisDataStoreFactory.MAXCONN.key)) { pgProperties.put(PostgisDataStoreFactory.MAXCONN.key, "10"); } if (!pgProperties.containsKey(PostgisDataStoreFactory.MINCONN.key)) { pgProperties.put(PostgisDataStoreFactory.MINCONN.key, "1"); } if (!pgProperties.containsKey(PostgisDataStoreFactory.WKBENABLED.key)) { pgProperties.put(PostgisDataStoreFactory.WKBENABLED.key, "true"); } if (!dataStoreFactory.canProcess(pgProperties)) { getLogger().warn("cannot process properties-"); throw new JobExecutionException("cannot process properties-"); } try { targetDataStore = (PostgisDataStore) dataStoreFactory.createDataStore(pgProperties); } catch (IOException e) { getLogger().warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } private String determineTargetSchemaName() throws IOException { if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; String targetSchema = null; boolean needCreate = false; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); rs = connection.getMetaData().getTables(null, _pgSchema, XGVERSION_NAME, new String[] {"TABLE"}); if (!rs.next()) needCreate = true; if (needCreate) createXGeosVersionTable(connection, _pgSchema); rs.close(); rs = null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("'vsschema', 'vsstatus' FROM "); sbSQL.append(encodeSchemaTableName(_pgSchema, XGVERSION_NAME)).append(' '); sbSQL.append("ORDER BY vsid"); stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); ArrayList tmpSchemas = new ArrayList(); int i = 0; int current = 0; while (rs.next()) { Object[] values = new Object[2]; values[0] = rs.getObject(1); values[1] = rs.getObject(2); tmpSchemas.add(values); if ((((Short)values[1]) & DataRepositoryStatus.VSSTATUS_USING) != 0) { current = i; } i++; } if (current < (tmpSchemas.size() - 1)) { Object[] values = tmpSchemas.get(current + 1); targetSchema = (String) values[0]; } else { Object[] values = tmpSchemas.get(0); targetSchema = (String) values[0]; } sbSQL = new StringBuilder("UPDATE "); sbSQL.append(encodeSchemaTableName(_pgSchema, XGVERSION_NAME)).append(' '); sbSQL.append(" SET 'vsstatus' = "); sbSQL.append(DataRepositoryStatus.VSSTATUS_COVERT); sbSQL.append(" WHERE 'vsschema' = "); sbSQL.append(targetSchema); int count = stmt.executeUpdate(sbSQL.toString()); if (count != 1) { logger.info("update status for " + targetSchema + " update result count=" + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { if (rs != null) try { rs.close(); } catch (SQLException e) {}; if (stmt != null) try { stmt.close(); } catch (SQLException e) {}; if (connection != null) try { connection.close(); } catch (SQLException e) {}; } return targetSchema; } public String encodeSchemaTableName(String schemaName, String tableName) { return "\"" + schemaName + "\".\"" + tableName + "\""; } private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException { Statement stmt = null; StringBuilder sql = new StringBuilder("CREATE TABLE "); sql.append(encodeSchemaTableName(pgSchema, XGVERSION_NAME)); sql.append(" ( vsid serial PRIMARY KEY, "); sql.append(" vsschema character varying(64) NOT NULL, "); sql.append(" vsstatus smallint NOT NULL, "); sql.append(" vstimestamp timestamp with time zone ) "); try { stmt = connection.createStatement(); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("ALTER TABLE "); sql.append(encodeSchemaTableName(pgSchema, XGVERSION_NAME)); sql.append(" OWNER TO spatialdb"); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("GRANT ALL ON TABLE "); sql.append(encodeSchemaTableName(pgSchema, XGVERSION_NAME)); sql.append(" TO public"); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("INSERT INTO "); sql.append(encodeSchemaTableName(pgSchema, XGVERSION_NAME)); sql.append(" ('vsschema', 'vsstatus' ) VALUES ("); sql.append("'gisrepo1', "); sql.append(DataRepositoryStatus.VSSTATUS_UNKNOWN).append(" )"); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("INSERT INTO "); sql.append(encodeSchemaTableName(pgSchema, XGVERSION_NAME)); sql.append(" ('vsschema', 'vsstatus' ) VALUES ("); sql.append("'gisrepo2', "); sql.append(DataRepositoryStatus.VSSTATUS_UNKNOWN).append(" )"); stmt.executeUpdate(sql.toString()); } finally { if (stmt != null) stmt.close(); } } private void updateRepoStatusToReady(String targetSchema) { if (targetDataStore == null) return; Connection connection = null; Statement stmt = null; ResultSet rs = null; boolean needCreate = false; try { StringBuilder sbSQL = new StringBuilder("UPDATE "); sbSQL.append(encodeSchemaTableName(_pgSchema, XGVERSION_NAME)).append(' '); sbSQL.append(" SET 'vsstatus' = "); sbSQL.append(DataRepositoryStatus.VSSTATUS_READY); sbSQL.append(" WHERE 'vsschema' = "); sbSQL.append(targetSchema); connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); stmt = connection.createStatement(); int count = stmt.executeUpdate(sbSQL.toString()); if (count != 1) { logger.info("update status for " + targetSchema + " update result count=" + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } catch (IOException e) { logger.warn(e.getMessage(), e); } finally { if (rs != null) try { rs.close(); } catch (SQLException e) {}; if (stmt != null) try { stmt.close(); } catch (SQLException e) {}; if (connection != null) try { connection.close(); } catch (SQLException e) {}; } } }