package com.ximple.eofms.jobs; import java.io.FileWriter; import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.TreeMap; import au.com.bytecode.opencsv.CSVWriter; import au.com.bytecode.opencsv.ResultSetHelper; import au.com.bytecode.opencsv.ResultSetHelperService; import com.ximple.eofms.jobs.context.AbstractOracleJobContext; import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; import com.ximple.eofms.util.DefaultColorTable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; import org.geotools.data.Transaction; import org.geotools.data.jdbc.JDBCUtils; import org.geotools.data.postgis.PostgisNGDataStoreFactory; import org.geotools.jdbc.JDBCDataStore; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class OracleTransformColorOwner2CSVJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(OracleTransformColorOwner2CSVJob.class); public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA"; public static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC"; public static String FETCH_FDRCOLOR = "SELECT FRREDERID, COLOR FROM BASEDB.FEEDER"; public static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; private static final String PGHOST = "PGHOST"; private static final String PGDATBASE = "PGDATBASE"; private static final String PGPORT = "PGPORT"; private static final String PGSCHEMA = "PGSCHEMA"; private static final String PGUSER = "PGUSER"; private static final String PGPASS = "PGPASS"; private static final String USEWKB = "USEWKB"; private static final boolean useTpclidText = false; private static final int FETCHSIZE = 100; private static final int COMMITSIZE = 100; protected static class Pair { Object first; Object second; public Pair(Object first, Object second) { this.first = first; this.second = second; } } protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); protected String _pgHost; protected String _pgDatabase; protected String _pgPort; protected String _pgSchema; protected String _pgUsername; protected String _pgPassword; protected String _pgUseWKB; protected Map pgProperties; protected JDBCDataStore targetDataStore; private long queryTime = 0; private long queryTimeStart = 0; protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDATBASE); _pgPort = dataMap.getString(PGPORT); _pgSchema = dataMap.getString(PGSCHEMA); _pgUsername = dataMap.getString(PGUSER); _pgPassword = dataMap.getString(PGPASS); _pgUseWKB = dataMap.getString(USEWKB); Log logger = getLogger(); /* logger.info("PGHOST=" + _myHost); logger.info("PGDATBASE=" + _myDatabase); logger.info("PGPORT=" + _myPort); logger.info("PGSCHEMA=" + _mySchema); logger.info("PGUSER=" + _myUsername); logger.info("PGPASS=" + _myPassword); logger.info("USEWKB=" + _myUseWKB); */ if (_pgHost == null) { logger.warn("PGHOST is null"); throw new JobExecutionException("Unknown PostGIS host."); } if (_pgDatabase == null) { logger.warn("PGDATABASE is null"); throw new JobExecutionException("Unknown PostGIS database."); } if (_pgPort == null) { logger.warn("PGPORT is null"); throw new JobExecutionException("Unknown PostGIS port."); } if (_pgSchema == null) { logger.warn("PGSCHEMA is null"); throw new JobExecutionException("Unknown PostGIS schema."); } if (_pgUsername == null) { logger.warn("PGUSERNAME is null"); throw new JobExecutionException("Unknown PostGIS username."); } if (_pgPassword == null) { logger.warn("PGPASSWORD is null"); throw new JobExecutionException("Unknown PostGIS password."); } Map remote = new TreeMap(); remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); // remote.put("charset", "UTF-8"); remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); // remote.put( "namespace", null); pgProperties = remote; } @Override public Log getLogger() { return logger; } @Override public void execute(JobExecutionContext context) throws JobExecutionException { // Every job has its own job detail JobDetail jobDetail = context.getJobDetail(); // The name is defined in the job definition String jobName = jobDetail.getKey().getName(); // Log the time the job started logger.info(jobName + " fired at " + new Date()); extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); if (getSourceDataStore() == null) { logger.warn("Cannot connect source oracle database."); throw new JobExecutionException("Cannot connect source oracle database."); } if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } if (isProfileMode()) { queryTime = 0; } long t1 = System.currentTimeMillis(); String targetSchemaName; try { logger.info("-- step:clearOutputDatabase --"); clearOutputDatabase(); logger.info("-- step:transformOracleDMMSDB --"); targetSchemaName = determineTargetSchemaName(); OracleConvertPostGISJobContext jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, isProfileMode(), isTransformed()); jobContext.setSourceDataStore(getSourceDataStore()); jobContext.setExecutionContext(context); long tStep = System.currentTimeMillis(); fetchTPData(jobContext); logger.info("TPC DIST:" + jobContext.getDistId() + ":" + ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName())); mergeConnectivityOwner(jobContext); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd); } tStep = System.currentTimeMillis(); mergeDynamicColor(jobContext); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd); } jobContext.closeOracleConnection(); long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); logTimeDiff("Total ", t1, t2); } catch (SQLException e) { disconnect(); logger.warn(e.getMessage(), e); throw new JobExecutionException("Database error. " + e.getMessage(), e); } catch (IOException ex) { disconnect(); logger.warn(ex.getMessage(), ex); throw new JobExecutionException("IO error. " + ex.getMessage(), ex); } finally { disconnect(); } logger.warn(jobName + " end at " + new Date()); } /** * Connectivity (Connectivity) * * @param jobContext job context * @throws java.sql.SQLException sql exception */ protected void mergeConnectivityOwner(AbstractOracleJobContext jobContext) throws SQLException, IOException { Connection connection = jobContext.getOracleConnection(); boolean found = false; ResultSet rs = null; Statement stmt = null; try { String targetSchemaName = determineTargetSchemaName(); logger.info("target schema:" + targetSchemaName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_CONNFDR); rs.setFetchSize(FETCHSIZE); ResultSetHelper resultService = new ResultSetHelperService(); final String[] header = new String[] { "tid", "oid", "owner" }; CSVWriter writer = new CSVWriter(new FileWriter("featureowner.csv"), ','); writer.writeNext(header); while (rs.next()) { writer.writeNext(resultService.getColumnValues(rs)); } writer.flush(); writer.close(); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void mergeDynamicColor(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException { Connection connection = jobContext.getOracleConnection(); boolean found = false; ResultSet rs = null; Statement stmt = null; try { String targetSchemaName = determineTargetSchemaName(); logger.info("target schema:" + targetSchemaName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_COLORTAB); rs.setFetchSize(FETCHSIZE); ResultSetHelper resultService = new ResultSetHelperService(); DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); final String[] header = new String[] { "tid", "oid", "dyncolor" }; CSVWriter writer = new CSVWriter(new FileWriter("featurecolor.csv"), ','); // writer.writeAll(rs, true); writer.writeNext(header); while (rs.next()) { int colorId = rs.getInt(3); String[] values = resultService.getColumnValues(rs); String colorText = colorTable.getColorCode(colorId); values[2] = colorText; writer.writeNext(values); } writer.flush(); writer.close(); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void batchExecuteSQL(ArrayList sqlStmts) throws IOException { if (targetDataStore == null) return; Connection connection = null; Statement stmt = null; // ResultSet rs = null; int[] results = null; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); connection.setAutoCommit(false); stmt = connection.createStatement(); for (String sqlStmt : sqlStmts) { stmt.addBatch(sqlStmt); } results = stmt.executeBatch(); connection.commit(); } catch (SQLException e) { if (results != null) { } logger.warn(e.getMessage(), e); } finally { // JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } } private List fetchTargetTableList(String targetSchemaName, int cid) throws IOException { ArrayList result = new ArrayList(); if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); String[] types = {"TABLE"}; rs = connection.getMetaData().getTables(null, targetSchemaName, "fsc-" + cid +"%", types); while (rs.next()) { String tableName = rs.getString("TABLE_NAME"); logger.info("table:" + tableName); result.add(tableName); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } return result; //To change body of created methods use File | Settings | File Templates. } @Override protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { return new OracleConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); } private void logTimeDiff(String message, long tBefore, long tCurrent) { logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); } public DataStore getTargetDataStore() { return targetDataStore; } protected void createTargetDataStore() throws JobExecutionException { if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); } if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); } if (!dataStoreFactory.canProcess(pgProperties)) { getLogger().warn("cannot process properties-"); throw new JobExecutionException("cannot process properties-"); } try { targetDataStore = dataStoreFactory.createDataStore(pgProperties); } catch (IOException e) { getLogger().warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } protected void disconnect() { super.disconnect(); if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } } private String determineTargetSchemaName() throws IOException { if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; String targetSchema = null; boolean needCreate = false; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); if (!rs.next()) needCreate = true; if (needCreate) { throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME); } rs.close(); rs = null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vsschema, vsstatus FROM "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' '); sbSQL.append("ORDER BY vsid"); stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); ArrayList tmpSchemas = new ArrayList(); int i = 0; int current = -1; while (rs.next()) { Object[] values = new Object[2]; values[0] = rs.getString("vsschema"); values[1] = rs.getShort("vsstatus"); tmpSchemas.add(values); if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { current = i; } i++; } if (current != -1) { Object[] values = tmpSchemas.get(current); targetSchema = (String) values[0]; } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } return targetSchema; } public String encodeSchemaTableName(String schemaName, String tableName) { return "\"" + schemaName + "\".\"" + tableName + "\""; } public final void accumulateQueryTime() { queryTime += System.currentTimeMillis() - queryTimeStart; } public long getQueryTime() { return queryTime; } public final void markQueryTime() { queryTimeStart = System.currentTimeMillis(); } public final void resetQueryTime() { queryTime = 0; } private void clearOutputDatabase() { } }