package com.ximple.eofms.jobs;
|
|
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
|
import com.ximple.eofms.jobs.context.postgis.FeatureDgnConvertPostGISJobContext;
|
import com.ximple.eofms.jobs.context.postgis.GeneralDgnConvertPostGISJobContext;
|
import com.ximple.eofms.jobs.context.postgis.IndexDgnConvertPostGISJobContext;
|
import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext;
|
import com.ximple.eofms.util.*;
|
import com.ximple.io.dgn7.*;
|
import com.ximple.util.PrintfFormat;
|
import oracle.jdbc.OracleConnection;
|
import oracle.jdbc.OracleResultSet;
|
import oracle.sql.ARRAY;
|
import oracle.sql.BLOB;
|
import org.apache.commons.collections.OrderedMap;
|
import org.apache.commons.collections.OrderedMapIterator;
|
import org.apache.commons.collections.map.LinkedMap;
|
import org.apache.commons.dbcp.DelegatingConnection;
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.net.ftp.FTP;
|
import org.apache.commons.net.ftp.FTPClient;
|
import org.apache.commons.net.ftp.FTPReply;
|
import org.geotools.data.DataStore;
|
import org.geotools.data.Transaction;
|
import org.geotools.data.jdbc.JDBCUtils;
|
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
|
import org.geotools.feature.SchemaException;
|
import org.geotools.jdbc.JDBCDataStore;
|
import org.opengis.feature.IllegalAttributeException;
|
import org.postgresql.PGConnection;
|
import org.postgresql.copy.CopyManager;
|
import org.quartz.JobDataMap;
|
import org.quartz.JobDetail;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
|
import java.io.*;
|
import java.math.BigDecimal;
|
import java.net.MalformedURLException;
|
import java.net.URL;
|
import java.nio.BufferOverflowException;
|
import java.nio.ByteBuffer;
|
import java.nio.ByteOrder;
|
import java.nio.channels.FileChannel;
|
import java.sql.*;
|
import java.util.*;
|
import java.util.Date;
|
/**
|
* Created by Alchemist on 2014/4/7.
|
*/
|
public class DMMSNddUpdateJob extends AbstractOracleDatabaseJob {
|
final static Log logger = LogFactory.getLog(DMMSNddUpdateJob.class);
|
|
private static final String PGHOST = "PGHOST";
|
private static final String PGDATBASE = "PGDATBASE";
|
private static final String PGPORT = "PGPORT";
|
private static final String PGSCHEMA = "PGSCHEMA";
|
private static final String PGUSER = "PGUSER";
|
private static final String PGPASS = "PGPASS";
|
private static final String USEWKB = "USEWKB";
|
|
private static final boolean useTpclidText = false;
|
|
private static final int FETCHSIZE = 30;
|
private static final int COMMITSIZE = 100;
|
private static final String INDEXPATHNAME = "index";
|
private static final String OTHERPATHNAME = "other";
|
public static final String FORWARDFLOW_MARK = "shape://ccarrow";
|
public static final String BACKFLOW_MARK = "shape://rccarrow";
|
public static final String UNFLOW_MARK = "shape://backslash";
|
public static final String NONFLOW_MARK = "shape://slash";
|
|
private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC";
|
private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
|
|
private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)";
|
private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)";
|
|
public static final String FDYNCOLOR_SUFFIX = "_fdyncolor";
|
public static final String FOWNER_SUFFIX = "_fowner";
|
|
protected static class Pair {
|
Object first;
|
Object second;
|
|
public Pair(Object first, Object second) {
|
this.first = first;
|
this.second = second;
|
}
|
}
|
|
protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
|
|
protected String _pgHost;
|
protected String _pgDatabase;
|
protected String _pgPort;
|
protected String _pgSchema;
|
protected String _pgUsername;
|
protected String _pgPassword;
|
protected String _pgUseWKB;
|
|
protected Map<String, String> pgProperties;
|
protected JDBCDataStore targetDataStore;
|
// protected OracleConvertEdbGeoJobContext oracleJobContext;
|
|
private long queryTime = 0;
|
private long queryTimeStart = 0;
|
|
public Log getLogger() {
|
return logger;
|
}
|
|
protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath,
|
boolean profileMode,
|
boolean useTransform) {
|
return new OracleConvertPostGISJobContext(getDataPath(),
|
getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
|
}
|
|
protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
|
super.extractJobConfiguration(jobDetail);
|
JobDataMap dataMap = jobDetail.getJobDataMap();
|
_pgHost = dataMap.getString(PGHOST);
|
_pgDatabase = dataMap.getString(PGDATBASE);
|
_pgPort = dataMap.getString(PGPORT);
|
_pgSchema = dataMap.getString(PGSCHEMA);
|
_pgUsername = dataMap.getString(PGUSER);
|
_pgPassword = dataMap.getString(PGPASS);
|
_pgUseWKB = dataMap.getString(USEWKB);
|
|
Log logger = getLogger();
|
/*
|
logger.info("PGHOST=" + _myHost);
|
logger.info("PGDATBASE=" + _myDatabase);
|
logger.info("PGPORT=" + _myPort);
|
logger.info("PGSCHEMA=" + _mySchema);
|
logger.info("PGUSER=" + _myUsername);
|
logger.info("PGPASS=" + _myPassword);
|
logger.info("USEWKB=" + _myUseWKB);
|
*/
|
|
if (_pgHost == null) {
|
logger.warn("PGHOST is null");
|
throw new JobExecutionException("Unknown PostGIS host.");
|
}
|
if (_pgDatabase == null) {
|
logger.warn("PGDATABASE is null");
|
throw new JobExecutionException("Unknown PostGIS database.");
|
}
|
if (_pgPort == null) {
|
logger.warn("PGPORT is null");
|
throw new JobExecutionException("Unknown PostGIS port.");
|
}
|
if (_pgSchema == null) {
|
logger.warn("PGSCHEMA is null");
|
throw new JobExecutionException("Unknown PostGIS schema.");
|
}
|
if (_pgUsername == null) {
|
logger.warn("PGUSERNAME is null");
|
throw new JobExecutionException("Unknown PostGIS username.");
|
}
|
if (_pgPassword == null) {
|
logger.warn("PGPASSWORD is null");
|
throw new JobExecutionException("Unknown PostGIS password.");
|
}
|
|
Map<String, String> remote = new TreeMap<String, String>();
|
remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
|
// remote.put("charset", "UTF-8");
|
remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
|
remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
|
remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
|
remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
|
remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
|
// remote.put( "namespace", null);
|
|
String temp="";
|
temp= dataMap.getString("ftpurl");
|
if(temp==null)
|
{
|
logger.warn("not config ftpurl ->ftp://127.0.0.1:21/");
|
temp="ftp://127.0.0.1:21/";
|
}
|
remote.put("ftpurl", temp);
|
temp= dataMap.getString("ftpuid");
|
if(temp==null)
|
{
|
temp="anonymous";
|
}
|
remote.put("ftpuid", temp);
|
|
temp= dataMap.getString("ftppwd");
|
if(temp==null)
|
{
|
temp="";
|
}
|
remote.put("ftppwd", temp);
|
|
temp= dataMap.getString("ftpdir");
|
if(temp==null)
|
{
|
temp="tcdaas/featureImg";
|
}
|
remote.put("ftpdir", temp);
|
pgProperties = remote;
|
}
|
|
|
|
private List<String[]> sqlExecQuery(Connection connection,String strSQLIn,String[] params) throws SQLException {
|
|
String strSQL=strSQLIn;
|
for(int i=0;i<params.length;i++)
|
{
|
if(params[i]==null)params[i]="";
|
strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]);
|
}
|
List<String[]> result=new ArrayList<String[]>();
|
List<String> temp = new ArrayList<String>();
|
String strTemp="";
|
// String result = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
|
|
try {
|
stmt = connection.createStatement();
|
rs = stmt.executeQuery(strSQL.toString());
|
// get first result
|
// temp.clear();
|
|
ResultSetMetaData rsmd = rs.getMetaData();
|
int NumOfCol = rsmd.getColumnCount();
|
|
while (rs.next()) {
|
for (int idx = 0; idx < NumOfCol; idx++) {
|
strTemp = rs.getString(idx + 1);
|
temp.add(strTemp);
|
}
|
result.add(temp.toArray(new String[0]));
|
temp.clear();
|
}
|
return result;
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
private void sqlExec(Connection connection,String strSQLIn,String[] params) throws SQLException {
|
|
String strSQL=strSQLIn;
|
for(int i=0;i<params.length;i++)
|
{
|
if(params[i]==null)params[i]="";
|
strSQL=strSQL.replace("%s"+String.valueOf(i+1),params[i]);
|
}
|
List<String[]> result=new ArrayList<String[]>();
|
List<String> temp = new ArrayList<String>();
|
String strTemp="";
|
// String result = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
|
|
try {
|
stmt = connection.createStatement();
|
stmt.execute( strSQL.toString());
|
// get first result
|
// temp.clear();
|
|
|
} finally {
|
// JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private String findValue(String strSource,String findTag)
|
{
|
int idx=-1; int iStart=-1; int iEnd=-1;
|
idx=strSource.indexOf(findTag);
|
if(idx<0) return "";
|
iStart= strSource.indexOf("\"",idx);
|
iEnd= strSource.indexOf("\"",iStart+1);
|
return strSource.substring(iStart+1,iEnd);
|
}
|
|
|
|
|
private void doJob(Connection postsql, String[] info) throws SQLException
|
{
|
// double switch (if db = enable -->work)
|
//Here is check
|
Date dtnow = new Date();
|
//get all file
|
//dept, count,dist,nei ,y,m,d,t,custom
|
// HashMap<String>
|
String typhoonName="";
|
String typhoonID="";
|
String department="";
|
String county="";
|
String district="";
|
String neighbor="";
|
String affectCustomers="";
|
String affectCustomersEver="";
|
String[] tmpArray;
|
String sTemp;
|
List<String> arraySQLVals= new ArrayList<String>();
|
|
if(!jobOnLine(postsql, "nddcanton"))
|
{
|
|
return;
|
}
|
logger.info("begin nddxml to postsql");
|
logger.info("getftpfile...");
|
String[] xmls= getNDDStrings(info, "neighbor_affect_customers.xml") ;
|
logger.info(String.format("total %d file(s)",xmls.length));
|
for(int iRow=0;iRow<xmls.length;iRow++)
|
{
|
tmpArray= xmls[iRow].split("\n");
|
for(int iLine=0;iLine<tmpArray.length;iLine++)
|
{
|
sTemp= findValue(tmpArray[iLine],"typhoonName");
|
if(sTemp.length()>0)
|
{
|
typhoonName= sTemp;
|
typhoonID= getTyphoonIDByName(postsql,typhoonName);
|
//
|
sTemp= findValue(tmpArray[iLine],"Department id");
|
department=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"county ufid");
|
if(sTemp.length()>0)
|
{
|
county=sTemp;
|
}
|
sTemp= findValue(tmpArray[iLine],"district ufid");
|
if(sTemp.length()>0)
|
{
|
district=sTemp;
|
}
|
sTemp= findValue(tmpArray[iLine],"neighbor ufid");
|
if(sTemp.length()>0)
|
{
|
neighbor=sTemp;
|
sTemp= findValue(tmpArray[iLine],"affectCustomers");
|
if(sTemp.length()>0)
|
{
|
affectCustomers=sTemp;
|
}
|
else
|
{
|
affectCustomers="0";
|
}
|
|
sTemp= findValue(tmpArray[iLine],"affectCustomersEver");
|
if(sTemp.length()>0)
|
{
|
affectCustomersEver=sTemp;
|
}
|
else
|
{
|
affectCustomersEver="0";
|
}
|
arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,%s,%s",typhoonID,department,county,district,neighbor,affectCustomers,affectCustomersEver));
|
// insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever
|
//yy,mm,dd,tt
|
}
|
|
}
|
//!!
|
String yy="0000"+String.valueOf( dtnow.getYear()+1900);
|
String mm="00"+String.valueOf( dtnow.getMonth()+1);
|
String dd="00"+String.valueOf( dtnow.getDate());
|
String t0="00"+ String.valueOf( dtnow.getHours());
|
String t1="00"+ String.valueOf( dtnow.getMinutes());
|
yy= yy.substring(yy.length()-4);
|
mm= mm.substring(mm.length()-2);
|
dd= dd.substring(dd.length()-2);
|
t0= t0.substring(t0.length()-2);
|
t1= t1.substring(t1.length()-2);
|
String insertDBSQL=" insert into ndd.nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever"+
|
",yy,mm,dd,tt) values ";
|
for(int j=0;j<arraySQLVals.size();j++)
|
{
|
sqlExec(postsql,insertDBSQL + arraySQLVals.get(j)+
|
String.format(",%s,%s,%s,'%s%s')",
|
yy,mm,dd,t0,t1
|
|
) ,
|
new String[]{});
|
|
}
|
|
String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=1";
|
sqlExec(postsql,
|
String.format(strSQLUpdateCurr,
|
yy,mm,dd,t0,t1
|
) ,
|
new String[]{});
|
logger.info(String.format("next xml"));
|
}
|
logger.info(String.format("done"));
|
|
|
|
|
}
|
private void doJob2(Connection postsql, String[] info) throws SQLException
|
{
|
// double switch (if db = enable -->work)
|
//Here is check
|
Date dtnow = new Date();
|
//get all file
|
//dept, count,dist,nei ,y,m,d,t,custom
|
// HashMap<String>
|
String typhoonName="";
|
String typhoonID="";
|
String department="";
|
String department_id="";
|
String substation="";
|
String substation_ufid="";
|
String substation_affectCustomers="";
|
String substation_nopower="";
|
|
String mxfmr_name="";
|
String mxfmr_ufid="";
|
String mxfmr_affectCustomers="";
|
String mxfmr_nopower="";
|
|
String feeder_name="";
|
String feeder_id="";
|
String feeder_affectCustomers="";
|
String feeder_nopower="";
|
|
String[] tmpArray;
|
String sTemp;
|
List<String> arraySQLVals= new ArrayList<String>();
|
|
if(!jobOnLine(postsql, "nddfeeder"))
|
{
|
|
return;
|
}
|
|
String yy="0000"+String.valueOf( dtnow.getYear()+1900);
|
String mm="00"+String.valueOf( dtnow.getMonth()+1);
|
String dd="00"+String.valueOf( dtnow.getDate());
|
String t0="00"+ String.valueOf( dtnow.getHours());
|
String t1="00"+ String.valueOf( dtnow.getMinutes());
|
yy= yy.substring(yy.length()-4);
|
mm= mm.substring(mm.length()-2);
|
dd= dd.substring(dd.length()-2);
|
t0= t0.substring(t0.length()-2);
|
t1= t1.substring(t1.length()-2);
|
|
logger.info("begin nddxml(feeder) to postsql");
|
logger.info("getftpfile...");
|
String[] xmls= getNDDStrings(info, "feeder_affect_customers.xml") ;
|
logger.info(String.format("total %d file(s)",xmls.length));
|
for(int iRow=0;iRow<xmls.length;iRow++)
|
{
|
arraySQLVals.clear();
|
tmpArray= xmls[iRow].split("\n");
|
for(int iLine=0;iLine<tmpArray.length;iLine++)
|
{
|
sTemp= findValue(tmpArray[iLine],"typhoonName");
|
if(sTemp.length()>0)
|
{
|
typhoonName= sTemp;
|
typhoonID= getTyphoonIDByName(postsql,typhoonName);
|
//
|
sTemp= findValue(tmpArray[iLine],"Department id");
|
department_id=sTemp;
|
|
sTemp= findValue(tmpArray[iLine],"name");
|
department=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"Substation name");
|
if(sTemp.length()>0)
|
{
|
substation=sTemp;
|
sTemp= findValue(tmpArray[iLine],"ufid");
|
if(sTemp.length()>0)
|
{
|
substation_ufid=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"affectCustomers");
|
if(sTemp.length()>0)
|
{
|
substation_affectCustomers=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"noPowerAll");
|
if(sTemp.length()>0)
|
{
|
substation_nopower=sTemp;
|
}
|
//
|
arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')",
|
typhoonID,
|
department_id,substation_ufid,"-1","-1",
|
department,substation," "," ",
|
substation_affectCustomers,substation_nopower,
|
yy,mm,dd,t0,t1));
|
}
|
|
|
sTemp= findValue(tmpArray[iLine],"Mxfmr name");
|
if(sTemp.length()>0)
|
{
|
mxfmr_name=sTemp;
|
sTemp= findValue(tmpArray[iLine],"ufid");
|
if(sTemp.length()>0)
|
{
|
mxfmr_ufid=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"affectCustomers");
|
if(sTemp.length()>0)
|
{
|
mxfmr_affectCustomers=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"noPowerAll");
|
if(sTemp.length()>0)
|
{
|
mxfmr_nopower=sTemp;
|
}
|
arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')",
|
typhoonID,
|
department_id,substation_ufid,mxfmr_ufid,"-1",
|
department,substation,mxfmr_name," ",
|
mxfmr_affectCustomers,mxfmr_nopower,
|
yy,mm,dd,t0,t1));
|
}
|
|
sTemp= findValue(tmpArray[iLine],"Feeder name");
|
if(sTemp.length()>0)
|
{
|
feeder_name=sTemp;
|
sTemp= findValue(tmpArray[iLine],"id");
|
if(sTemp.length()>0)
|
{
|
feeder_id=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"affectCustomers");
|
if(sTemp.length()>0)
|
{
|
feeder_affectCustomers=sTemp;
|
}
|
|
sTemp= findValue(tmpArray[iLine],"noPowerAll");
|
if(sTemp.length()>0)
|
{
|
feeder_nopower=sTemp;
|
}
|
arraySQLVals.add(String.format("(%s,%s,%s,%s,%s,'%s','%s','%s','%s',%s,%s,%s,%s,%s,'%s%s')",
|
typhoonID,
|
department_id,substation_ufid,mxfmr_ufid,feeder_id,
|
department,substation,mxfmr_name,feeder_name,
|
feeder_affectCustomers,feeder_nopower,
|
yy,mm,dd,t0,t1));
|
}
|
// insert into nddcanton_history (project_id,department_id,county_id,district_id,neighbor_id,affectcustomers,affectcustomersever
|
//yy,mm,dd,tt
|
|
|
}
|
//!!
|
|
String insertDBSQL=
|
" insert into ndd.nddfeeder_history (project_id,department_id,substation_id,mxfmr_id,feeder_id,"+
|
"department_name,substation_name,mxfmr_name,feeder_name,"+
|
"affectCustomers,nopower"+
|
",yy,mm,dd,tt) values ";
|
for(int j=0;j<arraySQLVals.size();j++)
|
{
|
sqlExec(postsql,insertDBSQL + arraySQLVals.get(j)
|
,
|
new String[]{});
|
|
}
|
|
String strSQLUpdateCurr="update ndd.currdata set yy='%s',mm='%s',dd='%s',tt='%s%s' where sr=2";
|
sqlExec(postsql,
|
String.format(strSQLUpdateCurr,
|
yy,mm,dd,t0,t1
|
) ,
|
new String[]{});
|
logger.info(String.format("next xml"));
|
}
|
logger.info(String.format("done"));
|
|
|
|
|
}
|
|
/*
|
private void doJob(Connection postsql,Connection orcl) throws SQLException
|
{
|
String strSQLGetTask="select proc_id,procname,datastore,name,step,src,dest,txtsql from roadfee_proc where rowstatus=1 and procname like 'STEP%' order by procname,step" ;
|
List<String[]> joblist=null;
|
Connection inConnection;
|
int idOfJob=0;
|
|
List<String[]> nodata= new ArrayList<String[]>();
|
List<String[]> lista= new ArrayList<String[]>();
|
List<String[]> list1= new ArrayList<String[]>();
|
List<String[]> listIn= new ArrayList<String[]>();
|
List<String[]> temp;//= new ArrayList<String[]>();
|
nodata.add(new String[]{""});
|
// proc_id[0],procname[1],datastore[2\,name[3],step[4], src[5],des[6]t,txtsql[7]
|
try{
|
logger.info("getJoblist");
|
joblist=sqlExecQuery(postsql, strSQLGetTask, new String[]{});
|
|
for ( idOfJob=0;idOfJob<joblist.size();idOfJob++)
|
{
|
logger.info("begin "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")");
|
if(joblist.get(idOfJob)[5].equals("nodata"))
|
{
|
listIn=nodata;
|
}
|
else if(joblist.get(idOfJob)[5].equals("list1"))
|
{
|
listIn=list1;
|
}
|
else if(joblist.get(idOfJob)[5].equals("lista"))
|
{
|
listIn=lista;
|
}
|
|
if(joblist.get(idOfJob)[2].equals("psql"))
|
{
|
inConnection= postsql;
|
}
|
else if(joblist.get(idOfJob)[2].equals("orcl"))
|
{
|
inConnection= orcl;
|
}
|
else
|
return ; //connection failed
|
|
if( joblist.get(idOfJob)[6].equals("list1")) list1.clear();
|
if( joblist.get(idOfJob)[6].equals("lista")) lista.clear();
|
//runsql
|
logger.info("process data count: "+String.valueOf(listIn.size()));
|
|
for( int idxOfListIn=0;idxOfListIn< listIn.size();idxOfListIn++)
|
{
|
|
if( joblist.get(idOfJob)[6].equals("nodata"))
|
{
|
sqlExec(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn));
|
//logger.info("finish "+joblist.get(idOfJob)[1]+"-"+joblist.get(idOfJob)[3]+"("+joblist.get(idOfJob)[0]+")")
|
|
continue;
|
}else
|
{
|
temp=sqlExecQuery(inConnection, joblist.get(idOfJob)[7], listIn.get(idxOfListIn));
|
|
}
|
|
|
for(int j=0;j<temp.size();j++)
|
{
|
if( joblist.get(idOfJob)[6].equals("list1"))
|
{
|
list1.add(temp.get(j));
|
}
|
else if( joblist.get(idOfJob)[6].equals("lista"))
|
{
|
lista.add(temp.get(j));
|
}
|
}
|
}
|
|
|
}
|
|
}catch(SQLException sqlex)
|
{
|
logger.warn("ERROR@ID:"+String.valueOf( joblist.get(idOfJob)[0]));
|
throw sqlex;
|
}
|
|
|
}
|
*/
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
// Every job has its own job detail
|
JobDetail jobDetail = context.getJobDetail();
|
|
// The name is defined in the job definition
|
String jobName = jobDetail.getKey().getName();
|
|
// Log the time the job started
|
logger.info(jobName + " fired at " + new Date());
|
extractJobConfiguration(jobDetail);
|
|
if (isIgnoreDBETL()) {
|
return;
|
}
|
|
//createSourceDataStore();
|
createTargetDataStore();
|
/*
|
if (getSourceDataStore() == null) {
|
logger.warn("Cannot connect source oracle database.");
|
throw new JobExecutionException("Cannot connect source oracle database.");
|
}
|
*/
|
if (getTargetDataStore() == null) {
|
logger.warn("Cannot connect source postgreSQL database.");
|
throw new JobExecutionException("Cannot connect source postgreSQL database.");
|
}
|
|
if (isProfileMode()) {
|
queryTime = 0;
|
}
|
|
long t1 = System.currentTimeMillis();
|
String targetSchemaName, targetThemeTable;
|
try {
|
//logger.info("-- step:clearOutputDatabase --");
|
doJob(targetDataStore.getConnection(Transaction.AUTO_COMMIT),new String[]{
|
pgProperties.get("ftpurl"),
|
pgProperties.get("ftpuid"),
|
pgProperties.get("ftppwd"),
|
pgProperties.get("ftpdir")
|
});
|
doJob2 (targetDataStore.getConnection(Transaction.AUTO_COMMIT),new String[]{
|
pgProperties.get("ftpurl"),
|
pgProperties.get("ftpuid"),
|
pgProperties.get("ftppwd"),
|
pgProperties.get("ftpdir")
|
});
|
// doJob( targetDataStore.getConnection(Transaction.AUTO_COMMIT),sourceDataStore.getConnection(Transaction.AUTO_COMMIT) );
|
|
} catch (IOException ex) {
|
disconnect();
|
logger.warn(ex.getMessage(), ex);
|
throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
|
} catch (SQLException e) {
|
disconnect();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException("Database error. " + e.getMessage(), e);
|
|
}finally {
|
disconnect();
|
}
|
logger.warn(jobName + " end at " + new Date());
|
}
|
|
private void logTimeDiff(String message, long tBefore, long tCurrent) {
|
logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
|
(((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
|
}
|
|
private void exetcuteConvert(OracleConvertPostGISJobContext jobContext,
|
String querySchema, String targetSchemaName) throws SQLException {
|
int order = 0;
|
OrderedMap map = getBlobStorageList(jobContext.getOracleConnection(),
|
querySchema, "SD$SPACENODES", null);
|
|
logger.info("begin convert job:[" + map.size() + "]:testmode=" + _testMode);
|
|
int total = map.size(); //spacenodes count
|
int step = total / 100;
|
int current = 0;
|
|
if (total == 0) {
|
logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is zero.");
|
return;
|
}
|
logger.warn("SELECT COUNT FROM " + querySchema + ".SD$SPACENODES is " + map.size());
|
|
//jobContext.startTransaction();
|
jobContext.setCurrentSchema(querySchema);
|
jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 0);
|
for (OrderedMapIterator it = map.orderedMapIterator(); it.hasNext(); ) {
|
it.next();
|
|
Pair pair = (Pair) it.getValue();
|
String tableSrc = (String) pair.first;
|
|
logger.info("begin convert:[" + order + "]-" + tableSrc);
|
queryIgsetElement(jobContext, querySchema, tableSrc);
|
|
order++;
|
|
if (_testMode) {
|
if ((_testCount < 0) || (order >= _testCount))
|
break;
|
}
|
|
if ((order % COMMITSIZE) == 0) {
|
// OracleConnection connection = jobContext.getOracleConnection();
|
// connection.commitTransaction();
|
jobContext.commitTransaction();
|
//jobContext.startTransaction();
|
System.gc();
|
System.runFinalization();
|
}
|
|
if (step != 0) {
|
int now = order % step;
|
if (now != current) {
|
current = now;
|
jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current);
|
|
}
|
} else {
|
jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", current);
|
current++;
|
}
|
}
|
jobContext.getExecutionContext().put("ConvertDgn2PostGISJobProgress", 100);
|
|
jobContext.commitTransaction();
|
jobContext.resetFeatureContext();
|
|
if (isProfileMode()) {
|
|
}
|
|
logger.info("end convert job:[" + order + "]");
|
System.gc();
|
System.runFinalization();
|
}
|
|
protected OrderedMap getBlobStorageList(Connection connection, String schemaSrc, String tableSrc,
|
OrderedMap orderedMap) throws SQLException {
|
if (orderedMap == null)
|
orderedMap = new LinkedMap(99);
|
String fetchStmtFmt = "SELECT SNID, SPACETABLE FROM \"%s\".\"%s\"";
|
PrintfFormat spf = new PrintfFormat(fetchStmtFmt);
|
String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc});
|
Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
ResultSet rs = null;
|
|
stmt.setFetchSize(FETCHSIZE);
|
try {
|
rs = stmt.executeQuery(fetchStmt);
|
int size = rs.getMetaData().getColumnCount();
|
|
while (rs.next()) {
|
Object[] values = new Object[size];
|
|
for (int i = 0; i < size; i++) {
|
values[i] = rs.getObject(i + 1);
|
}
|
|
Integer key = ((BigDecimal) values[0]).intValue();
|
String name = (String) values[1];
|
|
Pair pair = (Pair) orderedMap.get(key);
|
if (pair == null)
|
orderedMap.put(key, new Pair(name, null));
|
else
|
pair.first = name;
|
}
|
} catch (SQLException e) {
|
logger.error(e.toString(), e);
|
logger.error("stmt=" + fetchStmt);
|
throw e;
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
|
return orderedMap;
|
}
|
|
protected OrderedMap getRawFormatStorageList(OracleConnection connection, String schemaSrc, String tableSrc,
|
OrderedMap orderedMap) throws SQLException {
|
if (orderedMap == null)
|
orderedMap = new LinkedMap(99);
|
String fetchStmtFmt = "SELECT RNID, SPACETABLE FROM \"%s\".\"%s\"";
|
PrintfFormat spf = new PrintfFormat(fetchStmtFmt);
|
String fetchStmt = spf.sprintf(new Object[]{schemaSrc, tableSrc});
|
Statement stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
|
stmt.setFetchSize(FETCHSIZE);
|
ResultSet rs = stmt.executeQuery(fetchStmt);
|
try {
|
int size = rs.getMetaData().getColumnCount();
|
while (rs.next()) {
|
Object[] values = new Object[size];
|
|
for (int i = 0; i < size; i++) {
|
values[i] = rs.getObject(i + 1);
|
}
|
|
Integer key = ((BigDecimal) values[0]).intValue();
|
String name = (String) values[1];
|
|
Pair pair = (Pair) orderedMap.get(key);
|
if (pair == null)
|
orderedMap.put(key, new Pair(null, name));
|
else
|
pair.second = name;
|
}
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
return orderedMap;
|
}
|
|
protected void queryIgsetElement(OracleConvertPostGISJobContext jobContext,
|
String srcschema, String srctable) throws SQLException {
|
Connection connection = jobContext.getOracleConnection();
|
String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" ORDER BY ROWID";
|
//String fetchSrcStmtFmt = "SELECT IGDSELM FROM \"%s\".\"%s\" WHERE TAG_SFSC = 423 AND TAG_LUFID = 21612065 ORDER BY ROWID";
|
PrintfFormat spf = new PrintfFormat(fetchSrcStmtFmt);
|
String fetchSrcStmt = spf.sprintf(new Object[]{srcschema, srctable});
|
Statement stmtSrc = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
|
stmtSrc.setFetchSize(FETCHSIZE);
|
ResultSet rsSrc = stmtSrc.executeQuery(fetchSrcStmt);
|
int igdsMetaType = rsSrc.getMetaData().getColumnType(1);
|
while (rsSrc.next()) {
|
if (isProfileMode()) {
|
markQueryTime();
|
}
|
|
byte[] raw = null;
|
if (igdsMetaType == Types.BLOB) {
|
BLOB blob = (BLOB) rsSrc.getBlob(1);
|
|
try {
|
raw = getBytesFromBLOB(blob);
|
} catch (BufferOverflowException e) {
|
logger.warn("Wrong Element Structure-", e);
|
} finally {
|
// blob.close();
|
}
|
} else {
|
raw = rsSrc.getBytes(1);
|
}
|
|
try {
|
if (raw != null) {
|
Element element = fetchBinaryElement(raw);
|
if (isProfileMode()) {
|
accumulateQueryTime();
|
}
|
jobContext.putFeatureCollection(element);
|
} else {
|
if (isProfileMode()) {
|
accumulateQueryTime();
|
}
|
}
|
} catch (Dgn7fileException e) {
|
logger.warn("Dgn7Exception", e);
|
}
|
}
|
|
JDBCUtils.close(rsSrc);
|
JDBCUtils.close(stmtSrc);
|
}
|
|
protected void queryRawElement(OracleConvertPostGISJobContext jobContext,
|
String srcschema, String srctable) throws SQLException {
|
Connection connection = jobContext.getOracleConnection();
|
String fetchDestStmtFmt = "SELECT ELEMENT FROM \"%s\".\"%s\" ORDER BY ROWID";
|
PrintfFormat spf = new PrintfFormat(fetchDestStmtFmt);
|
String fetchDestStmt = spf.sprintf(new Object[]{srcschema, srctable});
|
Statement stmtDest = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
|
stmtDest.setFetchSize(FETCHSIZE);
|
ResultSet rsDest = stmtDest.executeQuery(fetchDestStmt);
|
|
try {
|
while (rsDest.next()) {
|
ARRAY rawsValue = ((OracleResultSet) rsDest).getARRAY(1);
|
long[] rawData = rawsValue.getLongArray();
|
byte[] comparessedValue;
|
|
/*
|
if (dataMode == TransferTask.DataMode.Normal)
|
{
|
comparessedValue = BinConverter.unmarshalByteArray(rawData, true);
|
} else
|
{
|
comparessedValue = BinConverter.unmarshalCompactByteArray(rawData);
|
}
|
*/
|
comparessedValue = BinConverter.unmarshalByteArray(rawData, true);
|
|
byte[] rawDest = ByteArrayCompressor.decompressByteArray(comparessedValue);
|
|
try {
|
Element element = fetchBinaryElement(rawDest);
|
jobContext.putFeatureCollection(element);
|
} catch (Dgn7fileException e) {
|
logger.warn("Dgn7Exception:" + e.getMessage(), e);
|
}
|
}
|
} finally {
|
JDBCUtils.close(rsDest);
|
JDBCUtils.close(stmtDest);
|
}
|
}
|
|
// Binary to Element
|
private Element fetchBinaryElement(byte[] raws) throws Dgn7fileException {
|
ByteBuffer buffer = ByteBuffer.wrap(raws);
|
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
short signature = buffer.getShort();
|
|
// byte type = (byte) (buffer.get() & 0x7f);
|
byte type = (byte) ((signature >>> 8) & 0x007f);
|
|
// silly Bentley say contentLength is in 2-byte words
|
// and ByteByffer uses raws.
|
// track the record location
|
int elementLength = (buffer.getShort() * 2) + 4;
|
ElementType recordType = ElementType.forID(type);
|
IElementHandler handler;
|
|
handler = recordType.getElementHandler();
|
|
Element dgnElement = (Element) handler.read(buffer, signature, elementLength);
|
if (recordType.isComplexElement() && (elementLength < raws.length)) {
|
int offset = elementLength;
|
while (offset < (raws.length - 4)) {
|
buffer.position(offset);
|
signature = buffer.getShort();
|
type = (byte) ((signature >>> 8) & 0x007f);
|
elementLength = (buffer.getShort() * 2) + 4;
|
if (raws.length < (offset + elementLength)) {
|
logger.debug("Length not match:" + offset + ":" + buffer.position() + ":" + buffer.limit());
|
break;
|
}
|
recordType = ElementType.forID(type);
|
handler = recordType.getElementHandler();
|
if (handler != null) {
|
Element subElement = (Element) handler.read(buffer, signature, elementLength);
|
((ComplexElement) dgnElement).add(subElement);
|
offset += elementLength;
|
} else {
|
byte[] remain = new byte[buffer.remaining()];
|
System.arraycopy(raws, offset, remain, 0, buffer.remaining());
|
for (int i = 0; i < remain.length; i++) {
|
if (remain[i] != 0) {
|
logger.info("fetch element has some error. index=" + (offset + i) + ":value=" + remain[i]);
|
}
|
}
|
break;
|
}
|
}
|
}
|
|
return dgnElement;
|
}
|
|
/**
|
* 嚙踝蕭嚙踝蕭嚙賞換嚙踝蕭嚙豬對蕭嚙褕迎蕭嚙線嚙瑾
|
*
|
* @param context 嚙線嚙瑾嚙踝蕭嚙踝蕭嚙踝蕭嚙踝蕭
|
* @throws org.quartz.JobExecutionException
|
* exception
|
*/
|
private void convertIndexDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
|
File indexDir = new File(getDataPath(), INDEXPATHNAME);
|
if (!indexDir.exists()) {
|
logger.info("index dir=" + indexDir + " not exist.");
|
return;
|
}
|
|
if (!indexDir.isDirectory()) {
|
logger.info("index dir=" + indexDir + " is not a directory.");
|
}
|
|
List<File> dgnFiles = FileUtils.recurseDir(indexDir, new FileFilter() {
|
public boolean accept(File pathname) {
|
return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn");
|
}
|
});
|
|
for (File dgnFile : dgnFiles) {
|
if (dgnFile.isDirectory()) continue;
|
IndexDgnConvertPostGISJobContext convertContext =
|
new IndexDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName,
|
isProfileMode(), isTransformed());
|
logger.info("--- start index dgnfile-" + dgnFile.toString() + " ---");
|
FileInputStream fs = null;
|
FileChannel fc = null;
|
Dgn7fileReader reader = null;
|
try {
|
convertContext.clearOutputDatabase();
|
convertContext.setExecutionContext(context);
|
String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
|
convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
|
convertContext.startTransaction();
|
|
fs = new FileInputStream(dgnFile);
|
fc = fs.getChannel();
|
reader = new Dgn7fileReader(fc, new Lock());
|
convertContext.setReader(reader);
|
|
scanIndexDgnElement(convertContext);
|
|
convertContext.commitTransaction();
|
convertContext.closeFeatureWriter();
|
|
System.gc();
|
System.runFinalization();
|
} catch (FileNotFoundException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (Dgn7fileException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IOException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IllegalAttributeException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (SchemaException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} finally {
|
convertContext.closeFeatureWriter();
|
|
if (reader != null) {
|
try {
|
reader.close();
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
}
|
}
|
|
if (fs != null) {
|
try {
|
fs.close();
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
}
|
}
|
|
if (isProfileMode()) {
|
logger.warn("Profile-Current convertContext Process Cost-" +
|
((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " +
|
(((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
|
logger.warn("Profile-Current convertContext Update Cost-" +
|
((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " +
|
(((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
|
}
|
}
|
}
|
}
|
|
protected void scanIndexDgnElement(IndexDgnConvertPostGISJobContext convertContext)
|
throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
|
Dgn7fileReader reader = convertContext.getReader();
|
int count = 0;
|
Element lastComplex = null;
|
|
while (reader.hasNext()) {
|
if (isProfileMode()) markProcessTime();
|
Element.FileRecord record = reader.nextElement();
|
if (record.element() != null) {
|
Element element = (Element) record.element();
|
ElementType type = element.getElementType();
|
|
if ((!type.isComplexElement()) && (!element.isComponentElement())) {
|
if (lastComplex != null) {
|
processIndexElement(lastComplex, convertContext);
|
lastComplex = null;
|
}
|
|
processIndexElement(element, convertContext);
|
} else if (element.isComponentElement()) {
|
if (lastComplex != null) {
|
((ComplexElement) lastComplex).add(element);
|
}
|
} else if (type.isComplexElement()) {
|
if (lastComplex != null) {
|
processIndexElement(lastComplex, convertContext);
|
}
|
lastComplex = element;
|
}
|
}
|
count++;
|
}
|
|
if (lastComplex != null) {
|
processIndexElement(lastComplex, convertContext);
|
}
|
logger.debug("ElementRecord Count=" + count);
|
}
|
|
private void processIndexElement(Element element, IndexDgnConvertPostGISJobContext convertContext)
|
throws IllegalAttributeException, SchemaException {
|
//if (useTpclidText) {
|
// if (element instanceof TextElement) {
|
// convertContext.putFeatureCollection(element);
|
// }
|
//} else {
|
// if (element instanceof ShapeElement) {
|
convertContext.putFeatureCollection(element);
|
// }
|
//}
|
}
|
|
|
/**
|
* 嚙踝蕭嚙踝蕭嚙賞換嚙踝蕭L嚙稽嚙緘嚙踝蕭嚙褕迎蕭嚙線嚙瑾
|
*
|
* @param context jobContext
|
* @throws org.quartz.JobExecutionException
|
* exception
|
*/
|
private void convertOtherDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
|
File otherDir = new File(getDataPath(), OTHERPATHNAME);
|
if (!otherDir.exists()) {
|
logger.info("other dir=" + otherDir + " not exist.");
|
return;
|
}
|
|
if (!otherDir.isDirectory()) {
|
logger.info("other dir=" + otherDir + " is not a directory.");
|
}
|
|
List<File> dgnFiles = FileUtils.recurseDir(otherDir, new FileFilter() {
|
public boolean accept(File pathname) {
|
return pathname.isDirectory() || pathname.getName().toLowerCase().endsWith("dgn");
|
}
|
});
|
|
for (File dgnFile : dgnFiles) {
|
if (dgnFile.isDirectory()) continue;
|
|
GeneralDgnConvertPostGISJobContext convertContext =
|
new GeneralDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName,
|
isProfileMode(), isTransformed());
|
logger.info("--- start other dgnfile-" + dgnFile.toString() + " ---");
|
FileInputStream fs = null;
|
FileChannel fc;
|
Dgn7fileReader reader = null;
|
try {
|
convertContext.setExecutionContext(context);
|
String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
|
convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
|
convertContext.startTransaction();
|
|
fs = new FileInputStream(dgnFile);
|
fc = fs.getChannel();
|
reader = new Dgn7fileReader(fc, new Lock());
|
convertContext.setReader(reader);
|
|
scanOtherDgnElement(convertContext);
|
|
convertContext.commitTransaction();
|
convertContext.closeFeatureWriter();
|
|
System.gc();
|
System.runFinalization();
|
} catch (FileNotFoundException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (Dgn7fileException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IOException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IllegalAttributeException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (SchemaException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} finally {
|
convertContext.closeFeatureWriter();
|
|
if (reader != null) {
|
try {
|
reader.close();
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
}
|
}
|
|
if (fs != null) {
|
try {
|
fs.close();
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
}
|
}
|
|
if (isProfileMode()) {
|
logger.warn("Profile-Current convertContext Process Cost-" +
|
((int) ((convertContext.getProcessTime()) / 60000.0)) + " min - " +
|
(((int) ((convertContext.getProcessTime()) % 60000.0)) / 1000) + " sec");
|
logger.warn("Profile-Current convertContext Update Cost-" +
|
((int) ((convertContext.getUpdateTime()) / 60000.0)) + " min - " +
|
(((int) ((convertContext.getUpdateTime()) % 60000.0)) / 1000) + " sec");
|
}
|
}
|
}
|
}
|
|
public void scanOtherDgnElement(GeneralDgnConvertPostGISJobContext convertContext)
|
throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
|
Dgn7fileReader reader = convertContext.getReader();
|
int count = 0;
|
Element lastComplex = null;
|
while (reader.hasNext()) {
|
Element.FileRecord record = reader.nextElement();
|
if (record.element() != null) {
|
Element element = (Element) record.element();
|
ElementType type = element.getElementType();
|
|
if ((!type.isComplexElement()) && (!element.isComponentElement())) {
|
if (lastComplex != null) {
|
processOtherElement(lastComplex, convertContext);
|
lastComplex = null;
|
}
|
|
processOtherElement(element, convertContext);
|
} else if (element.isComponentElement()) {
|
if (lastComplex != null) {
|
((ComplexElement) lastComplex).add(element);
|
}
|
} else if (type.isComplexElement()) {
|
if (lastComplex != null) {
|
processOtherElement(lastComplex, convertContext);
|
}
|
lastComplex = element;
|
}
|
}
|
count++;
|
}
|
|
if (lastComplex != null) {
|
processOtherElement(lastComplex, convertContext);
|
}
|
logger.debug("ElementRecord Count=" + count);
|
}
|
|
private void processOtherElement(Element element, GeneralDgnConvertPostGISJobContext convertContext)
|
throws IllegalAttributeException, SchemaException {
|
convertContext.putFeatureCollection(element);
|
}
|
|
private void clearOutputDatabase() {
|
/*
|
File outDataPath = new File(getDataPath(), OracleConvertEdbGeoJobContext.SHPOUTPATH);
|
if (outDataPath.exists() && outDataPath.isDirectory())
|
{
|
deleteFilesInPath(outDataPath);
|
}
|
outDataPath = new File(getDataPath(), IndexDgnConvertShpJobContext.SHPOUTPATH);
|
if (outDataPath.exists() && outDataPath.isDirectory())
|
{
|
deleteFilesInPath(outDataPath);
|
}
|
outDataPath = new File(getDataPath(), GeneralDgnConvertShpJobContext.SHPOUTPATH);
|
if (outDataPath.exists() && outDataPath.isDirectory())
|
{
|
deleteFilesInPath(outDataPath);
|
}
|
*/
|
}
|
|
private void deleteFilesInPath(File outDataPath) {
|
deleteFilesInPath(outDataPath, true);
|
}
|
|
private void deleteFilesInPath(File outDataPath, boolean removeSubDir) {
|
if (!outDataPath.isDirectory()) {
|
return;
|
}
|
File[] files = outDataPath.listFiles();
|
for (File file : files) {
|
if (file.isFile()) {
|
if (!file.delete()) {
|
logger.info("Cannot delete file-" + file.toString());
|
}
|
} else if (file.isDirectory()) {
|
deleteFilesInPath(file, removeSubDir);
|
if (removeSubDir) {
|
if (file.delete()) {
|
logger.info("Cannot delete dir-" + file.toString());
|
}
|
}
|
}
|
}
|
}
|
|
private void convertFeatureDesignFile(JobExecutionContext context, String targetSchemaName) throws JobExecutionException {
|
File elminDir = new File(getDataPath(), "elmin");
|
if (!elminDir.exists()) {
|
logger.info("elmin dir=" + elminDir + " not exist.");
|
return;
|
}
|
|
if (!elminDir.isDirectory()) {
|
logger.info("elmin dir=" + elminDir + " is not a directory.");
|
}
|
|
File[] dgnFiles = elminDir.listFiles(new FilenameFilter() {
|
public boolean accept(File dir, String name) {
|
return name.toLowerCase().endsWith(".dgn");
|
}
|
});
|
|
for (File dgnFile : dgnFiles) {
|
FeatureDgnConvertPostGISJobContext convertContext =
|
new FeatureDgnConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, _filterPath,
|
isProfileMode(), isTransformed());
|
logger.info("--- start dgnfile-" + dgnFile.toString() + " ---");
|
try {
|
convertContext.setExecutionContext(context);
|
String dgnPaths[] = StringUtils.splitToArray(dgnFile.toString(), File.separator);
|
convertContext.setFilename(dgnPaths[dgnPaths.length - 1]);
|
convertContext.startTransaction();
|
|
FileInputStream fs = new FileInputStream(dgnFile);
|
FileChannel fc = fs.getChannel();
|
Dgn7fileReader reader = new Dgn7fileReader(fc, new Lock());
|
convertContext.setReader(reader);
|
|
scanFeatureDgnElement(convertContext);
|
|
convertContext.commitTransaction();
|
convertContext.closeFeatureWriter();
|
System.gc();
|
System.runFinalization();
|
} catch (FileNotFoundException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (Dgn7fileException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IOException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (IllegalAttributeException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} catch (SchemaException e) {
|
convertContext.rollbackTransaction();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
} finally {
|
convertContext.closeFeatureWriter();
|
}
|
}
|
}
|
|
public void scanFeatureDgnElement(FeatureDgnConvertPostGISJobContext convertContext)
|
throws Dgn7fileException, IOException, IllegalAttributeException, SchemaException {
|
Dgn7fileReader reader = convertContext.getReader();
|
int count = 0;
|
Element lastComplex = null;
|
while (reader.hasNext()) {
|
Element.FileRecord record = reader.nextElement();
|
if (record.element() != null) {
|
Element element = (Element) record.element();
|
ElementType type = element.getElementType();
|
|
if ((!type.isComplexElement()) && (!element.isComponentElement())) {
|
if (lastComplex != null) {
|
processFeatureElement(lastComplex, convertContext);
|
lastComplex = null;
|
}
|
|
processFeatureElement(element, convertContext);
|
} else if (element.isComponentElement()) {
|
if (lastComplex != null) {
|
((ComplexElement) lastComplex).add(element);
|
}
|
} else if (type.isComplexElement()) {
|
if (lastComplex != null) {
|
processFeatureElement(lastComplex, convertContext);
|
}
|
lastComplex = element;
|
}
|
}
|
count++;
|
}
|
|
if (lastComplex != null) {
|
processFeatureElement(lastComplex, convertContext);
|
}
|
logger.debug("ElementRecord Count=" + count);
|
}
|
|
private void processFeatureElement(Element element, FeatureDgnConvertPostGISJobContext convertContext)
|
throws IllegalAttributeException, SchemaException {
|
convertContext.putFeatureCollection(element);
|
}
|
|
private void createDummyFeatureFile(JobExecutionContext context) throws JobExecutionException {
|
/*
|
DummyFeatureConvertShpJobContext convertContext = new DummyFeatureConvertShpJobContext(getDataPath(), _filterPath);
|
try {
|
convertContext.startTransaction();
|
convertContext.commitTransaction();
|
convertContext.closeFeatureWriter();
|
} catch (IOException e)
|
{
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
}
|
*/
|
}
|
|
public DataStore getTargetDataStore() {
|
return targetDataStore;
|
}
|
|
protected void createTargetDataStore() throws JobExecutionException {
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
|
/*
|
if (!isDriverFound())
|
{
|
throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER);
|
}
|
*/
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
|
}
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
|
}
|
|
/*
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.WKBENABLED.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.WKBENABLED.key, "true");
|
}
|
*/
|
|
if (!dataStoreFactory.canProcess(pgProperties)) {
|
getLogger().warn("cannot process properties-");
|
throw new JobExecutionException("cannot process properties-");
|
}
|
try {
|
targetDataStore = dataStoreFactory.createDataStore(pgProperties);
|
} catch (IOException e) {
|
getLogger().warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
}
|
}
|
|
protected void disconnect() {
|
super.disconnect();
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
}
|
|
private String determineTargetSchemaName() throws IOException {
|
if (targetDataStore == null) return null;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
String targetSchema = null;
|
boolean needCreate = false;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
// Create XGVERSIONTABLE_NAME
|
rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
|
if (!rs.next()) needCreate = true;
|
if (needCreate)
|
createXGeosVersionTable(connection, _pgSchema);
|
rs.close();
|
|
StringBuilder sbSQL = new StringBuilder("SELECT ");
|
sbSQL.append("vsschema, vsstatus FROM ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append("ORDER BY vsid");
|
stmt = connection.createStatement();
|
rs = stmt.executeQuery(sbSQL.toString());
|
ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
|
int i = 0;
|
int current = -1;
|
while (rs.next()) {
|
Object[] values = new Object[2];
|
values[0] = rs.getString("vsschema");
|
values[1] = rs.getShort("vsstatus");
|
tmpSchemas.add(values);
|
if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
|
current = i;
|
}
|
i++;
|
}
|
|
if (current == -1) {
|
Object[] values = tmpSchemas.get(0);
|
targetSchema = (String) values[0];
|
} else if (current < (tmpSchemas.size() - 1)) {
|
Object[] values = tmpSchemas.get(current + 1);
|
targetSchema = (String) values[0];
|
} else {
|
Object[] values = tmpSchemas.get(0);
|
targetSchema = (String) values[0];
|
}
|
|
sbSQL = new StringBuilder("UPDATE ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append(" SET vsstatus = ");
|
sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
|
sbSQL.append(" WHERE vsschema = '");
|
sbSQL.append(targetSchema).append("'");
|
int count = stmt.executeUpdate(sbSQL.toString());
|
if (count != 1) {
|
logger.info("update status for " + targetSchema + " update result count="
|
+ count);
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
return targetSchema;
|
}
|
|
private String determineTargetThemeTableName() throws IOException {
|
if (targetDataStore == null) return null;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
String targetTable = null;
|
boolean needCreate = false;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
// Create XPTVERSIONTABLE_NAME
|
needCreate = false;
|
rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"});
|
if (!rs.next()) needCreate = true;
|
if (needCreate)
|
createXPWThemeVersionTable(connection, _pgSchema);
|
rs.close();
|
|
rs = null;
|
|
StringBuilder sbSQL = new StringBuilder("SELECT ");
|
sbSQL.append("vptname, vptstatus FROM ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append("ORDER BY vptid");
|
stmt = connection.createStatement();
|
rs = stmt.executeQuery(sbSQL.toString());
|
ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>();
|
int i = 0;
|
int current = -1;
|
while (rs.next()) {
|
Object[] values = new Object[2];
|
values[0] = rs.getString("vptname");
|
values[1] = rs.getShort("vptstatus");
|
tmpTablenames.add(values);
|
if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
|
current = i;
|
}
|
i++;
|
}
|
|
if (current == -1) {
|
Object[] values = tmpTablenames.get(0);
|
targetTable = (String) values[0];
|
} else if (current < (tmpTablenames.size() - 1)) {
|
Object[] values = tmpTablenames.get(current + 1);
|
targetTable = (String) values[0];
|
} else {
|
Object[] values = tmpTablenames.get(0);
|
targetTable = (String) values[0];
|
}
|
|
sbSQL = new StringBuilder("UPDATE ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append(" SET vptstatus = ");
|
sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT);
|
sbSQL.append(" WHERE vptname = '");
|
sbSQL.append(targetTable).append("'");
|
int count = stmt.executeUpdate(sbSQL.toString());
|
if (count != 1) {
|
logger.info("update status for " + targetTable + " update result count="
|
+ count);
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
return targetTable;
|
}
|
|
public String encodeSchemaTableName(String schemaName, String tableName) {
|
if (schemaName == null)
|
return "\"" + tableName + "\"";
|
return "\"" + schemaName + "\".\"" + tableName + "\"";
|
}
|
|
private void createXGeosVersionTable(Connection connection, String pgSchema) throws SQLException {
|
Statement stmt = null;
|
StringBuilder sql = new StringBuilder("CREATE TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
|
sql.append(" ( vsid serial PRIMARY KEY, ");
|
sql.append(" vsschema character varying(64) NOT NULL, ");
|
sql.append(" vsstatus smallint NOT NULL, ");
|
sql.append(" vstimestamp timestamp with time zone ) ");
|
try {
|
stmt = connection.createStatement();
|
stmt.executeUpdate(sql.toString());
|
|
sql = new StringBuilder("ALTER TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
|
sql.append(" OWNER TO ").append(_pgUsername);
|
stmt.executeUpdate(sql.toString());
|
|
sql = new StringBuilder("GRANT ALL ON TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
|
sql.append(" TO public");
|
stmt.executeUpdate(sql.toString());
|
|
for (String schemaName : DataReposVersionManager.DEFAULTXGVERSIONSCHEMA_NAMES) {
|
sql = new StringBuilder("INSERT INTO ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME));
|
sql.append(" (vsschema, vsstatus) VALUES ('");
|
sql.append(schemaName).append("', ");
|
sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )");
|
stmt.executeUpdate(sql.toString());
|
|
createIfNotExistNewSchema(connection, schemaName);
|
}
|
|
} finally {
|
if (stmt != null) stmt.close();
|
}
|
}
|
|
private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException {
|
Statement stmt = null;
|
StringBuilder sql = new StringBuilder("CREATE TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
|
sql.append(" ( vptid serial PRIMARY KEY, ");
|
sql.append(" vptname character varying(64) NOT NULL, ");
|
sql.append(" vptstatus smallint NOT NULL, ");
|
sql.append(" vpttimestamp timestamp with time zone ) ");
|
try {
|
stmt = connection.createStatement();
|
stmt.executeUpdate(sql.toString());
|
|
sql = new StringBuilder("ALTER TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
|
sql.append(" OWNER TO ").append(_pgUsername);
|
stmt.executeUpdate(sql.toString());
|
|
sql = new StringBuilder("GRANT ALL ON TABLE ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
|
sql.append(" TO public");
|
stmt.executeUpdate(sql.toString());
|
|
for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) {
|
sql = new StringBuilder("INSERT INTO ");
|
sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME));
|
sql.append(" (vptname, vptstatus) VALUES ('");
|
sql.append(schemaName).append("', ");
|
sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )");
|
stmt.executeUpdate(sql.toString());
|
}
|
|
} finally {
|
if (stmt != null) stmt.close();
|
}
|
}
|
|
private void updateRepoStatusToReady(String targetSchema) {
|
if (targetDataStore == null) return;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
boolean needCreate = false;
|
try {
|
StringBuilder sbSQL = new StringBuilder("UPDATE ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append(" SET vsstatus = ");
|
sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
|
sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '");
|
sbSQL.append(targetSchema).append("'");
|
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
stmt = connection.createStatement();
|
int count = stmt.executeUpdate(sbSQL.toString());
|
if (count != 1) {
|
logger.info("update status for " + targetSchema + " update result count="
|
+ count);
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
}
|
|
private void updatePWThemeStatusToReady(String targetSchema) {
|
if (targetDataStore == null) return;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
boolean needCreate = false;
|
try {
|
StringBuilder sbSQL = new StringBuilder("UPDATE ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append(" SET vptstatus = ");
|
sbSQL.append(DataReposVersionManager.VSSTATUS_READY);
|
sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '");
|
sbSQL.append(targetSchema).append("'");
|
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
stmt = connection.createStatement();
|
int count = stmt.executeUpdate(sbSQL.toString());
|
if (count != 1) {
|
logger.info("update status for " + targetSchema + " update result count="
|
+ count);
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} catch (IOException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
}
|
|
private void createIfNotExistNewSchema(Connection connection, String s) throws SQLException {
|
Statement stmt = null;
|
ResultSet rs = null;
|
try {
|
/*
|
rs = connection.getMetaData().getSchemas(null, s);
|
if (rs.next()) return;
|
rs.close();
|
rs = null;
|
*/
|
|
StringBuilder sbSQL = new StringBuilder("CREATE SCHEMA ");
|
sbSQL.append(s).append(' ');
|
sbSQL.append("AUTHORIZATION ").append(_pgUsername);
|
stmt = connection.createStatement();
|
stmt.executeUpdate(sbSQL.toString());
|
|
sbSQL = new StringBuilder("GRANT ALL ON SCHEMA ");
|
sbSQL.append(s).append(' ');
|
sbSQL.append("TO public");
|
stmt.executeUpdate(sbSQL.toString());
|
} catch (SQLException e) {
|
logger.info("create schema:" + s + " has exception.");
|
logger.info(e.getMessage(), e);
|
} finally {
|
if (rs != null) rs.close();
|
if (stmt != null) stmt.close();
|
}
|
}
|
|
public final void accumulateQueryTime() {
|
queryTime += System.currentTimeMillis() - queryTimeStart;
|
}
|
|
public long getQueryTime() {
|
return queryTime;
|
}
|
|
public final void markQueryTime() {
|
queryTimeStart = System.currentTimeMillis();
|
}
|
|
public final void resetQueryTime() {
|
queryTime = 0;
|
}
|
|
private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
|
if (context == null) {
|
getLogger().info("jobContext is null in convertDynamicColorTheme");
|
return;
|
}
|
Connection connection = context.getOracleConnection();
|
Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
|
boolean found = false;
|
ResultSet rs = null;
|
Statement stmt = null;
|
PreparedStatement pstmt = null;
|
try {
|
|
DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
|
String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
|
logger.info("target table:" + targetTableName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_COLORTAB);
|
rs.setFetchSize(50);
|
|
createOrClearTargetTable(connectionPG, targetTableName,
|
"(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
|
|
pstmt = connectionPG.prepareStatement("INSERT INTO " +
|
encodeSchemaTableName(_pgSchema, targetTableName) +
|
" (tid, oid, dyncolor) VALUES (?, ?, ?)" );
|
|
final int MAX_BATCHSIZE = 50;
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int colorId = rs.getInt(3);
|
String colorText = colorTable.getColorCode(colorId);
|
|
pstmt.setShort(1, (short) cid);
|
pstmt.setInt(2, (int) oid);
|
pstmt.setString(3, colorText);
|
pstmt.addBatch();
|
|
if (count % MAX_BATCHSIZE == 0) {
|
pstmt.executeBatch();
|
}
|
++count;
|
}
|
|
pstmt.executeBatch();
|
createTargetTableIndex(connectionPG, targetTableName);
|
|
logger.info("Execute Update Count=" + count);
|
} catch (SQLException e) {
|
logger.info(e.getMessage(), e);
|
throw new IOException(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(pstmt);
|
JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
|
}
|
}
|
|
private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException {
|
if (context == null) {
|
getLogger().info("jobContext is null in convertPowerOwnerTheme");
|
return;
|
}
|
Connection connection = context.getOracleConnection();
|
Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
|
boolean found = false;
|
ResultSet rs = null;
|
Statement stmt = null;
|
PreparedStatement pstmt = null;
|
try {
|
connectionPG.setAutoCommit(false);
|
String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
|
logger.info("target table:" + targetTableName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_CONNFDR);
|
rs.setFetchSize(50);
|
|
createOrClearTargetTable(connectionPG, targetTableName,
|
"(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)");
|
|
pstmt = connectionPG.prepareStatement("INSERT INTO " +
|
encodeSchemaTableName(_pgSchema, targetTableName) +
|
" (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" );
|
|
final int MAX_BATCHSIZE = 50;
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int ownerId = rs.getInt(3);
|
short dirId = (short) rs.getInt(4);
|
pstmt.setShort(1, (short) cid);
|
pstmt.setInt(2, (int) oid);
|
pstmt.setShort(3, (short) ownerId);
|
ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
|
if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
|
(ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
|
pstmt.setString(4, "shape://ccarrow");
|
|
} else if ((ConnectivityDirectionEnum.BackflowON == dir) ||
|
(ConnectivityDirectionEnum.BackFixflowON == dir)) {
|
pstmt.setString(4, "shape://rccarrow");
|
} else {
|
pstmt.setString(4, "shape://backslash");
|
}
|
pstmt.addBatch();
|
|
if (count % MAX_BATCHSIZE == 0) {
|
pstmt.executeBatch();
|
}
|
++count;
|
}
|
|
pstmt.executeBatch();
|
createTargetTableIndex(connectionPG, targetTableName);
|
|
logger.info("Execute Update Count=" + count);
|
} catch (SQLException e) {
|
logger.info(e.getMessage(), e);
|
throw new IOException(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(pstmt);
|
JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
|
}
|
}
|
|
private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException {
|
Statement stmt = connection.createStatement();
|
ResultSet rs = null;
|
try {
|
rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
|
if (rs.next()) {
|
stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE");
|
}
|
|
stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private void createTargetTableIndex(Connection connection, String tableName) throws SQLException {
|
Statement stmt = connection.createStatement();
|
ResultSet rs = null;
|
try {
|
rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
|
if (rs.next()) {
|
stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
|
" ADD PRIMARY KEY (tid, oid)");
|
}
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
|
throws IOException {
|
if (context == null) {
|
getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI");
|
return false;
|
}
|
Connection connection = context.getOracleConnection();
|
Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
while (connectionPG instanceof DelegatingConnection) {
|
connectionPG = ((DelegatingConnection) connectionPG).getDelegate();
|
}
|
|
if (!(connectionPG instanceof PGConnection)) {
|
return false;
|
}
|
|
final int MAX_BATCHSIZE = 250;
|
ResultSet rs = null;
|
Statement stmt = null;
|
try {
|
// connectionPG.setAutoCommit(false);
|
DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
|
String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX;
|
String targetTempName = "tmp_" + targetTableName;
|
logger.info("target table:" + targetTableName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_COLORTAB);
|
rs.setFetchSize(MAX_BATCHSIZE);
|
|
createOrClearTempTargetTable(connectionPG, targetTempName,
|
"(tid smallint not null, oid int not null, dyncolor varchar(10) not null)");
|
StringBuilder sb = new StringBuilder();
|
|
CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
|
PushbackReader reader = new PushbackReader(new StringReader(""), 10240);
|
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int colorId = rs.getInt(3);
|
String colorText = colorTable.getColorCode(colorId);
|
|
sb.append(cid).append(',');
|
sb.append(oid).append(',');
|
sb.append(colorText).append("\n");
|
|
if (count % MAX_BATCHSIZE == 0) {
|
reader.unread(sb.toString().toCharArray());
|
cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
|
sb.delete(0, sb.length());
|
}
|
++count;
|
}
|
|
reader.unread(sb.toString().toCharArray());
|
cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
|
createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
|
|
logger.info("Execute Copy Count=" + count);
|
} catch (SQLException e) {
|
logger.info(e.getMessage(), e);
|
throw new IOException(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
|
}
|
return true;
|
}
|
|
private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName)
|
throws IOException {
|
if (context == null) {
|
getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI");
|
return false;
|
}
|
Connection connection = context.getOracleConnection();
|
Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
while (connectionPG instanceof DelegatingConnection) {
|
connectionPG = ((DelegatingConnection) connectionPG).getDelegate();
|
}
|
|
if (!(connectionPG instanceof PGConnection)) {
|
return false;
|
}
|
|
final int MAX_BATCHSIZE = 250;
|
ResultSet rs = null;
|
Statement stmt = null;
|
try {
|
// connectionPG.setAutoCommit(false);
|
String targetTableName = targetTableBaseName + FOWNER_SUFFIX;
|
String targetTempName = "tmp_" + targetTableName;
|
logger.info("target table:" + targetTableName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_CONNFDR);
|
rs.setFetchSize(MAX_BATCHSIZE);
|
|
createOrClearTempTargetTable(connectionPG, targetTempName,
|
"(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)");
|
|
StringBuilder sb = new StringBuilder();
|
|
CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI();
|
PushbackReader reader = new PushbackReader(new StringReader(""), 10240);
|
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int ownerId = rs.getInt(3);
|
short dirId = (short) rs.getInt(4);
|
String flowMark = null;
|
ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId);
|
if ((ConnectivityDirectionEnum.ForwardflowON == dir) ||
|
(ConnectivityDirectionEnum.ForwardFixflowON == dir)) {
|
flowMark = FORWARDFLOW_MARK;
|
|
} else if ((ConnectivityDirectionEnum.BackflowON == dir) ||
|
(ConnectivityDirectionEnum.BackFixflowON == dir)) {
|
flowMark = BACKFLOW_MARK;
|
} else if (ConnectivityDirectionEnum.Nondeterminate == dir) {
|
flowMark = NONFLOW_MARK;
|
} else {
|
flowMark = UNFLOW_MARK;
|
}
|
|
sb.append(cid).append(',');
|
sb.append(oid).append(',');
|
sb.append(ownerId).append(',');
|
sb.append(flowMark).append('\n');
|
|
if (count % MAX_BATCHSIZE == 0) {
|
reader.unread(sb.toString().toCharArray());
|
cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
|
sb.delete(0, sb.length());
|
}
|
++count;
|
}
|
|
reader.unread(sb.toString().toCharArray());
|
cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader);
|
createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName);
|
|
logger.info("Execute Copy Count=" + count);
|
} catch (SQLException e) {
|
logger.info(e.getMessage(), e);
|
throw new IOException(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null);
|
}
|
return true;
|
}
|
|
private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException {
|
Statement stmt = connection.createStatement();
|
ResultSet rs = null;
|
try {
|
rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"});
|
if (rs.next()) {
|
stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE");
|
}
|
|
stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException {
|
Statement stmt = connection.createStatement();
|
ResultSet rs = null;
|
try {
|
stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable);
|
rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"});
|
if (rs.next()) {
|
stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) +
|
" ADD PRIMARY KEY (tid, oid)");
|
}
|
stmt.execute("DROP TABLE " + tempTable);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
String [] siteInfo=new String[]{"ftp://10.10.1.9:21","DMMS","DMMS000"} ;
|
|
private String[] getNDDList(String[] info){
|
String url=info[3];
|
String ftp=info[0];
|
String uid=info[1];
|
String pwd=info[2];
|
// List<MapItemValue> tmp= dmmsSite.getFtpList(site);
|
//for(int i=0;i<tmp.size();i++)
|
//{
|
// if(tmp.get(i).getGroupName().equals("featureimg"))
|
// {
|
// url="/tcdaas/ndddash/";
|
String [] fileNow=getFileList(ftp,uid,pwd,url,"");
|
return fileNow ;
|
// }
|
//}
|
//return new String[]{};
|
}
|
|
private byte[] getNDDDash(String[] info, String dirname, String filename) {
|
String url="";//info[3];
|
String ftp=info[0];
|
String uid=info[1];
|
String pwd=info[2];
|
|
dirname= dirname.replace("[.]","_"); //防hack
|
filename= filename.replace("[/]","_"); //防hack
|
// List<MapItemValue> tmp= dmmsSite.getFtpList(site);
|
String[] temp=dirname.split("/");
|
dirname= temp[temp.length-1];
|
|
// for(int i=0;i<tmp.size();i++)
|
// {
|
// if(tmp.get(i).getGroupName().equals("featureimg"))
|
// {
|
url=info[3]+dirname+"/";
|
|
|
byte[] bytes= getFile(ftp,uid,pwd,url,filename);
|
return bytes;
|
// return new FileTransfer(filename, "application/octet-stream",bytes);
|
// }
|
// }
|
// return null;
|
}
|
|
private String[] getFileList(String urlString,String ftpUser,String ftpPwd,String filePath, String filter){
|
FTPClient ftpClient=null;
|
try{
|
ftpClient= new FTPClient();
|
}catch(Throwable ex)
|
{
|
ex.getMessage();
|
}
|
|
URL url;
|
|
// /tcdaas/dsbncard -- for feature D
|
// /tcdaas/mhole -- for feature D
|
// /tcdaas/featureimg -- for feature U/D attached and LIST
|
try{
|
url= new URL(urlString);//"ftp://20.20.1.3:21/");
|
ftpClient.connect(
|
url.getHost(),url.getPort()
|
);
|
|
if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000"))
|
{
|
return null;
|
}
|
int reply = ftpClient.getReplyCode();
|
//FTPReply stores a set of constants for FTP reply codes.
|
|
if (!FTPReply.isPositiveCompletion(reply))
|
{
|
ftpClient.disconnect();
|
return null;
|
}
|
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
|
//enter passive mode
|
ftpClient.enterLocalPassiveMode();
|
|
String[] filelist=ftpClient.listNames(filePath + (filter == null ? "" : filter)) ;
|
ftpClient.disconnect();
|
return filelist;
|
}catch(MalformedURLException urlex)
|
{
|
|
} catch (Exception ex)
|
{
|
|
}
|
return new String[]{};
|
}
|
|
private byte[] getFile(String urlString,String ftpUser,String ftpPwd,String filePath,String fileName){
|
FTPClient ftpClient= new FTPClient();
|
|
URL url;
|
byte[] result;
|
// /tcdaas/dsbncard -- for feature D
|
// /tcdaas/mhole -- for feature D
|
// /tcdaas/featureimg -- for feature U/D attached and LIST
|
try{
|
url= new URL(urlString);//"ftp://20.20.1.3:21/");
|
ftpClient.connect(
|
url.getHost(),url.getPort()
|
);
|
|
if(!ftpClient.login(ftpUser,ftpPwd))// "DMMS","DMMS000"))
|
{
|
return null;
|
}
|
int reply = ftpClient.getReplyCode();
|
//FTPReply stores a set of constants for FTP reply codes.
|
|
if (!FTPReply.isPositiveCompletion(reply))
|
{
|
ftpClient.disconnect();
|
return null;
|
}
|
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
|
//enter passive mode
|
ftpClient.enterLocalPassiveMode();
|
|
String[] filelist=ftpClient.listNames(filePath+ fileName) ;
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
if(filelist.length>0)
|
{
|
if(ftpClient.retrieveFile(filePath+fileName,bos))
|
{
|
result= bos.toByteArray() ; //negative numbers can use (b)&0xff
|
bos.close();
|
}
|
else
|
{
|
result=null;
|
try{
|
bos.close();
|
} catch (Exception ex)
|
{
|
|
}
|
}
|
}
|
else
|
{
|
result=null;
|
}
|
|
ftpClient.disconnect();
|
|
}catch(MalformedURLException urlex)
|
{
|
result=null;
|
} catch (Exception ex)
|
{
|
result=null;
|
}
|
return result;
|
}
|
public String[] getNDDStrings(String[] info, String filename)
|
{
|
byte[] temp;
|
// String stie=getUserDept();
|
|
// String[] list=dmmsFtpClient.getNDDList(stie) ;
|
String[] list =getNDDList(info) ;
|
|
List<String> lstXML= new ArrayList<String>();
|
for(int i=0;i<list.length;i++)
|
{
|
temp=getNDDDash(info, list[i], filename) ;
|
try{
|
if(temp!=null) lstXML.add(new String(temp,"UTF-8"));
|
} catch (UnsupportedEncodingException ex) {
|
// this should never happen because "UTF-8" is hard-coded.
|
throw new IllegalStateException(ex);
|
}
|
}
|
if(lstXML.size()>0)
|
return lstXML. toArray(new String[0]);
|
|
return null;
|
}
|
|
|
private static Map<String, String> ditTyphoon = new HashMap<String, String>();
|
|
public String getTyphoonIDByName(Connection postsql,String typhoonName) throws SQLException
|
{
|
if(ditTyphoon.containsKey(typhoonName))
|
{
|
return ditTyphoon.get(typhoonName);
|
}else
|
{
|
return readOrCreateTyphoonByName(postsql,typhoonName);
|
//readOrCreateTyphoon;
|
}
|
}
|
public String readOrCreateTyphoonByName(Connection postsql,String typhoonName) throws SQLException
|
{
|
//targetDataStore
|
//time of create should be modify
|
List<String[]> listDict;
|
String strSQLSelectProject=String.format( "select typhoon_id,typhoon_name from ndd.typhoonproject where typhoon_name='%s'",typhoonName);
|
String strSQLInsertProject=String.format( "insert into ndd.typhoonproject (typhoon_name,row_created) values ('%s',now())",typhoonName);
|
|
listDict= sqlExecQuery(postsql,strSQLSelectProject,new String[]{});
|
//boolean bCreate=false;
|
if(listDict!=null)
|
{
|
if(listDict.size()>0)
|
{
|
for(int i=0;i<listDict.size();i++)
|
{
|
return addDict(listDict.get(i)[0],listDict.get(i)[1]);
|
}
|
}
|
}
|
//bCreate=true;
|
//insert
|
logger.info(String.format("new project:%s",typhoonName));
|
sqlExec(postsql,strSQLInsertProject,new String[]{});
|
return readOrCreateTyphoonByName(postsql, typhoonName) ;
|
}
|
private synchronized static String addDict(String id,String typhoon)
|
{
|
if(ditTyphoon.containsKey(typhoon))
|
return ditTyphoon.get(typhoon);
|
else
|
ditTyphoon.put(typhoon,id);
|
return id;
|
}
|
|
public boolean jobOnLine(Connection postsql,String jobname) throws SQLException
|
{
|
//working when jobname=1
|
//targetDataStore
|
//time of create should be modify
|
List<String[]> listDict;
|
String strSQLSelectSchedule=String.format( "select enabled from ndd.schedule where name='%s'",jobname);
|
|
listDict= sqlExecQuery(postsql,strSQLSelectSchedule,new String[]{});
|
if(listDict.size()==0)return false; // not exist ->dont work
|
return listDict.get(0)[0].equals("1");
|
}
|
}
|