forked from geodmms/xdgnjobs

unknown
2014-04-08 c49c240521d24d5ed1ff2d07ac26ccdea0648600
Merge branch 'origin/2.1.x'

Conflicts:
xdgnjobs/ximple-jobcarrier/quartz.properties
2 files modified
3 files added
2252 ■■■■■ changed files
xdgnjobs/ximple-jobcarrier/quartz.properties 6 ●●●● patch | view | raw | blame | history
xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml 173 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties 5 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml 173 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java 1895 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-jobcarrier/quartz.properties
@@ -23,11 +23,11 @@
org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml
org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml
org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_nddjob.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml
org.quartz.plugin.jobInitializer.failOnFileNotFound = true
org.quartz.plugin.jobInitializer.scanInterval = 10
xdgnjobs/ximple-jobcarrier/quartz_jobs_dmmsroadfee.xml
New file
@@ -0,0 +1,173 @@
<?xml version='1.0' encoding='utf-8'?>
<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd"
                     version="1.8">
  <pre-processing-commands>
    <delete-jobs-in-group>*</delete-jobs-in-group>
    <!-- clear all jobs in scheduler -->
    <delete-triggers-in-group>*</delete-triggers-in-group>
    <!-- clear all triggers in scheduler -->
  </pre-processing-commands>
  <processing-directives>
    <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them -->
    <overwrite-existing-data>true</overwrite-existing-data>
    <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error -->
    <ignore-duplicates>false</ignore-duplicates>
  </processing-directives>
  <schedule>
    <job>
      <name>ConvertDMMS2PostGisWithGeoserver</name>
      <group>DEFAULT</group>
      <description>A job that convert dgn to postgis</description>
      <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class-->
      <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>-->
      <job-class>com.ximple.eofms.jobs.DMMSRoadfeeCalculateJob</job-class>
      <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class-->
      <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class-->
      <!--volatility>false</volatility-->
      <durability>false</durability>
      <recover>false</recover>
      <!--job-data-map allows-transient-data="true"-->
      <job-data-map>
        <entry>
          <key>JOBDATA_DIR</key>
          <value>/Users/Shared/Public/Projects/XGeoDMMS/xjobrun/tctpcjobs/jobdata</value>
        </entry>
        <entry>
          <key>PGHOST</key>
          <value>10.10.1.7</value>
        </entry>
        <entry>
          <key>PGDATBASE</key>
          <value>pgDMMS2</value>
        </entry>
        <entry>
          <key>PGPORT</key>
          <value>5432</value>
        </entry>
        <entry>
          <key>PGSCHEMA</key>
          <value>public</value>
        </entry>
        <entry>
          <key>PGUSER</key>
          <value>tpcdb</value>
        </entry>
        <entry>
          <key>PGPASS</key>
          <value>simple000</value>
        </entry>
        <entry>
          <key>ORAHOST</key>
          <value>10.10.1.7</value>
        </entry>
        <entry>
          <key>ORAINST</key>
          <value>orcl</value>
        </entry>
        <entry>
          <key>ORAPORT</key>
          <value>1521</value>
        </entry>
        <entry>
          <key>ORAUSER</key>
          <value>system</value>
        </entry>
        <entry>
          <key>ORAPASS</key>
          <value>simple000</value>
        </entry>
        <entry>
          <key>ORGSCHEMA</key>
          <!--value>SPATIALDB</value-->
          <value>SPATIALDB, CMMS_SPATIALDB</value>
        </entry>
        <entry>
          <key>CONVERTDB</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTFILE</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTELEMIN</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTPWTHEMES</key>
          <value>true</value>
        </entry>
        <entry>
          <key>CREATEDUMMY</key>
          <value>false</value>
        </entry>
        <entry>
          <key>ELEMLOG</key>
          <value>true</value>
        </entry>
        <entry>
          <key>USEWKB</key>
          <value>true</value>
        </entry>
        <entry>
          <key>TESTMODE</key>
          <value>false</value>
        </entry>
        <entry>
          <key>TESTCOUNT</key>
          <value>2</value>
        </entry>
        <entry>
          <key>COPYCONNECTIVITYMODE</key>
          <value>true</value>
        </entry>
        <entry>
          <key>PROFILEMODE</key>
          <value>true</value>
        </entry>
        <entry>
          <key>USEZONE121</key>
          <value>true</value>
        </entry>
        <entry>
          <key>GEOSERVER_URL</key>
          <value>http://10.10.1.7:8080/geoserver</value>
        </entry>
        <entry>
          <key>GEOSERVER_USER</key>
          <value>admin</value>
        </entry>
        <entry>
          <key>GEOSERVER_PASS</key>
          <value>geoserver</value>
        </entry>
        <entry>
          <key>IGNORE_DBETL</key>
          <value>false</value>
        </entry>
      </job-data-map>
    </job>
    <trigger>
      <simple>
        <name>convertTrigger</name>
        <group>DEFAULT</group>
        <job-name>ConvertDMMS2PostGisWithGeoserver</job-name>
        <job-group>DEFAULT</job-group>
        <start-time>2013-03-01T18:00:00</start-time>
        <!-- repeat indefinitely every 10 seconds -->
        <repeat-count>0</repeat-count>
        <repeat-interval>500</repeat-interval>
        <!-- <repeat-interval>72000000</repeat-interval> -->
      </simple>
    </trigger>
  </schedule>
