forked from geodmms/xdgnjobs

Dennis Kao
2013-11-25 a5ef5555b7b88724c432eea7a68b75d688507017
add color table
1 files added
452 ■■■■■ changed files
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java 452 ●●●●● patch | view | raw | blame | history
xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleTransformColorOwner2CSVJob.java
New file
@@ -0,0 +1,452 @@
package com.ximple.eofms.jobs;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import au.com.bytecode.opencsv.CSVWriter;
import com.ximple.eofms.jobs.context.AbstractOracleJobContext;
import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.geotools.data.DataStore;
import org.geotools.data.Transaction;
import org.geotools.data.jdbc.JDBCUtils;
import org.geotools.data.postgis.PostgisNGDataStoreFactory;
import org.geotools.jdbc.JDBCDataStore;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class OracleTransformColorOwner2CSVJob extends AbstractOracleDatabaseJob {
    final static Log logger = LogFactory.getLog(OracleTransformColorOwner2CSVJob.class);
    public static String FETCH_TPDATA = "SELECT TPID, TPNAME FROM BASEDB.TPDATA";
    public static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC";
    public static String FETCH_FDRCOLOR = "SELECT FRREDERID, COLOR FROM BASEDB.FEEDER";
    public static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC";
    private static final String PGHOST = "PGHOST";
    private static final String PGDATBASE = "PGDATBASE";
    private static final String PGPORT = "PGPORT";
    private static final String PGSCHEMA = "PGSCHEMA";
    private static final String PGUSER = "PGUSER";
    private static final String PGPASS = "PGPASS";
    private static final String USEWKB = "USEWKB";
    private static final boolean useTpclidText = false;
    private static final int FETCHSIZE = 30;
    private static final int COMMITSIZE = 100;
    protected static class Pair {
        Object first;
        Object second;
        public Pair(Object first, Object second) {
            this.first = first;
            this.second = second;
        }
    }
    protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory();
    protected String _pgHost;
    protected String _pgDatabase;
    protected String _pgPort;
    protected String _pgSchema;
    protected String _pgUsername;
    protected String _pgPassword;
    protected String _pgUseWKB;
    protected Map<String, String> pgProperties;
    protected JDBCDataStore targetDataStore;
    private long queryTime = 0;
    private long queryTimeStart = 0;
    protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException {
        super.extractJobConfiguration(jobDetail);
        JobDataMap dataMap = jobDetail.getJobDataMap();
        _pgHost = dataMap.getString(PGHOST);
        _pgDatabase = dataMap.getString(PGDATBASE);
        _pgPort = dataMap.getString(PGPORT);
        _pgSchema = dataMap.getString(PGSCHEMA);
        _pgUsername = dataMap.getString(PGUSER);
        _pgPassword = dataMap.getString(PGPASS);
        _pgUseWKB = dataMap.getString(USEWKB);
        Log logger = getLogger();
        /*
        logger.info("PGHOST=" + _myHost);
        logger.info("PGDATBASE=" + _myDatabase);
        logger.info("PGPORT=" + _myPort);
        logger.info("PGSCHEMA=" + _mySchema);
        logger.info("PGUSER=" + _myUsername);
        logger.info("PGPASS=" + _myPassword);
        logger.info("USEWKB=" + _myUseWKB);
        */
        if (_pgHost == null) {
            logger.warn("PGHOST is null");
            throw new JobExecutionException("Unknown PostGIS host.");
        }
        if (_pgDatabase == null) {
            logger.warn("PGDATABASE is null");
            throw new JobExecutionException("Unknown PostGIS database.");
        }
        if (_pgPort == null) {
            logger.warn("PGPORT is null");
            throw new JobExecutionException("Unknown PostGIS port.");
        }
        if (_pgSchema == null) {
            logger.warn("PGSCHEMA is null");
            throw new JobExecutionException("Unknown PostGIS schema.");
        }
        if (_pgUsername == null) {
            logger.warn("PGUSERNAME is null");
            throw new JobExecutionException("Unknown PostGIS username.");
        }
        if (_pgPassword == null) {
            logger.warn("PGPASSWORD is null");
            throw new JobExecutionException("Unknown PostGIS password.");
        }
        Map<String, String> remote = new TreeMap<String, String>();
        remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis");
        // remote.put("charset", "UTF-8");
        remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost);
        remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort);
        remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase);
        remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername);
        remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword);
        // remote.put( "namespace", null);
        pgProperties = remote;
    }
    @Override
    public Log getLogger() {
        return logger;
    }
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // Every job has its own job detail
        JobDetail jobDetail = context.getJobDetail();
        // The name is defined in the job definition
        String jobName = jobDetail.getKey().getName();
        // Log the time the job started
        logger.info(jobName + " fired at " + new Date());
        extractJobConfiguration(jobDetail);
        createSourceDataStore();
        createTargetDataStore();
        if (getSourceDataStore() == null) {
            logger.warn("Cannot connect source oracle database.");
            throw new JobExecutionException("Cannot connect source oracle database.");
        }
        if (getTargetDataStore() == null) {
            logger.warn("Cannot connect source postgreSQL database.");
            throw new JobExecutionException("Cannot connect source postgreSQL database.");
        }
        if (isProfileMode()) {
            queryTime = 0;
        }
        long t1 = System.currentTimeMillis();
        String targetSchemaName;
        try {
            logger.info("-- step:clearOutputDatabase --");
            clearOutputDatabase();
            logger.info("-- step:transformOracleDMMSDB --");
            targetSchemaName = determineTargetSchemaName();
            OracleConvertPostGISJobContext jobContext =
                (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath,
                    isProfileMode(), isTransformed());
            jobContext.setSourceDataStore(getSourceDataStore());
            jobContext.setExecutionContext(context);
            long tStep = System.currentTimeMillis();
            fetchTPData(jobContext);
            logger.info("TPC DIST:" + jobContext.getDistId() + ":" +
                ((jobContext.getDistName() == null) ? "NULL" : jobContext.getDistName()));
            mergeConnectivityOwner(jobContext);
            if (isProfileMode()) {
                long tStepEnd = System.currentTimeMillis();
                logTimeDiff("Profile-Merge Connectivity Owner", tStep, tStepEnd);
            }
            tStep = System.currentTimeMillis();
            mergeDynamicColor(jobContext);
            if (isProfileMode()) {
                long tStepEnd = System.currentTimeMillis();
                logTimeDiff("Profile-Merge ColorTable", tStep, tStepEnd);
            }
            jobContext.closeOracleConnection();
            long t2 = System.currentTimeMillis();
            // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss";
            // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW);
            logTimeDiff("Total ", t1, t2);
        } catch (SQLException e) {
            disconnect();
            logger.warn(e.getMessage(), e);
            throw new JobExecutionException("Database error. " + e.getMessage(), e);
        } catch (IOException ex) {
            disconnect();
            logger.warn(ex.getMessage(), ex);
            throw new JobExecutionException("IO error. " + ex.getMessage(), ex);
        } finally {
            disconnect();
        }
        logger.warn(jobName + " end at " + new Date());
    }
    /**
     * Connectivity (Connectivity)
     *
     * @param jobContext job context
     * @throws java.sql.SQLException sql exception
     */
    protected void mergeConnectivityOwner(AbstractOracleJobContext jobContext) throws SQLException, IOException {
        Connection connection = jobContext.getOracleConnection();
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        try {
            String targetSchemaName = determineTargetSchemaName();
            logger.info("target schema:" + targetSchemaName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_CONNFDR);
            rs.setFetchSize(50);
            CSVWriter writer = new CSVWriter(new FileWriter("featureowner.csv"), ',');
            writer.writeAll(rs, true);
            writer.flush();
            writer.close();
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void mergeDynamicColor(OracleConvertPostGISJobContext jobContext) throws SQLException, IOException {
        Connection connection = jobContext.getOracleConnection();
        boolean found = false;
        ResultSet rs = null;
        Statement stmt = null;
        try {
            String targetSchemaName = determineTargetSchemaName();
            logger.info("target schema:" + targetSchemaName);
            stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            rs = stmt.executeQuery(FETCH_COLORTAB);
            rs.setFetchSize(50);
            CSVWriter writer = new CSVWriter(new FileWriter("featurecolor.csv"), ',');
            writer.writeAll(rs, true);
            writer.flush();
            writer.close();
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
        }
    }
    private void batchExecuteSQL(ArrayList<String> sqlStmts) throws IOException {
        if (targetDataStore == null) return;
        Connection connection = null;
        Statement stmt = null;
        // ResultSet rs = null;
        int[] results = null;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            connection.setAutoCommit(false);
            stmt = connection.createStatement();
            for (String sqlStmt : sqlStmts) {
                stmt.addBatch(sqlStmt);
            }
            results = stmt.executeBatch();
            connection.commit();
        } catch (SQLException e) {
            if (results != null) {
            }
            logger.warn(e.getMessage(), e);
        } finally {
            // JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
    }
    private List<String> fetchTargetTableList(String targetSchemaName, int cid) throws IOException {
        ArrayList<String> result = new ArrayList<String>();
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            String[] types = {"TABLE"};
            rs = connection.getMetaData().getTables(null, targetSchemaName, "fsc-" + cid +"%", types);
            while (rs.next()) {
                String tableName = rs.getString("TABLE_NAME");
                logger.info("table:" + tableName);
                result.add(tableName);
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return result;  //To change body of created methods use File | Settings | File Templates.
    }
    @Override
    protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) {
        return new OracleConvertPostGISJobContext(getDataPath(),
            getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform);
    }
    private void logTimeDiff(String message, long tBefore, long tCurrent) {
        logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " +
            (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec");
    }
    public DataStore getTargetDataStore() {
        return targetDataStore;
    }
    protected void createTargetDataStore() throws JobExecutionException {
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5");
        }
        if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) {
            pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1");
        }
        if (!dataStoreFactory.canProcess(pgProperties)) {
            getLogger().warn("cannot process properties-");
            throw new JobExecutionException("cannot process properties-");
        }
        try {
            targetDataStore = dataStoreFactory.createDataStore(pgProperties);
        } catch (IOException e) {
            getLogger().warn(e.getMessage(), e);
            throw new JobExecutionException(e.getMessage(), e);
        }
    }
    protected void disconnect() {
        super.disconnect();
        if (targetDataStore != null) {
            targetDataStore.dispose();
            targetDataStore = null;
        }
    }
    private String determineTargetSchemaName() throws IOException {
        if (targetDataStore == null) return null;
        Connection connection = null;
        Statement stmt = null;
        ResultSet rs = null;
        String targetSchema = null;
        boolean needCreate = false;
        try {
            connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT);
            rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"});
            if (!rs.next()) needCreate = true;
            if (needCreate) {
                throw new IOException("cannot found " + DataReposVersionManager.XGVERSIONTABLE_NAME);
            }
            rs.close();
            rs = null;
            StringBuilder sbSQL = new StringBuilder("SELECT ");
            sbSQL.append("vsschema, vsstatus FROM ");
            sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME)).append(' ');
            sbSQL.append("ORDER BY vsid");
            stmt = connection.createStatement();
            rs = stmt.executeQuery(sbSQL.toString());
            ArrayList<Object[]> tmpSchemas = new ArrayList<Object[]>();
            int i = 0;
            int current = -1;
            while (rs.next()) {
                Object[] values = new Object[2];
                values[0] = rs.getString("vsschema");
                values[1] = rs.getShort("vsstatus");
                tmpSchemas.add(values);
                if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) {
                    current = i;
                }
                i++;
            }
            if (current != -1) {
                Object[] values = tmpSchemas.get(current);
                targetSchema = (String) values[0];
            }
        } catch (SQLException e) {
            logger.warn(e.getMessage(), e);
        } finally {
            JDBCUtils.close(rs);
            JDBCUtils.close(stmt);
            JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null);
        }
        return targetSchema;
    }
    public String encodeSchemaTableName(String schemaName, String tableName) {
        return "\"" + schemaName + "\".\"" + tableName + "\"";
    }
    public final void accumulateQueryTime() {
        queryTime += System.currentTimeMillis() - queryTimeStart;
    }
    public long getQueryTime() {
        return queryTime;
    }
    public final void markQueryTime() {
        queryTimeStart = System.currentTimeMillis();
    }
    public final void resetQueryTime() {
        queryTime = 0;
    }
    private void clearOutputDatabase() {
    }
}