From bf0b1352cd0f0701f3c870a3e25cf11c1e471b53 Mon Sep 17 00:00:00 2001 From: yuanhung <yuanhung@ximple.com.tw> Date: Mon, 22 Aug 2016 15:07:43 +0800 Subject: [PATCH] NDD RT用 --- xdgnjobs/ximple-jobcarrier/quartz_jobs_nddrtjob.xml | 194 +++++++++ xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java | 152 ------- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddrtUpdateJob.java | 860 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1,059 insertions(+), 147 deletions(-) diff --git a/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddrtjob.xml b/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddrtjob.xml new file mode 100644 index 0000000..a1e892f --- /dev/null +++ b/xdgnjobs/ximple-jobcarrier/quartz_jobs_nddrtjob.xml @@ -0,0 +1,194 @@ +<?xml version='1.0' encoding='utf-8'?> + +<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_1_8.xsd" + version="1.8"> + + <pre-processing-commands> + <delete-jobs-in-group>*</delete-jobs-in-group> + <!-- clear all jobs in scheduler --> + <delete-triggers-in-group>*</delete-triggers-in-group> + <!-- clear all triggers in scheduler --> + </pre-processing-commands> + + <processing-directives> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), overwrite them --> + <overwrite-existing-data>true</overwrite-existing-data> + <!-- if there are any jobs/trigger in scheduler of same name (as in this file), and over-write is false, ignore them rather then generating an error --> + <ignore-duplicates>false</ignore-duplicates> + </processing-directives> + + <schedule> + <job> + <name>ConvertDMMS2PostGisWithGeoserver</name> + <group>DEFAULT</group> + <description>A job that convert dgn to postgis</description> + <!--job-class>com.ximple.eofms.jobs.OracleConvertDgn2PostGISJob</job-class--> + <!--<job-class>com.ximple.eofms.jobs.GeoserverIntegrateConfigJob</job-class>--> + <job-class>com.ximple.eofms.jobs.DMMSNddrtUpdateJob</job-class> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwnerJob</job-class--> + <!--job-class>com.ximple.eofms.jobs.OracleTransformColorOwner2CSVJob</job-class--> + <!--volatility>false</volatility--> + <durability>false</durability> + <recover>false</recover> + <!--job-data-map allows-transient-data="true"--> + <job-data-map> + <entry> + <key>JOBDATA_DIR</key> + <value>C:/tmp/</value> + </entry> + <entry> + <key>PGHOST</key> + <value>10.10.1.3</value> + </entry> + <entry> + <key>PGDATBASE</key> + <value>pgDMMS</value> + </entry> + <entry> + <key>PGPORT</key> + <value>5432</value> + </entry> + <entry> + <key>PGSCHEMA</key> + <value>ndd</value> + </entry> + <entry> + <key>PGUSER</key> + <value>tpcdb</value> + </entry> + <entry> + <key>PGPASS</key> + <value>simple000</value> + </entry> + + <entry> + <key>ftpurl</key> + <value>ftp://10.10.1.3:21/</value> + </entry> + <entry> + <key>ftpdir</key> + <value>/tcdaas/ndddash/</value> + </entry> + + <entry> + <key>ftpuid</key> + <value>Administrator</value> + </entry> + + <entry> + <key>ftppwd</key> + <value>simple@000</value> + </entry> + + + <entry> + <key>ORAHOST</key> + <value>10.10.1.3</value> + </entry> + <entry> + <key>ORAINST</key> + <value>orcl</value> + </entry> + <entry> + <key>ORAPORT</key> + <value>1521</value> + </entry> + <entry> + <key>ORAUSER</key> + <value>system</value> + </entry> + <entry> + <key>ORAPASS</key> + <value>manager</value> + </entry> + <entry> + <key>ORGSCHEMA</key> + <!--value>SPATIALDB</value--> + <value>SPATIALDB, CMMS_SPATIALDB</value> + </entry> + <entry> + <key>CONVERTDB</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTFILE</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTELEMIN</key> + <value>false</value> + </entry> + <entry> + <key>CONVERTPWTHEMES</key> + <value>true</value> + </entry> + <entry> + <key>CREATEDUMMY</key> + <value>false</value> + </entry> + <entry> + <key>ELEMLOG</key> + <value>true</value> + </entry> + <entry> + <key>USEWKB</key> + <value>true</value> + </entry> + <entry> + <key>TESTMODE</key> + <value>false</value> + </entry> + <entry> + <key>TESTCOUNT</key> + <value>2</value> + </entry> + <entry> + <key>COPYCONNECTIVITYMODE</key> + <value>true</value> + </entry> + <entry> + <key>PROFILEMODE</key> + <value>true</value> + </entry> + <entry> + <key>USEZONE121</key> + <value>true</value> + </entry> + <entry> + <key>GEOSERVER_URL</key> + <value>http://10.10.1.19:8080/geoserver</value> + </entry> + <entry> + <key>GEOSERVER_USER</key> + <value>admin</value> + </entry> + <entry> + <key>GEOSERVER_PASS</key> + <value>geoserver</value> + </entry> + <entry> + <key>IGNORE_DBETL</key> + <value>false</value> + </entry> + </job-data-map> + </job> + + <trigger> + <simple> + <name>convertTrigger</name> + <group>DEFAULT</group> + <job-name>ConvertDMMS2PostGisWithGeoserver</job-name> + <job-group>DEFAULT</job-group> + <start-time>2013-03-01T18:00:00</start-time> + <!-- repeat indefinitely every 10 seconds --> + <repeat-count>0</repeat-count> + <repeat-interval>500</repeat-interval> + <!-- <repeat-interval>72000000</repeat-interval> --> + </simple> + </trigger> + + </schedule> + +</job-scheduling-data> diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java index b1d979d..097e221 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddUpdateJob.java @@ -318,7 +318,7 @@ return; } - String strStep0SQLNDD_rt="truncate table ndd.typhoon_rt"; + // String strStep0SQLNDD_rt="truncate table ndd.typhoon_rt"; logger.info("begin nddxml to postsql"); logger.info("getftpfile..."); @@ -396,10 +396,10 @@ t1= t1.substring(t1.length()-2); String insertDBSQL=" insert into ndd.nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever"+ ",yy,mm,dd,tt,ts_ser) values "; - String strStep1SQLNDD_rt="insert into ndd.typhoon_rt(dts,county_id,district_id,neighbor_id,c0,c1)"; + // String strStep1SQLNDD_rt="insert into ndd.typhoon_rt(dts,county_id,district_id,neighbor_id,c0,c1)"; - sqlExec(postsql,strStep0SQLNDD_rt,new String[]{}); +// sqlExec(postsql,strStep0SQLNDD_rt,new String[]{}); for(int j=0;j<arraySQLVals.size();j++) { @@ -410,11 +410,11 @@ yy+mm+dd+"."+t0+t1 ) , new String[]{}); - sqlExec(postsql,strStep1SQLNDD_rt+ + /* sqlExec(postsql,strStep1SQLNDD_rt+ String.format(" values('%s',%s )", yy+mm+dd+"."+t0+t1,arraySQLValsForTyphoon.get(j) ),new String[]{} - ); + );*/ } @@ -635,148 +635,6 @@ } - private void doJob3(Connection postsql, String[] info) throws SQLException - { - // double switch (if db = enable -->work) - //Here is check - Date dtnow = new Date(); - //get all file - //dept, count,dist,nei ,y,m,d,t,custom - // HashMap<String> - String typhoonName=""; - String typhoonID=""; - String department=""; - String county=""; - String district=""; - String neighbor=""; - String affectCustomers=""; - String affectCustomersEver=""; - String[] tmpArray; - String sTemp; - List<String> arraySQLVals= new ArrayList<String>(); - List<String> arraySQLValsForTyphoon= new ArrayList<String>(); - boolean bActiveCheckDBSchedule=true; - - - if(!jobOnLine(postsql, "nddcanton")&& bActiveCheckDBSchedule) - { - - return; - } - String strStep0SQLNDD_rt="truncate table ndd.typhoon_rt"; - - logger.info("begin nddroad to postsql"); - logger.info("getftpfile..."); - String[] xmls= getNDDStrings(info, "*NDSExt.xml") ; - - - logger.info(String.format("total %d file(s)",xmls.length)); - for(int iRow=0;iRow<xmls.length;iRow++) - { - arraySQLVals.clear(); - arraySQLValsForTyphoon.clear(); - tmpArray= xmls[iRow].split("\n"); - for(int iLine=0;iLine<tmpArray.length;iLine++) - { - sTemp= findValue(tmpArray[iLine],"typhoonName"); - if(sTemp.length()>0) - { - typhoonName= sTemp; - typhoonID= getTyphoonIDByName(postsql,typhoonName); - // - sTemp= findValue(tmpArray[iLine],"Department id"); - department=sTemp; - } - - sTemp= findValue(tmpArray[iLine],"county ufid"); - if(sTemp.length()>0) - { - county=sTemp; - } - sTemp= findValue(tmpArray[iLine],"district ufid"); - if(sTemp.length()>0) - { - district=sTemp; - } - sTemp= findValue(tmpArray[iLine],"neighbor ufid"); - if(sTemp.length()>0) - { - neighbor=sTemp; - sTemp= findValue(tmpArray[iLine],"affectCustomers"); - if(sTemp.length()>0) - { - affectCustomers=sTemp; - } - else - { - affectCustomers="0"; - } - - sTemp= findValue(tmpArray[iLine],"affectCustomersEver"); - if(sTemp.length()>0) - { - affectCustomersEver=sTemp; - } - else - { - affectCustomersEver="0"; - } - arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,%s,%s",typhoonID,department,county,district,neighbor,affectCustomers,affectCustomersEver)); - arraySQLValsForTyphoon.add(String.format("%s,%s,%s,%s,%s",county,district,neighbor,affectCustomers,affectCustomersEver)); - // insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever - //yy,mm,dd,tt - } - - } - //!! - String yy="0000"+String.valueOf( dtnow.getYear()+1900); - String mm="00"+String.valueOf( dtnow.getMonth()+1); - String dd="00"+String.valueOf( dtnow.getDate()); - String t0="00"+ String.valueOf( dtnow.getHours()); - String t1="00"+ String.valueOf( dtnow.getMinutes()); - yy= yy.substring(yy.length()-4); - mm= mm.substring(mm.length()-2); - dd= dd.substring(dd.length()-2); - t0= t0.substring(t0.length()-2); - t1= t1.substring(t1.length()-2); - String insertDBSQL=" insert into ndd.nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever"+ - ",yy,mm,dd,tt,ts_ser) values "; - String strStep1SQLNDD_rt="insert into ndd.typhoon_rt(dts,county_id,district_id,neighbor_id,c0,c1)"; - - - sqlExec(postsql,strStep0SQLNDD_rt,new String[]{}); - - for(int j=0;j<arraySQLVals.size();j++) - { - - sqlExec(postsql,insertDBSQL + arraySQLVals.get(j)+ - String.format(",%s,%s,%s,'%s%s',%s)", - yy,mm,dd,t0,t1, - yy+mm+dd+"."+t0+t1 - ) , - new String[]{}); - sqlExec(postsql,strStep1SQLNDD_rt+ - String.format(" values('%s',%s )", - yy+mm+dd+"."+t0+t1,arraySQLValsForTyphoon.get(j) - ),new String[]{} - ); - } - - - String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=1"; - sqlExec(postsql, - String.format(strSQLUpdateCurr, - yy,mm,dd,t0,t1 - ) , - new String[]{}); - logger.info(String.format("next xml")); - } - logger.info(String.format("done")); - - - - - } /* private void doJob(Connection postsql,Connection orcl) throws SQLException diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddrtUpdateJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddrtUpdateJob.java new file mode 100644 index 0000000..54ccb15 --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/DMMSNddrtUpdateJob.java @@ -0,0 +1,860 @@ +package com.ximple.eofms.jobs; + +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import com.ximple.eofms.util.*; +import com.ximple.io.dgn7.*; +import com.ximple.util.PrintfFormat; +import oracle.jdbc.OracleConnection; +import oracle.jdbc.OracleResultSet; +import oracle.sql.ARRAY; +import oracle.sql.BLOB; +import org.apache.commons.collections.OrderedMap; +import org.apache.commons.collections.OrderedMapIterator; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.dbcp.DelegatingConnection; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPReply; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.feature.SchemaException; +import org.geotools.jdbc.JDBCDataStore; +import org.opengis.feature.IllegalAttributeException; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import java.io.*; +import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.sql.*; +import java.util.*; +import java.util.Date; + +/** + * Created by Alchemist on 2016/08/22. + */ +/* +CREATE TABLE ndd.typhoon_version +( + name character varying(255) DEFAULT 'undefined'::character varying, + dts character varying(13) DEFAULT '00000000.0000'::character varying, + status integer NOT NULL DEFAULT 0, + id serial NOT NULL, + CONSTRAINT "PK_TYPHOON_VERSION_ID" PRIMARY KEY (id) +) +WITH ( + OIDS=FALSE +); +insert into ndd.typhoon_version (name,dts,status) values('typhoon_rt','20160822.0000',1) +*/ +public class DMMSNddrtUpdateJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(DMMSNddrtUpdateJob.class); + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 30; + private static final int COMMITSIZE = 100; + private static final String INDEXPATHNAME = "index"; + private static final String OTHERPATHNAME = "other"; + public static final String FORWARDFLOW_MARK = "shape://ccarrow"; + public static final String BACKFLOW_MARK = "shape://rccarrow"; + public static final String UNFLOW_MARK = "shape://backslash"; + public static final String NONFLOW_MARK = "shape://slash"; + + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; + + protected static class Pair { + Object first; + Object second; + + public Pair(Object first, Object second) { + this.first = first; + this.second = second; + } + } + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + protected Map<String, String> pgProperties; + protected JDBCDataStore targetDataStore; + // protected OracleConvertEdbGeoJobContext oracleJobContext; + + private long queryTime = 0; + private long queryTimeStart = 0; + + public Log getLogger() { + return logger; + } + + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, + boolean profileMode, + boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + /* + logger.info("PGHOST=" + _myHost); + logger.info("PGDATBASE=" + _myDatabase); + logger.info("PGPORT=" + _myPort); + logger.info("PGSCHEMA=" + _mySchema); + logger.info("PGUSER=" + _myUsername); + logger.info("PGPASS=" + _myPassword); + logger.info("USEWKB=" + _myUseWKB); + */ + + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + + String temp=""; + temp= dataMap.getString("ftpurl"); + if(temp==null) + { + logger.warn("not config ftpurl ->ftp://127.0.0.1:21/"); + temp="ftp://127.0.0.1:21/"; + } + remote.put("ftpurl", temp); + temp= dataMap.getString("ftpuid"); + if(temp==null) + { + temp="anonymous"; + } + remote.put("ftpuid", temp); + + temp= dataMap.getString("ftppwd"); + if(temp==null) + { + temp=""; + } + remote.put("ftppwd", temp); + + temp= dataMap.getString("ftpdir"); + if(temp==null) + { + temp="tcdaas/featureImg"; + } + remote.put("ftpdir", temp); + pgProperties = remote; + } + + + + private List<String[]> sqlExecQuery(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery(strSQL.toString()); + // get first result + // temp.clear(); + + ResultSetMetaData rsmd = rs.getMetaData(); + int NumOfCol = rsmd.getColumnCount(); + + while (rs.next()) { + for (int idx = 0; idx < NumOfCol; idx++) { + strTemp = rs.getString(idx + 1); + temp.add(strTemp); + } + result.add(temp.toArray(new String[0])); + temp.clear(); + } + return result; + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + private void sqlExec(Connection connection,String strSQLIn,String[] params) throws SQLException { + + String strSQL=strSQLIn; + for(int i=0;i<params.length;i++) + { + if(params[i]==null)params[i]=""; + strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]); + } + List<String[]> result=new ArrayList<String[]>(); + List<String> temp = new ArrayList<String>(); + String strTemp=""; + // String result = null; + Statement stmt = null; + ResultSet rs = null; + + + try { + stmt = connection.createStatement(); + stmt.execute( strSQL.toString()); + // get first result + // temp.clear(); + + + } finally { + // JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private String findValue(String strSource,String findTag) + { + int idx=-1; int iStart=-1; int iEnd=-1; + idx=strSource.indexOf(findTag); + if(idx<0) return ""; + iStart= strSource.indexOf("\"",idx); + iEnd= strSource.indexOf("\"",iStart+1); + return strSource.substring(iStart+1,iEnd); + } + + + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + /* + if (!isDriverFound()) + { + throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); + } + */ + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + /* + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) { + pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true"); + } + */ + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + + private void doJob(Connection postsql, String[] info) throws SQLException + { + // double switch (if db = enable -->work) + //Here is check + Date dtnow = new Date(); + //get all file + //dept, count,dist,nei ,y,m,d,t,custom + // HashMap<String> + String typhoonName=""; + String typhoonID=""; + String department=""; + String county=""; + String district=""; + String neighbor=""; + String affectCustomers=""; + String affectCustomersEver=""; + String[] tmpArray; + String sTemp; + List<String> arraySQLVals= new ArrayList<String>(); + List<String> arraySQLValsForTyphoon= new ArrayList<String>(); + boolean bActiveCheckDBSchedule=true; + String strDts=""; + + if(!jobOnLine(postsql, "nddcanton")&& bActiveCheckDBSchedule) + { + + return; + } + + String[] strTableList=new String []{ "1","2" }; + String strOnlineTableNum=jobOnlineStatus(postsql, "typhoon_rt"); + // String strNextTableNum=""; + String tableCurr=strTableList[strTableList.length-1]; + String tableNext="";//strTableList[strTableList.length-1] ; + + for (int i=0;i<strTableList.length;i++){ + tableNext=tableCurr; + tableCurr=strTableList[i]; + + + if(strTableList[i].equals( strOnlineTableNum)) + { + + break; + } + + } + + String strStep0SQLNDD_rt="truncate table ndd.typhoon_rt"+tableNext; + + + logger.info("begin nddxml to postsql(rt) "); + logger.info("getftpfile..."); + String[] xmls= getNDDStrings(info, "neighbor_affect_customers.xml") ; + + logger.info(String.format("total %d file(s)",xmls.length)); + for(int iRow=0;iRow<xmls.length;iRow++) + { + arraySQLVals.clear(); + arraySQLValsForTyphoon.clear(); + tmpArray= xmls[iRow].split("\n"); + for(int iLine=0;iLine<tmpArray.length;iLine++) + { + sTemp= findValue(tmpArray[iLine],"typhoonName"); + if(sTemp.length()>0) + { + typhoonName= sTemp; + typhoonID= getTyphoonIDByName(postsql,typhoonName); + // + sTemp= findValue(tmpArray[iLine],"Department id"); + department=sTemp; + } + + sTemp= findValue(tmpArray[iLine],"county ufid"); + if(sTemp.length()>0) + { + county=sTemp; + } + sTemp= findValue(tmpArray[iLine],"district ufid"); + if(sTemp.length()>0) + { + district=sTemp; + } + sTemp= findValue(tmpArray[iLine],"neighbor ufid"); + if(sTemp.length()>0) + { + neighbor=sTemp; + sTemp= findValue(tmpArray[iLine],"affectCustomers"); + if(sTemp.length()>0) + { + affectCustomers=sTemp; + } + else + { + affectCustomers="0"; + } + + sTemp= findValue(tmpArray[iLine],"affectCustomersEver"); + if(sTemp.length()>0) + { + affectCustomersEver=sTemp; + } + else + { + affectCustomersEver="0"; + } + arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,%s,%s",typhoonID,department,county,district,neighbor,affectCustomers,affectCustomersEver)); + arraySQLValsForTyphoon.add(String.format("%s,%s,%s,%s,%s",county,district,neighbor,affectCustomers,affectCustomersEver)); + // insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever + //yy,mm,dd,tt + } + + } + //!! + String yy="0000"+String.valueOf( dtnow.getYear()+1900); + String mm="00"+String.valueOf( dtnow.getMonth()+1); + String dd="00"+String.valueOf( dtnow.getDate()); + String t0="00"+ String.valueOf( dtnow.getHours()); + String t1="00"+ String.valueOf( dtnow.getMinutes()); + yy= yy.substring(yy.length()-4); + mm= mm.substring(mm.length()-2); + dd= dd.substring(dd.length()-2); + t0= t0.substring(t0.length()-2); + t1= t1.substring(t1.length()-2); + strDts=yy+mm+dd+"."+t0+t1; + // String insertDBSQL=" insert into ndd.nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever"+ + // ",yy,mm,dd,tt,ts_ser) values "; + String strStep1SQLNDD_rt="insert into ndd.typhoon_rt"+tableNext+ "(dts,county_id,district_id,neighbor_id,c0,c1)"; + + + sqlExec(postsql,strStep0SQLNDD_rt,new String[]{}); + + for(int j=0;j<arraySQLVals.size();j++) + { + /* + sqlExec(postsql,insertDBSQL + arraySQLVals.get(j)+ + String.format(",%s,%s,%s,'%s%s',%s)", + yy,mm,dd,t0,t1, + yy+mm+dd+"."+t0+t1 + ) , + new String[]{});*/ + + sqlExec(postsql,strStep1SQLNDD_rt+ + String.format(" values('%s',%s )", + yy+mm+dd+"."+t0+t1,arraySQLValsForTyphoon.get(j) + ),new String[]{} + ); + } + + String strStep2SQLNDD_rt="CREATE OR REPLACE VIEW ndd.v_typhoon_rt AS " + + " SELECT dts, county_id, district_id, neighbor_id, c0, c1, id "+ + " FROM ndd.typhoon_rt" + tableNext; + + //switch + if(!tableNext.equals(tableCurr)) + sqlExec(postsql, + strStep2SQLNDD_rt, + new String[]{}); + + String strStep3SQLNDD_rt="update ndd.typhoon_version set " + + " dts= '"+strDts+"',"+ "status=" +tableNext + " where name='typhoon_rt'"; + + + sqlExec(postsql, + strStep3SQLNDD_rt, + new String[]{}); + // String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=1"; + /* + sqlExec(postsql, + String.format(strSQLUpdateCurr, + yy,mm,dd,t0,t1 + ) , + new String[]{});*/ + logger.info(String.format("next xml")); + } + logger.info(String.format("done")); + + + + + } + + + public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + if (isIgnoreDBETL()) { + return; + } + + //createSourceDataStore(); + createTargetDataStore(); + /* + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + */ + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName, targetThemeTable; + try { + //logger.info("-- step:clearOutputDatabase --"); + doJob(targetDataStore.getConnection(Transaction.AUTO_COMMIT),new String[]{ + pgProperties.get("ftpurl"), + pgProperties.get("ftpuid"), + pgProperties.get("ftppwd"), + pgProperties.get("ftpdir") + }); + + // doJob( targetDataStore.getConnection(Transaction.AUTO_COMMIT),sourceDataStore.getConnection(Transaction.AUTO_COMMIT) ); + + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + + }finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + String [] siteInfo=new String[]{"ftp://10.10.1.9:21","DMMS","DMMS000"} ; + + private String[] getNDDList(String[] info){ + String url=info[3]; + String ftp=info[0]; + String uid=info[1]; + String pwd=info[2]; + // List<MapItemValue> tmp= dmmsSite.getFtpList(site); + //for(int i=0;i<tmp.size();i++) + //{ + // if(tmp.get(i).getGroupName().equals("featureimg")) + // { + // url="/tcdaas/ndddash/"; + String [] fileNow=getFileList(ftp,uid,pwd,url,""); + return fileNow ; + // } + //} + //return new String[]{}; + } + + private byte[] getNDDDash(String[] info, String dirname, String filename) { + String url="";//info[3]; + String ftp=info[0]; + String uid=info[1]; + String pwd=info[2]; + + dirname= dirname.replace("[.]","_"); //防hack + filename= filename.replace("[/]","_"); //防hack + // List<MapItemValue> tmp= dmmsSite.getFtpList(site); + String[] temp=dirname.split("/"); + dirname= temp[temp.length-1]; + + // for(int i=0;i<tmp.size();i++) + // { + // if(tmp.get(i).getGroupName().equals("featureimg")) + // { + url=info[3]+dirname+"/"; + + + byte[] bytes= getFile(ftp,uid,pwd,url,filename); + return bytes; + // return new FileTransfer(filename, "application/octet-stream",bytes); + // } + // } + // return null; + } + + private String[] getFileList(String urlString,String ftpUser,String ftpPwd,String filePath, String filter){ + FTPClient ftpClient=null; + try{ + ftpClient= new FTPClient(); + }catch(Throwable ex) + { + ex.getMessage(); + } + + URL url; + + // /tcdaas/dsbncard -- for feature D + // /tcdaas/mhole -- for feature D + // /tcdaas/featureimg -- for feature U/D attached and LIST + try{ + url= new URL(urlString);//"ftp://20.20.1.3:21/"); + ftpClient.connect( + url.getHost(),url.getPort() + ); + + if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000")) + { + return null; + } + int reply = ftpClient.getReplyCode(); + //FTPReply stores a set of constants for FTP reply codes. + + if (!FTPReply.isPositiveCompletion(reply)) + { + ftpClient.disconnect(); + return null; + } + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + //enter passive mode + ftpClient.enterLocalPassiveMode(); + + String[] filelist=ftpClient.listNames(filePath + (filter == null ? "" : filter)) ; + ftpClient.disconnect(); + return filelist; + }catch(MalformedURLException urlex) + { + + } catch (Exception ex) + { + + } + return new String[]{}; + } + + private byte[] getFile(String urlString,String ftpUser,String ftpPwd,String filePath,String fileName){ + FTPClient ftpClient= new FTPClient(); + + URL url; + byte[] result; + // /tcdaas/dsbncard -- for feature D + // /tcdaas/mhole -- for feature D + // /tcdaas/featureimg -- for feature U/D attached and LIST + try{ + url= new URL(urlString);//"ftp://20.20.1.3:21/"); + ftpClient.connect( + url.getHost(),url.getPort() + ); + + if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000")) + { + return null; + } + int reply = ftpClient.getReplyCode(); + //FTPReply stores a set of constants for FTP reply codes. + + if (!FTPReply.isPositiveCompletion(reply)) + { + ftpClient.disconnect(); + return null; + } + ftpClient.setFileType(FTP.BINARY_FILE_TYPE); + //enter passive mode + ftpClient.enterLocalPassiveMode(); + + String[] filelist=ftpClient.listNames(filePath+ fileName) ; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + if(filelist.length>0) + { + //if(ftpClient.retrieveFile(filelist[0],bos)) + if(ftpClient.retrieveFile(filePath+fileName,bos)) + { + result= bos.toByteArray() ; //negative numbers can use (b)&0xff + bos.close(); + } + else + { + result=null; + try{ + bos.close(); + } catch (Exception ex) + { + + } + } + } + else + { + result=null; + } + + ftpClient.disconnect(); + + }catch(MalformedURLException urlex) + { + result=null; + } catch (Exception ex) + { + result=null; + } + return result; + } + public String[] getNDDStrings(String[] info, String filename) + { + byte[] temp; + // String stie=getUserDept(); + + // String[] list=dmmsFtpClient.getNDDList(stie) ; + String[] list =getNDDList(info) ; + + List<String> lstXML= new ArrayList<String>(); + for(int i=0;i<list.length;i++) + { + temp=getNDDDash(info, list[i], filename) ; + try{ + if(temp!=null) lstXML.add(new String(temp,"UTF-8")); + } catch (UnsupportedEncodingException ex) { + // this should never happen because "UTF-8" is hard-coded. + throw new IllegalStateException(ex); + } + } + if(lstXML.size()>0) + return lstXML. toArray(new String[0]); + + return new String[]{}; + } + + + private static Map<String, String> ditTyphoon = new HashMap<String, String>(); + + public String getTyphoonIDByName(Connection postsql,String typhoonName) throws SQLException + { + if(ditTyphoon.containsKey(typhoonName)) + { + return ditTyphoon.get(typhoonName); + }else + { + return readOrCreateTyphoonByName(postsql,typhoonName); + //readOrCreateTyphoon; + } + } + public String readOrCreateTyphoonByName(Connection postsql,String typhoonName) throws SQLException + { + //targetDataStore + //time of create should be modify + List<String[]> listDict; + String strSQLSelectProject=String.format( "select typhoon_id,typhoon_name from ndd.typhoonproject where typhoon_name='%s'",typhoonName); + String strSQLInsertProject=String.format( "insert into ndd.typhoonproject (typhoon_name,row_created) values ('%s',now())",typhoonName); + + listDict= sqlExecQuery(postsql,strSQLSelectProject,new String[]{}); + //boolean bCreate=false; + if(listDict!=null) + { + if(listDict.size()>0) + { + for(int i=0;i<listDict.size();i++) + { + return addDict(listDict.get(i)[0],listDict.get(i)[1]); + } + } + } + //bCreate=true; + //insert + logger.info(String.format("new project:%s",typhoonName)); + sqlExec(postsql,strSQLInsertProject,new String[]{}); + return readOrCreateTyphoonByName(postsql, typhoonName) ; + } + private synchronized static String addDict(String id,String typhoon) + { + if(ditTyphoon.containsKey(typhoon)) + return ditTyphoon.get(typhoon); + else + ditTyphoon.put(typhoon,id); + return id; + } + + public boolean jobOnLine(Connection postsql,String jobname) throws SQLException + { + //working when jobname=1 + //targetDataStore + //time of create should be modify + List<String[]> listDict; + String strSQLSelectSchedule=String.format( "select enabled from ndd.schedule where name='%s'",jobname); + + listDict= sqlExecQuery(postsql,strSQLSelectSchedule,new String[]{}); + if(listDict.size()==0)return false; // not exist ->dont work + return listDict.get(0)[0].equals("1"); + } + + public String jobOnlineStatus(Connection postsql,String jobname) throws SQLException + { + //working when jobname=1 + //targetDataStore + //time of create should be modify + List<String[]> listDict; + String strSQLSelectSchedule=String.format( "select status from ndd.typhoon_version where name='%s'",jobname); + + listDict= sqlExecQuery(postsql,strSQLSelectSchedule,new String[]{}); + if(listDict.size()==0)return ""; // not exist ->dont work + return listDict.get(0)[0]; + } +} -- Gitblit v0.0.0-SNAPSHOT