</job-scheduling-data>
xdgnjobs/ximple-jobcarrier/src/main/resources/quartz.properties
@@ -24,10 +24,11 @@
org.quartz.plugin.jobInitializer.class = org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml
org.quartz.plugin.jobInitializer.fileNames = quartz_jobs.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_inc.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_edb.xml
org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_colowner.xml
#org.quartz.plugin.jobInitializer.fileNames = quartz_jobs_dmmsroadfee.xml
org.quartz.plugin.jobInitializer.failOnFileNotFound = true
org.quartz.plugin.jobInitializer.scanInterval = 10
xdgnjobs/ximple-jobcarrier/src/main/resources/quartz_jobs_dmmsroadfee.xml
New file
@@ -0,0 +1,173 @@
<?xml version='1.0' encoding='utf-8'?>
<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd"
                     version="1.8">
  <pre-processing-commands>
    <delete-jobs-in-group>*</delete-jobs-in-group>
    <!-- clear all jobs in scheduler -->
    <delete-triggers-in-group>*</delete-triggers-in-group>
    <!-- clear all triggers in scheduler -->
  </pre-processing-commands>
  <processing-directives>
    <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them -->
    <overwrite-existing-data>true</overwrite-existing-data>
    <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error -->
    <ignore-duplicates>false</ignore-duplicates>
  </processing-directives>
  <schedule>
    <job>
      <name>ConvertDMMS2PostGisWithGeoserver</name>
      <group>DEFAULT</group>
      <description>A job that convert dgn to postgis</description>
      <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class-->
      <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>-->
      <job-class>com.ximple.eofms.jobs.DMMSRoadfeeCalculateJob</job-class>
      <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class-->
      <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class-->
      <!--volatility>false</volatility-->
      <durability>false</durability>
      <recover>false</recover>
      <!--job-data-map allows-transient-data="true"-->
      <job-data-map>
        <entry>
          <key>JOBDATA_DIR</key>
          <value>/Users/Shared/Public/Projects/XGeoDMMS/xjobrun/tctpcjobs/jobdata</value>
        </entry>
        <entry>
          <key>PGHOST</key>
          <value>10.10.1.7</value>
        </entry>
        <entry>
          <key>PGDATBASE</key>
          <value>pgDMMS2</value>
        </entry>
        <entry>
          <key>PGPORT</key>
          <value>5432</value>
        </entry>
        <entry>
          <key>PGSCHEMA</key>
          <value>public</value>
        </entry>
        <entry>
          <key>PGUSER</key>
          <value>tpcdb</value>
        </entry>
        <entry>
          <key>PGPASS</key>
          <value>simple000</value>
        </entry>
        <entry>
          <key>ORAHOST</key>
          <value>10.10.1.7</value>
        </entry>
        <entry>
          <key>ORAINST</key>
          <value>orcl</value>
        </entry>
        <entry>
          <key>ORAPORT</key>
          <value>1521</value>
        </entry>
        <entry>
          <key>ORAUSER</key>
          <value>system</value>
        </entry>
        <entry>
          <key>ORAPASS</key>
          <value>simple000</value>
        </entry>
        <entry>
          <key>ORGSCHEMA</key>
          <!--value>SPATIALDB</value-->
          <value>SPATIALDB, CMMS_SPATIALDB</value>
        </entry>
        <entry>
          <key>CONVERTDB</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTFILE</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTELEMIN</key>
          <value>false</value>
        </entry>
        <entry>
          <key>CONVERTPWTHEMES</key>
          <value>true</value>
        </entry>
        <entry>
          <key>CREATEDUMMY</key>
          <value>false</value>
        </entry>
        <entry>
          <key>ELEMLOG</key>
          <value>true</value>
        </entry>
        <entry>
          <key>USEWKB</key>
          <value>true</value>
        </entry>
        <entry>
          <key>TESTMODE</key>
          <value>false</value>
        </entry>
        <entry>
          <key>TESTCOUNT</key>
          <value>2</value>
        </entry>
        <entry>
          <key>COPYCONNECTIVITYMODE</key>
          <value>true</value>
        </entry>
        <entry>
          <key>PROFILEMODE</key>
          <value>true</value>
        </entry>
        <entry>
          <key>USEZONE121</key>
          <value>true</value>
        </entry>
        <entry>
          <key>GEOSERVER_URL</key>
          <value>http://10.10.1.7:8080/geoserver</value>
        </entry>
        <entry>
          <key>GEOSERVER_USER</key>
          <value>admin</value>
        </entry>
        <entry>
          <key>GEOSERVER_PASS</key>
          <value>geoserver</value>
        </entry>
        <entry>
          <key>IGNORE_DBETL</key>
          <value>false</value>
        </entry>
      </job-data-map>
    </job>
    <trigger>
      <simple>
        <name>convertTrigger</name>
        <group>DEFAULT</group>
        <job-name>ConvertDMMS2PostGisWithGeoserver</job-name>
        <job-group>DEFAULT</job-group>
        <start-time>2013-03-01T18:00:00</start-time>
        <!-- repeat indefinitely every 10 seconds -->
        <repeat-count>0</repeat-count>
        <repeat-interval>500</repeat-interval>
        <!-- <repeat-interval>72000000</repeat-interval> -->
      </simple>
    </trigger>
  </schedule>
