package com.ximple.eofms.jobs;
|
|
import java.io.IOException;
|
import java.sql.Connection;
|
import java.sql.ResultSet;
|
import java.sql.SQLException;
|
import java.sql.Statement;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.TreeMap;
|
|
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
|
import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext;
|
import com.ximple.eofms.util.DefaultColorTable;
|
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.LogFactory;
|
import org.geotools.data.DataStore;
|
import org.geotools.data.Transaction;
|
import org.geotools.data.jdbc.JDBCUtils;
|
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
|
import org.geotools.jdbc.JDBCDataStore;
|
import org.quartz.JobDataMap;
|
import org.quartz.JobDetail;
|
import org.quartz.JobExecutionContext;
|
import org.quartz.JobExecutionException;
|
|
public class OracleTransformColorOwnerJob extends AbstractOracleDatabaseJob {
|
final static Log logger = LogFactory.getLog(OracleTransformColorOwnerJob.class);
|
|
public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA";
|
public static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC";
|
public static String FETCH_FDRCOLOR = "SELECT FRREDERID, COLOR FROM BASEDB.FEEDER";
|
public static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
|
|
private static final String PGHOST = "PGHOST";
|
private static final String PGDATBASE = "PGDATBASE";
|
private static final String PGPORT = "PGPORT";
|
private static final String PGSCHEMA = "PGSCHEMA";
|
private static final String PGUSER = "PGUSER";
|
private static final String PGPASS = "PGPASS";
|
private static final String USEWKB = "USEWKB";
|
|
private static final boolean useTpclidText = false;
|
|
private static final int FETCHSIZE = 30;
|
private static final int COMMITSIZE = 100;
|
|
protected static class Pair {
|
Object first;
|
Object second;
|
|
public Pair(Object first, Object second) {
|
this.first = first;
|
this.second = second;
|
}
|
}
|
|
protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
|
|
protected String _pgHost;
|
protected String _pgDatabase;
|
protected String _pgPort;
|
protected String _pgSchema;
|
protected String _pgUsername;
|
protected String _pgPassword;
|
protected String _pgUseWKB;
|
|
protected Map<String, String> pgProperties;
|
protected JDBCDataStore targetDataStore;
|
|
private long queryTime = 0;
|
private long queryTimeStart = 0;
|
|
protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
|
super.extractJobConfiguration(jobDetail);
|
JobDataMap dataMap = jobDetail.getJobDataMap();
|
_pgHost = dataMap.getString(PGHOST);
|
_pgDatabase = dataMap.getString(PGDATBASE);
|
_pgPort = dataMap.getString(PGPORT);
|
_pgSchema = dataMap.getString(PGSCHEMA);
|
_pgUsername = dataMap.getString(PGUSER);
|
_pgPassword = dataMap.getString(PGPASS);
|
_pgUseWKB = dataMap.getString(USEWKB);
|
|
Log logger = getLogger();
|
/*
|
logger.info("PGHOST=" + _myHost);
|
logger.info("PGDATBASE=" + _myDatabase);
|
logger.info("PGPORT=" + _myPort);
|
logger.info("PGSCHEMA=" + _mySchema);
|
logger.info("PGUSER=" + _myUsername);
|
logger.info("PGPASS=" + _myPassword);
|
logger.info("USEWKB=" + _myUseWKB);
|
*/
|
|
if (_pgHost == null) {
|
logger.warn("PGHOST is null");
|
throw new JobExecutionException("Unknown PostGIS host.");
|
}
|
if (_pgDatabase == null) {
|
logger.warn("PGDATABASE is null");
|
throw new JobExecutionException("Unknown PostGIS database.");
|
}
|
if (_pgPort == null) {
|
logger.warn("PGPORT is null");
|
throw new JobExecutionException("Unknown PostGIS port.");
|
}
|
if (_pgSchema == null) {
|
logger.warn("PGSCHEMA is null");
|
throw new JobExecutionException("Unknown PostGIS schema.");
|
}
|
if (_pgUsername == null) {
|
logger.warn("PGUSERNAME is null");
|
throw new JobExecutionException("Unknown PostGIS username.");
|
}
|
if (_pgPassword == null) {
|
logger.warn("PGPASSWORD is null");
|
throw new JobExecutionException("Unknown PostGIS password.");
|
}
|
|
Map<String, String> remote = new TreeMap<String, String>();
|
remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
|
// remote.put("charset", "UTF-8");
|
remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
|
remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
|
remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
|
remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
|
remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
|
// remote.put( "namespace", null);
|
pgProperties = remote;
|
}
|
|
@Override
|
public Log getLogger() {
|
return logger;
|
}
|
|
@Override
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
// Every job has its own job detail
|
JobDetail jobDetail = context.getJobDetail();
|
|
// The name is defined in the job definition
|
String jobName = jobDetail.getKey().getName();
|
|
// Log the time the job started
|
logger.info(jobName + " fired at " + new Date());
|
extractJobConfiguration(jobDetail);
|
|
createSourceDataStore();
|
createTargetDataStore();
|
if (getSourceDataStore() == null) {
|
logger.warn("Cannot connect source oracle database.");
|
throw new JobExecutionException("Cannot connect source oracle database.");
|
}
|
|
if (getTargetDataStore() == null) {
|
logger.warn("Cannot connect source postgreSQL database.");
|
throw new JobExecutionException("Cannot connect source postgreSQL database.");
|
}
|
|
if (isProfileMode()) {
|
queryTime = 0;
|
}
|
|
long t1 = System.currentTimeMillis();
|
String targetSchemaName;
|
try {
|
logger.info("-- step:clearOutputDatabase --");
|
clearOutputDatabase();
|
|
logger.info("-- step:transformOracleDMMSDB --");
|
targetSchemaName = determineTargetSchemaName();
|
|
OracleConvertPostGISJobContext jobContext =
|
(OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
|
isProfileMode(), isTransformed());
|
jobContext.setSourceDataStore(getSourceDataStore());
|
jobContext.setExecutionContext(context);
|
|
long tStep = System.currentTimeMillis();
|
|
fetchTPData(jobContext);
|
logger.info("TPC DIST:" + jobContext.getDistId() + ":" +
|
((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName()));
|
|
mergeConnectivityOwner(jobContext);
|
|
if (isProfileMode()) {
|
long tStepEnd = System.currentTimeMillis();
|
logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd);
|
}
|
|
tStep = System.currentTimeMillis();
|
mergeDynamicColor(jobContext);
|
|
if (isProfileMode()) {
|
long tStepEnd = System.currentTimeMillis();
|
logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd);
|
}
|
|
jobContext.closeOracleConnection();
|
|
long t2 = System.currentTimeMillis();
|
// public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
|
// SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
|
logTimeDiff("Total ", t1, t2);
|
|
} catch (SQLException e) {
|
disconnect();
|
logger.warn(e.getMessage(), e);
|
throw new JobExecutionException("Database error. " + e.getMessage(), e);
|
} catch (IOException ex) {
|
disconnect();
|
logger.warn(ex.getMessage(), ex);
|
throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
|
} finally {
|
disconnect();
|
}
|
logger.warn(jobName + " end at " + new Date());
|
}
|
|
/**
|
* Connectivity (Connectivity)
|
*
|
* @param jobContext job context
|
* @throws java.sql.SQLException sql exception
|
*/
|
protected void mergeConnectivityOwner(AbstractOracleJobContext jobContext) throws SQLException, IOException {
|
Connection connection = jobContext.getOracleConnection();
|
|
boolean found = false;
|
ResultSet rs = null;
|
Statement stmt = null;
|
try {
|
String targetSchemaName = determineTargetSchemaName();
|
logger.info("target schema:" + targetSchemaName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_CONNFDR);
|
rs.setFetchSize(50);
|
int lastClass = -1;
|
boolean changeType = false;
|
List<String> tables = null;
|
ArrayList<String> sqlBatchStmts = new ArrayList<String>();
|
final int MAX_BATCHSIZE = 50;
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int ownerId = rs.getInt(3);
|
if (lastClass != cid) {
|
logger.info("change type to :" + cid);
|
}
|
changeType = (lastClass != cid);
|
if (changeType) {
|
tables = fetchTargetTableList(targetSchemaName, cid);
|
if (tables == null)
|
logger.info("tables is null." + cid);
|
}
|
if (tables != null) {
|
for (String t : tables) {
|
String sqlStmt = generatrTargetOwnerSql(targetSchemaName, t, cid, oid, ownerId);
|
sqlBatchStmts.add(sqlStmt);
|
}
|
}
|
|
if (MAX_BATCHSIZE < sqlBatchStmts.size()) {
|
batchExecuteSQL(sqlBatchStmts);
|
count += sqlBatchStmts.size();
|
sqlBatchStmts.clear();
|
}
|
lastClass = cid;
|
}
|
|
if (!sqlBatchStmts.isEmpty()) {
|
batchExecuteSQL(sqlBatchStmts);
|
count += sqlBatchStmts.size();
|
}
|
logger.info("Execute Update Count=" + count);
|
// } catch (SQLException e)
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private String generatrTargetOwnerSql(String schemaName, String t, int cid, long oid, int ownerId) {
|
StringBuilder sb = new StringBuilder("UPDATE ");
|
sb.append(schemaName).append(".\"").append(t).append("\"");
|
sb.append(" SET fowner = ").append(ownerId);
|
sb.append(" WHERE oid=").append(oid);
|
return sb.toString();
|
}
|
|
private void updateTargetOwner(Connection connection,
|
String schemaName, String t, int cid, long oid, int ownerId)
|
throws SQLException, IOException {
|
if (connection == null) return;
|
Statement stmt = null;
|
ResultSet rs = null;
|
try {
|
stmt = connection.createStatement();
|
stmt.executeUpdate("UPDATE " + schemaName + "." + t + " SET fowner = " + ownerId + " WHERE oid=" + oid);
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private void mergeDynamicColor(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException {
|
Connection connection = jobContext.getOracleConnection();
|
|
boolean found = false;
|
ResultSet rs = null;
|
Statement stmt = null;
|
try {
|
String targetSchemaName = determineTargetSchemaName();
|
logger.info("target schema:" + targetSchemaName);
|
stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
rs = stmt.executeQuery(FETCH_COLORTAB);
|
rs.setFetchSize(50);
|
int lastClass = -1;
|
boolean changeType = false;
|
List<String> tables = null;
|
ArrayList<String> sqlBatchStmts = new ArrayList<String>();
|
final int MAX_BATCHSIZE = 50;
|
int count = 0;
|
while (rs.next()) {
|
int cid = rs.getInt(1);
|
long oid = rs.getLong(2);
|
int colorId = rs.getInt(3);
|
if (lastClass != cid) {
|
logger.info("change type to :" + cid);
|
}
|
changeType = (lastClass != cid);
|
if (changeType) {
|
tables = fetchTargetTableList(targetSchemaName, cid);
|
if (tables == null)
|
logger.info("tables is null." + cid);
|
}
|
if (tables != null) {
|
for (String t : tables) {
|
String sqlStmt = generatrTargetDynamicColorSql(targetSchemaName, t, cid, oid, colorId);
|
sqlBatchStmts.add(sqlStmt);
|
}
|
}
|
if (MAX_BATCHSIZE < sqlBatchStmts.size()) {
|
batchExecuteSQL(sqlBatchStmts);
|
count += sqlBatchStmts.size();
|
sqlBatchStmts.clear();
|
}
|
lastClass = cid;
|
}
|
if (!sqlBatchStmts.isEmpty()) {
|
batchExecuteSQL(sqlBatchStmts);
|
count += sqlBatchStmts.size();
|
}
|
logger.info("Execute Update Count=" + count);
|
// } catch (SQLException e)
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private String generatrTargetDynamicColorSql(String schemaName, String t, int cid, long oid, int colorId) {
|
DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
|
String colorText = colorTable.getColorCode(colorId);
|
StringBuilder sb = new StringBuilder("UPDATE ");
|
sb.append(schemaName).append(".\"").append(t).append("\"");
|
sb.append(" SET dyncolor = '").append(colorText).append("'");
|
sb.append(" WHERE oid=").append(oid);
|
return sb.toString();
|
}
|
|
private void updateTargetDynamicColor(Connection connection, String schemaName,
|
String t, int cid, long oid, int colorId) {
|
if (connection == null) return;
|
DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance();
|
Statement stmt = null;
|
ResultSet rs = null;
|
try {
|
stmt = connection.createStatement();
|
String colorText = colorTable.getColorCode(colorId);
|
stmt.executeUpdate("UPDATE " + schemaName + "." + t + " SET dyncolor = '" + colorText + "' WHERE oid=" + oid);
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
}
|
}
|
|
private void batchExecuteSQL(ArrayList<String> sqlStmts) throws IOException {
|
if (targetDataStore == null) return;
|
Connection connection = null;
|
Statement stmt = null;
|
// ResultSet rs = null;
|
int[] results = null;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
connection.setAutoCommit(false);
|
stmt = connection.createStatement();
|
for (String sqlStmt : sqlStmts) {
|
stmt.addBatch(sqlStmt);
|
}
|
results = stmt.executeBatch();
|
connection.commit();
|
} catch (SQLException e) {
|
if (results != null) {
|
}
|
logger.warn(e.getMessage(), e);
|
} finally {
|
// JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
}
|
|
|
private List<String> fetchTargetTableList(String targetSchemaName, int cid) throws IOException {
|
ArrayList<String> result = new ArrayList<String>();
|
if (targetDataStore == null) return null;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
String[] types = {"TABLE"};
|
rs = connection.getMetaData().getTables(null, targetSchemaName, "fsc-" + cid +"%", types);
|
while (rs.next()) {
|
String tableName = rs.getString("TABLE_NAME");
|
logger.info("table:" + tableName);
|
result.add(tableName);
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
|
return result; //To change body of created methods use File | Settings | File Templates.
|
}
|
|
|
@Override
|
protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
|
return new OracleConvertPostGISJobContext(getDataPath(),
|
getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
|
}
|
|
private void logTimeDiff(String message, long tBefore, long tCurrent) {
|
logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
|
(((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
|
}
|
|
public DataStore getTargetDataStore() {
|
return targetDataStore;
|
}
|
|
protected void createTargetDataStore() throws JobExecutionException {
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
|
}
|
|
if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
|
pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
|
}
|
|
if (!dataStoreFactory.canProcess(pgProperties)) {
|
getLogger().warn("cannot process properties-");
|
throw new JobExecutionException("cannot process properties-");
|
}
|
try {
|
targetDataStore = dataStoreFactory.createDataStore(pgProperties);
|
} catch (IOException e) {
|
getLogger().warn(e.getMessage(), e);
|
throw new JobExecutionException(e.getMessage(), e);
|
}
|
}
|
|
protected void disconnect() {
|
super.disconnect();
|
if (targetDataStore != null) {
|
targetDataStore.dispose();
|
targetDataStore = null;
|
}
|
}
|
|
private String determineTargetSchemaName() throws IOException {
|
if (targetDataStore == null) return null;
|
Connection connection = null;
|
Statement stmt = null;
|
ResultSet rs = null;
|
String targetSchema = null;
|
boolean needCreate = false;
|
try {
|
connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
|
rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
|
if (!rs.next()) needCreate = true;
|
if (needCreate) {
|
throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME);
|
}
|
rs.close();
|
rs = null;
|
|
StringBuilder sbSQL = new StringBuilder("SELECT ");
|
sbSQL.append("vsschema, vsstatus FROM ");
|
sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
|
sbSQL.append("ORDER BY vsid");
|
stmt = connection.createStatement();
|
rs = stmt.executeQuery(sbSQL.toString());
|
ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
|
int i = 0;
|
int current = -1;
|
while (rs.next()) {
|
Object[] values = new Object[2];
|
values[0] = rs.getString("vsschema");
|
values[1] = rs.getShort("vsstatus");
|
tmpSchemas.add(values);
|
if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
|
current = i;
|
}
|
i++;
|
}
|
|
if (current != -1) {
|
Object[] values = tmpSchemas.get(current);
|
targetSchema = (String) values[0];
|
}
|
} catch (SQLException e) {
|
logger.warn(e.getMessage(), e);
|
} finally {
|
JDBCUtils.close(rs);
|
JDBCUtils.close(stmt);
|
JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
|
}
|
return targetSchema;
|
}
|
|
public String encodeSchemaTableName(String schemaName, String tableName) {
|
return "\"" + schemaName + "\".\"" + tableName + "\"";
|
}
|
|
public final void accumulateQueryTime() {
|
queryTime += System.currentTimeMillis() - queryTimeStart;
|
}
|
|
public long getQueryTime() {
|
return queryTime;
|
}
|
|
public final void markQueryTime() {
|
queryTimeStart = System.currentTimeMillis();
|
}
|
|
public final void resetQueryTime() {
|
queryTime = 0;
|
}
|
|
private void clearOutputDatabase() {
|
}
|
}
|