From 8228a9616175b94ff0df5a9832184e5459c07c1a Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Fri, 07 Mar 2014 02:10:56 +0800 Subject: [PATCH] update for increment and theme jobs --- xdgnjobs/ximple-jobcarrier/quartz_jobs_inc.xml | 169 +++ xdgnjobs/ximple-jobcarrier/quartz.properties | 5 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties | 4 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_inc.xml | 168 +++ xdgnjobs/ximple-spatialjob/src/main/resources/conf/DefaultConvertShpFilter.xml | 20 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleClearExchangeJob.java | 370 ++++++++ xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java | 131 ++ xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java | 290 +++++ xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java | 36 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java | 1360 +++++++++++++++++++++++++++++ xdgnjobs/ximple-jobcarrier/quartz_jobs_colowner.xml | 164 +++ 11 files changed, 2,666 insertions(+), 51 deletions(-) diff --git a/xdgnjobs/ximple-jobcarrier/quartz.properties b/xdgnjobs/ximple-jobcarrier/quartz.properties index e491e0d..19eb15a 100644 --- a/xdgnjobs/ximple-jobcarrier/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/quartz.properties @@ -23,8 +23,11 @@ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin -org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml + +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml +org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml + org.quartz.plugin.jobInitializer.failOnFileNotFound = true org.quartz.plugin.jobInitializer.scanInterval = 10 org.quartz.plugin.jobInitializer.wrapInUserTransaction = false diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_colowner.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_colowner.xml new file mode 100644 index 0000000..c086024 --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_colowner.xml @@ -0,0 +1,164 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <job-class>com.ximple.eofms.jobs.OracleConvertThemes2PostGISJob</job-class> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>/Users/ulysseskao/Projects/XGeoDMMS/xjobrun/nstpcjobs/jobdata</value> + <!--value>/mnt/hdisk/home.data/private/projects/xdcad/xjobrun/nntpcjobs/jobdata</value--> + </entry> + <entry> + <key>PGHOST</key> + <value>10.16.17.14</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>public</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORAHOST</key> + <value>10.16.17.14</value> + </entry> + <entry> + <key>ORAINST</key> + <value>nntpc</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>manager</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>false</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://192.168.11.99:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>true</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_inc.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_inc.xml new file mode 100644 index 0000000..1a278e6 --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_inc.xml @@ -0,0 +1,169 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertIncrementDMMS2PostGis</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <job-class>com.ximple.eofms.jobs.OracleIncrementDgn2PostGISJob</job-class> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <!--value>/home/ulysseskao/projects/xgeodmms/xjobrun/nstpcjobs/jobdata</value--> + <value>/Users/ulysseskao/Projects/XGeoDMMS/xjobrun/nstpcjobs/jobdata</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.16.17.14</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>public</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>tpc000</value> + </entry> + <entry> + <key>ORAHOST</key> + <value>10.16.17.14</value> + </entry> + <entry> + <key>ORAINST</key> + <value>nntpc</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>manager</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.7:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertIncrementDMMS2PostGis</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties index 15de783..1619aaa 100644 --- a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties @@ -23,7 +23,9 @@ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.jobInitializer.class = org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin -org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml + +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml +org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml org.quartz.plugin.jobInitializer.failOnFileNotFound = true diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_inc.xml b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_inc.xml new file mode 100644 index 0000000..94ae77b --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_inc.xml @@ -0,0 +1,168 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertIncrementDMMS2PostGis</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <job-class>com.ximple.eofms.jobs.OracleIncrementDgn2PostGISJob</job-class> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>/home/ulysseskao/projects/xgeodmms/xjobrun/nstpcjobs/jobdata</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.9</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>public</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORAHOST</key> + <value>10.10.1.9</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>SYSTEM000</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>true</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.7:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleClearExchangeJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleClearExchangeJob.java new file mode 100644 index 0000000..9dc6d62 --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleClearExchangeJob.java @@ -0,0 +1,370 @@ +package com.ximple.eofms.jobs; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Date; +import java.util.Map; +import java.util.TreeMap; + +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.jdbc.JDBCDataStore; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class OracleClearExchangeJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(OracleClearExchangeJob.class); + + public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA"; + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 100; + private static final int COMMITSIZE = 100; + + protected static class Pair { + Object first; + Object second; + + public Pair(Object first, Object second) { + this.first = first; + this.second = second; + } + } + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + protected Map<String, String> pgProperties; + protected JDBCDataStore targetDataStore; + + private long queryTime = 0; + private long queryTimeStart = 0; + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + /* + logger.info("PGHOST=" + _myHost); + logger.info("PGDATBASE=" + _myDatabase); + logger.info("PGPORT=" + _myPort); + logger.info("PGSCHEMA=" + _mySchema); + logger.info("PGUSER=" + _myUsername); + logger.info("PGPASS=" + _myPassword); + logger.info("USEWKB=" + _myUseWKB); + */ + + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + pgProperties = remote; + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + createSourceDataStore(); + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName; + try { + logger.info("-- step:clearOutputDatabase --"); + clearOutputDatabase(); + + logger.info("-- step:transformOracleDMMSDB --"); + targetSchemaName = ""; + + OracleConvertPostGISJobContext jobContext = + (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + jobContext.setSourceDataStore(getSourceDataStore()); + jobContext.setExecutionContext(context); + + long tStep = System.currentTimeMillis(); + + fetchTPData(jobContext); + logger.info("TPC DIST:" + jobContext.getDistId() + ":" + + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); + + clearExchangeData(jobContext); + + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd); + } + + tStep = System.currentTimeMillis(); + + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd); + } + + jobContext.closeOracleConnection(); + + long t2 = System.currentTimeMillis(); + // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; + // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); + logTimeDiff("Total ", t1, t2); + + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + private void clearExchangeData(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException { + Connection connection = jobContext.getOracleConnection(); + + ResultSet rsMeta = connection.getMetaData().getTables(null, "CMMS_POSTDB", "GEO_EXCHANGE", + new String[]{"TABLE"}); + + boolean found = false; + try { + while (rsMeta.next()) { + found = true; + break; + } + // } catch (SQLException e) + } finally { + if (rsMeta != null) { + rsMeta.close(); + rsMeta = null; + } + } + + if (!found) { + logger.info("Cannot Found GEO_EXCHANGE in CMMS_POSTDB."); + return; + } + + Statement stmt = null; + try { + stmt = connection.createStatement(); + int count = stmt.executeUpdate("DELETE FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE=1"); + logger.info("DELETE GEO_EXCHANGE UPDATE SIZE=" + count); + } finally { + JDBCUtils.close(stmt); + } + } + + @Override + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) { + throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME); + } + rs.close(); + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current != -1) { + Object[] values = tmpSchemas.get(current); + targetSchema = (String) values[0]; + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + + private void clearOutputDatabase() { + } +} diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java index 9bbfe1c..974237e 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java @@ -317,6 +317,7 @@ } } + clearExchangeData(jobContext); jobContext.closeOracleConnection(); } @@ -388,6 +389,41 @@ (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); } + private void clearExchangeData(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException { + Connection connection = jobContext.getOracleConnection(); + + ResultSet rsMeta = connection.getMetaData().getTables(null, "CMMS_POSTDB", "GEO_EXCHANGE", + new String[]{"TABLE"}); + + boolean found = false; + try { + while (rsMeta.next()) { + found = true; + break; + } + // } catch (SQLException e) + } finally { + if (rsMeta != null) { + rsMeta.close(); + rsMeta = null; + } + } + + if (!found) { + logger.info("Cannot Found GEO_EXCHANGE in CMMS_POSTDB."); + return; + } + + Statement stmt = null; + try { + stmt = connection.createStatement(); + int count = stmt.executeUpdate("UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=1 WHERE ISEXCHANGE=0"); + logger.info("GEO_EXCHANGE UPDATE SIZE=" + count); + } finally { + JDBCUtils.close(stmt); + } + } + private void exetcuteConvert(OracleConvertPostGISJobContext jobContext, String querySchema, String targetSchemaName) throws SQLException { int order = 0; diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java new file mode 100644 index 0000000..6fd990d --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertThemes2PostGISJob.java @@ -0,0 +1,1360 @@ +package com.ximple.eofms.jobs; + +import java.io.IOException; +import java.io.PushbackReader; +import java.io.StringReader; +import java.net.URL; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import com.ximple.eofms.geoserver.config.XGeosDataConfig; +import com.ximple.eofms.geoserver.config.XGeosDataConfigMapping; +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import com.ximple.eofms.util.ConnectivityDirectionEnum; +import com.ximple.eofms.util.DefaultColorTable; +import com.ximple.eofms.util.PrintfFormat; +import com.ximple.eofms.util.XGeosConfigDigesterUtils; +import org.apache.commons.collections.MultiMap; +import org.apache.commons.dbcp.DelegatingConnection; +import org.apache.commons.digester3.Digester; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.jdbc.JDBCDataStore; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.xml.sax.SAXException; + +public class OracleConvertThemes2PostGISJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(GeoserverIntegrateConfigJob.class); + + private static final String SKIPCONFIGJOB = "SKIPCONFIGJOB"; + private static final String MASTERMODE = "MASTERMODE"; + private static final String EPSG = "EPSG:"; + private static final String XGEOSDATACONFIG_PATH = "xgeosdataconfig.xml"; + + // private static final int MAGIC_BLOCKSIZE = (64 * 1024 * 1024) - (32 * 1024); + + private static final String QUERY_VIEWDEFSQL = "SELECT table_name, view_definition FROM information_schema.views " + + "WHERE table_schema = ? AND table_name LIKE "; + + private static final String CREATE_VIEWSQL = "CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\".\"%s\""; + private static final String EXTRAWHERE_VIEWSQL = " WHERE \"%s\".level = %s AND \"%s\".symweight = %s"; + + private static final String ALTER_VIEWSQL = "ALTER TABLE \"%s\" OWNER TO "; + // private static final String GRANT_VIEWSQL = "GRANT SELECT ON TABLE \"%s\" TO public"; + private static final int SRSID_TWD97_ZONE119 = 3825; + private static final int SRSID_TWD97_ZONE121 = 3826; + public static final String DEFAULT_STORENAME = "pgDMMS"; + public static final String DEFAULT_GEODMMS_NAMESPACE = "http://tpc.ximple.com.tw/geodmms"; + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 30; + private static final int COMMITSIZE = 100; + + public static final String FORWARDFLOW_MARK = "shape://ccarrow"; + public static final String BACKFLOW_MARK = "shape://rccarrow"; + public static final String UNFLOW_MARK = "shape://backslash"; + public static final String NONFLOW_MARK = "shape://slash"; + + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR WHERE TAG_BCOMPID = 0 ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; + + private static XGeosDataConfigMapping xgeosDataConfigMapping = null; + protected JDBCDataStore targetDataStore; + protected Map<String, String> pgProperties; + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + private long queryTime = 0; + private long queryTimeStart = 0; + + public Log getLogger() { + return logger; + } + + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, + boolean profileMode, + boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + } + + protected XGeosDataConfigMapping getConfigMapping() { + if (xgeosDataConfigMapping == null) { + Digester digester = XGeosConfigDigesterUtils.getXGeosConfigDigester(); + final URL configDataURL = XGeosDataConfigMapping.class.getResource(XGEOSDATACONFIG_PATH); + try { + xgeosDataConfigMapping = (XGeosDataConfigMapping) digester.parse(configDataURL); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } catch (SAXException e) { + logger.warn(e.getMessage(), e); + } + + } + return xgeosDataConfigMapping; + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + createSourceDataStore(); + createTargetDataStore(); + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName, targetThemeTable; + try { + logger.info("-- step:clearOutputDatabase --"); + + targetSchemaName = determineTargetSchemaName(); + targetThemeTable = determineTargetThemeTableName(); + + OracleConvertPostGISJobContext jobContext = null; + + if (checkConvertPWThemes()) { + jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + jobContext.setSourceDataStore(getSourceDataStore()); + jobContext.setElementLogging(checkElementLogging()); + jobContext.setExecutionContext(context); + + long tStep = System.currentTimeMillis(); + if (!convertPowerOwnerThemeWithCopyAPI(jobContext, targetThemeTable)) { + convertPowerOwnerTheme(jobContext, targetThemeTable); + } + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + tStep = System.currentTimeMillis(); + if (!convertDynamicColorThemeWithCopyAPI(jobContext, targetThemeTable)) + convertDynamicColorTheme(jobContext, targetThemeTable); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + jobContext.closeOracleConnection(); + } + + updatePWThemeStatusToReady(targetThemeTable); + + long t2 = System.currentTimeMillis(); + // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; + // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); + logTimeDiff("Total ", t1, t2); + + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + + + try { + logger.info("-- step:resetPostgisViewMapping --"); + long tStep = System.currentTimeMillis(); + resetThemesViewMapping(context); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-resetPostgisViewMapping", tStep, tStepEnd); + } + logger.info("-- step:resetGeoServerConfig --"); + tStep = System.currentTimeMillis(); + // resetGeoServerConfig(jobExecutionContext); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-resetGeoServerConfig", tStep, tStepEnd); + } + } finally { + disconnect(); + } + } + + /** + * 重新建立所有重新建立所有PostGIS中的資料庫視景 + * + * @param executionContext 批次執行的關係 + */ + private void resetThemesViewMapping(JobExecutionContext executionContext) { + assert executionContext != null; + Connection connection = null; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + String ownerName = _pgUsername; + + String currentTargetThemesName = retrieveCurrentThemeName(connection, + DataReposVersionManager.VSSTATUS_READY); + if (currentTargetThemesName == null) { + logger.info("Cannot found themes that status is VSSTATUS_READY[" + + DataReposVersionManager.VSSTATUS_READY + "]"); + return; + } + + ArrayList<String> realTableNames = new ArrayList<String>(); + retrieveAllRealTableName(connection, currentTargetThemesName, realTableNames); + + resetThemesBaseView(connection, ownerName, currentTargetThemesName); + + XGeosDataConfigMapping configMapping = getConfigMapping(); + String[] allView = retrieveTargetStoreAllViewNames(connection); + TreeSet<String> allViewNames = new TreeSet<String>(); + if (allView != null) { + allViewNames.addAll(Arrays.asList(allView)); + } + List values = (List) configMapping.getMapping().get("pgOMS"); + for (Object value : values) { + XGeosDataConfig xgeosConfig = (XGeosDataConfig) value; + short tid = xgeosConfig.getFSC(); + short cid = xgeosConfig.getCOMP(); + StringBuilder sbTable = new StringBuilder("fsc-"); + sbTable.append(tid).append("-c-"); + sbTable.append(cid); + + int index = realTableNames.indexOf(sbTable.toString()); + if (index == -1) { + logger.debug("pgOMS LayerView Cannot found-" + xgeosConfig.toString()); + continue; + } + + StringBuilder sbView = new StringBuilder("fsc-"); + sbView.append(tid).append("-c"); + sbView.append(cid).append("-l"); + sbView.append(xgeosConfig.getLEV()).append("-w"); + sbView.append(xgeosConfig.getWEIGHT()); + String viewName = sbView.toString(); + if (allViewNames.contains(viewName)) { + resetThemesPostgisDataView(connection, ownerName, null, viewName); + if (tid == 106) { + resetFlowThemesPostgisDataView(connection, ownerName, null, viewName); + } + } + } + + updateCurrentThemeStatus(connection, currentTargetThemesName, + DataReposVersionManager.VSSTATUS_LINKVIEW); + + // String[] featureNames = dataStore.getTypeNames(); + // logger.info("featureNames[] size = " + featureNames.length); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + if (connection != null) + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + // if (dataStore != null) dataStore.dispose(); + } + } + + private void retrieveAllRealTableName(Connection connection, String targetSchema, + ArrayList<String> realTableNames) throws SQLException { + ResultSet rsMeta = null; + try { + rsMeta = connection.getMetaData().getTables("", targetSchema, "fsc%", new String[]{"TABLE"}); + while (rsMeta.next()) { + String tableName = rsMeta.getString(3); + realTableNames.add(tableName); + } + rsMeta.close(); + rsMeta = null; + + rsMeta = connection.getMetaData().getTables("", targetSchema, "index%", new String[]{"TABLE"}); + while (rsMeta.next()) { + String tableName = rsMeta.getString(3); + realTableNames.add(tableName); + } + rsMeta.close(); + rsMeta = null; + + rsMeta = connection.getMetaData().getTables("", targetSchema, "lndtpc%", new String[]{"TABLE"}); + while (rsMeta.next()) { + String tableName = rsMeta.getString(3); + realTableNames.add(tableName); + } + } finally { + if (rsMeta != null) rsMeta.close(); + } + } + + private void resetPostgisDataView(Connection connection, HashMap<String, String> viewDefs, + String ownerName, String schemaName, String tableName) throws SQLException { + String[] splits = tableName.split("-"); + if (splits.length > 3) { + // feature table + + StringBuilder viewBuilder = new StringBuilder(); + viewBuilder.append(splits[0]); + viewBuilder.append('-'); + viewBuilder.append(splits[1]); + viewBuilder.append('-'); + viewBuilder.append(splits[2]); + viewBuilder.append(splits[3]); + String viewName = viewBuilder.toString(); + if (viewDefs.containsKey(viewName)) { + String viewDef = viewDefs.get(viewName); + int pos = viewDef.indexOf("FROM"); + String subView = viewDef.substring(pos + 4); + // String[] viewSources = subView.split("\\."); + String[] viewSources = subView.split("(\\.\"|\")"); + if (!viewSources[0].equalsIgnoreCase(schemaName)) { + createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); + } + } else { + createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); + } + + } else { + + splits = tableName.split("_"); + if (splits.length > 0) { + StringBuilder viewBuilder = new StringBuilder(); + viewBuilder.append(splits[0]); + if (splits.length > 1) viewBuilder.append(splits[1]); + if (splits.length > 2) viewBuilder.append(splits[2]); + String viewName = viewBuilder.toString(); + if (viewDefs.containsKey(viewName)) { + String viewDef = viewDefs.get(viewName); + int pos = viewDef.indexOf("FROM"); + String subView = viewDef.substring(pos + 4); + String[] viewSources = subView.split("(\\.\"|\")"); + if (!viewSources[0].equalsIgnoreCase(schemaName)) { + createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); + } + } else { + createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); + } + } + } + } + + private void resetThemesBaseView(Connection connection, String ownerName, String currentThemesName) + throws SQLException { + String viewName = "xpwtheme" + FDYNCOLOR_SUFFIX; + String tableName = currentThemesName + FDYNCOLOR_SUFFIX; + PrintfFormat pf = new PrintfFormat("CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\""); + String sql = pf.sprintf(new Object[]{viewName, tableName}); + Statement stmt = connection.createStatement(); + try { + stmt.execute(sql); + pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(viewName); + stmt.execute(sql); + + viewName = "xpwtheme" + FOWNER_SUFFIX; + tableName = currentThemesName + FOWNER_SUFFIX; + pf = new PrintfFormat("CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\""); + sql = pf.sprintf(new Object[]{viewName, tableName}); + + stmt.execute(sql); + pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(viewName); + stmt.execute(sql); + } catch (SQLException e) { + // logger.warn(e.getMessage(), e); + logger.info(sql == null ? "SQL=NULL" : "SQL=" + sql); + throw e; + } finally { + stmt.close(); + } + } + + + private void resetThemesPostgisDataView(Connection connection, String ownerName, + String currentSchema, String viewName) throws SQLException { + String themeViewName = viewName + "-oms"; + // PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); + // String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); + ResultSet rs = null; + Statement stmt = connection.createStatement(); + + try { + StringBuilder sbSQL = new StringBuilder("CREATE OR REPLACE VIEW \""); + sbSQL.append(themeViewName).append("\" AS SELECT "); + + rs = connection.getMetaData().getColumns(null, currentSchema, viewName, "%"); + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + sbSQL.append("t." + fieldName).append(", "); + } + sbSQL.append("fc.dyncolor, fo.fowner FROM "); + if (currentSchema != null) + sbSQL.append("\"").append(currentSchema).append("\".\"").append(viewName).append("\" AS t,"); + else + sbSQL.append("\"").append(viewName).append("\" AS t,"); + sbSQL.append("xpwtheme").append(FDYNCOLOR_SUFFIX).append(" AS fc,"); + sbSQL.append("xpwtheme").append(FOWNER_SUFFIX).append(" AS fo WHERE "); + sbSQL.append("t.tid = fc.tid AND t.oid = fc.oid AND "); + sbSQL.append("t.tid = fo.tid AND t.oid = fo.oid"); + + // sbSQL.delete(sbSQL.length() - 2, sbSQL.length()); + String sql = sbSQL.toString(); + stmt.execute(sql); + sbSQL.delete(0, sbSQL.length()); + + PrintfFormat pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(themeViewName); + stmt.execute(sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void resetFlowThemesPostgisDataView(Connection connection, String ownerName, + String currentSchema, String viewName) throws SQLException { + String themeViewName = viewName + "-flow-oms"; + ResultSet rs = null; + Statement stmt = connection.createStatement(); + + try { + StringBuilder sbSQL = new StringBuilder("CREATE OR REPLACE VIEW \""); + sbSQL.append(themeViewName).append("\" AS SELECT "); + + rs = connection.getMetaData().getColumns(null, currentSchema, viewName, "%"); + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + sbSQL.append("t." + fieldName).append(", "); + } + sbSQL.append("fc.dyncolor, fo.fowner, fo.flow FROM "); + if (currentSchema != null) + sbSQL.append("\"").append(currentSchema).append("\".\"").append(viewName).append("\" AS t,"); + else + sbSQL.append("\"").append(viewName).append("\" AS t,"); + sbSQL.append("xpwtheme").append(FDYNCOLOR_SUFFIX).append(" AS fc,"); + sbSQL.append("xpwtheme").append(FOWNER_SUFFIX).append(" AS fo WHERE "); + sbSQL.append("t.tid = fc.tid AND t.oid = fc.oid AND "); + sbSQL.append("t.tid = fo.tid AND t.oid = fo.oid"); + + // sbSQL.delete(sbSQL.length() - 2, sbSQL.length()); + String sql = sbSQL.toString(); + stmt.execute(sql); + sbSQL.delete(0, sbSQL.length()); + + PrintfFormat pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(themeViewName); + stmt.execute(sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private HashMap<String, String> retrieveViewDef(Connection connection, String schemaName, String tablePattern) throws SQLException { + PreparedStatement stmt = connection.prepareStatement(QUERY_VIEWDEFSQL + "'" + tablePattern + "'"); + stmt.setString(1, schemaName); + // stmt.setString(2, tablePattern); + HashMap<String, String> result = new HashMap<String, String>(); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + String tableName = rs.getString(1); + String viewDef = rs.getString(2); + result.put(tableName, viewDef); + } + rs.close(); + stmt.close(); + return result; + } + + private void createOrReplaceView(Connection connection, String schemaName, String tableName, String viewName, + String ownerName) throws SQLException { + PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); + String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); + Statement stmt = connection.createStatement(); + try { + stmt.execute(sql); + pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(viewName); + stmt.execute(sql); + } catch (SQLException e) { + // logger.warn(e.getMessage(), e); + logger.info(sql == null ? "SQL=NULL" : "SQL=" + sql); + throw e; + } finally { + stmt.close(); + } + // connection.commit(); + } + + private void createOrReplaceExtraView(Connection connection, String schemaName, String tableName, String viewName, + String ownerName, XGeosDataConfig xgeosConfig) throws SQLException { + PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); + String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); + + PrintfFormat pfWhere = new PrintfFormat(EXTRAWHERE_VIEWSQL); + sql += pfWhere.sprintf(new String[]{tableName, Short.toString(xgeosConfig.getLEV()), + tableName, Short.toString(xgeosConfig.getWEIGHT())}); + + Statement stmt = connection.createStatement(); + stmt.execute(sql); + + pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); + sql = pf.sprintf(viewName); + stmt.execute(sql); + stmt.close(); + // connection.commit(); + } + + private Timestamp retrieveCurrentSchemaTimestamp(Connection connection, short status) throws SQLException { + StringBuilder sbSQL = new StringBuilder("SELECT vstimestamp, vsschema, vsstatus FROM "); + sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME); + sbSQL.append(" WHERE vsstatus = "); + sbSQL.append(status); + sbSQL.append(" ORDER BY vsid"); + + Timestamp result = null; + Statement stmt = null; + ResultSet rs = null; + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + // get first result + if (rs.next()) { + result = rs.getTimestamp(1); + } + return result; + } finally { + if (rs != null) rs.close(); + if (stmt != null) stmt.close(); + } + } + + private void updateCurrentRepositoryStatus(Connection connection, String schemaName, short newStatus) + throws SQLException { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(newStatus); + sbSQL.append(", vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '"); + sbSQL.append(schemaName).append("'"); + + Statement stmt = null; + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sbSQL.toString()); + } finally { + if (stmt != null) stmt.close(); + } + } + + private boolean checkCurrentRepositoryStatus(Connection connection, short status) { + try { + return (retrieveCurrentSchemaName(connection, status) != null); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + return false; + } + } + + private String retrieveCurrentSchemaName(Connection connection, short status) throws SQLException { + StringBuilder sbSQL = new StringBuilder("SELECT vsschema, vstimestamp, vsstatus FROM "); + sbSQL.append(DataReposVersionManager.XGVERSIONTABLE_NAME); + sbSQL.append(" WHERE vsstatus = "); + sbSQL.append(status); + sbSQL.append(" ORDER BY vsid"); + + String result = null; + Statement stmt = null; + ResultSet rs = null; + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + // get first result + if (rs.next()) { + result = rs.getString(1); + } + return result; + } finally { + if (rs != null) rs.close(); + if (stmt != null) stmt.close(); + } + } + + private void updateCurrentThemeStatus(Connection connection, String schemaName, short newStatus) + throws SQLException { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(newStatus); + sbSQL.append(", vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); + sbSQL.append(schemaName).append("'"); + + Statement stmt = null; + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sbSQL.toString()); + } finally { + if (stmt != null) stmt.close(); + } + } + + + private boolean checkCurrentThemeStatus(Connection connection, short status) { + try { + return (retrieveCurrentThemeName(connection, status) != null); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + return false; + } + } + + + private String retrieveCurrentThemeName(Connection connection, short status) throws SQLException { + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus, vpttimestamp FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + + String result = null; + Statement stmt = null; + ResultSet rs = null; + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + // get first result + if (rs.next()) { + result = rs.getString(1); + } + return result; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + + protected String[] retrieveTargetStoreAllViewNames(Connection connection) { + try { + final int TABLE_NAME_COL = 3; + List list = new ArrayList(); + + DatabaseMetaData meta = connection.getMetaData(); + // String[] tableType = { "TABLE", "VIEW" }; + String[] tableType = { "VIEW" }; + ResultSet tables = meta.getTables(null, _pgSchema, "%", tableType); + + while (tables.next()) { + String tableName = tables.getString(TABLE_NAME_COL); + list.add(tableName); + /* + if (allowTable(tableName)) { + list.add(tableName); + } + */ + } + tables.close(); + return (String[]) list.toArray(new String[list.size()]); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } + return null; + } + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + /* + if (!isDriverFound()) + { + throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); + } + */ + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); + } + */ + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XGVERSIONTABLE_NAME + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) { + rs.close(); + return null; + } + rs.close(); + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } else if (current < (tmpSchemas.size() - 1)) { + Object[] values = tmpSchemas.get(current + 1); + targetSchema = (String) values[0]; + } else { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + private String determineTargetThemeTableName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetTable = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XPTVERSIONTABLE_NAME + needCreate = false; + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXPWThemeVersionTable(connection, _pgSchema); + rs.close(); + + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vptname"); + values[1] = rs.getShort("vptstatus"); + tmpTablenames.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } else if (current < (tmpTablenames.size() - 1)) { + Object[] values = tmpTablenames.get(current + 1); + targetTable = (String) values[0]; + } else { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vptname = '"); + sbSQL.append(targetTable).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetTable + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetTable; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + if (schemaName == null) + return "\"" + tableName + "\""; + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + if (cid > Short.MAX_VALUE) { + logger.info("Wrong Color Table:" + cid + "-" + oid); + continue; + } + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(colorText).append("\n"); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + String flowMark = null; + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + flowMark = FORWARDFLOW_MARK; + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + flowMark = BACKFLOW_MARK; + } else if (ConnectivityDirectionEnum.Nondeterminate == dir) { + flowMark = NONFLOW_MARK; + } else { + flowMark = UNFLOW_MARK; + } + + if (cid > Short.MAX_VALUE) { + logger.info("Wrong Connectivity Table:" + cid + "-" + oid); + continue; + } + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(ownerId).append(','); + sb.append(flowMark).append('\n'); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setShort(3, (short) ownerId); + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + pstmt.setString(4, "shape://ccarrow"); + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + pstmt.setString(4, "shape://rccarrow"); + } else { + pstmt.setString(4, "shape://backslash"); + } + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + stmt.execute("DROP TABLE " + tempTable); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setString(3, colorText); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void updatePWThemeStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" ( vptid serial PRIMARY KEY, "); + sql.append(" vptname character varying(64) NOT NULL, "); + sql.append(" vptstatus smallint NOT NULL, "); + sql.append(" vpttimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" (vptname, vptstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + } + + } finally { + if (stmt != null) stmt.close(); + } + } +} diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java index 8570254..2a904f2 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleIncrementDgn2PostGISJob.java @@ -5,6 +5,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -51,7 +52,7 @@ private static final String USEWKB = "USEWKB"; private static final int FETCHSIZE = 30; - private static final int COMMITSIZE = 100; + private static final int COMMITSIZE = 10; protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); @@ -97,6 +98,7 @@ @Override protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDATBASE); @@ -213,6 +215,7 @@ // Log the time the job started logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); @@ -397,11 +400,14 @@ return; } - // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE <> 0 + // SELECT COUNT(*) FROM CMMS_POSTDB.GEO_EXCHANGE WHERE ISEXCHANGE = 0 int exchangeCount = fetchExchangeCount(connection); + logger.info("exchangeCount=" + exchangeCount); + try { - processIncrementElement(jobContext); + processIncrementElement(jobContext, exchangeCount); // jobContext.setCurrentSchema(querySchema); + } finally { } @@ -412,7 +418,7 @@ Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = null; StringBuilder sbSQL = new StringBuilder(); - sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE <> 0"); + sbSQL.append("SELECT COUNT(*) FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE = 0"); int size = -1; try { @@ -433,14 +439,20 @@ Element element; }; - private void processIncrementElement(OracleIncrementPostGISJobContext jobContext) throws SQLException { + private void processIncrementElement(OracleIncrementPostGISJobContext jobContext, int exchangeCount) throws SQLException { Connection connection = jobContext.getOracleConnection(); - // SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, IGDSELM + if (exchangeCount == 0) { + logger.info("GEO_EXCHANGE ELEMENT COUNT IS ZERO."); + return; + } + + // SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM // FROM CMMS_POSTDB.GEO_EXCHANGE ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0 - String fetchSrcStmtFmt = "SELECT TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID IGDSELM " + - "FROM \"%s\".\"%s\" ORDER BY UPDATETIME WHERE ISEXCHANGE <> 0"; - //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; + String fetchSrcStmtFmt = "SELECT ID, TAG_LUFID, TAG_SFSC, TAG_BCOMPID, TAG_SOCCID, STATUS, TASKID, IGDSELM " + + "FROM \"%s\".\"%s\" WHERE ISEXCHANGE = 0 ORDER BY UPDATETIME"; + // String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" + // WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); String fetchSrcStmt = spf.sprintf(new Object[]{"CMMS_POSTDB", "GEO_EXCHANGE"}); Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); @@ -448,23 +460,31 @@ stmtSrc.setFetchSize(FETCHSIZE); ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); int igdsMetaType = rsSrc.getMetaData().getColumnType(1); + ArrayList<Integer> transIds = new ArrayList<Integer>(); + + int step = exchangeCount / 100; + int order = 0; + int current = 0; + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 0); + while (rsSrc.next()) { if (isProfileMode()) { markQueryTime(); } ElementTransactionContext xContext = new ElementTransactionContext(); - xContext.oid = rsSrc.getInt(1); - xContext.cid = (short) rsSrc.getInt(2); - xContext.compid = (short) rsSrc.getInt(3); - xContext.occid = (short) rsSrc.getInt(4); - xContext.transcationType = rsSrc.getInt(5); - xContext.taskid = rsSrc.getInt(6); + xContext.transcationId = rsSrc.getInt(1); + xContext.oid = rsSrc.getInt(2); + xContext.cid = (short) rsSrc.getInt(3); + xContext.compid = (short) rsSrc.getInt(4); + xContext.occid = (short) rsSrc.getInt(5); + xContext.transcationType = rsSrc.getInt(6); + xContext.taskid = rsSrc.getInt(7); try { - if (xContext.transcationType > 2) { + if (xContext.transcationType <= 2) { byte[] raw = null; if (igdsMetaType == Types.BLOB) { - BLOB blob = (BLOB) rsSrc.getBlob(7); + BLOB blob = (BLOB) rsSrc.getBlob(8); try { raw = getBytesFromBLOB(blob); @@ -474,7 +494,7 @@ // blob.close(); } } else { - raw = rsSrc.getBytes(7); + raw = rsSrc.getBytes(8); } if (raw != null) { Element element = fetchBinaryElement(raw); @@ -487,15 +507,88 @@ accumulateQueryTime(); } } + } else { + xContext.element = null; } - jobContext.putFeatureCollection(xContext); + + if (xContext.transcationType > 1) { + // remove first + } + + jobContext.processFeatureContext(xContext); + transIds.add(xContext.transcationId); + } catch (Dgn7fileException e) { logger.warn("Dgn7Exception", e); } + + if ((order % COMMITSIZE) == 0) { + // OracleConnection connection = jobContext.getOracleConnection(); + // connection.commitTransaction(); + jobContext.commitTransaction(); + //jobContext.startTransaction(); + System.gc(); + System.runFinalization(); + } + + if (step != 0) { + int now = order % step; + if (now != current) { + current = now; + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); + + } + } else { + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", current); + current++; + } } + + jobContext.getExecutionContext().put("IncrementDgn2PostGISJobProgress", 100); + + jobContext.commitTransaction(); + jobContext.resetFeatureContext(); JDBCUtils.close(rsSrc); JDBCUtils.close(stmtSrc); + + if (!transIds.isEmpty()) { + completeTransactionAction(connection, transIds); + } + } + + private void completeTransactionAction(Connection connection, ArrayList<Integer> transIds) { + if (transIds.isEmpty()) return; + + boolean autoCommit = true; + PreparedStatement statement = null; + try { + autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + String sql = "UPDATE \"CMMS_POSTDB\".\"GEO_EXCHANGE\" SET ISEXCHANGE=? WHERE ID=?"; + + statement = connection.prepareStatement(sql); + for (int id : transIds) { + statement.setInt((int) 1, 1); + statement.setInt((int) 2, id); + statement.executeUpdate(); + } + connection.commit(); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + try { + connection.rollback(); + } catch (SQLException e1) { + logger.warn(e.getMessage(), e1); + } + } finally { + JDBCUtils.close(statement); + try { + connection.setAutoCommit(autoCommit); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } + } } // Binary to Element diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java index 0a1f1c3..577bcb8 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/context/postgis/OracleIncrementPostGISJobContext.java @@ -9,11 +9,13 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import com.vividsolutions.jts.geom.Geometry; import com.vividsolutions.jts.util.Assert; @@ -73,6 +75,7 @@ private int accumulate = 0; public static class ElementTransactionContext { + public int transcationId; public int transcationType; public short cid; public int oid; @@ -83,8 +86,9 @@ public int result; }; + public OracleIncrementPostGISJobContext(String dataPath, DataStore pgDS, String targetSchema, String filterConfig, - boolean profileMode, boolean useTransform) { + boolean profileMode, boolean useTransform) { super(dataPath, pgDS, targetSchema, profileMode, useTransform); _filterConfig = filterConfig; elementDispatcher = createElementDispatcher(); @@ -126,26 +130,122 @@ /** * STATUS 欄位 :0:新增 2:編輯 3:刪除設備 4:刪除元件 + * * @param context */ - public void putFeatureCollection(ElementTransactionContext context) { + public void processFeatureContext(ElementTransactionContext context) { assert elementDispatcher != null; if (context == null) { logger.warn("putFeatureCollection context is null"); return; } + SimpleFeature simpleFeature = null; if (context.transcationType == 0) { // insert Element - putFeatureCollection(context.element); + if (context.element != null) { + simpleFeature = generateFeature(context.element); + if (simpleFeature == null) return; + } + + if (simpleFeature != null) { + SimpleFeatureType featureType = simpleFeature.getFeatureType(); + String bindingStmt = makePrepareInsertSql(featureType); + logger.trace("Execute SQL(0):" + bindingStmt); + executePrepareSQL(bindingStmt, simpleFeature); + } } else if (context.transcationType == 2) { // Update Element + if (context.element != null) { + simpleFeature = generateFeature(context.element); + if (simpleFeature == null) return; + } + + if (simpleFeature != null) { + SimpleFeatureType featureType = simpleFeature.getFeatureType(); + // String deleteStmt = makePrepareDeleteSql(featureType); + String deleteStmt = makePrepareDeleteSql(context); + logger.trace("Execute SQL(2):" + deleteStmt); + executeSQL(deleteStmt); + String bindingStmt = makePrepareInsertSql(featureType); + logger.trace("Execute SQL(2):" + bindingStmt); + executePrepareSQL(bindingStmt, simpleFeature); + } } else if (context.transcationType == 3) { // Remove Whole Feature + String deleteStmt = makePrepareDeleteSql(context); + logger.trace("Execute SQL(3):" + deleteStmt); + executeSQL(deleteStmt); } else if (context.transcationType == 4) { // Remove Feature Part + try { + List<String> tableList = fetchExistTableSchema(getTargetSchema(), context); + for (String targetTable : tableList) { + String deleteStmt = makePrepareDeleteSql(context, targetTable); + logger.trace("Execute SQL(4):" + deleteStmt); + executeSQL(deleteStmt); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } } } + private SimpleFeature generateFeature(Element element) { + assert elementDispatcher != null; + // 判斷是否符和條件 + SimpleFeature feature = elementDispatcher.execute(element, getDistId(), isTransformed()); + if (feature == null) { + boolean isEmptySize = false; + FrammeAttributeData linkage = + AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); + logger.warn("Unknown Element:" + element.getElementType().toString() + + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + + (linkage == null ? "NULL" : "FSC=" + (linkage.getFsc() + "|UFID=" + linkage.getUfid() + + "|COMPID=" + linkage.getComponentID()))); + + if (element instanceof ComplexElement) { + ComplexElement complex = (ComplexElement) element; + logger.warn("----Complex Element size=" + complex.size() + ":" + + (linkage == null ? "NULL" : (linkage.getUfid()))); + if (complex.size() == 0) + isEmptySize = true; + } + + /* + if (getElementLogging() && (!isEmptySize)) { + getElementLogger().logElement(element, getCurrentSchema()); + } + */ + return null; + } + + if (((Geometry) feature.getDefaultGeometry()).isEmpty()) { + boolean isEmptySize = false; + FrammeAttributeData linkage = + AbstractFLinkageDispatchableFilter.getFeatureLinkage(element); + logger.warn("Empty Geom Element:" + element.getElementType().toString() + + ":type=" + element.getType() + ":lv=" + element.getLevelIndex() + ":id=" + + (linkage == null ? "NULL" : (linkage.getFsc() + "|" + linkage.getComponentID()))); + + if (element instanceof ComplexElement) { + ComplexElement complex = (ComplexElement) element; + logger.warn("----Complex Element size=" + complex.size() + ":" + + (linkage == null ? "NULL" : (linkage.getUfid()))); + if (complex.size() == 0) + isEmptySize = true; + } + + /* + if (getElementLogging() && (!isEmptySize)) { + getElementLogger().logElement(element, getCurrentSchema()); + } + */ + return null; + } + return feature; + } + + /* protected void putFeatureCollection(Element element) { assert elementDispatcher != null; // 判斷是否符和條件 @@ -205,6 +305,7 @@ commitTransaction(); } } + */ public void startTransaction() { } @@ -234,6 +335,8 @@ private void updateDataStore() { if (isProfileMode()) markUpdateTime(); + if (txFeaturesContext.keySet().isEmpty()) return; + Iterator<SimpleFeatureType> it = txFeaturesContext.keySet().iterator(); Connection conn = null; try { @@ -367,31 +470,7 @@ protected void createOrClearFeatureDataTable(SimpleFeatureType featureType) throws SchemaException { String featureName = featureType.getTypeName(); Connection conn = null; - if (isExistFeature(featureType)) { - try { - conn = getConnection(); - if (dropTableMode) { - dropGeometryColumn(conn, getTargetSchema(), featureName, - (featureType).getGeometryDescriptor().getName().getLocalPart()); - dropTable(conn, getTargetSchema(), featureName); - - ArrayList<String> schemaTexts = createNewSchemaTexts(conn, featureType); - for (String stmtText : schemaTexts) { - Statement stmt = conn.createStatement(); - stmt.execute(stmtText); - JDBCUtils.close(stmt); - } - } else { - deleteTable(conn, getTargetSchema(), featureName); - } - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - } finally { - JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); - } - } else { + if (!isExistFeature(featureType)) { String tempStmt = null; try { conn = getConnection(); @@ -400,7 +479,7 @@ Statement stmt = conn.createStatement(); tempStmt = stmtText; stmt.execute(stmtText); - stmt.close(); + JDBCUtils.close(stmt); } JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); } catch (IOException e) { @@ -418,4 +497,157 @@ public boolean isSchemaChanged() { return schemaChanged; } + + protected String getFeatureTableName(ElementTransactionContext elmContext, boolean forLookup) { + StringBuilder sb = new StringBuilder(); + sb.append("fsc-"); + sb.append(elmContext.cid); + if (!forLookup) { + sb.append("-c-"); + sb.append(elmContext.compid); + } + return sb.toString(); + } + + List<String> fetchExistTableSchema(String schemaName, ElementTransactionContext elmContext) throws SQLException { + Connection connection = getConnection(); + String tablePattern = getFeatureTableName(elmContext, true); + ResultSet rsMeta = connection.getMetaData().getTables(null, schemaName, + tablePattern + "%", new String[]{"TABLE"}); + + ArrayList<String> tables = new ArrayList<String>(); + try { + while (rsMeta.next()) { + String tablename = rsMeta.getString(3); + tables.add(tablename); + } + // } catch (SQLException e) + } finally { + if (rsMeta != null) { + rsMeta.close(); + rsMeta = null; + } + } + return tables; + } + + @Override + protected String makeInsertSql(SimpleFeature feature, int srid) { + return super.makeInsertSql(feature, srid); + } + + @Override + protected String makePrepareInsertSql(SimpleFeatureType featureType) { + return super.makePrepareInsertSql(featureType); + } + + protected String makePrepareDeleteSql(ElementTransactionContext context) { + String tableName = getFeatureTableName(context, false); + return makePrepareDeleteSql(context, tableName); + } + + private String makePrepareDeleteSql(ElementTransactionContext context, String targetTable) { + StringBuilder stmtBuilder = new StringBuilder(); + stmtBuilder.append("DELETE FROM "); + String tableName = getFeatureTableName(context, false); + stmtBuilder.append(encodeSchemaTableName(targetTable)); + stmtBuilder.append(" WHERE tid="); + stmtBuilder.append(context.cid); + stmtBuilder.append(" AND oid="); + stmtBuilder.append(context.oid); + if (context.compid != -1) { + stmtBuilder.append(" AND cid="); + stmtBuilder.append(context.compid); + } + + return stmtBuilder.toString(); + } + + private void executeSQL(String sqlStmt) { + + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(); + stmt = conn.createStatement(); + stmt.execute(sqlStmt); + JDBCUtils.close(stmt); + JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); + } catch (SQLException e) { + JDBCUtils.close(stmt); + JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); + logger.warn("RUN--" + sqlStmt); + logger.warn(e.getMessage(), e); + } + } + + private void executePrepareSQL(String sqlStmt, SimpleFeature feature) { + + Connection conn = null; + PreparedStatement pstmt = null; + try { + conn = getConnection(); + boolean autoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + pstmt = conn.prepareStatement(sqlStmt); + + try { + // stmt.execute(feature); + bindFeatureParameters(pstmt, feature); + // pstmt.executeUpdate(); + pstmt.addBatch(); + } catch (PSQLException e) { + if (sqlStmt != null) { + logger.error("Execute:" + sqlStmt); + } + logger.error(e.getServerErrorMessage()); + logger.error(e.getMessage(), e); + } catch (NullPointerException e) { + if (sqlStmt != null) { + logger.error("Execute:" + sqlStmt); + } + logger.error(feature.toString()); + logger.error(e.getMessage(), e); + } catch (ClassCastException e) { + if (sqlStmt != null) { + logger.error("Execute:" + sqlStmt); + } + for (int i = 0; i < feature.getAttributeCount(); i++) { + logger.info("attr[" + i + "]-" + ((feature.getAttribute(i) == null) ? " NULL" : + feature.getAttribute(i).toString())); + } + logger.error(e.getMessage(), e); + } + + int[] numUpdates = pstmt.executeBatch(); + for (int i = 0; i < numUpdates.length; i++) { + if (numUpdates[i] == -2) + logger.warn("Execution " + i + ": unknown number of rows updated"); + } + conn.commit(); + + JDBCUtils.close(pstmt); + logger.debug("End Save into PostGIS:" + feature.getFeatureType().getTypeName()); + + conn.setAutoCommit(autoCommit); + JDBCUtils.close(conn, Transaction.AUTO_COMMIT, null); + accumulate = 0; + } catch (BatchUpdateException e) { + JDBCUtils.close(pstmt); + JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); + logger.error(e.getMessage(), e); + SQLException ex; + while ((ex = e.getNextException()) != null) { + // logger.warn(ex.getMessage(), ex); + logger.warn(ex.getMessage()); + } + } catch (SQLException e) { + JDBCUtils.close(pstmt); + JDBCUtils.close(conn, Transaction.AUTO_COMMIT, e); + logger.error(e.getMessage(), e); + } finally { + if (isProfileMode()) accumulateUpdateTime(); + } + } } diff --git a/xdgnjobs/ximple-spatialjob/src/main/resources/conf/DefaultConvertShpFilter.xml b/xdgnjobs/ximple-spatialjob/src/main/resources/conf/DefaultConvertShpFilter.xml index c7bb842..76c9085 100755 --- a/xdgnjobs/ximple-spatialjob/src/main/resources/conf/DefaultConvertShpFilter.xml +++ b/xdgnjobs/ximple-spatialjob/src/main/resources/conf/DefaultConvertShpFilter.xml @@ -427,6 +427,15 @@ </elementCriterion> <TextCreateStrategy/> </TypeCompFilter> + <TypeCompFilter name="FSC-407.C-13"> + <tid>407</tid> + <cid>13</cid> + <description>電桿桿高</description> + <elementCriterion> + <elementType>17</elementType> + </elementCriterion> + <TextCreateStrategy/> + </TypeCompFilter> <TypeCompFilter name="FSC-114.C-0"> <tid>114</tid> <cid>0</cid> @@ -1748,7 +1757,16 @@ </elementCriterion> <SymbolCreateStrategy/> </TypeCompFilter> - + <TypeCompFilter name="FSC-111.C-0"> + <tid>111</tid> + <cid>0</cid> + <description>電壓調整器</description> + <elementCriterion> + <elementType>17</elementType> + </elementCriterion> + <SymbolCreateStrategy/> + </TypeCompFilter> + <!-- Dummy <TypeCompLevelFilter name="DemoFeature3"> <tid>999</tid> -- Gitblit v0.0.0-SNAPSHOT