</job-scheduling-data>
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSRoadfeeCalculateJob.java
New file
@@ -0,0 +1,1895 @@
package com.ximple.eofms.jobs;
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext;
import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext;
import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext;
import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext;
import com.ximple.eofms.util.*;
import com.ximple.io.dgn7.*;
import com.ximple.util.PrintfFormat;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleResultSet;
import oracle.sql.ARRAY;
import oracle.sql.BLOB;
import org.apache.commons.collections.OrderedMap;
import org.apache.commons.collections.OrderedMapIterator;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.dbcp.DelegatingConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.geotools.data.DataStore;
import org.geotools.data.Transaction;
import org.geotools.data.jdbc.JDBCUtils;
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
import org.geotools.feature.SchemaException;
import org.geotools.jdbc.JDBCDataStore;
import org.opengis.feature.IllegalAttributeException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.io.*;
import java.math.BigDecimal;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.sql.*;
import java.util.*;
import java.util.Date;
public class DMMSRoadfeeCalculateJob extends AbstractOracleDatabaseJob {
    final static Log logger = LogFactory.getLog(DMMSRoadfeeCalculateJob.class);
    private static final String PGHOST = "PGHOST";
    private static final String PGDATBASE = "PGDATBASE";
    private static final String PGPORT = "PGPORT";
    private static final String PGSCHEMA = "PGSCHEMA";
    private static final String PGUSER = "PGUSER";
    private static final String PGPASS = "PGPASS";
    private static final String USEWKB = "USEWKB";
    private static final boolean useTpclidText = false;
    private static final int FETCHSIZE = 30;
    private static final int COMMITSIZE = 100;
    private static final String INDEXPATHNAME = "index";
    private static final String OTHERPATHNAME = "other";
    public static final String FORWARDFLOW_MARK = "shape://ccarrow";
    public static final String BACKFLOW_MARK = "shape://rccarrow";
    public static final String UNFLOW_MARK = "shape://backslash";
    public static final String NONFLOW_MARK = "shape://slash";
    private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC";
    private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
    private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)";
    private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)";
    public static final String FDYNCOLOR_SUFFIX = "_fdyncolor";
    public static final String FOWNER_SUFFIX = "_fowner";
    protected static class Pair {
        Object first;
        Object second;
        public Pair(Object first, Object second) {
            this.first = first;
            this.second = second;
        }
    }
    protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
    protected String _pgHost;
    protected String _pgDatabase;
    protected String _pgPort;
    protected String _pgSchema;
    protected String _pgUsername;
    protected String _pgPassword;
    protected String _pgUseWKB;
    protected Map<String, String> pgProperties;
    protected JDBCDataStore targetDataStore;
    // protected OracleConvertEdbGeoJobContext oracleJobContext;
    private long queryTime = 0;
    private long queryTimeStart = 0;
    public Log getLogger() {
        return logger;
    }
    protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath,
                                                         boolean profileMode,
                                                         boolean useTransform) {
        return new OracleConvertPostGISJobContext(getDataPath(),
                getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
    }
    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
        super.extractJobConfiguration(jobDetail);
        JobDataMap dataMap = jobDetail.getJobDataMap();
        _pgHost = dataMap.getString(PGHOST);
        _pgDatabase = dataMap.getString(PGDATBASE);
        _pgPort = dataMap.getString(PGPORT);
        _pgSchema = dataMap.getString(PGSCHEMA);
        _pgUsername = dataMap.getString(PGUSER);
        _pgPassword = dataMap.getString(PGPASS);
        _pgUseWKB = dataMap.getString(USEWKB);
        Log logger = getLogger();
        /*
        logger.info("PGHOST=" + _myHost);
        logger.info("PGDATBASE=" + _myDatabase);
        logger.info("PGPORT=" + _myPort);
        logger.info("PGSCHEMA=" + _mySchema);
        logger.info("PGUSER=" + _myUsername);
        logger.info("PGPASS=" + _myPassword);
        logger.info("USEWKB=" + _myUseWKB);
        */
        if (_pgHost == null) {
            logger.warn("PGHOST is null");
            throw new JobExecutionException("Unknown PostGIS host.");
        }
        if (_pgDatabase == null) {
            logger.warn("PGDATABASE is null");
            throw new JobExecutionException("Unknown PostGIS database.");
        }
        if (_pgPort == null) {
            logger.warn("PGPORT is null");
            throw new JobExecutionException("Unknown PostGIS port.");
        }
        if (_pgSchema == null) {
            logger.warn("PGSCHEMA is null");
            throw new JobExecutionException("Unknown PostGIS schema.");
        }
        if (_pgUsername == null) {
            logger.warn("PGUSERNAME is null");
            throw new JobExecutionException("Unknown PostGIS username.");
        }
        if (_pgPassword == null) {
            logger.warn("PGPASSWORD is null");
            throw new JobExecutionException("Unknown PostGIS password.");
        }
        Map<String, String> remote = new TreeMap<String, String>();
        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
        // remote.put("charset", "UTF-8");
        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
        // remote.put( "namespace", null);
        pgProperties = remote;
    }
    private List<String[]> sqlExecQuery(Connection connection,String strSQLIn,String[] params) throws SQLException {
        String strSQL=strSQLIn;
        for(int i=0;i<params.length;i++)
        {
            if(params[i]==null)params[i]="";
            strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]);
        }
        List<String[]> result=new ArrayList<String[]>();
        List<String> temp = new  ArrayList<String>();
        String strTemp="";
        // String result = null;
        Statement stmt = null;
        ResultSet rs = null;
        try {
            stmt = connection.createStatement();
            rs = stmt.executeQuery(strSQL.toString());
            // get first result
            // temp.clear();
            ResultSetMetaData rsmd = rs.getMetaData();
            int NumOfCol = rsmd.getColumnCount();
            while (rs.next()) {
                for (int idx = 0; idx < NumOfCol; idx++) {
                    strTemp = rs.getString(idx + 1);
                    temp.add(strTemp);
                }
                result.add(temp.toArray(new String[0]));
                temp.clear();
            }
            return result;
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void sqlExec(Connection connection,String strSQLIn,String[] params) throws SQLException {
        String strSQL=strSQLIn;
        for(int i=0;i<params.length;i++)
        {
            if(params[i]==null)params[i]="";
            strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]);
        }
        List<String[]> result=new ArrayList<String[]>();
        List<String> temp = new  ArrayList<String>();
        String strTemp="";
        // String result = null;
        Statement stmt = null;
        ResultSet rs = null;
        try {
            stmt = connection.createStatement();
            stmt.execute( strSQL.toString());
            // get first result
            // temp.clear();
        } finally {
          //  JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void doJob(Connection postsql,Connection orcl)  throws  SQLException
    {
        String strSQLGetTask="select proc_id,procname,datastore,name,step,src,dest,txtsql from roadfee_proc where rowstatus=1 and procname like 'STEP%' order by procname,step" ;
        List<String[]> joblist=null;
        Connection inConnection;
        int idOfJob=0;
        List<String[]> nodata= new ArrayList<String[]>();
        List<String[]> lista= new ArrayList<String[]>();
        List<String[]> list1= new ArrayList<String[]>();
        List<String[]> listIn= new ArrayList<String[]>();
        List<String[]> temp;//= new ArrayList<String[]>();
        nodata.add(new String[]{""});
        //  proc_id[0],procname[1],datastore[2\,name[3],step[4],  src[5],des[6]t,txtsql[7]
        try{
        logger.info("getJoblist");
        joblist=sqlExecQuery(postsql, strSQLGetTask, new String[]{});
        for ( idOfJob=0;idOfJob<joblist.size();idOfJob++)
        {
            logger.info("begin "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")");
            if(joblist.get(idOfJob)[5].equals("nodata"))
            {
                listIn=nodata;
            }
            else if(joblist.get(idOfJob)[5].equals("list1"))
            {
                listIn=list1;
            }
            else if(joblist.get(idOfJob)[5].equals("lista"))
            {
                listIn=lista;
            }
            if(joblist.get(idOfJob)[2].equals("psql"))
            {
                inConnection= postsql;
            }
            else if(joblist.get(idOfJob)[2].equals("orcl"))
            {
                inConnection= orcl;
            }
            else
                return ; //connection failed
            if( joblist.get(idOfJob)[6].equals("list1")) list1.clear();
            if( joblist.get(idOfJob)[6].equals("lista")) lista.clear();
            //runsql
            logger.info("process data count: "+String.valueOf(listIn.size()));
            for( int idxOfListIn=0;idxOfListIn< listIn.size();idxOfListIn++)
            {
                if( joblist.get(idOfJob)[6].equals("nodata"))
                {
                    sqlExec(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn));
                    //logger.info("finish  "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")")
                    continue;
                }else
                {
                    temp=sqlExecQuery(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn));
                }
                for(int j=0;j<temp.size();j++)
                {
                    if( joblist.get(idOfJob)[6].equals("list1"))
                    {
                        list1.add(temp.get(j));
                    }
                    else if( joblist.get(idOfJob)[6].equals("lista"))
                    {
                        lista.add(temp.get(j));
                    }
                }
            }
        }
        }catch(SQLException sqlex)
        {
            logger.warn("ERROR@ID:"+String.valueOf( joblist.get(idOfJob)[0]));
            throw sqlex;
        }
    }
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // Every job has its own job detail
        JobDetail jobDetail = context.getJobDetail();
        // The name is defined in the job definition
        String jobName = jobDetail.getKey().getName();
        // Log the time the job started
        logger.info(jobName + " fired at " + new Date());
        extractJobConfiguration(jobDetail);
        if (isIgnoreDBETL()) {
            return;
        }
        createSourceDataStore();
        createTargetDataStore();
        if (getSourceDataStore() == null) {
            logger.warn("Cannot connect source oracle database.");
            throw new JobExecutionException("Cannot connect source oracle database.");
        }
        if (getTargetDataStore() == null) {
            logger.warn("Cannot connect source postgreSQL database.");
            throw new JobExecutionException("Cannot connect source postgreSQL database.");
        }
        if (isProfileMode()) {
            queryTime = 0;
        }
        long t1 = System.currentTimeMillis();
        String targetSchemaName, targetThemeTable;
        try {
            //logger.info("-- step:clearOutputDatabase --");
             doJob( targetDataStore.getConnection(Transaction.AUTO_COMMIT),sourceDataStore.getConnection(Transaction.AUTO_COMMIT) );
        } catch (IOException ex) {
            disconnect();
            logger.warn(ex.getMessage(), ex);
            throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
        } catch (SQLException e) {
            disconnect();
            logger.warn(e.getMessage(), e);
            throw new JobExecutionException("Database error. " + e.getMessage(), e);
         } finally {
            disconnect();
        }
        logger.warn(jobName + " end at " + new Date());
    }
    private void logTimeDiff(String message, long tBefore, long tCurrent) {
        logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
                (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
    }
    private void exetcuteConvert(OracleConvertPostGISJobContext jobContext,
                                 String querySchema, String targetSchemaName) throws SQLException {
        int order = 0;
        OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(),
                querySchema, "SD$SPACENODES", null);
        logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode);
        int total = map.size(); //spacenodes count
        int step = total / 100;
        int current = 0;
        if (total == 0) {
            logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is zero.");
            return;
        }
        logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is " + map.size());
        //jobContext.startTransaction();
        jobContext.setCurrentSchema(querySchema);
        jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0);
        for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext(); ) {
            it.next();
            Pair pair = (Pair) it.getValue();
            String tableSrc = (String) pair.first;
            logger.info("begin convert:[" + order + "]-" + tableSrc);
            queryIgsetElement(jobContext, querySchema, tableSrc);
            order++;
            if (_testMode) {
                if ((_testCount < 0) || (order >= _testCount))
                    break;
            }
            if ((order % COMMITSIZE) == 0) {
                // OracleConnection connection = jobContext.getOracleConnection();
                // connection.commitTransaction();
                jobContext.commitTransaction();
                //jobContext.startTransaction();
                System.gc();
                System.runFinalization();
            }
            if (step != 0) {
                int now = order % step;
                if (now != current) {
                    current = now;
                    jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current);
                }
            } else {
                jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current);
                current++;
            }
        }
        jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100);
        jobContext.commitTransaction();
        jobContext.resetFeatureContext();
        if (isProfileMode()) {
        }
        logger.info("end convert job:[" + order + "]");
        System.gc();
        System.runFinalization();
    }
    protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc,
                                            OrderedMap orderedMap) throws SQLException {
        if (orderedMap == null)
            orderedMap = new LinkedMap(99);
        String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\"";
        PrintfFormat spf = new PrintfFormat(fetchStmtFmt);
        String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc});
        Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ResultSet rs = null;
        stmt.setFetchSize(FETCHSIZE);
        try {
            rs = stmt.executeQuery(fetchStmt);
            int size = rs.getMetaData().getColumnCount();
            while (rs.next()) {
                Object[] values = new Object[size];
                for (int i = 0; i < size; i++) {
                    values[i] = rs.getObject(i + 1);
                }
                Integer key = ((BigDecimal) values[0]).intValue();
                String name = (String) values[1];
                Pair pair = (Pair) orderedMap.get(key);
                if (pair == null)
                    orderedMap.put(key, new Pair(name, null));
                else
                    pair.first = name;
            }
        } catch (SQLException e) {
            logger.error(e.toString(), e);
            logger.error("stmt=" + fetchStmt);
            throw e;
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
        return orderedMap;
    }
    protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc,
                                                 OrderedMap orderedMap) throws SQLException {
        if (orderedMap == null)
            orderedMap = new LinkedMap(99);
        String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\"";
        PrintfFormat spf = new PrintfFormat(fetchStmtFmt);
        String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc});
        Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        stmt.setFetchSize(FETCHSIZE);
        ResultSet rs = stmt.executeQuery(fetchStmt);
        try {
            int size = rs.getMetaData().getColumnCount();
            while (rs.next()) {
                Object[] values = new Object[size];
                for (int i = 0; i < size; i++) {
                    values[i] = rs.getObject(i + 1);
                }
                Integer key = ((BigDecimal) values[0]).intValue();
                String name = (String) values[1];
                Pair pair = (Pair) orderedMap.get(key);
                if (pair == null)
                    orderedMap.put(key, new Pair(null, name));
                else
                    pair.second = name;
            }
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
        return orderedMap;
    }
    protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext,
                                     String srcschema, String srctable) throws SQLException {
        Connection connection = jobContext.getOracleConnection();
        String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID";
        //String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
        PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
        String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable});
        Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        stmtSrc.setFetchSize(FETCHSIZE);
        ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
        int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
        while (rsSrc.next()) {
            if (isProfileMode()) {
                markQueryTime();
            }
            byte[] raw = null;
            if (igdsMetaType == Types.BLOB) {
                BLOB blob = (BLOB) rsSrc.getBlob(1);
                try {
                    raw = getBytesFromBLOB(blob);
                } catch (BufferOverflowException e) {
                    logger.warn("Wrong Element Structure-", e);
                } finally {
                    // blob.close();
                }
            } else {
                raw = rsSrc.getBytes(1);
            }
            try {
                if (raw != null) {
                    Element element = fetchBinaryElement(raw);
                    if (isProfileMode()) {
                        accumulateQueryTime();
                    }
                    jobContext.putFeatureCollection(element);
                } else {
                    if (isProfileMode()) {
                        accumulateQueryTime();
                    }
                }
            } catch (Dgn7fileException e) {
                logger.warn("Dgn7Exception", e);
            }
        }
        JDBCUtils.close(rsSrc);
        JDBCUtils.close(stmtSrc);
    }
    protected void queryRawElement(OracleConvertPostGISJobContext jobContext,
                                   String srcschema, String srctable) throws SQLException {
        Connection connection = jobContext.getOracleConnection();
        String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID";
        PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt);
        String fetchDestStmt = spf.sprintf(new Object[]{srcschema, srctable});
        Statement stmtDest = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        stmtDest.setFetchSize(FETCHSIZE);
        ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt);
        try {
            while (rsDest.next()) {
                ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1);
                long[] rawData = rawsValue.getLongArray();
                byte[] comparessedValue;
                /*
                if (dataMode == TransferTask.DataMode.Normal)
                {
                    comparessedValue = BinConverter.unmarshalByteArray(rawData, true);
                } else
                {
                    comparessedValue = BinConverter.unmarshalCompactByteArray(rawData);
                }
                */
                comparessedValue = BinConverter.unmarshalByteArray(rawData, true);
                byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue);
                try {
                    Element element = fetchBinaryElement(rawDest);
                    jobContext.putFeatureCollection(element);
                } catch (Dgn7fileException e) {
                    logger.warn("Dgn7Exception:" + e.getMessage(), e);
                }
            }
        } finally {
            JDBCUtils.close(rsDest);
            JDBCUtils.close(stmtDest);
        }
    }
    // Binary to Element
    private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException {
        ByteBuffer buffer = ByteBuffer.wrap(raws);
        buffer.order(ByteOrder.LITTLE_ENDIAN);
        short signature = buffer.getShort();
        // byte type = (byte) (buffer.get() & 0x7f);
        byte type = (byte) ((signature >>> 8) & 0x007f);
        // silly Bentley say contentLength is in 2-byte words
        // and ByteByffer uses raws.
        // track the record location
        int elementLength = (buffer.getShort() * 2) + 4;
        ElementType recordType = ElementType.forID(type);
        IElementHandler handler;
        handler = recordType.getElementHandler();
        Element dgnElement = (Element) handler.read(buffer, signature, elementLength);
        if (recordType.isComplexElement() && (elementLength < raws.length)) {
            int offset = elementLength;
            while (offset < (raws.length - 4)) {
                buffer.position(offset);
                signature = buffer.getShort();
                type = (byte) ((signature >>> 8) & 0x007f);
                elementLength = (buffer.getShort() * 2) + 4;
                if (raws.length < (offset + elementLength)) {
                    logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit());
                    break;
                }
                recordType = ElementType.forID(type);
                handler = recordType.getElementHandler();
                if (handler != null) {
                    Element subElement = (Element) handler.read(buffer, signature, elementLength);
                    ((ComplexElement) dgnElement).add(subElement);
                    offset += elementLength;
                } else {
                    byte[] remain = new byte[buffer.remaining()];
                    System.arraycopy(raws, offset, remain, 0, buffer.remaining());
                    for (int i = 0; i < remain.length; i++) {
                        if (remain[i] != 0) {
                            logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]);
                        }
                    }
                    break;
                }
            }
        }
        return dgnElement;
    }
    /**
     * �����ഫ���޹��ɪ��u�@
     *
     * @param context �u�@��������
     * @throws org.quartz.JobExecutionException
     *          exception
     */
    private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
        File indexDir = new File(getDataPath(), INDEXPATHNAME);
        if (!indexDir.exists()) {
            logger.info("index dir=" + indexDir + " not exist.");
            return;
        }
        if (!indexDir.isDirectory()) {
            logger.info("index dir=" + indexDir + " is not a directory.");
        }
        List<File> dgnFiles = FileUtils.recurseDir(indexDir, new FileFilter() {
            public boolean accept(File pathname) {
                return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn");
            }
        });
        for (File dgnFile : dgnFiles) {
            if (dgnFile.isDirectory()) continue;
            IndexDgnConvertPostGISJobContext convertContext =
                    new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName,
                            isProfileMode(), isTransformed());
            logger.info("--- start index dgnfile-" + dgnFile.toString() + " ---");
            FileInputStream fs = null;
            FileChannel fc = null;
            Dgn7fileReader reader = null;
            try {
                convertContext.clearOutputDatabase();
                convertContext.setExecutionContext(context);
                String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
                convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
                convertContext.startTransaction();
                fs = new FileInputStream(dgnFile);
                fc = fs.getChannel();
                reader = new Dgn7fileReader(fc, new Lock());
                convertContext.setReader(reader);
                scanIndexDgnElement(convertContext);
                convertContext.commitTransaction();
                convertContext.closeFeatureWriter();
                System.gc();
                System.runFinalization();
            } catch (FileNotFoundException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (Dgn7fileException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IOException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IllegalAttributeException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (SchemaException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } finally {
                convertContext.closeFeatureWriter();
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                if (fs != null) {
                    try {
                        fs.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                if (isProfileMode()) {
                    logger.warn("Profile-Current convertContext Process Cost-" +
                            ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " +
                            (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
                    logger.warn("Profile-Current convertContext Update Cost-" +
                            ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " +
                            (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
                }
            }
        }
    }
    protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext)
            throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
        Dgn7fileReader reader = convertContext.getReader();
        int count = 0;
        Element lastComplex = null;
        while (reader.hasNext()) {
            if (isProfileMode()) markProcessTime();
            Element.FileRecord record = reader.nextElement();
            if (record.element() != null) {
                Element element = (Element) record.element();
                ElementType type = element.getElementType();
                if ((!type.isComplexElement()) && (!element.isComponentElement())) {
                    if (lastComplex != null) {
                        processIndexElement(lastComplex, convertContext);
                        lastComplex = null;
                    }
                    processIndexElement(element, convertContext);
                } else if (element.isComponentElement()) {
                    if (lastComplex != null) {
                        ((ComplexElement) lastComplex).add(element);
                    }
                } else if (type.isComplexElement()) {
                    if (lastComplex != null) {
                        processIndexElement(lastComplex, convertContext);
                    }
                    lastComplex = element;
                }
            }
            count++;
        }
        if (lastComplex != null) {
            processIndexElement(lastComplex, convertContext);
        }
        logger.debug("ElementRecord Count=" + count);
    }
    private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext)
            throws IllegalAttributeException, SchemaException {
        //if (useTpclidText) {
        //    if (element instanceof TextElement) {
        //        convertContext.putFeatureCollection(element);
        //    }
        //} else {
        //    if (element instanceof ShapeElement) {
        convertContext.putFeatureCollection(element);
        //    }
        //}
    }
    /**
     * �����ഫ��L�]�p���ɪ��u�@
     *
     * @param context jobContext
     * @throws org.quartz.JobExecutionException
     *          exception
     */
    private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
        File otherDir = new File(getDataPath(), OTHERPATHNAME);
        if (!otherDir.exists()) {
            logger.info("other dir=" + otherDir + " not exist.");
            return;
        }
        if (!otherDir.isDirectory()) {
            logger.info("other dir=" + otherDir + " is not a directory.");
        }
        List<File> dgnFiles = FileUtils.recurseDir(otherDir, new FileFilter() {
            public boolean accept(File pathname) {
                return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn");
            }
        });
        for (File dgnFile : dgnFiles) {
            if (dgnFile.isDirectory()) continue;
            GeneralDgnConvertPostGISJobContext convertContext =
                    new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName,
                            isProfileMode(), isTransformed());
            logger.info("--- start other dgnfile-" + dgnFile.toString() + " ---");
            FileInputStream fs = null;
            FileChannel fc;
            Dgn7fileReader reader = null;
            try {
                convertContext.setExecutionContext(context);
                String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
                convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
                convertContext.startTransaction();
                fs = new FileInputStream(dgnFile);
                fc = fs.getChannel();
                reader = new Dgn7fileReader(fc, new Lock());
                convertContext.setReader(reader);
                scanOtherDgnElement(convertContext);
                convertContext.commitTransaction();
                convertContext.closeFeatureWriter();
                System.gc();
                System.runFinalization();
            } catch (FileNotFoundException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (Dgn7fileException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IOException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IllegalAttributeException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (SchemaException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } finally {
                convertContext.closeFeatureWriter();
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                if (fs != null) {
                    try {
                        fs.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                if (isProfileMode()) {
                    logger.warn("Profile-Current convertContext Process Cost-" +
                            ((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " +
                            (((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
                    logger.warn("Profile-Current convertContext Update Cost-" +
                            ((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " +
                            (((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
                }
            }
        }
    }
    public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext)
            throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
        Dgn7fileReader reader = convertContext.getReader();
        int count = 0;
        Element lastComplex = null;
        while (reader.hasNext()) {
            Element.FileRecord record = reader.nextElement();
            if (record.element() != null) {
                Element element = (Element) record.element();
                ElementType type = element.getElementType();
                if ((!type.isComplexElement()) && (!element.isComponentElement())) {
                    if (lastComplex != null) {
                        processOtherElement(lastComplex, convertContext);
                        lastComplex = null;
                    }
                    processOtherElement(element, convertContext);
                } else if (element.isComponentElement()) {
                    if (lastComplex != null) {
                        ((ComplexElement) lastComplex).add(element);
                    }
                } else if (type.isComplexElement()) {
                    if (lastComplex != null) {
                        processOtherElement(lastComplex, convertContext);
                    }
                    lastComplex = element;
                }
            }
            count++;
        }
        if (lastComplex != null) {
            processOtherElement(lastComplex, convertContext);
        }
        logger.debug("ElementRecord Count=" + count);
    }
    private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext)
            throws IllegalAttributeException, SchemaException {
        convertContext.putFeatureCollection(element);
    }
    private void clearOutputDatabase() {
        /*
        File outDataPath = new File(getDataPath(), OracleConvertEdbGeoJobContext.SHPOUTPATH);
        if (outDataPath.exists() && outDataPath.isDirectory())
        {
            deleteFilesInPath(outDataPath);
        }
        outDataPath = new File(getDataPath(), IndexDgnConvertShpJobContext.SHPOUTPATH);
        if (outDataPath.exists() && outDataPath.isDirectory())
        {
            deleteFilesInPath(outDataPath);
        }
        outDataPath = new File(getDataPath(), GeneralDgnConvertShpJobContext.SHPOUTPATH);
        if (outDataPath.exists() && outDataPath.isDirectory())
        {
            deleteFilesInPath(outDataPath);
        }
        */
    }
    private void deleteFilesInPath(File outDataPath) {
        deleteFilesInPath(outDataPath, true);
    }
    private void deleteFilesInPath(File outDataPath, boolean removeSubDir) {
        if (!outDataPath.isDirectory()) {
            return;
        }
        File[] files = outDataPath.listFiles();
        for (File file : files) {
            if (file.isFile()) {
                if (!file.delete()) {
                    logger.info("Cannot delete file-" + file.toString());
                }
            } else if (file.isDirectory()) {
                deleteFilesInPath(file, removeSubDir);
                if (removeSubDir) {
                    if (file.delete()) {
                        logger.info("Cannot delete dir-" + file.toString());
                    }
                }
            }
        }
    }
    private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
        File elminDir = new File(getDataPath(), "elmin");
        if (!elminDir.exists()) {
            logger.info("elmin dir=" + elminDir + " not exist.");
            return;
        }
        if (!elminDir.isDirectory()) {
            logger.info("elmin dir=" + elminDir + " is not a directory.");
        }
        File[] dgnFiles = elminDir.listFiles(new FilenameFilter() {
            public boolean accept(File dir, String name) {
                return name.toLowerCase().endsWith(".dgn");
            }
        });
        for (File dgnFile : dgnFiles) {
            FeatureDgnConvertPostGISJobContext convertContext =
                    new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath,
                            isProfileMode(), isTransformed());
            logger.info("--- start dgnfile-" + dgnFile.toString() + " ---");
            try {
                convertContext.setExecutionContext(context);
                String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
                convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
                convertContext.startTransaction();
                FileInputStream fs = new FileInputStream(dgnFile);
                FileChannel fc = fs.getChannel();
                Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock());
                convertContext.setReader(reader);
                scanFeatureDgnElement(convertContext);
                convertContext.commitTransaction();
                convertContext.closeFeatureWriter();
                System.gc();
                System.runFinalization();
            } catch (FileNotFoundException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (Dgn7fileException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IOException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (IllegalAttributeException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } catch (SchemaException e) {
                convertContext.rollbackTransaction();
                logger.warn(e.getMessage(), e);
                throw new JobExecutionException(e.getMessage(), e);
            } finally {
                convertContext.closeFeatureWriter();
            }
        }
    }
    public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext)
            throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
        Dgn7fileReader reader = convertContext.getReader();
        int count = 0;
        Element lastComplex = null;
        while (reader.hasNext()) {
            Element.FileRecord record = reader.nextElement();
            if (record.element() != null) {
                Element element = (Element) record.element();
                ElementType type = element.getElementType();
                if ((!type.isComplexElement()) && (!element.isComponentElement())) {
                    if (lastComplex != null) {
                        processFeatureElement(lastComplex, convertContext);
                        lastComplex = null;
                    }
                    processFeatureElement(element, convertContext);
                } else if (element.isComponentElement()) {
                    if (lastComplex != null) {
                        ((ComplexElement) lastComplex).add(element);
                    }
                } else if (type.isComplexElement()) {
                    if (lastComplex != null) {
                        processFeatureElement(lastComplex, convertContext);
                    }
                    lastComplex = element;
                }
            }
            count++;
        }
        if (lastComplex != null) {
            processFeatureElement(lastComplex, convertContext);
        }
        logger.debug("ElementRecord Count=" + count);
    }
    private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext)
            throws IllegalAttributeException, SchemaException {
        convertContext.putFeatureCollection(element);
    }
    private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException {
        /*
        DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath);
        try {
            convertContext.startTransaction();
            convertContext.commitTransaction();
            convertContext.closeFeatureWriter();
        } catch (IOException e)
        {
            logger.warn(e.getMessage(), e);
            throw new JobExecutionException(e.getMessage(), e);
        }
        */
    }
    public DataStore getTargetDataStore() {
        return targetDataStore;
    }
    protected void createTargetDataStore() throws JobExecutionException {
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
        /*
        if (!isDriverFound())
        {
            throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER);
        }
        */
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
        }
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
        }
        /*
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
        }
        */
        if (!dataStoreFactory.canProcess(pgProperties)) {
            getLogger().warn("cannot process properties-");
            throw new JobExecutionException("cannot process properties-");
        }
        try {
            targetDataStore = dataStoreFactory.createDataStore(pgProperties);
        } catch (IOException e) {
            getLogger().warn(e.getMessage(), e);
            throw new JobExecutionException(e.getMessage(), e);
        }
    }
    protected void disconnect() {
        super.disconnect();
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
    }
    private String determineTargetSchemaName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetSchema = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XGVERSIONTABLE_NAME
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate)
                createXGeosVersionTable(connection, _pgSchema);
            rs.close();
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vsschema, vsstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vsid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vsschema");
                values[1] = rs.getShort("vsstatus");
                tmpSchemas.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current == -1) {
                Object[] values = tmpSchemas.get(0);
                targetSchema = (String) values[0];
            } else if (current < (tmpSchemas.size() - 1)) {
                Object[] values = tmpSchemas.get(current + 1);
                targetSchema = (String) values[0];
            } else {
                Object[] values = tmpSchemas.get(0);
                targetSchema = (String) values[0];
            }
            sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vsstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
            sbSQL.append(" WHERE vsschema = '");
            sbSQL.append(targetSchema).append("'");
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetSchema + " update result count="
                        + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetSchema;
    }
    private String determineTargetThemeTableName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetTable = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            // Create XPTVERSIONTABLE_NAME
            needCreate = false;
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate)
                createXPWThemeVersionTable(connection, _pgSchema);
            rs.close();
            rs = null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vptname, vptstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vptid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vptname");
                values[1] = rs.getShort("vptstatus");
                tmpTablenames.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current == -1) {
                Object[] values = tmpTablenames.get(0);
                targetTable = (String) values[0];
            } else if (current < (tmpTablenames.size() - 1)) {
                Object[] values = tmpTablenames.get(current + 1);
                targetTable = (String) values[0];
            } else {
                Object[] values = tmpTablenames.get(0);
                targetTable = (String) values[0];
            }
            sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vptstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
            sbSQL.append(" WHERE vptname = '");
            sbSQL.append(targetTable).append("'");
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetTable + " update result count="
                        + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetTable;
    }
    public String encodeSchemaTableName(String schemaName, String tableName) {
        if (schemaName == null)
            return "\"" + tableName + "\"";
        return "\"" + schemaName + "\".\"" + tableName + "\"";
    }
    private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException {
        Statement stmt = null;
        StringBuilder sql = new StringBuilder("CREATE TABLE ");
        sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
        sql.append(" ( vsid serial PRIMARY KEY, ");
        sql.append(" vsschema character varying(64) NOT NULL, ");
        sql.append(" vsstatus smallint NOT NULL, ");
        sql.append(" vstimestamp timestamp with time zone ) ");
        try {
            stmt = connection.createStatement();
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("ALTER TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
            sql.append(" OWNER TO ").append(_pgUsername);
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("GRANT ALL ON TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
            sql.append(" TO public");
            stmt.executeUpdate(sql.toString());
            for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) {
                sql = new StringBuilder("INSERT INTO ");
                sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
                sql.append(" (vsschema, vsstatus) VALUES ('");
                sql.append(schemaName).append("', ");
                sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )");
                stmt.executeUpdate(sql.toString());
                createIfNotExistNewSchema(connection, schemaName);
            }
        } finally {
            if (stmt != null) stmt.close();
        }
    }
    private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException {
        Statement stmt = null;
        StringBuilder sql = new StringBuilder("CREATE TABLE ");
        sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
        sql.append(" ( vptid serial PRIMARY KEY, ");
        sql.append(" vptname character varying(64) NOT NULL, ");
        sql.append(" vptstatus smallint NOT NULL, ");
        sql.append(" vpttimestamp timestamp with time zone ) ");
        try {
            stmt = connection.createStatement();
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("ALTER TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
            sql.append(" OWNER TO ").append(_pgUsername);
            stmt.executeUpdate(sql.toString());
            sql = new StringBuilder("GRANT ALL ON TABLE ");
            sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
            sql.append(" TO public");
            stmt.executeUpdate(sql.toString());
            for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) {
                sql = new StringBuilder("INSERT INTO ");
                sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
                sql.append(" (vptname, vptstatus) VALUES ('");
                sql.append(schemaName).append("', ");
                sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )");
                stmt.executeUpdate(sql.toString());
            }
        } finally {
            if (stmt != null) stmt.close();
        }
    }
    private void updateRepoStatusToReady(String targetSchema) {
        if (targetDataStore == null) return;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        boolean needCreate = false;
        try {
            StringBuilder sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vsstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
            sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '");
            sbSQL.append(targetSchema).append("'");
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            stmt = connection.createStatement();
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetSchema + " update result count="
                        + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } catch (IOException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
    }
    private void updatePWThemeStatusToReady(String targetSchema) {
        if (targetDataStore == null) return;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        boolean needCreate = false;
        try {
            StringBuilder sbSQL = new StringBuilder("UPDATE ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
            sbSQL.append(" SET vptstatus = ");
            sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
            sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '");
            sbSQL.append(targetSchema).append("'");
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            stmt = connection.createStatement();
            int count = stmt.executeUpdate(sbSQL.toString());
            if (count != 1) {
                logger.info("update status for " + targetSchema + " update result count="
                        + count);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } catch (IOException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
    }
    private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException {
        Statement stmt = null;
        ResultSet rs = null;
        try {
            /*
            rs = connection.getMetaData().getSchemas(null, s);
            if (rs.next()) return;
            rs.close();
            rs = null;
            */
            StringBuilder sbSQL = new StringBuilder("CREATE SCHEMA ");
            sbSQL.append(s).append(' ');
            sbSQL.append("AUTHORIZATION ").append(_pgUsername);
            stmt = connection.createStatement();
            stmt.executeUpdate(sbSQL.toString());
            sbSQL = new StringBuilder("GRANT ALL ON SCHEMA ");
            sbSQL.append(s).append(' ');
            sbSQL.append("TO public");
            stmt.executeUpdate(sbSQL.toString());
        } catch (SQLException e) {
            logger.info("create schema:" + s + " has exception.");
            logger.info(e.getMessage(), e);
        } finally {
            if (rs != null) rs.close();
            if (stmt != null) stmt.close();
        }
    }
    public final void accumulateQueryTime() {
        queryTime += System.currentTimeMillis() - queryTimeStart;
    }
    public long getQueryTime() {
        return queryTime;
    }
    public final void markQueryTime() {
        queryTimeStart = System.currentTimeMillis();
    }
    public final void resetQueryTime() {
        queryTime = 0;
    }
    private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertDynamicColorTheme");
            return;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        PreparedStatement pstmt = null;
        try {
            DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
            String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_COLORTAB);
            rs.setFetchSize(50);
            createOrClearTargetTable(connectionPG, targetTableName,
                    "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
            pstmt = connectionPG.prepareStatement("INSERT INTO " +
                    encodeSchemaTableName(_pgSchema, targetTableName) +
                    " (tid, oid, dyncolor) VALUES (?, ?, ?)" );
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int colorId = rs.getInt(3);
                String colorText = colorTable.getColorCode(colorId);
                pstmt.setShort(1, (short) cid);
                pstmt.setInt(2, (int) oid);
                pstmt.setString(3, colorText);
                pstmt.addBatch();
                if (count % MAX_BATCHSIZE == 0) {
                    pstmt.executeBatch();
                }
                ++count;
            }
            pstmt.executeBatch();
            createTargetTableIndex(connectionPG, targetTableName);
            logger.info("Execute Update Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(pstmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
    }
    private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertPowerOwnerTheme");
            return;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        PreparedStatement pstmt = null;
        try {
            connectionPG.setAutoCommit(false);
            String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_CONNFDR);
            rs.setFetchSize(50);
            createOrClearTargetTable(connectionPG, targetTableName,
                    "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)");
            pstmt = connectionPG.prepareStatement("INSERT INTO " +
                    encodeSchemaTableName(_pgSchema, targetTableName) +
                    " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" );
            final int MAX_BATCHSIZE = 50;
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int ownerId = rs.getInt(3);
                short dirId = (short) rs.getInt(4);
                pstmt.setShort(1, (short) cid);
                pstmt.setInt(2, (int) oid);
                pstmt.setShort(3, (short) ownerId);
                ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
                if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
                        (ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
                    pstmt.setString(4, "shape://ccarrow");
                } else if ((ConnectivityDirectionEnum.BackflowON == dir) ||
                        (ConnectivityDirectionEnum.BackFixflowON == dir)) {
                    pstmt.setString(4, "shape://rccarrow");
                } else {
                    pstmt.setString(4, "shape://backslash");
                }
                pstmt.addBatch();
                if (count % MAX_BATCHSIZE == 0) {
                    pstmt.executeBatch();
                }
                ++count;
            }
            pstmt.executeBatch();
            createTargetTableIndex(connectionPG, targetTableName);
            logger.info("Execute Update Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(pstmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
    }
    private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE");
            }
            stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void createTargetTableIndex(Connection connection, String tableName) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                        " ADD PRIMARY KEY (tid, oid)");
            }
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
            throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI");
            return false;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        while (connectionPG instanceof DelegatingConnection) {
            connectionPG = ((DelegatingConnection) connectionPG).getDelegate();
        }
        if (!(connectionPG instanceof PGConnection)) {
            return false;
        }
        final int MAX_BATCHSIZE = 250;
        ResultSet rs = null;
        Statement stmt = null;
        try {
            // connectionPG.setAutoCommit(false);
            DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
            String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
            String targetTempName = "tmp_" + targetTableName;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_COLORTAB);
            rs.setFetchSize(MAX_BATCHSIZE);
            createOrClearTempTargetTable(connectionPG, targetTempName,
                    "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
            StringBuilder sb = new StringBuilder();
            CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
            PushbackReader reader = new PushbackReader(new StringReader(""), 10240);
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int colorId = rs.getInt(3);
                String colorText = colorTable.getColorCode(colorId);
                sb.append(cid).append(',');
                sb.append(oid).append(',');
                sb.append(colorText).append("\n");
                if (count % MAX_BATCHSIZE == 0) {
                    reader.unread(sb.toString().toCharArray());
                    cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
                    sb.delete(0, sb.length());
                }
                ++count;
            }
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
        return true;
    }
    private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
            throws IOException {
        if (context == null) {
            getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI");
            return false;
        }
        Connection connection = context.getOracleConnection();
        Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
        while (connectionPG instanceof DelegatingConnection) {
            connectionPG = ((DelegatingConnection) connectionPG).getDelegate();
        }
        if (!(connectionPG instanceof PGConnection)) {
            return false;
        }
        final int MAX_BATCHSIZE = 250;
        ResultSet rs = null;
        Statement stmt = null;
        try {
            // connectionPG.setAutoCommit(false);
            String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
            String targetTempName = "tmp_" + targetTableName;
            logger.info("target table:" + targetTableName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_CONNFDR);
            rs.setFetchSize(MAX_BATCHSIZE);
            createOrClearTempTargetTable(connectionPG, targetTempName,
                    "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)");
            StringBuilder sb = new StringBuilder();
            CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
            PushbackReader reader = new PushbackReader(new StringReader(""), 10240);
            int count = 0;
            while (rs.next()) {
                int cid = rs.getInt(1);
                long oid = rs.getLong(2);
                int ownerId = rs.getInt(3);
                short dirId = (short) rs.getInt(4);
                String flowMark = null;
                ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
                if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
                        (ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
                    flowMark = FORWARDFLOW_MARK;
                } else if ((ConnectivityDirectionEnum.BackflowON == dir) ||
                        (ConnectivityDirectionEnum.BackFixflowON == dir)) {
                    flowMark = BACKFLOW_MARK;
                } else if (ConnectivityDirectionEnum.Nondeterminate == dir) {
                    flowMark = NONFLOW_MARK;
                } else {
                    flowMark = UNFLOW_MARK;
                }
                sb.append(cid).append(',');
                sb.append(oid).append(',');
                sb.append(ownerId).append(',');
                sb.append(flowMark).append('\n');
                if (count % MAX_BATCHSIZE == 0) {
                    reader.unread(sb.toString().toCharArray());
                    cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
                    sb.delete(0, sb.length());
                }
                ++count;
            }
            reader.unread(sb.toString().toCharArray());
            cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
            createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
            logger.info("Execute Copy Count=" + count);
        } catch (SQLException e) {
            logger.info(e.getMessage(), e);
            throw new IOException(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
        }
        return true;
    }
    private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE");
            }
            stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException {
        Statement stmt = connection.createStatement();
        ResultSet rs = null;
        try {
            stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
            rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
            if (rs.next()) {
                stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
                        " ADD PRIMARY KEY (tid, oid)");
            }
            stmt.execute("DROP TABLE " + tempTable);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
}