From 0193b6e730bc9e9aa096a955f2561fe3c257a914 Mon Sep 17 00:00:00 2001 From: unknown <yuanhung@ximple.com.tw> Date: Tue, 14 Jan 2014 19:18:15 +0800 Subject: [PATCH] 道路使用費 --- xdgnjobs/ximple-jobcarrier/quartz.properties | 2 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties | 2 xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml | 173 ++++ xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml | 173 ++++ xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java | 1895 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 2,243 insertions(+), 2 deletions(-) diff --git a/xdgnjobs/ximple-jobcarrier/quartz.properties b/xdgnjobs/ximple-jobcarrier/quartz.properties index e491e0d..f364aea 100644 --- a/xdgnjobs/ximple-jobcarrier/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/quartz.properties @@ -23,7 +23,7 @@ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin -org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml +org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml org.quartz.plugin.jobInitializer.failOnFileNotFound = true org.quartz.plugin.jobInitializer.scanInterval = 10 diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml new file mode 100644 index 0000000..065854c --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml @@ -0,0 +1,173 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class--> + <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>--> + <job-class>com.ximple.eofms.jobs.DMMSRoadfeeCalculateJob</job-class> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class--> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class--> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>/Users/Shared/Public/Projects/XGeoDMMS/xjobrun/tctpcjobs/jobdata</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.7</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS2</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>public</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORAHOST</key> + <value>10.10.1.7</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.7:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties index 15de783..2d032c0 100644 --- a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties @@ -23,7 +23,7 @@ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.jobInitializer.class = org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin -org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml +org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml #org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml org.quartz.plugin.jobInitializer.failOnFileNotFound = true diff --git a/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml new file mode 100644 index 0000000..065854c --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml @@ -0,0 +1,173 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class--> + <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>--> + <job-class>com.ximple.eofms.jobs.DMMSRoadfeeCalculateJob</job-class> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class--> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class--> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>/Users/Shared/Public/Projects/XGeoDMMS/xjobrun/tctpcjobs/jobdata</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.7</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS2</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>public</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORAHOST</key> + <value>10.10.1.7</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>simple000</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.7:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java new file mode 100644 index 0000000..a613352 --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java @@ -0,0 +1,1895 @@ +package com.ximple.eofms.jobs; + +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import com.ximple.eofms.util.*; +import com.ximple.io.dgn7.*; +import com.ximple.util.PrintfFormat; +import oracle.jdbc.OracleConnection; +import oracle.jdbc.OracleResultSet; +import oracle.sql.ARRAY; +import oracle.sql.BLOB; +import org.apache.commons.collections.OrderedMap; +import org.apache.commons.collections.OrderedMapIterator; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.dbcp.DelegatingConnection; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.feature.SchemaException; +import org.geotools.jdbc.JDBCDataStore; +import org.opengis.feature.IllegalAttributeException; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import java.io.*; +import java.math.BigDecimal; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.sql.*; +import java.util.*; +import java.util.Date; + +public class DMMSRoadfeeCalculateJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(DMMSRoadfeeCalculateJob.class); + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 30; + private static final int COMMITSIZE = 100; + private static final String INDEXPATHNAME = "index"; + private static final String OTHERPATHNAME = "other"; + public static final String FORWARDFLOW_MARK = "shape://ccarrow"; + public static final String BACKFLOW_MARK = "shape://rccarrow"; + public static final String UNFLOW_MARK = "shape://backslash"; + public static final String NONFLOW_MARK = "shape://slash"; + + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; + + protected static class Pair { + Object first; + Object second; + + public Pair(Object first, Object second) { + this.first = first; + this.second = second; + } + } + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + protected Map<String, String> pgProperties; + protected JDBCDataStore targetDataStore; + // protected OracleConvertEdbGeoJobContext oracleJobContext; + + private long queryTime = 0; + private long queryTimeStart = 0; + + public Log getLogger() { + return logger; + } + + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, + boolean profileMode, + boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + /* + logger.info("PGHOST=" + _myHost); + logger.info("PGDATBASE=" + _myDatabase); + logger.info("PGPORT=" + _myPort); + logger.info("PGSCHEMA=" + _mySchema); + logger.info("PGUSER=" + _myUsername); + logger.info("PGPASS=" + _myPassword); + logger.info("USEWKB=" + _myUseWKB); + */ + + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + pgProperties = remote; + } + + + + private List<String[]> sqlExecQuery(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(strSQL.toString()); + // get first result + // temp.clear(); + + ResultSetMetaData rsmd = rs.getMetaData(); + int NumOfCol = rsmd.getColumnCount(); + + while (rs.next()) { + for (int idx = 0; idx < NumOfCol; idx++) { + strTemp = rs.getString(idx + 1); + temp.add(strTemp); + } + result.add(temp.toArray(new String[0])); + temp.clear(); + } + return result; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + private void sqlExec(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + stmt.execute( strSQL.toString()); + // get first result + // temp.clear(); + + + } finally { + // JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + + private void doJob(Connection postsql,Connection orcl) throws SQLException + { + String strSQLGetTask="select proc_id,procname,datastore,name,step,src,dest,txtsql from roadfee_proc where rowstatus=1 and procname like 'STEP%' order by procname,step" ; + List<String[]> joblist=null; + Connection inConnection; + int idOfJob=0; + + List<String[]> nodata= new ArrayList<String[]>(); + List<String[]> lista= new ArrayList<String[]>(); + List<String[]> list1= new ArrayList<String[]>(); + List<String[]> listIn= new ArrayList<String[]>(); + List<String[]> temp;//= new ArrayList<String[]>(); + nodata.add(new String[]{""}); + // proc_id[0],procname[1],datastore[2\,name[3],step[4], src[5],des[6]t,txtsql[7] + try{ + logger.info("getJoblist"); + joblist=sqlExecQuery(postsql, strSQLGetTask, new String[]{}); + + for ( idOfJob=0;idOfJob<joblist.size();idOfJob++) + { + logger.info("begin "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")"); + if(joblist.get(idOfJob)[5].equals("nodata")) + { + listIn=nodata; + } + else if(joblist.get(idOfJob)[5].equals("list1")) + { + listIn=list1; + } + else if(joblist.get(idOfJob)[5].equals("lista")) + { + listIn=lista; + } + + if(joblist.get(idOfJob)[2].equals("psql")) + { + inConnection= postsql; + } + else if(joblist.get(idOfJob)[2].equals("orcl")) + { + inConnection= orcl; + } + else + return ; //connection failed + + if( joblist.get(idOfJob)[6].equals("list1")) list1.clear(); + if( joblist.get(idOfJob)[6].equals("lista")) lista.clear(); + //runsql + logger.info("process data count: "+String.valueOf(listIn.size())); + + for( int idxOfListIn=0;idxOfListIn< listIn.size();idxOfListIn++) + { + + if( joblist.get(idOfJob)[6].equals("nodata")) + { + sqlExec(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn)); + //logger.info("finish "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")") + + continue; + }else + { + temp=sqlExecQuery(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn)); + + } + + + for(int j=0;j<temp.size();j++) + { + if( joblist.get(idOfJob)[6].equals("list1")) + { + list1.add(temp.get(j)); + } + else if( joblist.get(idOfJob)[6].equals("lista")) + { + lista.add(temp.get(j)); + } + } + } + + + } + + }catch(SQLException sqlex) + { + logger.warn("ERROR@ID:"+String.valueOf( joblist.get(idOfJob)[0])); + throw sqlex; + } + + + } + + public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + if (isIgnoreDBETL()) { + return; + } + + createSourceDataStore(); + createTargetDataStore(); + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName, targetThemeTable; + try { + //logger.info("-- step:clearOutputDatabase --"); + + doJob( targetDataStore.getConnection(Transaction.AUTO_COMMIT),sourceDataStore.getConnection(Transaction.AUTO_COMMIT) ); + + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + } finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + + private void exetcuteConvert(OracleConvertPostGISJobContext jobContext, + String querySchema, String targetSchemaName) throws SQLException { + int order = 0; + OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(), + querySchema, "SD$SPACENODES", null); + + logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode); + + int total = map.size(); //spacenodes count + int step = total / 100; + int current = 0; + + if (total == 0) { + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is zero."); + return; + } + logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is " + map.size()); + + //jobContext.startTransaction(); + jobContext.setCurrentSchema(querySchema); + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0); + for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext(); ) { + it.next(); + + Pair pair = (Pair) it.getValue(); + String tableSrc = (String) pair.first; + + logger.info("begin convert:[" + order + "]-" + tableSrc); + queryIgsetElement(jobContext, querySchema, tableSrc); + + order++; + + if (_testMode) { + if ((_testCount < 0) || (order >= _testCount)) + break; + } + + if ((order % COMMITSIZE) == 0) { + // OracleConnection connection = jobContext.getOracleConnection(); + // connection.commitTransaction(); + jobContext.commitTransaction(); + //jobContext.startTransaction(); + System.gc(); + System.runFinalization(); + } + + if (step != 0) { + int now = order % step; + if (now != current) { + current = now; + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + + } + } else { + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current); + current++; + } + } + jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100); + + jobContext.commitTransaction(); + jobContext.resetFeatureContext(); + + if (isProfileMode()) { + + } + + logger.info("end convert job:[" + order + "]"); + System.gc(); + System.runFinalization(); + } + + protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc, + OrderedMap orderedMap) throws SQLException { + if (orderedMap == null) + orderedMap = new LinkedMap(99); + String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\""; + PrintfFormat spf = new PrintfFormat(fetchStmtFmt); + String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); + Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ResultSet rs = null; + + stmt.setFetchSize(FETCHSIZE); + try { + rs = stmt.executeQuery(fetchStmt); + int size = rs.getMetaData().getColumnCount(); + + while (rs.next()) { + Object[] values = new Object[size]; + + for (int i = 0; i < size; i++) { + values[i] = rs.getObject(i + 1); + } + + Integer key = ((BigDecimal) values[0]).intValue(); + String name = (String) values[1]; + + Pair pair = (Pair) orderedMap.get(key); + if (pair == null) + orderedMap.put(key, new Pair(name, null)); + else + pair.first = name; + } + } catch (SQLException e) { + logger.error(e.toString(), e); + logger.error("stmt=" + fetchStmt); + throw e; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + + return orderedMap; + } + + protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc, + OrderedMap orderedMap) throws SQLException { + if (orderedMap == null) + orderedMap = new LinkedMap(99); + String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\""; + PrintfFormat spf = new PrintfFormat(fetchStmtFmt); + String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc}); + Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(FETCHSIZE); + ResultSet rs = stmt.executeQuery(fetchStmt); + try { + int size = rs.getMetaData().getColumnCount(); + while (rs.next()) { + Object[] values = new Object[size]; + + for (int i = 0; i < size; i++) { + values[i] = rs.getObject(i + 1); + } + + Integer key = ((BigDecimal) values[0]).intValue(); + String name = (String) values[1]; + + Pair pair = (Pair) orderedMap.get(key); + if (pair == null) + orderedMap.put(key, new Pair(null, name)); + else + pair.second = name; + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + return orderedMap; + } + + protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext, + String srcschema, String srctable) throws SQLException { + Connection connection = jobContext.getOracleConnection(); + String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID"; + //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID"; + PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt); + String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable}); + Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmtSrc.setFetchSize(FETCHSIZE); + ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt); + int igdsMetaType = rsSrc.getMetaData().getColumnType(1); + while (rsSrc.next()) { + if (isProfileMode()) { + markQueryTime(); + } + + byte[] raw = null; + if (igdsMetaType == Types.BLOB) { + BLOB blob = (BLOB) rsSrc.getBlob(1); + + try { + raw = getBytesFromBLOB(blob); + } catch (BufferOverflowException e) { + logger.warn("Wrong Element Structure-", e); + } finally { + // blob.close(); + } + } else { + raw = rsSrc.getBytes(1); + } + + try { + if (raw != null) { + Element element = fetchBinaryElement(raw); + if (isProfileMode()) { + accumulateQueryTime(); + } + jobContext.putFeatureCollection(element); + } else { + if (isProfileMode()) { + accumulateQueryTime(); + } + } + } catch (Dgn7fileException e) { + logger.warn("Dgn7Exception", e); + } + } + + JDBCUtils.close(rsSrc); + JDBCUtils.close(stmtSrc); + } + + protected void queryRawElement(OracleConvertPostGISJobContext jobContext, + String srcschema, String srctable) throws SQLException { + Connection connection = jobContext.getOracleConnection(); + String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID"; + PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt); + String fetchDestStmt = spf.sprintf(new Object[]{srcschema, srctable}); + Statement stmtDest = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmtDest.setFetchSize(FETCHSIZE); + ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt); + + try { + while (rsDest.next()) { + ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1); + long[] rawData = rawsValue.getLongArray(); + byte[] comparessedValue; + + /* + if (dataMode == TransferTask.DataMode.Normal) + { + comparessedValue = BinConverter.unmarshalByteArray(rawData, true); + } else + { + comparessedValue = BinConverter.unmarshalCompactByteArray(rawData); + } + */ + comparessedValue = BinConverter.unmarshalByteArray(rawData, true); + + byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue); + + try { + Element element = fetchBinaryElement(rawDest); + jobContext.putFeatureCollection(element); + } catch (Dgn7fileException e) { + logger.warn("Dgn7Exception:" + e.getMessage(), e); + } + } + } finally { + JDBCUtils.close(rsDest); + JDBCUtils.close(stmtDest); + } + } + + // Binary to Element + private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException { + ByteBuffer buffer = ByteBuffer.wrap(raws); + buffer.order(ByteOrder.LITTLE_ENDIAN); + short signature = buffer.getShort(); + + // byte type = (byte) (buffer.get() & 0x7f); + byte type = (byte) ((signature >>> 8) & 0x007f); + + // silly Bentley say contentLength is in 2-byte words + // and ByteByffer uses raws. + // track the record location + int elementLength = (buffer.getShort() * 2) + 4; + ElementType recordType = ElementType.forID(type); + IElementHandler handler; + + handler = recordType.getElementHandler(); + + Element dgnElement = (Element) handler.read(buffer, signature, elementLength); + if (recordType.isComplexElement() && (elementLength < raws.length)) { + int offset = elementLength; + while (offset < (raws.length - 4)) { + buffer.position(offset); + signature = buffer.getShort(); + type = (byte) ((signature >>> 8) & 0x007f); + elementLength = (buffer.getShort() * 2) + 4; + if (raws.length < (offset + elementLength)) { + logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit()); + break; + } + recordType = ElementType.forID(type); + handler = recordType.getElementHandler(); + if (handler != null) { + Element subElement = (Element) handler.read(buffer, signature, elementLength); + ((ComplexElement) dgnElement).add(subElement); + offset += elementLength; + } else { + byte[] remain = new byte[buffer.remaining()]; + System.arraycopy(raws, offset, remain, 0, buffer.remaining()); + for (int i = 0; i < remain.length; i++) { + if (remain[i] != 0) { + logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]); + } + } + break; + } + } + } + + return dgnElement; + } + + /** + * �����ഫ�����ɪ��u�@ + * + * @param context �u�@�������� + * @throws org.quartz.JobExecutionException + * exception + */ + private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File indexDir = new File(getDataPath(), INDEXPATHNAME); + if (!indexDir.exists()) { + logger.info("index dir=" + indexDir + " not exist."); + return; + } + + if (!indexDir.isDirectory()) { + logger.info("index dir=" + indexDir + " is not a directory."); + } + + List<File> dgnFiles = FileUtils.recurseDir(indexDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; + IndexDgnConvertPostGISJobContext convertContext = + new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start index dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc = null; + Dgn7fileReader reader = null; + try { + convertContext.clearOutputDatabase(); + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanIndexDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } + } + } + } + + protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + + while (reader.hasNext()) { + if (isProfileMode()) markProcessTime(); + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + lastComplex = null; + } + + processIndexElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processIndexElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + //if (useTpclidText) { + // if (element instanceof TextElement) { + // convertContext.putFeatureCollection(element); + // } + //} else { + // if (element instanceof ShapeElement) { + convertContext.putFeatureCollection(element); + // } + //} + } + + + /** + * �����ഫ��L�]�p���ɪ��u�@ + * + * @param context jobContext + * @throws org.quartz.JobExecutionException + * exception + */ + private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File otherDir = new File(getDataPath(), OTHERPATHNAME); + if (!otherDir.exists()) { + logger.info("other dir=" + otherDir + " not exist."); + return; + } + + if (!otherDir.isDirectory()) { + logger.info("other dir=" + otherDir + " is not a directory."); + } + + List<File> dgnFiles = FileUtils.recurseDir(otherDir, new FileFilter() { + public boolean accept(File pathname) { + return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + if (dgnFile.isDirectory()) continue; + + GeneralDgnConvertPostGISJobContext convertContext = + new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, + isProfileMode(), isTransformed()); + logger.info("--- start other dgnfile-" + dgnFile.toString() + " ---"); + FileInputStream fs = null; + FileChannel fc; + Dgn7fileReader reader = null; + try { + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + fs = new FileInputStream(dgnFile); + fc = fs.getChannel(); + reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanOtherDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (fs != null) { + try { + fs.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + + if (isProfileMode()) { + logger.warn("Profile-Current convertContext Process Cost-" + + ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec"); + logger.warn("Profile-Current convertContext Update Cost-" + + ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " + + (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec"); + } + } + } + } + + public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + lastComplex = null; + } + + processOtherElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processOtherElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + convertContext.putFeatureCollection(element); + } + + private void clearOutputDatabase() { + /* + File outDataPath = new File(getDataPath(), OracleConvertEdbGeoJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + outDataPath = new File(getDataPath(), IndexDgnConvertShpJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + outDataPath = new File(getDataPath(), GeneralDgnConvertShpJobContext.SHPOUTPATH); + if (outDataPath.exists() && outDataPath.isDirectory()) + { + deleteFilesInPath(outDataPath); + } + */ + } + + private void deleteFilesInPath(File outDataPath) { + deleteFilesInPath(outDataPath, true); + } + + private void deleteFilesInPath(File outDataPath, boolean removeSubDir) { + if (!outDataPath.isDirectory()) { + return; + } + File[] files = outDataPath.listFiles(); + for (File file : files) { + if (file.isFile()) { + if (!file.delete()) { + logger.info("Cannot delete file-" + file.toString()); + } + } else if (file.isDirectory()) { + deleteFilesInPath(file, removeSubDir); + if (removeSubDir) { + if (file.delete()) { + logger.info("Cannot delete dir-" + file.toString()); + } + } + } + } + } + + private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException { + File elminDir = new File(getDataPath(), "elmin"); + if (!elminDir.exists()) { + logger.info("elmin dir=" + elminDir + " not exist."); + return; + } + + if (!elminDir.isDirectory()) { + logger.info("elmin dir=" + elminDir + " is not a directory."); + } + + File[] dgnFiles = elminDir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".dgn"); + } + }); + + for (File dgnFile : dgnFiles) { + FeatureDgnConvertPostGISJobContext convertContext = + new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + logger.info("--- start dgnfile-" + dgnFile.toString() + " ---"); + try { + convertContext.setExecutionContext(context); + String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator); + convertContext.setFilename(dgnPaths[dgnPaths.length - 1]); + convertContext.startTransaction(); + + FileInputStream fs = new FileInputStream(dgnFile); + FileChannel fc = fs.getChannel(); + Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock()); + convertContext.setReader(reader); + + scanFeatureDgnElement(convertContext); + + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + System.gc(); + System.runFinalization(); + } catch (FileNotFoundException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (Dgn7fileException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IOException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (IllegalAttributeException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } catch (SchemaException e) { + convertContext.rollbackTransaction(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } finally { + convertContext.closeFeatureWriter(); + } + } + } + + public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext) + throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException { + Dgn7fileReader reader = convertContext.getReader(); + int count = 0; + Element lastComplex = null; + while (reader.hasNext()) { + Element.FileRecord record = reader.nextElement(); + if (record.element() != null) { + Element element = (Element) record.element(); + ElementType type = element.getElementType(); + + if ((!type.isComplexElement()) && (!element.isComponentElement())) { + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + lastComplex = null; + } + + processFeatureElement(element, convertContext); + } else if (element.isComponentElement()) { + if (lastComplex != null) { + ((ComplexElement) lastComplex).add(element); + } + } else if (type.isComplexElement()) { + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + } + lastComplex = element; + } + } + count++; + } + + if (lastComplex != null) { + processFeatureElement(lastComplex, convertContext); + } + logger.debug("ElementRecord Count=" + count); + } + + private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext) + throws IllegalAttributeException, SchemaException { + convertContext.putFeatureCollection(element); + } + + private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException { + /* + DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath); + try { + convertContext.startTransaction(); + convertContext.commitTransaction(); + convertContext.closeFeatureWriter(); + } catch (IOException e) + { + logger.warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + */ + } + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + /* + if (!isDriverFound()) + { + throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); + } + */ + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); + } + */ + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XGVERSIONTABLE_NAME + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXGeosVersionTable(connection, _pgSchema); + rs.close(); + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } else if (current < (tmpSchemas.size() - 1)) { + Object[] values = tmpSchemas.get(current + 1); + targetSchema = (String) values[0]; + } else { + Object[] values = tmpSchemas.get(0); + targetSchema = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + private String determineTargetThemeTableName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetTable = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XPTVERSIONTABLE_NAME + needCreate = false; + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXPWThemeVersionTable(connection, _pgSchema); + rs.close(); + + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vptname"); + values[1] = rs.getShort("vptstatus"); + tmpTablenames.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } else if (current < (tmpTablenames.size() - 1)) { + Object[] values = tmpTablenames.get(current + 1); + targetTable = (String) values[0]; + } else { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vptname = '"); + sbSQL.append(targetTable).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetTable + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetTable; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + if (schemaName == null) + return "\"" + tableName + "\""; + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" ( vsid serial PRIMARY KEY, "); + sql.append(" vsschema character varying(64) NOT NULL, "); + sql.append(" vsstatus smallint NOT NULL, "); + sql.append(" vstimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)); + sql.append(" (vsschema, vsstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + + createIfNotExistNewSchema(connection, schemaName); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + + private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" ( vptid serial PRIMARY KEY, "); + sql.append(" vptname character varying(64) NOT NULL, "); + sql.append(" vptstatus smallint NOT NULL, "); + sql.append(" vpttimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" (vptname, vptstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + + private void updateRepoStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vsstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void updatePWThemeStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException { + Statement stmt = null; + ResultSet rs = null; + try { + /* + rs = connection.getMetaData().getSchemas(null, s); + if (rs.next()) return; + rs.close(); + rs = null; + */ + + StringBuilder sbSQL = new StringBuilder("CREATE SCHEMA "); + sbSQL.append(s).append(' '); + sbSQL.append("AUTHORIZATION ").append(_pgUsername); + stmt = connection.createStatement(); + stmt.executeUpdate(sbSQL.toString()); + + sbSQL = new StringBuilder("GRANT ALL ON SCHEMA "); + sbSQL.append(s).append(' '); + sbSQL.append("TO public"); + stmt.executeUpdate(sbSQL.toString()); + } catch (SQLException e) { + logger.info("create schema:" + s + " has exception."); + logger.info(e.getMessage(), e); + } finally { + if (rs != null) rs.close(); + if (stmt != null) stmt.close(); + } + } + + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + + private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setString(3, colorText); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setShort(3, (short) ownerId); + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + pstmt.setString(4, "shape://ccarrow"); + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + pstmt.setString(4, "shape://rccarrow"); + } else { + pstmt.setString(4, "shape://backslash"); + } + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(colorText).append("\n"); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + while (connectionPG instanceof DelegatingConnection) { + connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); + } + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + final int MAX_BATCHSIZE = 250; + ResultSet rs = null; + Statement stmt = null; + try { + // connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(MAX_BATCHSIZE); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); + + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 10240); + + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + short dirId = (short) rs.getInt(4); + String flowMark = null; + ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); + if ((ConnectivityDirectionEnum.ForwardflowON == dir) || + (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { + flowMark = FORWARDFLOW_MARK; + + } else if ((ConnectivityDirectionEnum.BackflowON == dir) || + (ConnectivityDirectionEnum.BackFixflowON == dir)) { + flowMark = BACKFLOW_MARK; + } else if (ConnectivityDirectionEnum.Nondeterminate == dir) { + flowMark = NONFLOW_MARK; + } else { + flowMark = UNFLOW_MARK; + } + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(ownerId).append(','); + sb.append(flowMark).append('\n'); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + stmt.execute("DROP TABLE " + tempTable); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } +} -- Gitblit v0.0.0-SNAPSHOT