From d15a82a03c79df7d6763aee61b74cd07286b081b Mon Sep 17 00:00:00 2001 From: unknown <yuanhung@ximple.com.tw> Date: Tue, 13 May 2014 11:42:08 +0800 Subject: [PATCH] Merge branch 'origin/nddjob' ndd and roadfee --- xdgnjobs/ximple-jobcarrier/pom.xml | 7 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_nddjob.xml | 194 +++ xdgnjobs/pom.xml | 10 xdgnjobs/ximple-jobcarrier/quartz.properties | 76 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties | 1 xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java | 2522 +++++++++++++++++++++++++++++++++++++++++++++++++ xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml | 12 xdgnjobs/ximple-jobcarrier/quartz_jobs_nddjob.xml | 194 +++ xdgnjobs/ximple-spatialjob/pom.xml | 8 9 files changed, 2,979 insertions(+), 45 deletions(-) diff --git a/xdgnjobs/pom.xml b/xdgnjobs/pom.xml index 5898993..159737c 100644 --- a/xdgnjobs/pom.xml +++ b/xdgnjobs/pom.xml @@ -188,7 +188,15 @@ <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> - <dependency> + + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + <version>2.1</version> + </dependency> + + + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> diff --git a/xdgnjobs/ximple-jobcarrier/pom.xml b/xdgnjobs/ximple-jobcarrier/pom.xml index 820aab7..5c8cbe8 100644 --- a/xdgnjobs/ximple-jobcarrier/pom.xml +++ b/xdgnjobs/ximple-jobcarrier/pom.xml @@ -243,6 +243,13 @@ <groupId>com.ximple.eofms</groupId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + + </dependency> + </dependencies> <build> diff --git a/xdgnjobs/ximple-jobcarrier/quartz.properties b/xdgnjobs/ximple-jobcarrier/quartz.properties index 59577df..42bec0e 100644 --- a/xdgnjobs/ximple-jobcarrier/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/quartz.properties @@ -1,38 +1,40 @@ -#=============================================================== -#Configure Main Scheduler Properties -#=============================================================== -org.quartz.scheduler.instanceName = QuartzScheduler -org.quartz.scheduler.instanceId = AUTO - -#=============================================================== -#Configure ThreadPool -#=============================================================== -org.quartz.threadPool.threadCount = 5 -org.quartz.threadPool.threadPriority = 5 -org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool - -#=============================================================== -#Configure JobStore -#=============================================================== -org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore -org.quartz.jobStore.misfireThreshold = 60000 - -#=============================================================== -#Configure Plugins -#=============================================================== -org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin - -org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin -#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml -#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml -#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml -#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml -#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml -org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_resetview.xml - -org.quartz.plugin.jobInitializer.failOnFileNotFound = true -org.quartz.plugin.jobInitializer.scanInterval = 10 -org.quartz.plugin.jobInitializer.wrapInUserTransaction = false - -org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin +#=============================================================== +#Configure Main Scheduler Properties +#=============================================================== +org.quartz.scheduler.instanceName = QuartzScheduler +org.quartz.scheduler.instanceId = AUTO + +#=============================================================== +#Configure ThreadPool +#=============================================================== +org.quartz.threadPool.threadCount = 5 +org.quartz.threadPool.threadPriority = 5 +org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool + +#=============================================================== +#Configure JobStore +#=============================================================== +org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore +org.quartz.jobStore.misfireThreshold = 60000 + +#=============================================================== +#Configure Plugins +#=============================================================== +org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin + +org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml +org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_resetview.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_nddjpb.xml + +org.quartz.plugin.jobInitializer.failOnFileNotFound = true +org.quartz.plugin.jobInitializer.scanInterval = 10 +org.quartz.plugin.jobInitializer.wrapInUserTransaction = false + +org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownhook.cleanShutdown = true \ No newline at end of file diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml index 065854c..ad4ad97 100644 --- a/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml @@ -36,15 +36,15 @@ <job-data-map> <entry> <key>JOBDATA_DIR</key> - <value>/Users/Shared/Public/Projects/XGeoDMMS/xjobrun/tctpcjobs/jobdata</value> + <value>c:/tmp/</value> </entry> <entry> <key>PGHOST</key> - <value>10.10.1.7</value> + <value>10.10.1.19</value> </entry> <entry> <key>PGDATBASE</key> - <value>pgDMMS2</value> + <value>pgDMMS</value> </entry> <entry> <key>PGPORT</key> @@ -64,7 +64,7 @@ </entry> <entry> <key>ORAHOST</key> - <value>10.10.1.7</value> + <value>10.10.1.19</value> </entry> <entry> <key>ORAINST</key> @@ -80,7 +80,7 @@ </entry> <entry> <key>ORAPASS</key> - <value>simple000</value> + <value>SYSTEM000</value> </entry> <entry> <key>ORGSCHEMA</key> @@ -137,7 +137,7 @@ </entry> <entry> <key>GEOSERVER_URL</key> - <value>http://10.10.1.7:8080/geoserver</value> + <value>http://10.10.1.19:8080/geoserver</value> </entry> <entry> <key>GEOSERVER_USER</key> diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddjob.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddjob.xml new file mode 100644 index 0000000..7876e95 --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddjob.xml @@ -0,0 +1,194 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class--> + <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>--> + <job-class>com.ximple.eofms.jobs.DMMSNddUpdateJob</job-class> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class--> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class--> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>C:/tmp/</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.19</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>ndd</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + + <entry> + <key>ftpurl</key> + <value>ftp://10.10.1.19:21/</value> + </entry> + <entry> + <key>ftpdir</key> + <value>/tcdaas/ndddash/</value> + </entry> + + <entry> + <key>ftpuid</key> + <value>Administrator</value> + </entry> + + <entry> + <key>ftppwd</key> + <value>simple@000</value> + </entry> + + + <entry> + <key>ORAHOST</key> + <value>10.10.1.19</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.19:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties index 9017116..a6c66cb 100644 --- a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties @@ -29,6 +29,7 @@ #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml +#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_nddjpb.xml org.quartz.plugin.jobInitializer.failOnFileNotFound = true org.quartz.plugin.jobInitializer.scanInterval = 10 diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_nddjob.xml b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_nddjob.xml new file mode 100644 index 0000000..d314d2a --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_nddjob.xml @@ -0,0 +1,194 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class--> + <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>--> + <job-class>com.ximple.eofms.jobs.DMMSNddUpdateJob</job-class> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class--> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class--> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>C:/tmp/</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.9</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>ndd</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + + <entry> + <key>ftpurl</key> + <value>ftp://20.20.1.3:21/</value> + </entry> + <entry> + <key>ftpdir</key> + <value>/tcdaas/ndddash/</value> + </entry> + + <entry> + <key>ftpuid</key> + <value>DMMS</value> + </entry> + + <entry> + <key>ftppwd</key> + <value>DMMS000</value> + </entry> + + + <entry> + <key>ORAHOST</key> + <value>10.10.1.7</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.7:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-spatialjob/pom.xml b/xdgnjobs/ximple-spatialjob/pom.xml index 835fe67..97765f3 100644 --- a/xdgnjobs/ximple-spatialjob/pom.xml +++ b/xdgnjobs/ximple-spatialjob/pom.xml @@ -167,7 +167,13 @@ <version>2.3</version> </dependency> - <!-- Ximple Library --> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + + </dependency> + + <!-- Ximple Library --> <dependency> <groupId>com.ximple.eofms</groupId> <artifactId>ximple-dgnio</artifactId> diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java new file mode 100644 index 0000000..ae62dc1 --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java @@ -0,0 +1,2522 @@ +package com.ximple.eofms.jobs; + +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import com.ximple.eofms.util.*; +import com.ximple.io.dgn7.*; +import com.ximple.util.PrintfFormat; +import oracle.jdbc.OracleConnection; +import oracle.jdbc.OracleResultSet; +import oracle.sql.ARRAY; +import oracle.sql.BLOB; +import org.apache.commons.collections.OrderedMap; +import org.apache.commons.collections.OrderedMapIterator; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.dbcp.DelegatingConnection; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPReply; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.feature.SchemaException; +import org.geotools.jdbc.JDBCDataStore; +import org.opengis.feature.IllegalAttributeException; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import java.io.*; +import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.sql.*; +import java.util.*; +import java.util.Date; +/** + * Created by Alchemist on 2014/4/7. + */ +public class DMMSNddUpdateJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(DMMSNddUpdateJob.class); + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 30; + private static final int COMMITSIZE = 100; + private static final String INDEXPATHNAME = "index"; + private static final String OTHERPATHNAME = "other"; + public static final String FORWARDFLOW_MARK = "shape://ccarrow"; + public static final String BACKFLOW_MARK = "shape://rccarrow"; + public static final String UNFLOW_MARK = "shape://backslash"; + public static final String NONFLOW_MARK = "shape://slash"; + + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; + + protected static class Pair { + Object first; + Object second; + + public Pair(Object first, Object second) { + this.first = first; + this.second = second; + } + } + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + protected Map<String, String> pgProperties; + protected JDBCDataStore targetDataStore; + // protected OracleConvertEdbGeoJobContext oracleJobContext; + + private long queryTime = 0; + private long queryTimeStart = 0; + + public Log getLogger() { + return logger; + } + + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, + boolean profileMode, + boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + /* + logger.info("PGHOST=" + _myHost); + logger.info("PGDATBASE=" + _myDatabase); + logger.info("PGPORT=" + _myPort); + logger.info("PGSCHEMA=" + _mySchema); + logger.info("PGUSER=" + _myUsername); + logger.info("PGPASS=" + _myPassword); + logger.info("USEWKB=" + _myUseWKB); + */ + + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + + String temp=""; + temp= dataMap.getString("ftpurl"); + if(temp==null) + { + logger.warn("not config ftpurl ->ftp://127.0.0.1:21/"); + temp="ftp://127.0.0.1:21/"; + } + remote.put("ftpurl", temp); + temp= dataMap.getString("ftpuid"); + if(temp==null) + { + temp="anonymous"; + } + remote.put("ftpuid", temp); + + temp= dataMap.getString("ftppwd"); + if(temp==null) + { + temp=""; + } + remote.put("ftppwd", temp); + + temp= dataMap.getString("ftpdir"); + if(temp==null) + { + temp="tcdaas/featureImg"; + } + remote.put("ftpdir", temp); + pgProperties = remote; + } + + + + private List<String[]> sqlExecQuery(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(strSQL.toString()); + // get first result + // temp.clear(); + + ResultSetMetaData rsmd = rs.getMetaData(); + int NumOfCol = rsmd.getColumnCount(); + + while (rs.next()) { + for (int idx = 0; idx < NumOfCol; idx++) { + strTemp = rs.getString(idx + 1); + temp.add(strTemp); + } + result.add(temp.toArray(new String[0])); + temp.clear(); + } + return result; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + private void sqlExec(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + stmt.execute( strSQL.toString()); + // get first result + // temp.clear(); + + + } finally { + // JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private String findValue(String strSource,String findTag) + { + int idx=-1; int iStart=-1; int iEnd=-1; + idx=strSource.indexOf(findTag); + if(idx<0) return ""; + iStart= strSource.indexOf("\"",idx); + iEnd= strSource.indexOf("\"",iStart+1); + return strSource.substring(iStart+1,iEnd); + } + + + + + private void doJob(Connection postsql, String[] info) throws SQLException + { + // double switch (if db = enable -->work) + //Here is check + Date dtnow = new Date(); + //get all file + //dept, count,dist,nei ,y,m,d,t,custom + // HashMap<String> + String typhoonName=""; + String typhoonID=""; + String department=""; + String county=""; + String district=""; + String neighbor=""; + String affectCustomers=""; + String affectCustomersEver=""; + String[] tmpArray; + String sTemp; + List<String> arraySQLVals= new ArrayList<String>(); + + if(!jobOnLine(postsql, "nddcanton")) + { + + return; + } + logger.info("begin nddxml to postsql"); + logger.info("getftpfile..."); + String[] xmls= getNDDStrings(info, "neighbor_affect_customers.xml") ; + logger.info(String.format("total %d file(s)",xmls.length)); + for(int iRow=0;iRow<xmls.length;iRow++) + { + arraySQLVals.clear(); + tmpArray= xmls[iRow].split("\n"); + for(int iLine=0;iLine<tmpArray.length;iLine++) + { + sTemp= findValue(tmpArray[iLine],"typhoonName"); + if(sTemp.length()>0) + { + typhoonName= sTemp; + typhoonID= getTyphoonIDByName(postsql,typhoonName); + // + sTemp= findValue(tmpArray[iLine],"Department id"); + department=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"county ufid"); + if(sTemp.length()>0) + { + county=sTemp; + } + sTemp= findValue(tmpArray[iLine],"district ufid"); + if(sTemp.length()>0) + { + district=sTemp; + } + sTemp= findValue(tmpArray[iLine],"neighbor ufid"); + if(sTemp.length()>0) + { + neighbor=sTemp; + sTemp= findValue(tmpArray[iLine],"affectCustomers"); + if(sTemp.length()>0) + { + affectCustomers=sTemp; + } + else + { + affectCustomers="0"; + } + + sTemp= findValue(tmpArray[iLine],"affectCustomersEver"); + if(sTemp.length()>0) + { + affectCustomersEver=sTemp; + } + else + { + affectCustomersEver="0"; + } + arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,%s,%s",typhoonID,department,county,district,neighbor,affectCustomers,affectCustomersEver)); + // insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever + //yy,mm,dd,tt + } + + } + //!! + String yy="0000"+String.valueOf( dtnow.getYear()+1900); + String mm="00"+String.valueOf( dtnow.getMonth()+1); + String dd="00"+String.valueOf( dtnow.getDate()); + String t0="00"+ String.valueOf( dtnow.getHours()); + String t1="00"+ String.valueOf( dtnow.getMinutes()); + yy= yy.substring(yy.length()-4); + mm= mm.substring(mm.length()-2); + dd= dd.substring(dd.length()-2); + t0= t0.substring(t0.length()-2); + t1= t1.substring(t1.length()-2); + String insertDBSQL=" insert into ndd.nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever"+ + ",yy,mm,dd,tt) values "; + for(int j=0;j<arraySQLVals.size();j++) + { + sqlExec(postsql,insertDBSQL + arraySQLVals.get(j)+ + String.format(",%s,%s,%s,'%s%s')", + yy,mm,dd,t0,t1 + + ) , + new String[]{}); + + } + + String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=1"; + sqlExec(postsql, + String.format(strSQLUpdateCurr, + yy,mm,dd,t0,t1 + ) , + new String[]{}); + logger.info(String.format("next xml")); + } + logger.info(String.format("done")); + + + + + } + private void doJob2(Connection postsql, String[] info) throws SQLException + { + // double switch (if db = enable -->work) + //Here is check + Date dtnow = new Date(); + //get all file + //dept, count,dist,nei ,y,m,d,t,custom + // HashMap<String> + String typhoonName=""; + String typhoonID=""; + String department=""; + String department_id=""; + String substation=""; + String substation_ufid=""; + String substation_affectCustomers=""; + String substation_nopower=""; + + String mxfmr_name=""; + String mxfmr_ufid=""; + String mxfmr_affectCustomers=""; + String mxfmr_nopower=""; + + String feeder_name=""; + String feeder_id=""; + String feeder_affectCustomers=""; + String feeder_nopower=""; + + String[] tmpArray; + String sTemp; + List<String> arraySQLVals= new ArrayList<String>(); + + if(!jobOnLine(postsql, "nddfeeder")) + { + + return; + } + + String yy="0000"+String.valueOf( dtnow.getYear()+1900); + String mm="00"+String.valueOf( dtnow.getMonth()+1); + String dd="00"+String.valueOf( dtnow.getDate()); + String t0="00"+ String.valueOf( dtnow.getHours()); + String t1="00"+ String.valueOf( dtnow.getMinutes()); + yy= yy.substring(yy.length()-4); + mm= mm.substring(mm.length()-2); + dd= dd.substring(dd.length()-2); + t0= t0.substring(t0.length()-2); + t1= t1.substring(t1.length()-2); + + logger.info("begin nddxml(feeder) to postsql"); + logger.info("getftpfile..."); + String[] xmls= getNDDStrings(info, "feeder_affect_customers.xml") ; + logger.info(String.format("total %d file(s)",xmls.length)); + for(int iRow=0;iRow<xmls.length;iRow++) + { + arraySQLVals.clear(); + tmpArray= xmls[iRow].split("\n"); + for(int iLine=0;iLine<tmpArray.length;iLine++) + { + sTemp= findValue(tmpArray[iLine],"typhoonName"); + if(sTemp.length()>0) + { + typhoonName= sTemp; + typhoonID= getTyphoonIDByName(postsql,typhoonName); + // + sTemp= findValue(tmpArray[iLine],"Department id"); + department_id=sTemp; + + sTemp= findValue(tmpArray[iLine],"name"); + department=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"Substation name"); + if(sTemp.length()>0) + { + substation=sTemp; + sTemp= findValue(tmpArray[iLine],"ufid"); + if(sTemp.length()>0) + { + substation_ufid=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"affectCustomers"); + if(sTemp.length()>0) + { + substation_affectCustomers=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"noPowerAll"); + if(sTemp.length()>0) + { + substation_nopower=sTemp; + } + // + arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')", + typhoonID, + department_id,substation_ufid,"-1","-1", + department,substation," "," ", + substation_affectCustomers,substation_nopower, + yy,mm,dd,t0,t1)); + } + + + sTemp= findValue(tmpArray[iLine],"Mxfmr name"); + if(sTemp.length()>0) + { + mxfmr_name=sTemp; + sTemp= findValue(tmpArray[iLine],"ufid"); + if(sTemp.length()>0) + { + mxfmr_ufid=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"affectCustomers"); + if(sTemp.length()>0) + { + mxfmr_affectCustomers=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"noPowerAll"); + if(sTemp.length()>0) + { + mxfmr_nopower=sTemp; + } + arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')", + typhoonID, + department_id,substation_ufid,mxfmr_ufid,"-1", + department,substation,mxfmr_name," ", + mxfmr_affectCustomers,mxfmr_nopower, + yy,mm,dd,t0,t1)); + } + + sTemp= findValue(tmpArray[iLine],"Feeder name"); + if(sTemp.length()>0) + { + feeder_name=sTemp; + sTemp= findValue(tmpArray[iLine],"id"); + if(sTemp.length()>0) + { + feeder_id=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"affectCustomers"); + if(sTemp.length()>0) + { + feeder_affectCustomers=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"noPowerAll"); + if(sTemp.length()>0) + { + feeder_nopower=sTemp; + } + arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')", + typhoonID, + department_id,substation_ufid,mxfmr_ufid,feeder_id, + department,substation,mxfmr_name,feeder_name, + feeder_affectCustomers,feeder_nopower, + yy,mm,dd,t0,t1)); + } + // insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever + //yy,mm,dd,tt + + + } + //!! + + String insertDBSQL= + " insert into ndd.nddfeeder_history (project_id,department_id,substation_id,mxfmr_id,feeder_id,"+ + "department_name,substation_name,mxfmr_name,feeder_name,"+ + "affectCustomers,nopower"+ + ",yy,mm,dd,tt) values "; + for(int j=0;j<arraySQLVals.size();j++) + { + sqlExec(postsql,insertDBSQL + arraySQLVals.get(j) + , + new String[]{}); + + } + + String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=2"; + sqlExec(postsql, + String.format(strSQLUpdateCurr, + yy,mm,dd,t0,t1 + ) , + new String[]{}); + logger.info(String.format("next xml")); + } + logger.info(String.format("done")); + + + + + } + + /* + private void doJob(Connection postsql,Connection orcl) throws SQLException + { + String strSQLGetTask="select proc_id,procname,datastore,name,step,src,dest,txtsql from roadfee_proc where rowstatus=1 and procname like 'STEP%' order by procname,step" ; + List<String[]> joblist=null; + Connection inConnection; + int idOfJob=0; + + List<String[]> nodata= new ArrayList<String[]>(); + List<String[]> lista= new ArrayList<String[]>(); + List<String[]> list1= new ArrayList<String[]>(); + List<String[]> listIn= new ArrayList<String[]>(); + List<String[]> temp;//= new ArrayList<String[]>(); + nodata.add(new String[]{""}); + // proc_id[0],procname[1],datastore[2\,name[3],step[4], src[5],des[6]t,txtsql[7] + try{ + logger.info("getJoblist"); + joblist=sqlExecQuery(postsql, strSQLGetTask, new String[]{}); + + for ( idOfJob=0;idOfJob<joblist.size();idOfJob++) + { + logger.info("begin "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")"); + if(joblist.get(idOfJob)[5].equals("nodata")) + { + listIn=nodata; + } + else if(joblist.get(idOfJob)[5].equals("list1")) + { + listIn=list1; + } + else if(joblist.get(idOfJob)[5].equals("lista")) + { + listIn=lista; + } + + if(joblist.get(idOfJob)[2].equals("psql")) + { + inConnection= postsql; + } + else if(joblist.get(idOfJob)[2].equals("orcl")) + { + inConnection= orcl; + } + else + return ; //connection failed + + if( joblist.get(idOfJob)[6].equals("list1")) list1.clear(); + if( joblist.get(idOfJob)[6].equals("lista")) lista.clear(); + //runsql + logger.info("process data count: "+String.valueOf(listIn.size())); + + for( int idxOfListIn=0;idxOfListIn< listIn.size();idxOfListIn++) + { + + if( joblist.get(idOfJob)[6].equals("nodata")) + { + sqlExec(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn)); + //logger.info("finish "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")") + + continue; + }else + { + temp=sqlExecQuery(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn)); + + } + + + for(int j=0;j<temp.size();j++) + { + if( joblist.get(idOfJob)[6].equals("list1")) + { + list1.add(temp.get(j)); + } + else if( joblist.get(idOfJob)[6].equals("lista")) + { + lista.add(temp.get(j)); + } + } + } + + + } + + }catch(SQLException sqlex) + { + logger.warn("ERROR@ID:"+String.valueOf( joblist.get(idOfJob)[0])); + throw sqlex; + } + + + } + */ + public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + if (isIgnoreDBETL()) { + return; + } + + //createSourceDataStore(); + createTargetDataStore(); + /* + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + */ + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName, targetThemeTable; + try { + //logger.info("-- step:clearOutputDatabase --"); + doJob(targetDataStore.getConnection(Transaction.AUTO_COMMIT),new String[]{ + pgProperties.get("ftpurl"), + pgProperties.get("ftpuid"), + pgProperties.get("ftppwd"), + pgProperties.get("ftpdir") + }); + doJob2 (targetDataStore.getConnection(Transaction.AUTO_COMMIT),new String[]{ + pgProperties.get("ftpurl"), + pgProperties.get("ftpuid"), + pgProperties.get("ftppwd"), + pgProperties.get("ftpdir") + }); + // doJob( targetDataStore.getConnection(Transaction.AUTO_COMMIT),sourceDataStore.getConnection(Transaction.AUTO_COMMIT) ); + + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + + }finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + + private void exetcuteConvert(OracleConvertPostGISJobContext jobContext, + String querySchema, String targetSchemaName) throws SQLException { + int order = 0; + OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(), + querySchema, "SD$SPACENODES", null); + + logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode); + + int total = map.size(); //spacenodes count + int step = total / 100; + int current = 0; + + if (total == 0) { + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is zero."); + return; + } + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is " + map.size()); + + //jobContext.startTransaction(); + jobContext.setCurrentSchema(querySchema); + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0); + for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext(); ) { + it.next(); + + Pair pair = (Pair) it.getValue(); + String tableSrc = (String) pair.first; + + logger.info("begin convert:[" + order + "]-" + tableSrc); + queryIgsetElement(jobContext, querySchema, tableSrc); + + order++; + + if (_testMode) { + if ((_testCount < 0) || (order >= _testCount)) + break; + } + + if ((order % COMMITSIZE) == 0) { + // OracleConnection connection = jobContext.getOracleConnection(); + // connection.commitTransaction(); + jobContext.commitTransaction(); + //jobContext.startTransaction(); + System.gc(); + System.runFinalization(); + } + + if (step != 0) { + int now = order % step; + if (now != current) { + current = now; + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + + } + } else { + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + current++; + } + } + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100); + + jobContext.commitTransaction(); + jobContext.resetFeatureContext(); + + if (isProfileMode()) { + + } + + logger.info("end convert job:[" + order + "]"); + System.gc(); + System.runFinalization(); + } + + protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc, + OrderedMap orderedMap) throws SQLException { + if (orderedMap == null) + orderedMap = new LinkedMap(99); + String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\""; + PrintfFormat spf = new PrintfFormat(fetchStmtFmt); + String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); + Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ResultSet rs = null; + + stmt.setFetchSize(FETCHSIZE); + try { + rs = stmt.executeQuery(fetchStmt); + int size = rs.getMetaData().getColumnCount(); + + while (rs.next()) { + Object[] values = new Object[size]; + + for (int i = 0; i < size; i++) { + values[i] = rs.getObject(i + 1); + } + + Integer key = ((BigDecimal) values[0]).intValue(); + String name = (String) values[1]; + + Pair pair = (Pair) orderedMap.get(key); + if (pair == null) + orderedMap.put(key, new Pair(name, null)); + else + pair.first = name; + } + } catch (SQLException e) { + logger.error(e.toString(), e); + logger.error("stmt=" + fetchStmt); + throw e; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + + return orderedMap; + } + + protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc, + OrderedMap orderedMap) throws SQLException { + if (orderedMap == null) + orderedMap = new LinkedMap(99); + String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\""; + PrintfFormat spf = new PrintfFormat(fetchStmtFmt); + String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); + Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(FETCHSIZE); + ResultSet rs = stmt.executeQuery(fetchStmt); + try { + int size = rs.getMetaData().getColumnCount(); + while (rs.next()) { + Object[] values = new Object[size]; + + for (int i = 0; i < size; i++) { + values[i] = rs.getObject(i + 1); + } + + Integer key = ((BigDecimal) values[0]).intValue(); + String name = (String) values[1]; + + Pair pair = (Pair) orderedMap.get(key); + if (pair == null) + orderedMap.put(key, new Pair(null, name)); + else + pair.second = name; + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + return orderedMap; + } + + protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext, + String srcschema, String srctable) throws SQLException { + Connection connection = jobContext.getOracleConnection(); + String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID"; + //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; + PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); + String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable}); + Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmtSrc.setFetchSize(FETCHSIZE); + ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); + int igdsMetaType = rsSrc.getMetaData().getColumnType(1); + while (rsSrc.next()) { + if (isProfileMode()) { + markQueryTime(); + } + + byte[] raw = null; + if (igdsMetaType == Types.BLOB) { + BLOB blob = (BLOB) rsSrc.getBlob(1); + + try { + raw = getBytesFromBLOB(blob); + } catch (BufferOverflowException e) { + logger.warn("Wrong Element Structure-", e); + } finally { + // blob.close(); + } + } else { + raw = rsSrc.getBytes(1); + } + + try { + if (raw != null) { + Element element = fetchBinaryElement(raw); + if (isProfileMode()) { + accumulateQueryTime(); + } + jobContext.putFeatureCollection(element); + } else { + if (isProfileMode()) { + accumulateQueryTime(); + } + } + } catch (Dgn7fileException e) { + logger.warn("Dgn7Exception", e); + } + } + + JDBCUtils.close(rsSrc); + JDBCUtils.close(stmtSrc); + } + + protected void queryRawElement(OracleConvertPostGISJobContext jobContext, + String srcschema, String srctable) throws SQLException { + Connection connection = jobContext.getOracleConnection(); + String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID"; + PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt); + String fetchDestStmt = spf.sprintf(new Object[]{srcschema, srctable}); + Statement stmtDest = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmtDest.setFetchSize(FETCHSIZE); + ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt); + + try { + while (rsDest.next()) { + ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1); + long[] rawData = rawsValue.getLongArray(); + byte[] comparessedValue; + + /* + if (dataMode == TransferTask.DataMode.Normal) + { + comparessedValue = BinConverter.unmarshalByteArray(rawData, true); + } else + { + comparessedValue = BinConverter.unmarshalCompactByteArray(rawData); + } + */ + comparessedValue = BinConverter.unmarshalByteArray(rawData, true); + + byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue); + + try { + Element element = fetchBinaryElement(rawDest); + jobContext.putFeatureCollection(element); + } catch (Dgn7fileException e) { + logger.warn("Dgn7Exception:" + e.getMessage(), e); + } + } + } finally { + JDBCUtils.close(rsDest); + JDBCUtils.close(stmtDest); + } + } + + // Binary to Element + private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { + ByteBuffer buffer = ByteBuffer.wrap(raws); + buffer.order(ByteOrder.LITTLE_ENDIAN); + short signature = buffer.getShort(); + + // byte type = (byte) (buffer.get() & 0x7f); + byte type = (byte) ((signature >>> 8) & 0x007f); + + // silly Bentley say contentLength is in 2-byte words + // and ByteByffer uses raws. + // track the record location + int elementLength = (buffer.getShort() * 2) + 4; + ElementType recordType = ElementType.forID(type); + IElementHandler handler; + + handler = recordType.getElementHandler(); + + Element dgnElement = (Element) handler.read(buffer, signature, elementLength); + if (recordType.isComplexElement() && (elementLength < raws.length)) { + int offset = elementLength; + while (offset < (raws.length - 4)) { + buffer.position(offset); + signature = buffer.getShort(); + type = (byte) ((signature >>> 8) & 0x007f); + elementLength = (buffer.getShort() * 2) + 4; + if (raws.length < (offset + elementLength)) { + logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); + break; + } + recordType = ElementType.forID(type); + handler = recordType.getElementHandler(); + if (handler != null) { + Element subElement = (Element) handler.read(buffer, signature, elementLength); + ((ComplexElement) dgnElement).add(subElement); + offset += elementLength; + } else { + byte[] remain = new byte[buffer.remaining()]; + System.arraycopy(raws, offset, remain, 0, buffer.remaining()); + for (int i = 0; i < remain.length; i++) { + if (remain[i] != 0) { + logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); + } + } + break; + } + } + } + + return dgnElement; + } + + /** + * 嚙踝蕭嚙踝蕭嚙賞換嚙踝蕭嚙豬對蕭嚙褕迎蕭嚙線嚙瑾 + * + * @param context 嚙線嚙瑾嚙踝蕭嚙踝蕭嚙踝蕭嚙踝蕭 + * @throws org.quartz.JobExecutionException + * exception + */ + private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File indexDir = new File(getDataPath(), INDEXPATHNAME); + if (!indexDir.exists()) { + logger.info("index dir=" + indexDir + " not exist."); + return; + } + + if (!indexDir.isDirectory()) { + logger.info("index dir=" + indexDir + " is not a directory."); + } + + List<File> dgnFiles = FileUtils.recurseDir(indexDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; + IndexDgnConvertPostGISJobContext convertContext = + new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start index dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc = null; + Dgn7fileReader reader = null; + try { + convertContext.clearOutputDatabase(); + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanIndexDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } + } + } + } + + protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + + while (reader.hasNext()) { + if (isProfileMode()) markProcessTime(); + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + lastComplex = null; + } + + processIndexElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + //if (useTpclidText) { + // if (element instanceof TextElement) { + // convertContext.putFeatureCollection(element); + // } + //} else { + // if (element instanceof ShapeElement) { + convertContext.putFeatureCollection(element); + // } + //} + } + + + /** + * 嚙踝蕭嚙踝蕭嚙賞換嚙踝蕭L嚙稽嚙緘嚙踝蕭嚙褕迎蕭嚙線嚙瑾 + * + * @param context jobContext + * @throws org.quartz.JobExecutionException + * exception + */ + private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File otherDir = new File(getDataPath(), OTHERPATHNAME); + if (!otherDir.exists()) { + logger.info("other dir=" + otherDir + " not exist."); + return; + } + + if (!otherDir.isDirectory()) { + logger.info("other dir=" + otherDir + " is not a directory."); + } + + List<File> dgnFiles = FileUtils.recurseDir(otherDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; + + GeneralDgnConvertPostGISJobContext convertContext = + new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start other dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc; + Dgn7fileReader reader = null; + try { + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanOtherDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } + } + } + } + + public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + lastComplex = null; + } + + processOtherElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + convertContext.putFeatureCollection(element); + } + + private void clearOutputDatabase() { + /* + File outDataPath = new File(getDataPath(), OracleConvertEdbGeoJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + outDataPath = new File(getDataPath(), IndexDgnConvertShpJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + outDataPath = new File(getDataPath(), GeneralDgnConvertShpJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + */ + } + + private void deleteFilesInPath(File outDataPath) { + deleteFilesInPath(outDataPath, true); + } + + private void deleteFilesInPath(File outDataPath, boolean removeSubDir) { + if (!outDataPath.isDirectory()) { + return; + } + File[] files = outDataPath.listFiles(); + for (File file : files) { + if (file.isFile()) { + if (!file.delete()) { + logger.info("Cannot delete file-" + file.toString()); + } + } else if (file.isDirectory()) { + deleteFilesInPath(file, removeSubDir); + if (removeSubDir) { + if (file.delete()) { + logger.info("Cannot delete dir-" + file.toString()); + } + } + } + } + } + + private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File elminDir = new File(getDataPath(), "elmin"); + if (!elminDir.exists()) { + logger.info("elmin dir=" + elminDir + " not exist."); + return; + } + + if (!elminDir.isDirectory()) { + logger.info("elmin dir=" + elminDir + " is not a directory."); + } + + File[] dgnFiles = elminDir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + FeatureDgnConvertPostGISJobContext convertContext = + new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); + try { + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + FileInputStream fs = new FileInputStream(dgnFile); + FileChannel fc = fs.getChannel(); + Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanFeatureDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + } + } + } + + public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + lastComplex = null; + } + + processFeatureElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + convertContext.putFeatureCollection(element); + } + + private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException { + /* + DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath); + try { + convertContext.startTransaction(); + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + } catch (IOException e) + { + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + */ + } + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + /* + if (!isDriverFound()) + { + throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); + } + */ + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); + } + */ + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XGVERSIONTABLE_NAME + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXGeosVersionTable(connection, _pgSchema); + rs.close(); + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } else if (current < (tmpSchemas.size() - 1)) { + Object[] values = tmpSchemas.get(current + 1); + targetSchema = (String) values[0]; + } else { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + private String determineTargetThemeTableName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetTable = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XPTVERSIONTABLE_NAME + needCreate = false; + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXPWThemeVersionTable(connection, _pgSchema); + rs.close(); + + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vptname"); + values[1] = rs.getShort("vptstatus"); + tmpTablenames.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } else if (current < (tmpTablenames.size() - 1)) { + Object[] values = tmpTablenames.get(current + 1); + targetTable = (String) values[0]; + } else { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vptname = '"); + sbSQL.append(targetTable).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetTable + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetTable; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + if (schemaName == null) + return "\"" + tableName + "\""; + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" ( vsid serial PRIMARY KEY, "); + sql.append(" vsschema character varying(64) NOT NULL, "); + sql.append(" vsstatus smallint NOT NULL, "); + sql.append(" vstimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" (vsschema, vsstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + + createIfNotExistNewSchema(connection, schemaName); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + + private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" ( vptid serial PRIMARY KEY, "); + sql.append(" vptname character varying(64) NOT NULL, "); + sql.append(" vptstatus smallint NOT NULL, "); + sql.append(" vpttimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" (vptname, vptstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + + private void updateRepoStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void updatePWThemeStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException { + Statement stmt = null; + ResultSet rs = null; + try { + /* + rs = connection.getMetaData().getSchemas(null, s); + if (rs.next()) return; + rs.close(); + rs = null; + */ + + StringBuilder sbSQL = new StringBuilder("CREATE SCHEMA "); + sbSQL.append(s).append(' '); + sbSQL.append("AUTHORIZATION ").append(_pgUsername); + stmt = connection.createStatement(); + stmt.executeUpdate(sbSQL.toString()); + + sbSQL = new StringBuilder("GRANT ALL ON SCHEMA "); + sbSQL.append(s).append(' '); + sbSQL.append("TO public"); + stmt.executeUpdate(sbSQL.toString()); + } catch (SQLException e) { + logger.info("create schema:" + s + " has exception."); + logger.info(e.getMessage(), e); + } finally { + if (rs != null) rs.close(); + if (stmt != null) stmt.close(); + } + } + + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + + private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setString(3, colorText); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setShort(3, (short) ownerId); + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + pstmt.setString(4, "shape://ccarrow"); + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + pstmt.setString(4, "shape://rccarrow"); + } else { + pstmt.setString(4, "shape://backslash"); + } + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(colorText).append("\n"); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + String flowMark = null; + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + flowMark = FORWARDFLOW_MARK; + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + flowMark = BACKFLOW_MARK; + } else if (ConnectivityDirectionEnum.Nondeterminate == dir) { + flowMark = NONFLOW_MARK; + } else { + flowMark = UNFLOW_MARK; + } + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(ownerId).append(','); + sb.append(flowMark).append('\n'); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + stmt.execute("DROP TABLE " + tempTable); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + String [] siteInfo=new String[]{"ftp://10.10.1.9:21","DMMS","DMMS000"} ; + + private String[] getNDDList(String[] info){ + String url=info[3]; + String ftp=info[0]; + String uid=info[1]; + String pwd=info[2]; + // List<MapItemValue> tmp= dmmsSite.getFtpList(site); + //for(int i=0;i<tmp.size();i++) + //{ + // if(tmp.get(i).getGroupName().equals("featureimg")) + // { + // url="/tcdaas/ndddash/"; + String [] fileNow=getFileList(ftp,uid,pwd,url,""); + return fileNow ; + // } + //} + //return new String[]{}; + } + + private byte[] getNDDDash(String[] info, String dirname, String filename) { + String url="";//info[3]; + String ftp=info[0]; + String uid=info[1]; + String pwd=info[2]; + + dirname= dirname.replace("[.]","_"); //防hack + filename= filename.replace("[/]","_"); //防hack + // List<MapItemValue> tmp= dmmsSite.getFtpList(site); + String[] temp=dirname.split("/"); + dirname= temp[temp.length-1]; + + // for(int i=0;i<tmp.size();i++) + // { + // if(tmp.get(i).getGroupName().equals("featureimg")) + // { + url=info[3]+dirname+"/"; + + + byte[] bytes= getFile(ftp,uid,pwd,url,filename); + return bytes; + // return new FileTransfer(filename, "application/octet-stream",bytes); + // } + // } + // return null; + } + + private String[] getFileList(String urlString,String ftpUser,String ftpPwd,String filePath, String filter){ + FTPClient ftpClient=null; + try{ + ftpClient= new FTPClient(); + }catch(Throwable ex) + { + ex.getMessage(); + } + + URL url; + + // /tcdaas/dsbncard -- for feature D + // /tcdaas/mhole -- for feature D + // /tcdaas/featureimg -- for feature U/D attached and LIST + try{ + url= new URL(urlString);//"ftp://20.20.1.3:21/"); + ftpClient.connect( + url.getHost(),url.getPort() + ); + + if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000")) + { + return null; + } + int reply = ftpClient.getReplyCode(); + //FTPReply stores a set of constants for FTP reply codes. + + if (!FTPReply.isPositiveCompletion(reply)) + { + ftpClient.disconnect(); + return null; + } + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + //enter passive mode + ftpClient.enterLocalPassiveMode(); + + String[] filelist=ftpClient.listNames(filePath + (filter == null ? "" : filter)) ; + ftpClient.disconnect(); + return filelist; + }catch(MalformedURLException urlex) + { + + } catch (Exception ex) + { + + } + return new String[]{}; + } + + private byte[] getFile(String urlString,String ftpUser,String ftpPwd,String filePath,String fileName){ + FTPClient ftpClient= new FTPClient(); + + URL url; + byte[] result; + // /tcdaas/dsbncard -- for feature D + // /tcdaas/mhole -- for feature D + // /tcdaas/featureimg -- for feature U/D attached and LIST + try{ + url= new URL(urlString);//"ftp://20.20.1.3:21/"); + ftpClient.connect( + url.getHost(),url.getPort() + ); + + if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000")) + { + return null; + } + int reply = ftpClient.getReplyCode(); + //FTPReply stores a set of constants for FTP reply codes. + + if (!FTPReply.isPositiveCompletion(reply)) + { + ftpClient.disconnect(); + return null; + } + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + //enter passive mode + ftpClient.enterLocalPassiveMode(); + + String[] filelist=ftpClient.listNames(filePath+ fileName) ; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + if(filelist.length>0) + { + if(ftpClient.retrieveFile(filePath+fileName,bos)) + { + result= bos.toByteArray() ; //negative numbers can use (b)&0xff + bos.close(); + } + else + { + result=null; + try{ + bos.close(); + } catch (Exception ex) + { + + } + } + } + else + { + result=null; + } + + ftpClient.disconnect(); + + }catch(MalformedURLException urlex) + { + result=null; + } catch (Exception ex) + { + result=null; + } + return result; + } + public String[] getNDDStrings(String[] info, String filename) + { + byte[] temp; + // String stie=getUserDept(); + + // String[] list=dmmsFtpClient.getNDDList(stie) ; + String[] list =getNDDList(info) ; + + List<String> lstXML= new ArrayList<String>(); + for(int i=0;i<list.length;i++) + { + temp=getNDDDash(info, list[i], filename) ; + try{ + if(temp!=null) lstXML.add(new String(temp,"UTF-8")); + } catch (UnsupportedEncodingException ex) { + // this should never happen because "UTF-8" is hard-coded. + throw new IllegalStateException(ex); + } + } + if(lstXML.size()>0) + return lstXML. toArray(new String[0]); + + return null; + } + + + private static Map<String, String> ditTyphoon = new HashMap<String, String>(); + + public String getTyphoonIDByName(Connection postsql,String typhoonName) throws SQLException + { + if(ditTyphoon.containsKey(typhoonName)) + { + return ditTyphoon.get(typhoonName); + }else + { + return readOrCreateTyphoonByName(postsql,typhoonName); + //readOrCreateTyphoon; + } + } + public String readOrCreateTyphoonByName(Connection postsql,String typhoonName) throws SQLException + { + //targetDataStore + //time of create should be modify + List<String[]> listDict; + String strSQLSelectProject=String.format( "select typhoon_id,typhoon_name from ndd.typhoonproject where typhoon_name='%s'",typhoonName); + String strSQLInsertProject=String.format( "insert into ndd.typhoonproject (typhoon_name,row_created) values ('%s',now())",typhoonName); + + listDict= sqlExecQuery(postsql,strSQLSelectProject,new String[]{}); + //boolean bCreate=false; + if(listDict!=null) + { + if(listDict.size()>0) + { + for(int i=0;i<listDict.size();i++) + { + return addDict(listDict.get(i)[0],listDict.get(i)[1]); + } + } + } + //bCreate=true; + //insert + logger.info(String.format("new project:%s",typhoonName)); + sqlExec(postsql,strSQLInsertProject,new String[]{}); + return readOrCreateTyphoonByName(postsql, typhoonName) ; + } + private synchronized static String addDict(String id,String typhoon) + { + if(ditTyphoon.containsKey(typhoon)) + return ditTyphoon.get(typhoon); + else + ditTyphoon.put(typhoon,id); + return id; + } + + public boolean jobOnLine(Connection postsql,String jobname) throws SQLException + { + //working when jobname=1 + //targetDataStore + //time of create should be modify + List<String[]> listDict; + String strSQLSelectSchedule=String.format( "select enabled from ndd.schedule where name='%s'",jobname); + + listDict= sqlExecQuery(postsql,strSQLSelectSchedule,new String[]{}); + if(listDict.size()==0)return false; // not exist ->dont work + return listDict.get(0)[0].equals("1"); + } +} -- Gitblit v0.0.0-SNAPSHOT