From a5ef5555b7b88724c432eea7a68b75d688507017 Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Mon, 25 Nov 2013 00:29:39 +0800 Subject: [PATCH] add color table --- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java | 452 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 452 insertions(+), 0 deletions(-) diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java new file mode 100644 index 0000000..55f09c4 --- /dev/null +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java @@ -0,0 +1,452 @@ +package com.ximple.eofms.jobs; + +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import au.com.bytecode.opencsv.CSVWriter; +import com.ximple.eofms.jobs.context.AbstractOracleJobContext; +import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.geotools.data.DataStore; +import org.geotools.data.Transaction; +import org.geotools.data.jdbc.JDBCUtils; +import org.geotools.data.postgis.PostgisNGDataStoreFactory; +import org.geotools.jdbc.JDBCDataStore; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class OracleTransformColorOwner2CSVJob extends AbstractOracleDatabaseJob { + final static Log logger = LogFactory.getLog(OracleTransformColorOwner2CSVJob.class); + + public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA"; + public static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + public static String FETCH_FDRCOLOR = "SELECT FRREDERID, COLOR FROM BASEDB.FEEDER"; + public static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; + + private static final String PGHOST = "PGHOST"; + private static final String PGDATBASE = "PGDATBASE"; + private static final String PGPORT = "PGPORT"; + private static final String PGSCHEMA = "PGSCHEMA"; + private static final String PGUSER = "PGUSER"; + private static final String PGPASS = "PGPASS"; + private static final String USEWKB = "USEWKB"; + + private static final boolean useTpclidText = false; + + private static final int FETCHSIZE = 30; + private static final int COMMITSIZE = 100; + + protected static class Pair { + Object first; + Object second; + + public Pair(Object first, Object second) { + this.first = first; + this.second = second; + } + } + + protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); + + protected String _pgHost; + protected String _pgDatabase; + protected String _pgPort; + protected String _pgSchema; + protected String _pgUsername; + protected String _pgPassword; + protected String _pgUseWKB; + + protected Map<String, String> pgProperties; + protected JDBCDataStore targetDataStore; + + private long queryTime = 0; + private long queryTimeStart = 0; + + protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { + super.extractJobConfiguration(jobDetail); + JobDataMap dataMap = jobDetail.getJobDataMap(); + _pgHost = dataMap.getString(PGHOST); + _pgDatabase = dataMap.getString(PGDATBASE); + _pgPort = dataMap.getString(PGPORT); + _pgSchema = dataMap.getString(PGSCHEMA); + _pgUsername = dataMap.getString(PGUSER); + _pgPassword = dataMap.getString(PGPASS); + _pgUseWKB = dataMap.getString(USEWKB); + + Log logger = getLogger(); + /* + logger.info("PGHOST=" + _myHost); + logger.info("PGDATBASE=" + _myDatabase); + logger.info("PGPORT=" + _myPort); + logger.info("PGSCHEMA=" + _mySchema); + logger.info("PGUSER=" + _myUsername); + logger.info("PGPASS=" + _myPassword); + logger.info("USEWKB=" + _myUseWKB); + */ + + if (_pgHost == null) { + logger.warn("PGHOST is null"); + throw new JobExecutionException("Unknown PostGIS host."); + } + if (_pgDatabase == null) { + logger.warn("PGDATABASE is null"); + throw new JobExecutionException("Unknown PostGIS database."); + } + if (_pgPort == null) { + logger.warn("PGPORT is null"); + throw new JobExecutionException("Unknown PostGIS port."); + } + if (_pgSchema == null) { + logger.warn("PGSCHEMA is null"); + throw new JobExecutionException("Unknown PostGIS schema."); + } + if (_pgUsername == null) { + logger.warn("PGUSERNAME is null"); + throw new JobExecutionException("Unknown PostGIS username."); + } + if (_pgPassword == null) { + logger.warn("PGPASSWORD is null"); + throw new JobExecutionException("Unknown PostGIS password."); + } + + Map<String, String> remote = new TreeMap<String, String>(); + remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); + // remote.put("charset", "UTF-8"); + remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); + remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); + remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); + remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); + remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); + // remote.put( "namespace", null); + pgProperties = remote; + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + // Every job has its own job detail + JobDetail jobDetail = context.getJobDetail(); + + // The name is defined in the job definition + String jobName = jobDetail.getKey().getName(); + + // Log the time the job started + logger.info(jobName + " fired at " + new Date()); + extractJobConfiguration(jobDetail); + + createSourceDataStore(); + createTargetDataStore(); + if (getSourceDataStore() == null) { + logger.warn("Cannot connect source oracle database."); + throw new JobExecutionException("Cannot connect source oracle database."); + } + + if (getTargetDataStore() == null) { + logger.warn("Cannot connect source postgreSQL database."); + throw new JobExecutionException("Cannot connect source postgreSQL database."); + } + + if (isProfileMode()) { + queryTime = 0; + } + + long t1 = System.currentTimeMillis(); + String targetSchemaName; + try { + logger.info("-- step:clearOutputDatabase --"); + clearOutputDatabase(); + + logger.info("-- step:transformOracleDMMSDB --"); + targetSchemaName = determineTargetSchemaName(); + + OracleConvertPostGISJobContext jobContext = + (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + isProfileMode(), isTransformed()); + jobContext.setSourceDataStore(getSourceDataStore()); + jobContext.setExecutionContext(context); + + long tStep = System.currentTimeMillis(); + + fetchTPData(jobContext); + logger.info("TPC DIST:" + jobContext.getDistId() + ":" + + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); + + mergeConnectivityOwner(jobContext); + + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd); + } + + tStep = System.currentTimeMillis(); + mergeDynamicColor(jobContext); + + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd); + } + + jobContext.closeOracleConnection(); + + long t2 = System.currentTimeMillis(); + // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; + // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); + logTimeDiff("Total ", t1, t2); + + } catch (SQLException e) { + disconnect(); + logger.warn(e.getMessage(), e); + throw new JobExecutionException("Database error. " + e.getMessage(), e); + } catch (IOException ex) { + disconnect(); + logger.warn(ex.getMessage(), ex); + throw new JobExecutionException("IO error. " + ex.getMessage(), ex); + } finally { + disconnect(); + } + logger.warn(jobName + " end at " + new Date()); + } + + /** + * Connectivity (Connectivity) + * + * @param jobContext job context + * @throws java.sql.SQLException sql exception + */ + protected void mergeConnectivityOwner(AbstractOracleJobContext jobContext) throws SQLException, IOException { + Connection connection = jobContext.getOracleConnection(); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + try { + String targetSchemaName = determineTargetSchemaName(); + logger.info("target schema:" + targetSchemaName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + CSVWriter writer = new CSVWriter(new FileWriter("featureowner.csv"), ','); + writer.writeAll(rs, true); + writer.flush(); + writer.close(); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void mergeDynamicColor(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException { + Connection connection = jobContext.getOracleConnection(); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + try { + String targetSchemaName = determineTargetSchemaName(); + logger.info("target schema:" + targetSchemaName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + CSVWriter writer = new CSVWriter(new FileWriter("featurecolor.csv"), ','); + writer.writeAll(rs, true); + writer.flush(); + writer.close(); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void batchExecuteSQL(ArrayList<String> sqlStmts) throws IOException { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + // ResultSet rs = null; + int[] results = null; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + connection.setAutoCommit(false); + stmt = connection.createStatement(); + for (String sqlStmt : sqlStmts) { + stmt.addBatch(sqlStmt); + } + results = stmt.executeBatch(); + connection.commit(); + } catch (SQLException e) { + if (results != null) { + } + logger.warn(e.getMessage(), e); + } finally { + // JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + + private List<String> fetchTargetTableList(String targetSchemaName, int cid) throws IOException { + ArrayList<String> result = new ArrayList<String>(); + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + String[] types = {"TABLE"}; + rs = connection.getMetaData().getTables(null, targetSchemaName, "fsc-" + cid +"%", types); + while (rs.next()) { + String tableName = rs.getString("TABLE_NAME"); + logger.info("table:" + tableName); + result.add(tableName); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + + return result; //To change body of created methods use File | Settings | File Templates. + } + + + @Override + protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { + return new OracleConvertPostGISJobContext(getDataPath(), + getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); + } + + private void logTimeDiff(String message, long tBefore, long tCurrent) { + logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); + } + + public DataStore getTargetDataStore() { + return targetDataStore; + } + + protected void createTargetDataStore() throws JobExecutionException { + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); + } + + if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { + pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); + } + + if (!dataStoreFactory.canProcess(pgProperties)) { + getLogger().warn("cannot process properties-"); + throw new JobExecutionException("cannot process properties-"); + } + try { + targetDataStore = dataStoreFactory.createDataStore(pgProperties); + } catch (IOException e) { + getLogger().warn(e.getMessage(), e); + throw new JobExecutionException(e.getMessage(), e); + } + } + + protected void disconnect() { + super.disconnect(); + if (targetDataStore != null) { + targetDataStore.dispose(); + targetDataStore = null; + } + } + + private String determineTargetSchemaName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetSchema = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) { + throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME); + } + rs.close(); + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vsschema, vsstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vsid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vsschema"); + values[1] = rs.getShort("vsstatus"); + tmpSchemas.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current != -1) { + Object[] values = tmpSchemas.get(current); + targetSchema = (String) values[0]; + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetSchema; + } + + public String encodeSchemaTableName(String schemaName, String tableName) { + return "\"" + schemaName + "\".\"" + tableName + "\""; + } + + public final void accumulateQueryTime() { + queryTime += System.currentTimeMillis() - queryTimeStart; + } + + public long getQueryTime() { + return queryTime; + } + + public final void markQueryTime() { + queryTimeStart = System.currentTimeMillis(); + } + + public final void resetQueryTime() { + queryTime = 0; + } + + private void clearOutputDatabase() { + } +} -- Gitblit v0.0.0-SNAPSHOT