package com.ximple.eofms.jobs;
|
|
import java.io.IOException;
|
import java.sql.Connection;
|
import java.sql.ResultSet;
|
import java.sql.SQLException;
|
import java.sql.Statement;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.Map;
|
import java.util.TreeMap;
|
|
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
|
import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext;
|
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.LogFactory;
|
import org.geotools.data.DataStore;
|
import org.geotools.data.Transaction;
|
import org.geotools.data.jdbc.JDBCUtils;
|
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
|
import org.geotools.jdbc.JDBCDataStore;
|
import org.quartz.JobDataMap;
|
import org.quartz.JobDetail;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
|
public class OracleClearExchangeJob extends AbstractOracleDatabaseJob {
|
final static Log logger = LogFactory.getLog(OracleClearExchangeJob.class);
|
|
public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA";
|
|
private static final String PGHOST = "PGHOST";
|
private static final String PGDATBASE = "PGDATBASE";
|
private static final String PGPORT = "PGPORT";
|
private static final String PGSCHEMA = "PGSCHEMA";
|
private static final String PGUSER = "PGUSER";
|
private static final String PGPASS = "PGPASS";
|
private static final String USEWKB = "USEWKB";
|
|
private static final boolean useTpclidText = false;
|
|
private static final int FETCHSIZE = 100;
|
private static final int COMMITSIZE = 100;
|
|
protected static class Pair {
|
Object first;
|
Object second;
|
|
public Pair(Object first, Object second) {
|
this.first = first;
|
this.second = second;
|
}
|
}
|
|
protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
|
|
protected String _pgHost;
|
protected String _pgDatabase;
|
protected String _pgPort;
|
protected String _pgSchema;
|
protected String _pgUsername;
|
protected String _pgPassword;
|
protected String _pgUseWKB;
|
|
protected Map<String, String> pgProperties;
|
protected JDBCDataStore targetDataStore;
|
|
private long queryTime = 0;
|
private long queryTimeStart = 0;
|
|
protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
|
super.extractJobConfiguration(jobDetail);
|
JobDataMap dataMap = jobDetail.getJobDataMap();
|
_pgHost = dataMap.getString(PGHOST);
|
_pgDatabase = dataMap.getString(PGDATBASE);
|
_pgPort = dataMap.getString(PGPORT);
|
_pgSchema = dataMap.getString(PGSCHEMA);
|
_pgUsername = dataMap.getString(PGUSER);
|
_pgPassword = dataMap.getString(PGPASS);
|
_pgUseWKB = dataMap.getString(USEWKB);
|
|
Log logger = getLogger();
|
/*
|
logger.info("PGHOST=" + _myHost);
|
logger.info("PGDATBASE=" + _myDatabase);
|
logger.info("PGPORT=" + _myPort);
|
logger.info("PGSCHEMA=" + _mySchema);
|
logger.info("PGUSER=" + _myUsername);
|
logger.info("PGPASS=" + _myPassword);
|
logger.info("USEWKB=" + _myUseWKB);
|
*/
|
|
if (_pgHost == null) {
|
logger.warn("PGHOST is null");
|
throw new JobExecutionException("Unknown PostGIS host.");
|
}
|
if (_pgDatabase == null) {
|
logger.warn("PGDATABASE is null");
|
throw new JobExecutionException("Unknown PostGIS database.");
|
}
|
if (_pgPort == null) {
|
logger.warn("PGPORT is null");
|
throw new JobExecutionException("Unknown PostGIS port.");
|
}
|
if (_pgSchema == null) {
|
logger.warn("PGSCHEMA is null");
|
throw new JobExecutionException("Unknown PostGIS schema.");
|
}
|
if (_pgUsername == null) {
|
logger.warn("PGUSERNAME is null");
|
throw new JobExecutionException("Unknown PostGIS username.");
|
}
|
if (_pgPassword == null) {
|
logger.warn("PGPASSWORD is null");
|
throw new JobExecutionException("Unknown PostGIS password.");
|
}
|
|
Map<String, String> remote = new TreeMap<String, String>();
|
remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
|
// remote.put("charset", "UTF-8");
|
remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
|
remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
|
remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
|
remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
|
remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
|
// remote.put( "namespace", null);
|
pgProperties = remote;
|
}
|
|
@Override
|
public Log getLogger() {
|
return logger;
|
}
|
|
@Override
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
// Every job has its own job detail
|
JobDetail jobDetail = context.getJobDetail();
|
|
// The name is defined in the job definition
|
String jobName = jobDetail.getKey().getName();
|
|
// Log the time the job started
|
logger.info(jobName + " fired at " + new Date());
|
extractJobConfiguration(jobDetail);
|
|
createSourceDataStore();
|
if (getSourceDataStore() == null) {
|
logger.warn("Cannot connect source oracle database.");
|
throw new JobExecutionException("Cannot connect source oracle database.");
|
}
|
|
if (isProfileMode()) {
|
queryTime = 0;
|
}
|
|
long t1 = System.currentTimeMillis();
|
String targetSchemaName;
|
try {
|
logger.info("-- step:clearOutputDatabase --");
|
clearOutputDatabase();
|
|
logger.info("-- step:transformOracleDMMSDB --");
|
targetSchemaName = "";
|
|
OracleConvertPostGISJobContext jobContext =
|
(OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
|
isProfileMode(), isTransformed());
|
jobContext.setSourceDataStore(getSourceDataStore());
|
jobContext.setExecutionContext(context);
|
|
long tStep = System.currentTimeMillis();
|
|
fetchTPData(jobContext);
|
logger.info("TPC DIST:" + jobContext.getDistId() + ":" +
|
((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName()));
|
|
clearExchangeData(jobContext);
|
|
if (isProfileMode()) {
|
long tStepEnd = System.currentTimeMillis();
|
logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd);
|
}
|
|
tStep = System.currentTimeMillis();
|
|
if (isProfileMode()) {
|
long tStepEnd = System.currentTimeMillis();
|
logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd);
|
}
|
|
jobContext.closeOracleConnection();
|
|
long t2 = System.currentTimeMillis();
|
// public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
|
// SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
|
logTimeDiff("Total ", t1, t2);
|
|
} catch (SQLException e) {
|
disconnect();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException("Database error. " + e.getMessage(), e);
|
} catch (IOException ex) {
|
disconnect();
|
logger.warn(ex.getMessage(), ex);
|
throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
|
} finally {
|
disconnect();
|
}
|
logger.warn(jobName + " end at " + new Date());
|
}
|
|
private void clearExchangeData(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException {
|
Connection connection = jobContext.getOracleConnection();
|
|
ResultSet rsMeta = connection.getMetaData().getTables(null, "CMMS_POSTDB", "GEO_EXCHANGE",
|
new String[]{"TABLE"});
|
|
boolean found = false;
|
try {
|
while (rsMeta.next()) {
|
found = true;
|
break;
|
}
|
// } catch (SQLException e)
|
} finally {
|
if (rsMeta != null) {
|
rsMeta.close();
|
rsMeta = null;
|
}
|
}
|
|
if (!found) {
|
logger.info("Cannot Found GEO_EXCHANGE in CMMS_POSTDB.");
|
return;
|
}
|
|
Statement stmt = null;
|
try {
|
stmt = connection.createStatement();
|
int count = stmt.executeUpdate("DELETE FROM \"CMMS_POSTDB\".\"GEO_EXCHANGE\" WHERE ISEXCHANGE=1");
|
logger.info("DELETE GEO_EXCHANGE UPDATE SIZE=" + count);
|
} finally {
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
@Override
|
protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
|
return new OracleConvertPostGISJobContext(getDataPath(),
|
getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
|
}
|
|
private void logTimeDiff(String message, long tBefore, long tCurrent) {
|
logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
|
(((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
|
}
|
|
public DataStore getTargetDataStore() {
|
return targetDataStore;
|
}
|
|
protected void createTargetDataStore() throws JobExecutionException {
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
|
}
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
|
}
|
|
if (!dataStoreFactory.canProcess(pgProperties)) {
|
getLogger().warn("cannot process properties-");
|
throw new JobExecutionException("cannot process properties-");
|
}
|
try {
|
targetDataStore = dataStoreFactory.createDataStore(pgProperties);
|
} catch (IOException e) {
|
getLogger().warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
}
|
}
|
|
protected void disconnect() {
|
super.disconnect();
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
}
|
|
private String determineTargetSchemaName() throws IOException {
|
if (targetDataStore == null) return null;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
String targetSchema = null;
|
boolean needCreate = false;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
|
if (!rs.next()) needCreate = true;
|
if (needCreate) {
|
throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME);
|
}
|
rs.close();
|
rs = null;
|
|
StringBuilder sbSQL = new StringBuilder("SELECT ");
|
sbSQL.append("vsschema, vsstatus FROM ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append("ORDER BY vsid");
|
stmt = connection.createStatement();
|
rs = stmt.executeQuery(sbSQL.toString());
|
ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
|
int i = 0;
|
int current = -1;
|
while (rs.next()) {
|
Object[] values = new Object[2];
|
values[0] = rs.getString("vsschema");
|
values[1] = rs.getShort("vsstatus");
|
tmpSchemas.add(values);
|
if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
|
current = i;
|
}
|
i++;
|
}
|
|
if (current != -1) {
|
Object[] values = tmpSchemas.get(current);
|
targetSchema = (String) values[0];
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
return targetSchema;
|
}
|
|
public String encodeSchemaTableName(String schemaName, String tableName) {
|
return "\"" + schemaName + "\".\"" + tableName + "\"";
|
}
|
|
public final void accumulateQueryTime() {
|
queryTime += System.currentTimeMillis() - queryTimeStart;
|
}
|
|
public long getQueryTime() {
|
return queryTime;
|
}
|
|
public final void markQueryTime() {
|
queryTimeStart = System.currentTimeMillis();
|
}
|
|
public final void resetQueryTime() {
|
queryTime = 0;
|
}
|
|
private void clearOutputDatabase() {
|
}
|
}
|