package com.ximple.eofms.jobs; import java.io.IOException; import java.io.PushbackReader; import java.io.StringReader; import java.net.URL; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; import com.ximple.eofms.geoserver.config.XGeosDataConfig; import com.ximple.eofms.geoserver.config.XGeosDataConfigMapping; import com.ximple.eofms.jobs.context.AbstractOracleJobContext; import com.ximple.eofms.jobs.context.postgis.OracleConvertPostGISJobContext; import com.ximple.eofms.util.ConnectivityDirectionEnum; import com.ximple.eofms.util.DefaultColorTable; import com.ximple.eofms.util.PrintfFormat; import com.ximple.eofms.util.XGeosConfigDigesterUtils; import org.apache.commons.collections.MultiMap; import org.apache.commons.dbcp.DelegatingConnection; import org.apache.commons.digester3.Digester; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.geotools.data.DataStore; import org.geotools.data.Transaction; import org.geotools.data.jdbc.JDBCUtils; import org.geotools.data.postgis.PostgisNGDataStoreFactory; import org.geotools.jdbc.JDBCDataStore; import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.xml.sax.SAXException; public class OracleConvertThemes2PostGISJob extends AbstractOracleDatabaseJob { final static Log logger = LogFactory.getLog(GeoserverIntegrateConfigJob.class); private static final String SKIPCONFIGJOB = "SKIPCONFIGJOB"; private static final String MASTERMODE = "MASTERMODE"; private static final String EPSG = "EPSG:"; private static final String XGEOSDATACONFIG_PATH = "xgeosdataconfig.xml"; // private static final int MAGIC_BLOCKSIZE = (64 * 1024 * 1024) - (32 * 1024); private static final String QUERY_VIEWDEFSQL = "SELECT table_name, view_definition FROM information_schema.views " + "WHERE table_schema = ? AND table_name LIKE "; private static final String CREATE_VIEWSQL = "CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\".\"%s\""; private static final String EXTRAWHERE_VIEWSQL = " WHERE \"%s\".level = %s AND \"%s\".symweight = %s"; private static final String ALTER_VIEWSQL = "ALTER TABLE \"%s\" OWNER TO "; // private static final String GRANT_VIEWSQL = "GRANT SELECT ON TABLE \"%s\" TO public"; private static final int SRSID_TWD97_ZONE119 = 3825; private static final int SRSID_TWD97_ZONE121 = 3826; public static final String DEFAULT_STORENAME = "pgDMMS"; public static final String DEFAULT_GEODMMS_NAMESPACE = "http://tpc.ximple.com.tw/geodmms"; private static final String PGHOST = "PGHOST"; private static final String PGDATBASE = "PGDATBASE"; private static final String PGPORT = "PGPORT"; private static final String PGSCHEMA = "PGSCHEMA"; private static final String PGUSER = "PGUSER"; private static final String PGPASS = "PGPASS"; private static final String USEWKB = "USEWKB"; private static final boolean useTpclidText = false; private static final int FETCHSIZE = 30; private static final int COMMITSIZE = 100; public static final String FORWARDFLOW_MARK = "shape://ccarrow"; public static final String BACKFLOW_MARK = "shape://rccarrow"; public static final String UNFLOW_MARK = "shape://backslash"; public static final String NONFLOW_MARK = "shape://slash"; private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1, DIR FROM BASEDB.CONNECTIVITY ORDER BY FSC"; private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR WHERE TAG_BCOMPID = 0 ORDER BY TAG_SFSC"; private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; public static final String FOWNER_SUFFIX = "_fowner"; private static XGeosDataConfigMapping xgeosDataConfigMapping = null; protected JDBCDataStore targetDataStore; protected Map pgProperties; protected static PostgisNGDataStoreFactory dataStoreFactory = new PostgisNGDataStoreFactory(); protected String _pgHost; protected String _pgDatabase; protected String _pgPort; protected String _pgSchema; protected String _pgUsername; protected String _pgPassword; protected String _pgUseWKB; private long queryTime = 0; private long queryTimeStart = 0; private String currentThemeTable = null; private Short currentThemeStatus = -1; public Log getLogger() { return logger; } protected AbstractOracleJobContext prepareJobContext(String targetSchemaName, String filterPath, boolean profileMode, boolean useTransform) { return new OracleConvertPostGISJobContext(getDataPath(), getTargetDataStore(), targetSchemaName, filterPath, profileMode, useTransform); } protected void extractJobConfiguration(JobDetail jobDetail) throws JobExecutionException { super.extractJobConfiguration(jobDetail); JobDataMap dataMap = jobDetail.getJobDataMap(); _pgHost = dataMap.getString(PGHOST); _pgDatabase = dataMap.getString(PGDATBASE); _pgPort = dataMap.getString(PGPORT); _pgSchema = dataMap.getString(PGSCHEMA); _pgUsername = dataMap.getString(PGUSER); _pgPassword = dataMap.getString(PGPASS); _pgUseWKB = dataMap.getString(USEWKB); Log logger = getLogger(); if (_pgHost == null) { logger.warn("PGHOST is null"); throw new JobExecutionException("Unknown PostGIS host."); } if (_pgDatabase == null) { logger.warn("PGDATABASE is null"); throw new JobExecutionException("Unknown PostGIS database."); } if (_pgPort == null) { logger.warn("PGPORT is null"); throw new JobExecutionException("Unknown PostGIS port."); } if (_pgSchema == null) { logger.warn("PGSCHEMA is null"); throw new JobExecutionException("Unknown PostGIS schema."); } if (_pgUsername == null) { logger.warn("PGUSERNAME is null"); throw new JobExecutionException("Unknown PostGIS username."); } if (_pgPassword == null) { logger.warn("PGPASSWORD is null"); throw new JobExecutionException("Unknown PostGIS password."); } Map remote = new TreeMap(); remote.put(PostgisNGDataStoreFactory.DBTYPE.key, "postgis"); // remote.put("charset", "UTF-8"); remote.put(PostgisNGDataStoreFactory.HOST.key, _pgHost); remote.put(PostgisNGDataStoreFactory.PORT.key, _pgPort); remote.put(PostgisNGDataStoreFactory.DATABASE.key, _pgDatabase); remote.put(PostgisNGDataStoreFactory.USER.key, _pgUsername); remote.put(PostgisNGDataStoreFactory.PASSWD.key, _pgPassword); // remote.put( "namespace", null); pgProperties = remote; } protected XGeosDataConfigMapping getConfigMapping() { if (xgeosDataConfigMapping == null) { Digester digester = XGeosConfigDigesterUtils.getXGeosConfigDigester(); final URL configDataURL = XGeosDataConfigMapping.class.getResource(XGEOSDATACONFIG_PATH); try { xgeosDataConfigMapping = (XGeosDataConfigMapping) digester.parse(configDataURL); } catch (IOException e) { logger.warn(e.getMessage(), e); } catch (SAXException e) { logger.warn(e.getMessage(), e); } } return xgeosDataConfigMapping; } private void logTimeDiff(String message, long tBefore, long tCurrent) { logger.warn(message + ":use time = " + ((int) ((tCurrent - tBefore) / 60000.0)) + " min - " + (((int) ((tCurrent - tBefore) % 60000.0)) / 1000) + " sec"); } @Override public void execute(JobExecutionContext context) throws JobExecutionException { // Every job has its own job detail JobDetail jobDetail = context.getJobDetail(); // The name is defined in the job definition String jobName = jobDetail.getKey().getName(); // Log the time the job started logger.info(jobName + " fired at " + new Date()); extractJobConfiguration(jobDetail); createSourceDataStore(); createTargetDataStore(); if (getSourceDataStore() == null) { logger.warn("Cannot connect source oracle database."); throw new JobExecutionException("Cannot connect source oracle database."); } if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } if (isProfileMode()) { queryTime = 0; } long t1 = System.currentTimeMillis(); String targetThemeTable; try { logger.info("-- step:clearOutputDatabase --"); OracleConvertPostGISJobContext jobContext = null; if (checkConvertPWThemes()) { targetThemeTable = determineTargetThemeTableName(); jobContext = (OracleConvertPostGISJobContext) prepareJobContext("public", _filterPath, isProfileMode(), isTransformed()); jobContext.setSourceDataStore(getSourceDataStore()); jobContext.setElementLogging(checkElementLogging()); jobContext.setExecutionContext(context); long tStep = System.currentTimeMillis(); if (!convertPowerOwnerThemeWithCopyAPI(jobContext, targetThemeTable)) { convertPowerOwnerTheme(jobContext, targetThemeTable); } if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); } tStep = System.currentTimeMillis(); if (!convertDynamicColorThemeWithCopyAPI(jobContext, targetThemeTable)) convertDynamicColorTheme(jobContext, targetThemeTable); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); } jobContext.closeOracleConnection(); updatePWThemeStatusToReady(targetThemeTable); } long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); logTimeDiff("Total ", t1, t2); } catch (IOException ex) { disconnect(); logger.warn(ex.getMessage(), ex); throw new JobExecutionException("IO error. " + ex.getMessage(), ex); } finally { disconnect(); } logger.warn(jobName + " end at " + new Date()); createTargetDataStore(); if (getTargetDataStore() == null) { logger.warn("Cannot connect source postgreSQL database."); throw new JobExecutionException("Cannot connect source postgreSQL database."); } try { logger.info("-- step:resetThemesViewMapping --"); long tStep = System.currentTimeMillis(); resetThemesViewMapping(context); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-resetThemesViewMapping", tStep, tStepEnd); } logger.info("-- step:resetGeoServerConfig --"); tStep = System.currentTimeMillis(); // resetGeoServerConfig(jobExecutionContext); if (isProfileMode()) { long tStepEnd = System.currentTimeMillis(); logTimeDiff("Profile-resetGeoServerConfig", tStep, tStepEnd); } } finally { disconnect(); } } /** * 重新建立所有重新建立所有PostGIS中的資料庫視景 * * @param executionContext 批次執行的關係 */ private void resetThemesViewMapping(JobExecutionContext executionContext) throws JobExecutionException { assert executionContext != null; Connection connection = null; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); String ownerName = _pgUsername; String currentTargetThemesName = retrieveCurrentThemeName(connection, DataReposVersionManager.VSSTATUS_READY); if (currentTargetThemesName == null) { logger.info("Cannot found themes that status is VSSTATUS_READY[" + DataReposVersionManager.VSSTATUS_READY + "]"); return; } ArrayList realTableNames = new ArrayList(); retrieveAllRealTableName(connection, currentTargetThemesName, realTableNames); resetThemesBaseView(connection, ownerName, currentTargetThemesName); if (currentThemeTable == null) { transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY, DataReposVersionManager.VSSTATUS_LINKVIEW, false); } else { transferThemesVersionStatus(DataReposVersionManager.VSSTATUS_READY, currentThemeStatus, true); } /* updateCurrentThemeStatus(connection, currentTargetThemesName, DataReposVersionManager.VSSTATUS_LINKVIEW); */ // String[] featureNames = dataStore.getTypeNames(); // logger.info("featureNames[] size = " + featureNames.length); } catch (IOException e) { logger.warn(e.getMessage(), e); } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { if (connection != null) JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); // if (dataStore != null) dataStore.dispose(); } } private void retrieveAllRealTableName(Connection connection, String targetSchema, ArrayList realTableNames) throws SQLException { ResultSet rsMeta = null; try { rsMeta = connection.getMetaData().getTables("", targetSchema, "fsc%", new String[]{"TABLE"}); while (rsMeta.next()) { String tableName = rsMeta.getString(3); realTableNames.add(tableName); } rsMeta.close(); rsMeta = null; rsMeta = connection.getMetaData().getTables("", targetSchema, "index%", new String[]{"TABLE"}); while (rsMeta.next()) { String tableName = rsMeta.getString(3); realTableNames.add(tableName); } rsMeta.close(); rsMeta = null; rsMeta = connection.getMetaData().getTables("", targetSchema, "lndtpc%", new String[]{"TABLE"}); while (rsMeta.next()) { String tableName = rsMeta.getString(3); realTableNames.add(tableName); } } finally { if (rsMeta != null) rsMeta.close(); } } private void resetPostgisDataView(Connection connection, HashMap viewDefs, String ownerName, String schemaName, String tableName) throws SQLException { String[] splits = tableName.split("-"); if (splits.length > 3) { // feature table StringBuilder viewBuilder = new StringBuilder(); viewBuilder.append(splits[0]); viewBuilder.append('-'); viewBuilder.append(splits[1]); viewBuilder.append('-'); viewBuilder.append(splits[2]); viewBuilder.append(splits[3]); String viewName = viewBuilder.toString(); if (viewDefs.containsKey(viewName)) { String viewDef = viewDefs.get(viewName); int pos = viewDef.indexOf("FROM"); String subView = viewDef.substring(pos + 4); // String[] viewSources = subView.split("\\."); String[] viewSources = subView.split("(\\.\"|\")"); if (!viewSources[0].equalsIgnoreCase(schemaName)) { createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); } } else { createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); } } else { splits = tableName.split("_"); if (splits.length > 0) { StringBuilder viewBuilder = new StringBuilder(); viewBuilder.append(splits[0]); if (splits.length > 1) viewBuilder.append(splits[1]); if (splits.length > 2) viewBuilder.append(splits[2]); String viewName = viewBuilder.toString(); if (viewDefs.containsKey(viewName)) { String viewDef = viewDefs.get(viewName); int pos = viewDef.indexOf("FROM"); String subView = viewDef.substring(pos + 4); String[] viewSources = subView.split("(\\.\"|\")"); if (!viewSources[0].equalsIgnoreCase(schemaName)) { createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); } } else { createOrReplaceView(connection, schemaName, tableName, viewName, ownerName); } } } } private void resetThemesBaseView(Connection connection, String ownerName, String currentThemesName) throws SQLException { String viewName = "xpwtheme" + FDYNCOLOR_SUFFIX; String tableName = currentThemesName + FDYNCOLOR_SUFFIX; PrintfFormat pf = new PrintfFormat("CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\""); String sql = pf.sprintf(new Object[]{viewName, tableName}); Statement stmt = connection.createStatement(); try { stmt.execute(sql); pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(viewName); stmt.execute(sql); viewName = "xpwtheme" + FOWNER_SUFFIX; tableName = currentThemesName + FOWNER_SUFFIX; pf = new PrintfFormat("CREATE OR REPLACE VIEW \"%s\" AS SELECT * FROM \"%s\""); sql = pf.sprintf(new Object[]{viewName, tableName}); stmt.execute(sql); pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(viewName); stmt.execute(sql); } catch (SQLException e) { // logger.warn(e.getMessage(), e); logger.info(sql == null ? "SQL=NULL" : "SQL=" + sql); throw e; } finally { stmt.close(); } } private void resetThemesPostgisDataView(Connection connection, String ownerName, String currentSchema, String viewName) throws SQLException { String themeViewName = viewName + "-oms"; // PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); // String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); ResultSet rs = null; Statement stmt = connection.createStatement(); try { StringBuilder sbSQL = new StringBuilder("CREATE OR REPLACE VIEW \""); sbSQL.append(themeViewName).append("\" AS SELECT "); rs = connection.getMetaData().getColumns(null, currentSchema, viewName, "%"); while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); sbSQL.append("t." + fieldName).append(", "); } sbSQL.append("fc.dyncolor, fo.fowner FROM "); if (currentSchema != null) sbSQL.append("\"").append(currentSchema).append("\".\"").append(viewName).append("\" AS t,"); else sbSQL.append("\"").append(viewName).append("\" AS t,"); sbSQL.append("xpwtheme").append(FDYNCOLOR_SUFFIX).append(" AS fc,"); sbSQL.append("xpwtheme").append(FOWNER_SUFFIX).append(" AS fo WHERE "); sbSQL.append("t.tid = fc.tid AND t.oid = fc.oid AND "); sbSQL.append("t.tid = fo.tid AND t.oid = fo.oid"); // sbSQL.delete(sbSQL.length() - 2, sbSQL.length()); String sql = sbSQL.toString(); stmt.execute(sql); sbSQL.delete(0, sbSQL.length()); PrintfFormat pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(themeViewName); stmt.execute(sql); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void resetFlowThemesPostgisDataView(Connection connection, String ownerName, String currentSchema, String viewName) throws SQLException { String themeViewName = viewName + "-flow-oms"; ResultSet rs = null; Statement stmt = connection.createStatement(); try { StringBuilder sbSQL = new StringBuilder("CREATE OR REPLACE VIEW \""); sbSQL.append(themeViewName).append("\" AS SELECT "); rs = connection.getMetaData().getColumns(null, currentSchema, viewName, "%"); while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); sbSQL.append("t." + fieldName).append(", "); } sbSQL.append("fc.dyncolor, fo.fowner, fo.flow FROM "); if (currentSchema != null) sbSQL.append("\"").append(currentSchema).append("\".\"").append(viewName).append("\" AS t,"); else sbSQL.append("\"").append(viewName).append("\" AS t,"); sbSQL.append("xpwtheme").append(FDYNCOLOR_SUFFIX).append(" AS fc,"); sbSQL.append("xpwtheme").append(FOWNER_SUFFIX).append(" AS fo WHERE "); sbSQL.append("t.tid = fc.tid AND t.oid = fc.oid AND "); sbSQL.append("t.tid = fo.tid AND t.oid = fo.oid"); // sbSQL.delete(sbSQL.length() - 2, sbSQL.length()); String sql = sbSQL.toString(); stmt.execute(sql); sbSQL.delete(0, sbSQL.length()); PrintfFormat pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(themeViewName); stmt.execute(sql); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private HashMap retrieveViewDef(Connection connection, String schemaName, String tablePattern) throws SQLException { PreparedStatement stmt = connection.prepareStatement(QUERY_VIEWDEFSQL + "'" + tablePattern + "'"); stmt.setString(1, schemaName); // stmt.setString(2, tablePattern); HashMap result = new HashMap(); ResultSet rs = stmt.executeQuery(); while (rs.next()) { String tableName = rs.getString(1); String viewDef = rs.getString(2); result.put(tableName, viewDef); } rs.close(); stmt.close(); return result; } private void createOrReplaceView(Connection connection, String schemaName, String tableName, String viewName, String ownerName) throws SQLException { PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); Statement stmt = connection.createStatement(); try { stmt.execute(sql); pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(viewName); stmt.execute(sql); } catch (SQLException e) { // logger.warn(e.getMessage(), e); logger.info(sql == null ? "SQL=NULL" : "SQL=" + sql); throw e; } finally { stmt.close(); } // connection.commit(); } private void createOrReplaceExtraView(Connection connection, String schemaName, String tableName, String viewName, String ownerName, XGeosDataConfig xgeosConfig) throws SQLException { PrintfFormat pf = new PrintfFormat(CREATE_VIEWSQL); String sql = pf.sprintf(new Object[]{viewName, schemaName, tableName}); PrintfFormat pfWhere = new PrintfFormat(EXTRAWHERE_VIEWSQL); sql += pfWhere.sprintf(new String[]{tableName, Short.toString(xgeosConfig.getLEV()), tableName, Short.toString(xgeosConfig.getWEIGHT())}); Statement stmt = connection.createStatement(); stmt.execute(sql); pf = new PrintfFormat(ALTER_VIEWSQL + ownerName); sql = pf.sprintf(viewName); stmt.execute(sql); stmt.close(); // connection.commit(); } private Timestamp retrieveCurrentThemeTimestamp(Connection connection, short status) throws SQLException { StringBuilder sbSQL = new StringBuilder("SELECT vpttimestamp, vptname, vptstatus FROM "); sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME); sbSQL.append(" WHERE vptstatus = "); sbSQL.append(status); sbSQL.append(" ORDER BY vptid"); Timestamp result = null; Statement stmt = null; ResultSet rs = null; try { stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); // get first result if (rs.next()) { result = rs.getTimestamp(1); } return result; } finally { if (rs != null) rs.close(); if (stmt != null) stmt.close(); } } private void updateCurrentThemeStatus(Connection connection, String themeTableName, short newStatus) throws SQLException { StringBuilder sbSQL = new StringBuilder("UPDATE "); sbSQL.append(DataReposVersionManager.XPTVERSIONTABLE_NAME).append(' '); sbSQL.append(" SET vptstatus = "); sbSQL.append(newStatus); sbSQL.append(", vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); sbSQL.append(themeTableName).append("'"); Statement stmt = null; try { stmt = connection.createStatement(); stmt.executeUpdate(sbSQL.toString()); } finally { if (stmt != null) stmt.close(); } } private boolean checkCurrentThemeStatus(Connection connection, short status) { try { return (retrieveCurrentThemeName(connection, status) != null); } catch (SQLException e) { logger.warn(e.getMessage(), e); return false; } } private String retrieveCurrentThemeName(Connection connection, short status) throws SQLException { StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vptname, vpttimestamp, vptstatus FROM "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); sbSQL.append(" WHERE vptstatus = "); sbSQL.append(status); sbSQL.append("ORDER BY vptid"); String result = null; Statement stmt = null; ResultSet rs = null; try { stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); // get first result if (rs.next()) { result = rs.getString(1); } return result; } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } protected String[] retrieveTargetStoreAllViewNames(Connection connection) { try { final int TABLE_NAME_COL = 3; List list = new ArrayList(); DatabaseMetaData meta = connection.getMetaData(); // String[] tableType = { "TABLE", "VIEW" }; String[] tableType = { "VIEW" }; ResultSet tables = meta.getTables(null, _pgSchema, "%", tableType); while (tables.next()) { String tableName = tables.getString(TABLE_NAME_COL); list.add(tableName); /* if (allowTable(tableName)) { list.add(tableName); } */ } tables.close(); return (String[]) list.toArray(new String[list.size()]); } catch (SQLException e) { logger.warn(e.getMessage(), e); } return null; } public DataStore getTargetDataStore() { return targetDataStore; } protected void createTargetDataStore() throws JobExecutionException { if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } /* if (!isDriverFound()) { throw new JobExecutionException("Oracle JDBC Driver not found.-" + JDBC_DRIVER); } */ if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MAXCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MAXCONN.key, "5"); } if (!pgProperties.containsKey(PostgisNGDataStoreFactory.MINCONN.key)) { pgProperties.put(PostgisNGDataStoreFactory.MINCONN.key, "1"); } if (!dataStoreFactory.canProcess(pgProperties)) { getLogger().warn("cannot process properties-"); throw new JobExecutionException("cannot process properties-"); } try { targetDataStore = dataStoreFactory.createDataStore(pgProperties); } catch (IOException e) { getLogger().warn(e.getMessage(), e); throw new JobExecutionException(e.getMessage(), e); } } protected void disconnect() { super.disconnect(); if (targetDataStore != null) { targetDataStore.dispose(); targetDataStore = null; } } private String determineTargetThemeTableName() throws IOException { if (targetDataStore == null) return null; Connection connection = null; Statement stmt = null; ResultSet rs = null; String targetTable = null; boolean needCreate = false; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); // Create XPTVERSIONTABLE_NAME needCreate = false; rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); if (!rs.next()) needCreate = true; if (needCreate) { createXPWThemeVersionTable(connection, _pgSchema); } rs.close(); rs = null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vptname, vptstatus FROM "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); sbSQL.append("ORDER BY vptid"); stmt = connection.createStatement(); rs = stmt.executeQuery(sbSQL.toString()); ArrayList tmpTablenames = new ArrayList(); int i = 0; int current = -1; while (rs.next()) { Object[] values = new Object[2]; values[0] = rs.getString("vptname"); values[1] = rs.getShort("vptstatus"); tmpTablenames.add(values); if (((Short) values[1]) >= DataReposVersionManager.VSSTATUS_LINKVIEW) { current = i; currentThemeTable = (String) values[0]; currentThemeStatus = (Short) values[1]; } i++; } if (current == -1) { Object[] values = tmpTablenames.get(0); targetTable = (String) values[0]; } else if (current < (tmpTablenames.size() - 1)) { Object[] values = tmpTablenames.get(current + 1); targetTable = (String) values[0]; } else { Object[] values = tmpTablenames.get(0); targetTable = (String) values[0]; } sbSQL = new StringBuilder("UPDATE "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); sbSQL.append(" SET vptstatus = "); sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); sbSQL.append(" WHERE vptname = '"); sbSQL.append(targetTable).append("'"); int count = stmt.executeUpdate(sbSQL.toString()); if (count != 1) { logger.info("update status for " + targetTable + " update result count=" + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } return targetTable; } public String encodeSchemaTableName(String schemaName, String tableName) { if (schemaName == null) return "\"" + tableName + "\""; return "\"" + schemaName + "\".\"" + tableName + "\""; } private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { if (context == null) { getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); return false; } Connection connection = context.getOracleConnection(); Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); while (connectionPG instanceof DelegatingConnection) { connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); } if (!(connectionPG instanceof PGConnection)) { return false; } final int MAX_BATCHSIZE = 250; ResultSet rs = null; Statement stmt = null; try { // connectionPG.setAutoCommit(false); DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; String targetTempName = "tmp_" + targetTableName; logger.info("target table:" + targetTableName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_COLORTAB); rs.setFetchSize(MAX_BATCHSIZE); createOrClearTempTargetTable(connectionPG, targetTempName, "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); StringBuilder sb = new StringBuilder(); CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); PushbackReader reader = new PushbackReader(new StringReader(""), 10240); int count = 0; while (rs.next()) { int cid = rs.getInt(1); long oid = rs.getLong(2); int colorId = rs.getInt(3); String colorText = colorTable.getColorCode(colorId); if (cid > Short.MAX_VALUE) { logger.info("Wrong Color Table:" + cid + "-" + oid); continue; } sb.append(cid).append(','); sb.append(oid).append(','); sb.append(colorText).append("\n"); if (count % MAX_BATCHSIZE == 0) { reader.unread(sb.toString().toCharArray()); cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); sb.delete(0, sb.length()); } ++count; } reader.unread(sb.toString().toCharArray()); cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, dyncolor"); logger.info("Execute Copy Count=" + count); } catch (SQLException e) { logger.info(e.getMessage(), e); throw new IOException(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); } return true; } private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { if (context == null) { getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); return false; } Connection connection = context.getOracleConnection(); Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); while (connectionPG instanceof DelegatingConnection) { connectionPG = ((DelegatingConnection) connectionPG).getDelegate(); } if (!(connectionPG instanceof PGConnection)) { return false; } final int MAX_BATCHSIZE = 250; ResultSet rs = null; Statement stmt = null; try { // connectionPG.setAutoCommit(false); String targetTableName = targetTableBaseName + FOWNER_SUFFIX; String targetTempName = "tmp_" + targetTableName; logger.info("target table:" + targetTableName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_CONNFDR); rs.setFetchSize(MAX_BATCHSIZE); createOrClearTempTargetTable(connectionPG, targetTempName, "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); StringBuilder sb = new StringBuilder(); CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); PushbackReader reader = new PushbackReader(new StringReader(""), 10240); int count = 0; while (rs.next()) { int cid = rs.getInt(1); long oid = rs.getLong(2); int ownerId = rs.getInt(3); short dirId = (short) rs.getInt(4); String flowMark; ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); if ((ConnectivityDirectionEnum.ForwardflowON == dir) || (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { flowMark = FORWARDFLOW_MARK; } else if ((ConnectivityDirectionEnum.BackflowON == dir) || (ConnectivityDirectionEnum.BackFixflowON == dir)) { flowMark = BACKFLOW_MARK; } else if (ConnectivityDirectionEnum.Nondeterminate == dir) { flowMark = NONFLOW_MARK; } else { flowMark = UNFLOW_MARK; } if (cid > Short.MAX_VALUE) { logger.info("Wrong Connectivity Table:" + cid + "-" + oid); continue; } sb.append(cid).append(','); sb.append(oid).append(','); sb.append(ownerId).append(','); sb.append(flowMark).append('\n'); if (count % MAX_BATCHSIZE == 0) { reader.unread(sb.toString().toCharArray()); cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); sb.delete(0, sb.length()); } ++count; } reader.unread(sb.toString().toCharArray()); cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName, "tid, oid, fowner, flow"); logger.info("Execute Copy Count=" + count); } catch (SQLException e) { logger.info(e.getMessage(), e); throw new IOException(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); } return true; } private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { if (context == null) { getLogger().info("jobContext is null in convertPowerOwnerTheme"); return; } Connection connection = context.getOracleConnection(); Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); boolean found = false; ResultSet rs = null; Statement stmt = null; PreparedStatement pstmt = null; try { connectionPG.setAutoCommit(false); String targetTableName = targetTableBaseName + FOWNER_SUFFIX; logger.info("target table:" + targetTableName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_CONNFDR); rs.setFetchSize(50); createOrClearTargetTable(connectionPG, targetTableName, "(tid smallint not null, oid int not null, fowner smallint not null, flow varchar(20) not null)"); pstmt = connectionPG.prepareStatement("INSERT INTO " + encodeSchemaTableName(_pgSchema, targetTableName) + " (tid, oid, fowner, flow) VALUES (?, ?, ?, ?)" ); final int MAX_BATCHSIZE = 50; int count = 0; while (rs.next()) { int cid = rs.getInt(1); long oid = rs.getLong(2); int ownerId = rs.getInt(3); short dirId = (short) rs.getInt(4); pstmt.setShort(1, (short) cid); pstmt.setInt(2, (int) oid); pstmt.setShort(3, (short) ownerId); ConnectivityDirectionEnum dir = ConnectivityDirectionEnum.convertShort(dirId); if ((ConnectivityDirectionEnum.ForwardflowON == dir) || (ConnectivityDirectionEnum.ForwardFixflowON == dir)) { pstmt.setString(4, "shape://ccarrow"); } else if ((ConnectivityDirectionEnum.BackflowON == dir) || (ConnectivityDirectionEnum.BackFixflowON == dir)) { pstmt.setString(4, "shape://rccarrow"); } else { pstmt.setString(4, "shape://backslash"); } pstmt.addBatch(); if (count % MAX_BATCHSIZE == 0) { pstmt.executeBatch(); } ++count; } pstmt.executeBatch(); createTargetTableIndex(connectionPG, targetTableName); logger.info("Execute Update Count=" + count); } catch (SQLException e) { logger.info(e.getMessage(), e); throw new IOException(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(pstmt); JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); } } private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { Statement stmt = connection.createStatement(); ResultSet rs = null; try { rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); if (rs.next()) { stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); } stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { Statement stmt = connection.createStatement(); ResultSet rs = null; try { rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); if (rs.next()) { stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " ADD PRIMARY KEY (tid, oid)"); } } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable, String fields) throws SQLException { Statement stmt = connection.createStatement(); ResultSet rs = null; try { boolean found = false; rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); if (rs.next()) { found = true; } JDBCUtils.close(rs); if (!found) { stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); if (rs.next()) { stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " ADD PRIMARY KEY (tid, oid)"); } } else { stmt.execute("TRUNCATE "+ tableName + " CASCADE"); stmt.execute("INSERT INTO " + tableName + "(" + fields + ") SELECT " + fields + " FROM " + tempTable); /* --insert into xpwtheme1_fdyncolor (tid, oid, dyncolor) select tid, oid, dyncolor from xpwtheme2_fdyncolor; --reindex table xpwtheme1_fdyncolor; --alter table xpwtheme1_fdyncolor drop constraint xpwtheme1_fdyncolor_pkey; --alter table xpwtheme1_fdyncolor ADD PRIMARY KEY (tid, oid); */ } stmt.execute("DROP TABLE " + tempTable); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { Statement stmt = connection.createStatement(); ResultSet rs = null; try { rs = connection.getMetaData().getTables(null, null, tableName, new String[]{"TABLE"}); if (rs.next()) { stmt.execute("DROP TABLE " + encodeSchemaTableName(null, tableName) + "CASCADE"); } stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(null, tableName) + " " + sql); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); } } private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { if (context == null) { getLogger().info("jobContext is null in convertDynamicColorTheme"); return; } Connection connection = context.getOracleConnection(); Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); boolean found = false; ResultSet rs = null; Statement stmt = null; PreparedStatement pstmt = null; try { DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; logger.info("target table:" + targetTableName); stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); rs = stmt.executeQuery(FETCH_COLORTAB); rs.setFetchSize(50); createOrClearTargetTable(connectionPG, targetTableName, "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); pstmt = connectionPG.prepareStatement("INSERT INTO " + encodeSchemaTableName(_pgSchema, targetTableName) + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); final int MAX_BATCHSIZE = 50; int count = 0; while (rs.next()) { int cid = rs.getInt(1); long oid = rs.getLong(2); int colorId = rs.getInt(3); String colorText = colorTable.getColorCode(colorId); pstmt.setShort(1, (short) cid); pstmt.setInt(2, (int) oid); pstmt.setString(3, colorText); pstmt.addBatch(); if (count % MAX_BATCHSIZE == 0) { pstmt.executeBatch(); } ++count; } pstmt.executeBatch(); createTargetTableIndex(connectionPG, targetTableName); logger.info("Execute Update Count=" + count); } catch (SQLException e) { logger.info(e.getMessage(), e); throw new IOException(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(pstmt); JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); } } private void updatePWThemeStatusToReady(String targetSchema) { if (targetDataStore == null) return; Connection connection = null; Statement stmt = null; ResultSet rs = null; boolean needCreate = false; try { StringBuilder sbSQL = new StringBuilder("UPDATE "); sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); sbSQL.append(" SET vptstatus = "); sbSQL.append(DataReposVersionManager.VSSTATUS_READY); sbSQL.append(" , vpttimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); sbSQL.append(targetSchema).append("'"); connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); stmt = connection.createStatement(); int count = stmt.executeUpdate(sbSQL.toString()); if (count != 1) { logger.info("update status for " + targetSchema + " update result count=" + count); } } catch (SQLException e) { logger.warn(e.getMessage(), e); } catch (IOException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(rs); JDBCUtils.close(stmt); JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } } private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { Statement stmt = null; StringBuilder sql = new StringBuilder("CREATE TABLE "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); sql.append(" ( vptid serial PRIMARY KEY, "); sql.append(" vptname character varying(64) NOT NULL, "); sql.append(" vptstatus smallint NOT NULL, "); sql.append(" vpttimestamp timestamp with time zone ) "); try { stmt = connection.createStatement(); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("ALTER TABLE "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); sql.append(" OWNER TO ").append(_pgUsername); stmt.executeUpdate(sql.toString()); sql = new StringBuilder("GRANT ALL ON TABLE "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); sql.append(" TO public"); stmt.executeUpdate(sql.toString()); for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { sql = new StringBuilder("INSERT INTO "); sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); sql.append(" (vptname, vptstatus) VALUES ('"); sql.append(schemaName).append("', "); sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); stmt.executeUpdate(sql.toString()); } } finally { if (stmt != null) stmt.close(); } } protected void transferThemesVersionStatus(Connection connection, short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException { try { String currentTargetTheme = retrieveCurrentThemeName(connection, vsstatusBefore); if (currentTargetTheme == null) { logger.info("Cannot found target schema in dataStore. status=" + vsstatusBefore); return; } String existTargetSchema = null; if (exclusive) existTargetSchema = retrieveCurrentThemeName(connection, vsstatusAfter); updateCurrentThemeStatus(connection, currentTargetTheme, vsstatusAfter); if ((exclusive) && (existTargetSchema != null)) { updateCurrentThemeStatus(connection, existTargetSchema, DataReposVersionManager.VSSTATUS_AVAILABLE); } } catch (SQLException e) { logger.warn(e.getMessage(), e); throw new JobExecutionException("Update " + DataReposVersionManager.XPTVERSIONTABLE_NAME + " has error-", e); } } protected void transferThemesVersionStatus(short vsstatusBefore, short vsstatusAfter, boolean exclusive) throws JobExecutionException { if (targetDataStore == null) return; Connection connection = null; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); transferThemesVersionStatus(connection, vsstatusBefore, vsstatusAfter, exclusive); } catch (IOException e) { logger.warn(e.getMessage(), e); } finally { JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); } } }