From 94ae08701bbd7585a0b7e5a92d1975965a503c03 Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Wed, 15 Jan 2014 11:28:52 +0800 Subject: [PATCH] Merge branch 'origin/2.1.x' --- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java | 1548 ++++++++++++++++++++++++++++++++++++++++------------------- 1 files changed, 1,046 insertions(+), 502 deletions(-) diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java index f19cf00..9bbfe1c 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java @@ -1,41 +1,52 @@ package com.ximple.eofms.jobs; import java.io.File; +import java.io.FileFilter; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FilenameFilter; import java.io.IOException; +import java.io.PushbackReader; +import java.io.StringReader; import java.math.BigDecimal; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.ArrayList; +import com.ximple.eofms.util.*; import org.apache.commons.collections.OrderedMap; import org.apache.commons.collections.OrderedMapIterator; import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.dbcp.DelegatingConnection; +import org.apache.commons.dbcp.PoolingConnection; +import org.apache.commons.dbcp.PoolingDataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; import org.geotools.data.Transaction; -import org.geotools.data.postgis.PostgisDataStore; -import org.geotools.data.postgis.PostgisDataStoreFactory; -import org.geotools.feature.IllegalAttributeException; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; import org.geotools.feature.SchemaException; +import org.geotools.jdbc.JDBCDataStore; +import org.opengis.feature.IllegalAttributeException; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; - -import com.vividsolutions.jts.geom.GeometryFactory; import oracle.jdbc.OracleConnection; import oracle.jdbc.OracleResultSet; @@ -47,9 +58,6 @@ import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext; import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext; import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; -import com.ximple.eofms.util.BinConverter; -import com.ximple.eofms.util.ByteArrayCompressor; -import com.ximple.eofms.util.StringUtils; import com.ximple.io.dgn7.ComplexElement; import com.ximple.io.dgn7.Dgn7fileException; import com.ximple.io.dgn7.Dgn7fileReader; @@ -57,16 +65,13 @@ import com.ximple.io.dgn7.ElementType; import com.ximple.io.dgn7.IElementHandler; import com.ximple.io.dgn7.Lock; -import com.ximple.io.dgn7.TextElement; -import com.ximple.io.dgn7.ShapeElement; import com.ximple.util.PrintfFormat; -public class OracleConvertDgn2PostGISJob extends AbstractOracleDatabaseJob -{ +public class OracleConvertDgn2PostGISJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(OracleConvertDgn2PostGISJob.class); private static final String PGHOST = "PGHOST"; - private static final String PGDDATBASE = "PGDDATBASE"; + private static final String PGDATBASE = "PGDATBASE"; private static final String PGPORT = "PGPORT"; private static final String PGSCHEMA = "PGSCHEMA"; private static final String PGUSER = "PGUSER"; @@ -77,22 +82,34 @@ private static final int FETCHSIZE = 30; private static final int COMMITSIZE = 100; + private static final String INDEXPATHNAME = "index"; + private static final String OTHERPATHNAME = "other"; + public static final String FORWARDFLOW_MARK = "shape://ccarrow"; + public static final String BACKFLOW_MARK = "shape://rccarrow"; + public static final String UNFLOW_MARK = "shape://backslash"; + public static final String NONFLOW_MARK = "shape://slash"; - protected static class Pair - { + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR WHERE TAG_BCOMPID = 0 ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; + + protected static class Pair { Object first; Object second; - public Pair(Object first, Object second) - { + public Pair(Object first, Object second) { this.first = first; this.second = second; } } - protected static PostgisDataStoreFactory dataStoreFactory = new PostgisDataStoreFactory(); + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); - GeometryFactory _geomFactory = new GeometryFactory(); protected String _pgHost; protected String _pgDatabase; protected String _pgPort; @@ -102,25 +119,28 @@ protected String _pgUseWKB; protected Map<String, String> pgProperties; - protected PostgisDataStore targetDataStore; - // protected OracleConvertPostGISJobContext oracleJobContext; + protected JDBCDataStore targetDataStore; + // protected OracleConvertEdbGeoJobContext oracleJobContext; - public Log getLogger() - { + private long queryTime = 0; + private long queryTimeStart = 0; + + public Log getLogger() { return logger; } - protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath) - { - return new OracleConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath); + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, + boolean profileMode, + boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); } - protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException - { + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); - _pgDatabase = dataMap.getString(PGDDATBASE); + _pgDatabase = dataMap.getString(PGDATBASE); _pgPort = dataMap.getString(PGPORT); _pgSchema = dataMap.getString(PGSCHEMA); _pgUsername = dataMap.getString(PGUSER); @@ -130,7 +150,7 @@ Log logger = getLogger(); /* logger.info("PGHOST=" + _myHost); - logger.info("PGDDATBASE=" + _myDatabase); + logger.info("PGDATBASE=" + _myDatabase); logger.info("PGPORT=" + _myPort); logger.info("PGSCHEMA=" + _mySchema); logger.info("PGUSER=" + _myUsername); @@ -138,206 +158,241 @@ logger.info("USEWKB=" + _myUseWKB); */ - if (_pgHost == null) - { + if (_pgHost == null) { logger.warn("PGHOST is null"); throw new JobExecutionException("Unknown PostGIS host."); } - if (_pgDatabase == null) - { + if (_pgDatabase == null) { logger.warn("PGDATABASE is null"); throw new JobExecutionException("Unknown PostGIS database."); } - if (_pgPort == null) - { + if (_pgPort == null) { logger.warn("PGPORT is null"); throw new JobExecutionException("Unknown PostGIS port."); } - if (_pgSchema == null) - { + if (_pgSchema == null) { logger.warn("PGSCHEMA is null"); throw new JobExecutionException("Unknown PostGIS schema."); } - if (_pgUsername == null) - { + if (_pgUsername == null) { logger.warn("PGUSERNAME is null"); throw new JobExecutionException("Unknown PostGIS username."); } - if (_pgPassword == null) - { + if (_pgPassword == null) { logger.warn("PGPASSWORD is null"); throw new JobExecutionException("Unknown PostGIS password."); } Map<String, String> remote = new TreeMap<String, String>(); - remote.put("dbtype", "postgis"); - remote.put("charset", "UTF-8"); - remote.put("host", _pgHost); - remote.put("port", _pgPort); - remote.put("database", _pgDatabase); - remote.put("user", _pgUsername); - remote.put("passwd", _pgPassword); - remote.put("namespace", null); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); pgProperties = remote; } - public void execute(JobExecutionContext context) throws JobExecutionException - { + public void execute(JobExecutionContext context) throws JobExecutionException { // Every job has its own job detail JobDetail jobDetail = context.getJobDetail(); // The name is defined in the job definition - String jobName = jobDetail.getName(); + String jobName = jobDetail.getKey().getName(); // Log the time the job started logger.info(jobName + " fired at " + new Date()); extractJobConfiguration(jobDetail); + + if (isIgnoreDBETL()) { + return; + } + createSourceDataStore(); createTargetDataStore(); - if (getSourceDataStore() == null) - { + if (getSourceDataStore() == null) { logger.warn("Cannot connect source oracle database."); throw new JobExecutionException("Cannot connect source oracle database."); } - if (getTargetDataStore() == null) - { + if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } + if (isProfileMode()) { + queryTime = 0; + } + long t1 = System.currentTimeMillis(); - String targetSchemaName = null; - try - { + String targetSchemaName, targetThemeTable; + try { logger.info("-- step:clearOutputDatabase --"); clearOutputDatabase(); targetSchemaName = determineTargetSchemaName(); + targetThemeTable = determineTargetThemeTableName(); - if (checkConvertDB()) - { + if (checkConvertFile()) { + logger.info("-- step:convertIndexDesignFile --"); + long tStep = System.currentTimeMillis(); + convertIndexDesignFile(context, targetSchemaName); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertIndexDesignFile", tStep, tStepEnd); + } + + logger.info("-- step:convertOtherDesignFile --"); + tStep = System.currentTimeMillis(); + convertOtherDesignFile(context, targetSchemaName); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertOtherDesignFile", tStep, tStepEnd); + } + } + + OracleConvertPostGISJobContext jobContext = null; + if (checkConvertDB()) { logger.info("-- step:convertOracleDB --"); - OracleConvertPostGISJobContext jobContext = - (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath); + jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); jobContext.setSourceDataStore(getSourceDataStore()); // jobContext.setConvertElementIn(_convertElementIn); jobContext.setElementLogging(checkElementLogging()); jobContext.setExecutionContext(context); - if (isCopyConnectivityMode()) + + createHibernateSequence(jobContext); + + long tStep = System.currentTimeMillis(); + + fetchTPData(jobContext); + logger.info("TPC DIST:" + jobContext.getDistId() + ":" + + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); + + if (isCopyConnectivityMode()) { copyConnectivity(jobContext); + } + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Copy Connectivity", tStep, tStepEnd); + } - for (String orgSchema : _orgSchema) - { + for (String orgSchema : _orgSchema) { logger.info("----- start schema:" + orgSchema + " -----"); + if (isProfileMode()) { + jobContext.resetProcessTime(); + jobContext.resetUpdateTime(); + } + tStep = System.currentTimeMillis(); exetcuteConvert(jobContext, orgSchema, _dataPath); //close all open filewriter instance jobContext.closeFeatureWriter(); + + if (isProfileMode()) { + logger.warn("Profile-Current Query Oracle Cost-" + + ((int) ((getQueryTime()) / 60000.0)) + " min - " + + (((int) ((getQueryTime()) % 60000.0)) / 1000) + " sec"); + long tStepEnd = System.currentTimeMillis(); + logger.warn("Profile-Current Process Cost-" + + ((int) ((getProcessTime()) / 60000.0)) + " min - " + + (((int) ((getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current Update Cost-" + + ((int) ((getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((getUpdateTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current JobContext Process Cost-" + + ((int) ((jobContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((jobContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current JobContext Update Cost-" + + ((int) ((jobContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((jobContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + logTimeDiff("Profile-Convert[" + orgSchema + "]", tStep, tStepEnd); + + resetQueryTime(); + resetProcessTime(); + resetUpdateTime(); + } + } + + jobContext.closeOracleConnection(); + } + + if (checkConvertElementIn()) { + logger.info("-- step:convertFeatureDesignFile --"); + long tStep = System.currentTimeMillis(); + convertFeatureDesignFile(context, targetSchemaName); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); } } - if (checkConvertFile()) - { - logger.info("-- step:convertIndexDesignFile --"); - convertIndexDesignFile(context, targetSchemaName); - logger.info("-- step:convertOtherDesignFile --"); - convertOtherDesignFile(context, targetSchemaName); - } - - if (checkConvertElementIn()) - { - logger.info("-- step:convertFeatureDesignFile --"); - convertFeatureDesignFile(context, targetSchemaName); - } - - if (checkCreateDummy()) - { + if (checkCreateDummy()) { logger.info("-- step:createDummyFeatureFile --"); createDummyFeatureFile(context); } - disconnect(); + updateRepoStatusToReady(targetSchemaName); + + if (checkConvertPWThemes()) { + jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + jobContext.setSourceDataStore(getSourceDataStore()); + jobContext.setElementLogging(checkElementLogging()); + jobContext.setExecutionContext(context); + + long tStep = System.currentTimeMillis(); + if (!convertPowerOwnerThemeWithCopyAPI(jobContext, targetThemeTable)) { + convertPowerOwnerTheme(jobContext, targetThemeTable); + } + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + tStep = System.currentTimeMillis(); + if (!convertDynamicColorThemeWithCopyAPI(jobContext, targetThemeTable)) + convertDynamicColorTheme(jobContext, targetThemeTable); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + jobContext.closeOracleConnection(); + } + + updatePWThemeStatusToReady(targetThemeTable); + long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); - logger.warn("use time = " + ((int) ((t2 - t1) / 60000.0)) + " min - " + - (((int) ((t2 - t1) % 60000.0)) / 1000) + " sec"); - } catch (SQLException e) - { + logTimeDiff("Total ", t1, t2); + + } catch (SQLException e) { + disconnect(); logger.warn(e.getMessage(), e); throw new JobExecutionException("Database error. " + e.getMessage(), e); - } catch (IOException ex) - { + } catch (IOException ex) { + disconnect(); logger.warn(ex.getMessage(), ex); throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } finally { + disconnect(); } - updateRepoStatusToReady(targetSchemaName); logger.warn(jobName + " end at " + new Date()); } - /** - * Connectivity�ƻs�@�Ӫ����A�b�d�߹q�y��V�ɥΨӤ��OMS��Ʈw���q���s����(Connectivity) - * - * @param jobContext job context - * @throws SQLException sql exception - */ - private void copyConnectivity(OracleConvertPostGISJobContext jobContext) throws SQLException - { - Connection connection = jobContext.getOracleConnection(); - ResultSet rsMeta = connection.getMetaData().getTables(null, "BASEDB", - AbstractOracleJobContext.CONNECTIVITY_WEBCHECK_NAME + "%", - new String[]{"TABLE"}); - - boolean found = false; - try - { - while (rsMeta.next()) - { - String tablename = rsMeta.getString(3); - if (AbstractOracleJobContext.CONNECTIVITY_WEBCHECK_NAME.equalsIgnoreCase(tablename)) - { - found = true; - break; - } - } - // } catch (SQLException e) - } finally - { - if (rsMeta != null) - { - rsMeta.close(); - rsMeta = null; - } - } - Statement stmt = connection.createStatement(); - if (found) - { - stmt.execute(AbstractOracleJobContext.TRUNCATE_CONNECTIVITY_WEBCHECK); - } else { - logger.info("Create CONNECTIVITY_WEBCHECK table."); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_1); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_2); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_3); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_4); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_5); - stmt.execute(AbstractOracleJobContext.CREATE_CONNECTIVITY_WEBCHECK_INDEX_6); - stmt.execute(AbstractOracleJobContext.ALTER_CONNECTIVITY_WEBCHECK_1); - stmt.execute(AbstractOracleJobContext.ALTER_CONNECTIVITY_WEBCHECK_2); - } - - stmt.execute(AbstractOracleJobContext.COPY_CONNECTIVITY_TO_WEBCHECK); - stmt.close(); + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); } private void exetcuteConvert(OracleConvertPostGISJobContext jobContext, - String querySchema, String targetSchemaName) throws SQLException - { + String querySchema, String targetSchemaName) throws SQLException { int order = 0; OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(), - querySchema, "SD$SPACENODES", null); + querySchema, "SD$SPACENODES", null); logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode); @@ -345,11 +400,16 @@ int step = total / 100; int current = 0; + if (total == 0) { + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is zero."); + return; + } + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is " + map.size()); + //jobContext.startTransaction(); jobContext.setCurrentSchema(querySchema); jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0); - for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext();) - { + for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext(); ) { it.next(); Pair pair = (Pair) it.getValue(); @@ -360,14 +420,12 @@ order++; - if (_testMode) - { + if (_testMode) { if ((_testCount < 0) || (order >= _testCount)) break; } - if ((order % COMMITSIZE) == 0) - { + if ((order % COMMITSIZE) == 0) { // OracleConnection connection = jobContext.getOracleConnection(); // connection.commitTransaction(); jobContext.commitTransaction(); @@ -376,26 +434,34 @@ System.runFinalization(); } - int now = order % step; - if (now != current) - { - current = now; - jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + if (step != 0) { + int now = order % step; + if (now != current) { + current = now; + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + } + } else { + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + current++; } } jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100); jobContext.commitTransaction(); jobContext.resetFeatureContext(); + + if (isProfileMode()) { + + } + logger.info("end convert job:[" + order + "]"); System.gc(); System.runFinalization(); } protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc, - OrderedMap orderedMap) throws SQLException - { + OrderedMap orderedMap) throws SQLException { if (orderedMap == null) orderedMap = new LinkedMap(99); String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\""; @@ -405,17 +471,14 @@ ResultSet rs = null; stmt.setFetchSize(FETCHSIZE); - try - { + try { rs = stmt.executeQuery(fetchStmt); int size = rs.getMetaData().getColumnCount(); - while (rs.next()) - { + while (rs.next()) { Object[] values = new Object[size]; - for (int i = 0; i < size; i++) - { + for (int i = 0; i < size; i++) { values[i] = rs.getObject(i + 1); } @@ -428,23 +491,20 @@ else pair.first = name; } - } catch (SQLException e) - { + } catch (SQLException e) { logger.error(e.toString(), e); logger.error("stmt=" + fetchStmt); throw e; - } finally - { - if (rs != null) rs.close(); - stmt.close(); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); } return orderedMap; } protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc, - OrderedMap orderedMap) throws SQLException - { + OrderedMap orderedMap) throws SQLException { if (orderedMap == null) orderedMap = new LinkedMap(99); String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\""; @@ -454,15 +514,12 @@ stmt.setFetchSize(FETCHSIZE); ResultSet rs = stmt.executeQuery(fetchStmt); - try - { + try { int size = rs.getMetaData().getColumnCount(); - while (rs.next()) - { + while (rs.next()) { Object[] values = new Object[size]; - for (int i = 0; i < size; i++) - { + for (int i = 0; i < size; i++) { values[i] = rs.getObject(i + 1); } @@ -475,19 +532,18 @@ else pair.second = name; } - } finally - { - rs.close(); - stmt.close(); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); } return orderedMap; } protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext, - String srcschema, String srctable) throws SQLException - { + String srcschema, String srctable) throws SQLException { Connection connection = jobContext.getOracleConnection(); String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID"; + //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); @@ -495,37 +551,49 @@ stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); int igdsMetaType = rsSrc.getMetaData().getColumnType(1); - while (rsSrc.next()) - { - byte[] raw; - if (igdsMetaType == Types.BLOB) - { + while (rsSrc.next()) { + if (isProfileMode()) { + markQueryTime(); + } + + byte[] raw = null; + if (igdsMetaType == Types.BLOB) { BLOB blob = (BLOB) rsSrc.getBlob(1); - raw = getBytesFromBLOB(blob); - blob.close(); - } else - { + try { + raw = getBytesFromBLOB(blob); + } catch (BufferOverflowException e) { + logger.warn("Wrong Element Structure-", e); + } finally { + // blob.close(); + } + } else { raw = rsSrc.getBytes(1); } - try - { - Element element = fetchBinaryElement(raw); - jobContext.putFeatureCollection(element); - } catch (Dgn7fileException e) - { + try { + if (raw != null) { + Element element = fetchBinaryElement(raw); + if (isProfileMode()) { + accumulateQueryTime(); + } + jobContext.putFeatureCollection(element); + } else { + if (isProfileMode()) { + accumulateQueryTime(); + } + } + } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } } - rsSrc.close(); - stmtSrc.close(); + JDBCUtils.close(rsSrc); + JDBCUtils.close(stmtSrc); } protected void queryRawElement(OracleConvertPostGISJobContext jobContext, - String srcschema, String srctable) throws SQLException - { + String srcschema, String srctable) throws SQLException { Connection connection = jobContext.getOracleConnection(); String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt); @@ -535,10 +603,8 @@ stmtDest.setFetchSize(FETCHSIZE); ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt); - try - { - while (rsDest.next()) - { + try { + while (rsDest.next()) { ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1); long[] rawData = rawsValue.getLongArray(); byte[] comparessedValue; @@ -556,26 +622,21 @@ byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue); - - try - { + try { Element element = fetchBinaryElement(rawDest); jobContext.putFeatureCollection(element); - } catch (Dgn7fileException e) - { + } catch (Dgn7fileException e) { logger.warn("Dgn7Exception:" + e.getMessage(), e); } } - } finally - { - rsDest.close(); - stmtDest.close(); + } finally { + JDBCUtils.close(rsDest); + JDBCUtils.close(stmtDest); } } // Binary to Element - private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException - { + private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { ByteBuffer buffer = ByteBuffer.wrap(raws); buffer.order(ByteOrder.LITTLE_ENDIAN); short signature = buffer.getShort(); @@ -593,37 +654,29 @@ handler = recordType.getElementHandler(); Element dgnElement = (Element) handler.read(buffer, signature, elementLength); - if (recordType.isComplexElement() && (elementLength < raws.length)) - { + if (recordType.isComplexElement() && (elementLength < raws.length)) { int offset = elementLength; - while (offset < (raws.length - 4)) - { + while (offset < (raws.length - 4)) { buffer.position(offset); signature = buffer.getShort(); type = (byte) ((signature >>> 8) & 0x007f); elementLength = (buffer.getShort() * 2) + 4; - if (raws.length < (offset + elementLength)) - { - System.out.println("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); + if (raws.length < (offset + elementLength)) { + logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); break; } recordType = ElementType.forID(type); handler = recordType.getElementHandler(); - if (handler != null) - { + if (handler != null) { Element subElement = (Element) handler.read(buffer, signature, elementLength); ((ComplexElement) dgnElement).add(subElement); offset += elementLength; - } else - { + } else { byte[] remain = new byte[buffer.remaining()]; System.arraycopy(raws, offset, remain, 0, buffer.remaining()); - for (int i = 0; i < remain.length; i++) - { - if (remain[i] != 0) - { + for (int i = 0; i < remain.length; i++) { + if (remain[i] != 0) { logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); - System.out.println("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); } } break; @@ -635,120 +688,134 @@ } /** - * �����ഫ�����ɪ��u�@ + * �����ഫ�����ɪ��u�@ * - * @param context �u�@�������� + * @param context �u�@�������� * @throws org.quartz.JobExecutionException * exception */ - private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException - { - File indexDir = new File(getDataPath(), "index"); - if (!indexDir.exists()) - { + private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File indexDir = new File(getDataPath(), INDEXPATHNAME); + if (!indexDir.exists()) { logger.info("index dir=" + indexDir + " not exist."); return; } - if (!indexDir.isDirectory()) - { + if (!indexDir.isDirectory()) { logger.info("index dir=" + indexDir + " is not a directory."); } - File[] dgnFiles = indexDir.listFiles(new FilenameFilter() - { - public boolean accept(File dir, String name) - { - return name.toLowerCase().endsWith(".dgn"); + List<File> dgnFiles = FileUtils.recurseDir(indexDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); } }); - for (File dgnFile : dgnFiles) - { + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; IndexDgnConvertPostGISJobContext convertContext = - new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName); - logger.debug("--- start dgnfile-" + dgnFile.toString() + " ---"); - try - { + new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start index dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc = null; + Dgn7fileReader reader = null; + try { convertContext.clearOutputDatabase(); convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); convertContext.startTransaction(); - FileInputStream fs = new FileInputStream(dgnFile); - FileChannel fc = fs.getChannel(); - Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); convertContext.setReader(reader); scanIndexDgnElement(convertContext); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); + System.gc(); System.runFinalization(); - } catch (FileNotFoundException e) - { + } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (Dgn7fileException e) - { + } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IOException e) - { + } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IllegalAttributeException e) - { + } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (SchemaException e) - { + } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } } } } protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext) - throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException - { + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; - while (reader.hasNext()) - { - Dgn7fileReader.Record record = reader.nextElement(); - if (record.element() != null) - { + + while (reader.hasNext()) { + if (isProfileMode()) markProcessTime(); + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); - if ((!type.isComplexElement()) && (!element.isComponentElement())) - { - if (lastComplex != null) - { + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { processIndexElement(lastComplex, convertContext); lastComplex = null; } processIndexElement(element, convertContext); - } else if (element.isComponentElement()) - { - if (lastComplex != null) - { + } else if (element.isComponentElement()) { + if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } - } else if (type.isComplexElement()) - { - if (lastComplex != null) - { + } else if (type.isComplexElement()) { + if (lastComplex != null) { processIndexElement(lastComplex, convertContext); } lastComplex = element; @@ -757,145 +824,153 @@ count++; } - if (lastComplex != null) - { + if (lastComplex != null) { processIndexElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext) - throws IllegalAttributeException, SchemaException - { - if (useTpclidText) - { - if (element instanceof TextElement) - { - convertContext.putFeatureCollection(element); - } - } else { - if (element instanceof ShapeElement) - { - convertContext.putFeatureCollection(element); - } - } + throws IllegalAttributeException, SchemaException { + //if (useTpclidText) { + // if (element instanceof TextElement) { + // convertContext.putFeatureCollection(element); + // } + //} else { + // if (element instanceof ShapeElement) { + convertContext.putFeatureCollection(element); + // } + //} } /** - * �����ഫ��L�]�p���ɪ��u�@ + * �����ഫ��L�]�p���ɪ��u�@ * * @param context jobContext * @throws org.quartz.JobExecutionException * exception */ - private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException - { - File otherDir = new File(getDataPath(), "other"); - if (!otherDir.exists()) - { + private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File otherDir = new File(getDataPath(), OTHERPATHNAME); + if (!otherDir.exists()) { logger.info("other dir=" + otherDir + " not exist."); return; } - if (!otherDir.isDirectory()) - { + if (!otherDir.isDirectory()) { logger.info("other dir=" + otherDir + " is not a directory."); } - File[] dgnFiles = otherDir.listFiles(new FilenameFilter() - { - public boolean accept(File dir, String name) - { - return name.toLowerCase().endsWith(".dgn"); + List<File> dgnFiles = FileUtils.recurseDir(otherDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); } }); - for (File dgnFile : dgnFiles) - { + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; + GeneralDgnConvertPostGISJobContext convertContext = - new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName); - logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); - try - { + new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start other dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc; + Dgn7fileReader reader = null; + try { convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); convertContext.startTransaction(); - FileInputStream fs = new FileInputStream(dgnFile); - FileChannel fc = fs.getChannel(); - Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); convertContext.setReader(reader); scanOtherDgnElement(convertContext); convertContext.commitTransaction(); convertContext.closeFeatureWriter(); + System.gc(); System.runFinalization(); - } catch (FileNotFoundException e) - { + } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (Dgn7fileException e) - { + } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IOException e) - { + } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IllegalAttributeException e) - { + } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (SchemaException e) - { + } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } } } } public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext) - throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException - { + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; - while (reader.hasNext()) - { - Dgn7fileReader.Record record = reader.nextElement(); - if (record.element() != null) - { + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); - if ((!type.isComplexElement()) && (!element.isComponentElement())) - { - if (lastComplex != null) - { + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { processOtherElement(lastComplex, convertContext); lastComplex = null; } processOtherElement(element, convertContext); - } else if (element.isComponentElement()) - { - if (lastComplex != null) - { + } else if (element.isComponentElement()) { + if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } - } else if (type.isComplexElement()) - { - if (lastComplex != null) - { + } else if (type.isComplexElement()) { + if (lastComplex != null) { processOtherElement(lastComplex, convertContext); } lastComplex = element; @@ -904,23 +979,20 @@ count++; } - if (lastComplex != null) - { + if (lastComplex != null) { processOtherElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext) - throws IllegalAttributeException, SchemaException - { + throws IllegalAttributeException, SchemaException { convertContext.putFeatureCollection(element); } - private void clearOutputDatabase() - { + private void clearOutputDatabase() { /* - File outDataPath = new File(getDataPath(), OracleConvertPostGISJobContext.SHPOUTPATH); + File outDataPath = new File(getDataPath(), OracleConvertEdbGeoJobContext.SHPOUTPATH); if (outDataPath.exists() && outDataPath.isDirectory()) { deleteFilesInPath(outDataPath); @@ -938,33 +1010,24 @@ */ } - private void deleteFilesInPath(File outDataPath) - { + private void deleteFilesInPath(File outDataPath) { deleteFilesInPath(outDataPath, true); } - private void deleteFilesInPath(File outDataPath, boolean removeSubDir) - { - if (!outDataPath.isDirectory()) - { + private void deleteFilesInPath(File outDataPath, boolean removeSubDir) { + if (!outDataPath.isDirectory()) { return; } File[] files = outDataPath.listFiles(); - for (File file : files) - { - if (file.isFile()) - { - if (!file.delete()) - { + for (File file : files) { + if (file.isFile()) { + if (!file.delete()) { logger.info("Cannot delete file-" + file.toString()); } - } else if (file.isDirectory()) - { + } else if (file.isDirectory()) { deleteFilesInPath(file, removeSubDir); - if (removeSubDir) - { - if (file.delete()) - { + if (removeSubDir) { + if (file.delete()) { logger.info("Cannot delete dir-" + file.toString()); } } @@ -972,35 +1035,29 @@ } } - private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException - { + private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { File elminDir = new File(getDataPath(), "elmin"); - if (!elminDir.exists()) - { + if (!elminDir.exists()) { logger.info("elmin dir=" + elminDir + " not exist."); return; } - if (!elminDir.isDirectory()) - { + if (!elminDir.isDirectory()) { logger.info("elmin dir=" + elminDir + " is not a directory."); } - File[] dgnFiles = elminDir.listFiles(new FilenameFilter() - { - public boolean accept(File dir, String name) - { + File[] dgnFiles = elminDir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".dgn"); } }); - for (File dgnFile : dgnFiles) - { + for (File dgnFile : dgnFiles) { FeatureDgnConvertPostGISJobContext convertContext = - new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath); + new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); - try - { + try { convertContext.setExecutionContext(context); String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); @@ -1017,68 +1074,56 @@ convertContext.closeFeatureWriter(); System.gc(); System.runFinalization(); - } catch (FileNotFoundException e) - { + } catch (FileNotFoundException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (Dgn7fileException e) - { + } catch (Dgn7fileException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IOException e) - { + } catch (IOException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (IllegalAttributeException e) - { + } catch (IllegalAttributeException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); - } catch (SchemaException e) - { + } catch (SchemaException e) { convertContext.rollbackTransaction(); logger.warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); } } } public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext) - throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException - { + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { Dgn7fileReader reader = convertContext.getReader(); int count = 0; Element lastComplex = null; - while (reader.hasNext()) - { - Dgn7fileReader.Record record = reader.nextElement(); - if (record.element() != null) - { + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { Element element = (Element) record.element(); ElementType type = element.getElementType(); - if ((!type.isComplexElement()) && (!element.isComponentElement())) - { - if (lastComplex != null) - { + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); lastComplex = null; } processFeatureElement(element, convertContext); - } else if (element.isComponentElement()) - { - if (lastComplex != null) - { + } else if (element.isComponentElement()) { + if (lastComplex != null) { ((ComplexElement) lastComplex).add(element); } - } else if (type.isComplexElement()) - { - if (lastComplex != null) - { + } else if (type.isComplexElement()) { + if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); } lastComplex = element; @@ -1087,21 +1132,18 @@ count++; } - if (lastComplex != null) - { + if (lastComplex != null) { processFeatureElement(lastComplex, convertContext); } logger.debug("ElementRecord Count=" + count); } private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext) - throws IllegalAttributeException, SchemaException - { + throws IllegalAttributeException, SchemaException { convertContext.putFeatureCollection(element); } - private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException - { + private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException { /* DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath); try { @@ -1116,15 +1158,12 @@ */ } - public DataStore getTargetDataStore() - { + public DataStore getTargetDataStore() { return targetDataStore; } - protected void createTargetDataStore() throws JobExecutionException - { - if (targetDataStore != null) - { + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } @@ -1136,53 +1175,55 @@ } */ - if (!pgProperties.containsKey(PostgisDataStoreFactory.MAXCONN.key)) - { - pgProperties.put(PostgisDataStoreFactory.MAXCONN.key, "10"); + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); } - if (!pgProperties.containsKey(PostgisDataStoreFactory.MINCONN.key)) - { - pgProperties.put(PostgisDataStoreFactory.MINCONN.key, "1"); + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); } - if (!pgProperties.containsKey(PostgisDataStoreFactory.WKBENABLED.key)) - { - pgProperties.put(PostgisDataStoreFactory.WKBENABLED.key, "true"); + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); } + */ - if (!dataStoreFactory.canProcess(pgProperties)) - { + if (!dataStoreFactory.canProcess(pgProperties)) { getLogger().warn("cannot process properties-"); throw new JobExecutionException("cannot process properties-"); } - try - { - targetDataStore = (PostgisDataStore) dataStoreFactory.createDataStore(pgProperties); - } catch (IOException e) - { + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { getLogger().warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } - private String determineTargetSchemaName() throws IOException - { + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; String targetSchema = null; boolean needCreate = false; - try - { + try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); - rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[] {"TABLE"}); + // Create XGVERSIONTABLE_NAME + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); if (!rs.next()) needCreate = true; if (needCreate) createXGeosVersionTable(connection, _pgSchema); rs.close(); - rs = null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vsschema, vsstatus FROM "); @@ -1193,25 +1234,21 @@ ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); int i = 0; int current = -1; - while (rs.next()) - { + while (rs.next()) { Object[] values = new Object[2]; values[0] = rs.getString("vsschema"); values[1] = rs.getShort("vsstatus"); tmpSchemas.add(values); - if ((((Short)values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) - { + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { current = i; } i++; } - if (current == -1) - { + if (current == -1) { Object[] values = tmpSchemas.get(0); targetSchema = (String) values[0]; - } else if (current < (tmpSchemas.size() - 1)) - { + } else if (current < (tmpSchemas.size() - 1)) { Object[] values = tmpSchemas.get(current + 1); targetSchema = (String) values[0]; } else { @@ -1226,28 +1263,98 @@ sbSQL.append(" WHERE vsschema = '"); sbSQL.append(targetSchema).append("'"); int count = stmt.executeUpdate(sbSQL.toString()); - if (count != 1) - { + if (count != 1) { logger.info("update status for " + targetSchema + " update result count=" - + count); + + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { - if (rs != null) try { rs.close(); } catch (SQLException e) {}; - if (stmt != null) try { stmt.close(); } catch (SQLException e) {}; - if (connection != null) try { connection.close(); } catch (SQLException e) {}; + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } return targetSchema; } - public String encodeSchemaTableName(String schemaName, String tableName) - { + private String determineTargetThemeTableName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetTable = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XPTVERSIONTABLE_NAME + needCreate = false; + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXPWThemeVersionTable(connection, _pgSchema); + rs.close(); + + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vptname"); + values[1] = rs.getShort("vptstatus"); + tmpTablenames.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } else if (current < (tmpTablenames.size() - 1)) { + Object[] values = tmpTablenames.get(current + 1); + targetTable = (String) values[0]; + } else { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vptname = '"); + sbSQL.append(targetTable).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetTable + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetTable; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + if (schemaName == null) + return "\"" + tableName + "\""; return "\"" + schemaName + "\".\"" + tableName + "\""; } - private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException - { + private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException { Statement stmt = null; StringBuilder sql = new StringBuilder("CREATE TABLE "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); @@ -1255,14 +1362,13 @@ sql.append(" vsschema character varying(64) NOT NULL, "); sql.append(" vsstatus smallint NOT NULL, "); sql.append(" vstimestamp timestamp with time zone ) "); - try - { + try { stmt = connection.createStatement(); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("ALTER TABLE "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); - sql.append(" OWNER TO spatialdb"); + sql.append(" OWNER TO ").append(_pgUsername); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("GRANT ALL ON TABLE "); @@ -1270,8 +1376,7 @@ sql.append(" TO public"); stmt.executeUpdate(sql.toString()); - for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) - { + for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) { sql = new StringBuilder("INSERT INTO "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); sql.append(" (vsschema, vsstatus) VALUES ('"); @@ -1287,48 +1392,110 @@ } } - private void updateRepoStatusToReady(String targetSchema) - { + private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" ( vptid serial PRIMARY KEY, "); + sql.append(" vptname character varying(64) NOT NULL, "); + sql.append(" vptstatus smallint NOT NULL, "); + sql.append(" vpttimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" (vptname, vptstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + + private void updateRepoStatusToReady(String targetSchema) { if (targetDataStore == null) return; Connection connection = null; Statement stmt = null; ResultSet rs = null; boolean needCreate = false; - try - { + try { StringBuilder sbSQL = new StringBuilder("UPDATE "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); sbSQL.append(" SET vsstatus = "); sbSQL.append(DataReposVersionManager.VSSTATUS_READY); - sbSQL.append(" WHERE vsschema = '"); + sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '"); sbSQL.append(targetSchema).append("'"); connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); stmt = connection.createStatement(); int count = stmt.executeUpdate(sbSQL.toString()); - if (count != 1) - { + if (count != 1) { logger.info("update status for " + targetSchema + " update result count=" - + count); + + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); - } catch (IOException e) - { + } catch (IOException e) { logger.warn(e.getMessage(), e); } finally { - if (rs != null) try { rs.close(); } catch (SQLException e) {}; - if (stmt != null) try { stmt.close(); } catch (SQLException e) {}; - if (connection != null) try { connection.close(); } catch (SQLException e) {}; + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } } - private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException - { + private void updatePWThemeStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; Statement stmt = null; ResultSet rs = null; - try - { + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException { + Statement stmt = null; + ResultSet rs = null; + try { /* rs = connection.getMetaData().getSchemas(null, s); if (rs.next()) return; @@ -1338,7 +1505,7 @@ StringBuilder sbSQL = new StringBuilder("CREATE SCHEMA "); sbSQL.append(s).append(' '); - sbSQL.append("AUTHORIZATION spatialdb"); + sbSQL.append("AUTHORIZATION ").append(_pgUsername); stmt = connection.createStatement(); stmt.executeUpdate(sbSQL.toString()); @@ -1346,8 +1513,7 @@ sbSQL.append(s).append(' '); sbSQL.append("TO public"); stmt.executeUpdate(sbSQL.toString()); - } catch (SQLException e) - { + } catch (SQLException e) { logger.info("create schema:" + s + " has exception."); logger.info(e.getMessage(), e); } finally { @@ -1355,4 +1521,382 @@ if (stmt != null) stmt.close(); } } + + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + + private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setString(3, colorText); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setShort(3, (short) ownerId); + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + pstmt.setString(4, "shape://ccarrow"); + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + pstmt.setString(4, "shape://rccarrow"); + } else { + pstmt.setString(4, "shape://backslash"); + } + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + if (cid > Short.MAX_VALUE) { + logger.info("Wrong Color Table:" + cid + "-" + oid); + continue; + } + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(colorText).append("\n"); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + String flowMark = null; + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + flowMark = FORWARDFLOW_MARK; + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + flowMark = BACKFLOW_MARK; + } else if (ConnectivityDirectionEnum.Nondeterminate == dir) { + flowMark = NONFLOW_MARK; + } else { + flowMark = UNFLOW_MARK; + } + + if (cid > Short.MAX_VALUE) { + logger.info("Wrong Connectivity Table:" + cid + "-" + oid); + continue; + } + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(ownerId).append(','); + sb.append(flowMark).append('\n'); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + stmt.execute("DROP TABLE " + tempTable); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } } -- Gitblit v0.0.0-SNAPSHOT