From 98fc790f9ec053d5367d7ce5851ef66d5a7ced5b Mon Sep 17 00:00:00 2001 From: Dennis Kao <ulysseskao@gmail.com> Date: Wed, 04 Dec 2013 09:33:15 +0800 Subject: [PATCH] add copyapi for theme --- xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java | 504 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 498 insertions(+), 6 deletions(-) diff --git a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java index c7e0cb9..2cc9b81 100644 --- a/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java +++ b/xdgnjobs/ximple-spatialjob/src/main/java/com/ximple/eofms/jobs/OracleConvertDgn2PostGISJob.java @@ -6,12 +6,15 @@ import java.io.FileNotFoundException; import java.io.FilenameFilter; import java.io.IOException; +import java.io.PushbackReader; +import java.io.StringReader; import java.math.BigDecimal; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -22,6 +25,7 @@ import java.util.Map; import java.util.TreeMap; +import com.ximple.eofms.util.DefaultColorTable; import org.apache.commons.collections.OrderedMap; import org.apache.commons.collections.OrderedMapIterator; import org.apache.commons.collections.map.LinkedMap; @@ -34,6 +38,8 @@ import org.geotools.feature.SchemaException; import org.geotools.jdbc.JDBCDataStore; import org.opengis.feature.IllegalAttributeException; +import org.postgresql.PGConnection; +import org.postgresql.copy.CopyManager; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; @@ -79,6 +85,15 @@ private static final int COMMITSIZE = 100; private static final String INDEXPATHNAME = "index"; private static final String OTHERPATHNAME = "other"; + + private static String FETCH_CONNFDR = "SELECT FSC, UFID, FDR1 FROM BASEDB.CONNECTIVITY ORDER BY FSC"; + private static String FETCH_COLORTAB = "SELECT TAG_SFSC, TAG_LUFID, COLOR FROM OCSDB.COLOR ORDER BY TAG_SFSC"; + + private static String CREATE_OWNERTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, owner smallint not null)"; + private static String CREATE_COLORTABLE = "CREATE TABLE s (tid smallint not null, oid int not null, dyncolor varchar(10) not null)"; + + public static final String FDYNCOLOR_SUFFIX = "_fdyncolor"; + public static final String FOWNER_SUFFIX = "_fowner"; protected static class Pair { Object first; @@ -209,11 +224,12 @@ } long t1 = System.currentTimeMillis(); - String targetSchemaName; + String targetSchemaName, targetThemeTable; try { logger.info("-- step:clearOutputDatabase --"); clearOutputDatabase(); targetSchemaName = determineTargetSchemaName(); + targetThemeTable = determineTargetThemeTableName(); if (checkConvertFile()) { logger.info("-- step:convertIndexDesignFile --"); @@ -234,11 +250,11 @@ } + OracleConvertPostGISJobContext jobContext = null; if (checkConvertDB()) { logger.info("-- step:convertOracleDB --"); - OracleConvertPostGISJobContext jobContext = - (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, + jobContext = (OracleConvertPostGISJobContext) prepareJobContext(targetSchemaName, _filterPath, isProfileMode(), isTransformed()); jobContext.setSourceDataStore(getSourceDataStore()); // jobContext.setConvertElementIn(_convertElementIn); @@ -317,12 +333,32 @@ createDummyFeatureFile(context); } + updateRepoStatusToReady(targetSchemaName); + + if (checkConvertPWThemes()) { + long tStep = System.currentTimeMillis(); + if (!convertPowerOwnerThemeWithCopyAPI(jobContext, targetThemeTable)) { + convertPowerOwnerTheme(jobContext, targetThemeTable); + } + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + tStep = System.currentTimeMillis(); + if (!convertDynamicColorThemeWithCopyAPI(jobContext, targetThemeTable)) + convertDynamicColorTheme(jobContext, targetThemeTable); + if (isProfileMode()) { + long tStepEnd = System.currentTimeMillis(); + logTimeDiff("Profile-convertFeatureDesignFile", tStep, tStepEnd); + } + } + + updatePWThemeStatusToReady(targetThemeTable); + long t2 = System.currentTimeMillis(); // public static final String DATE_FORMAT_NOW = "yyyy-MM-dd HH:mm:ss"; // SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_NOW); logTimeDiff("Total ", t1, t2); - - updateRepoStatusToReady(targetSchemaName); } catch (SQLException e) { disconnect(); @@ -1174,12 +1210,12 @@ boolean needCreate = false; try { connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XGVERSIONTABLE_NAME rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XGVERSIONTABLE_NAME, new String[]{"TABLE"}); if (!rs.next()) needCreate = true; if (needCreate) createXGeosVersionTable(connection, _pgSchema); rs.close(); - rs = null; StringBuilder sbSQL = new StringBuilder("SELECT "); sbSQL.append("vsschema, vsstatus FROM "); @@ -1233,6 +1269,77 @@ return targetSchema; } + private String determineTargetThemeTableName() throws IOException { + if (targetDataStore == null) return null; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + String targetTable = null; + boolean needCreate = false; + try { + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + // Create XPTVERSIONTABLE_NAME + needCreate = false; + rs = connection.getMetaData().getTables(null, _pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME, new String[]{"TABLE"}); + if (!rs.next()) needCreate = true; + if (needCreate) + createXPWThemeVersionTable(connection, _pgSchema); + rs.close(); + + rs = null; + + StringBuilder sbSQL = new StringBuilder("SELECT "); + sbSQL.append("vptname, vptstatus FROM "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append("ORDER BY vptid"); + stmt = connection.createStatement(); + rs = stmt.executeQuery(sbSQL.toString()); + ArrayList<Object[]> tmpTablenames = new ArrayList<Object[]>(); + int i = 0; + int current = -1; + while (rs.next()) { + Object[] values = new Object[2]; + values[0] = rs.getString("vptname"); + values[1] = rs.getShort("vptstatus"); + tmpTablenames.add(values); + if ((((Short) values[1]) & DataReposVersionManager.VSSTATUS_USING) != 0) { + current = i; + } + i++; + } + + if (current == -1) { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } else if (current < (tmpTablenames.size() - 1)) { + Object[] values = tmpTablenames.get(current + 1); + targetTable = (String) values[0]; + } else { + Object[] values = tmpTablenames.get(0); + targetTable = (String) values[0]; + } + + sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_COVERT); + sbSQL.append(" WHERE vptname = '"); + sbSQL.append(targetTable).append("'"); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetTable + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + return targetTable; + } + public String encodeSchemaTableName(String schemaName, String tableName) { return "\"" + schemaName + "\".\"" + tableName + "\""; } @@ -1275,6 +1382,42 @@ } } + private void createXPWThemeVersionTable(Connection connection, String pgSchema) throws SQLException { + Statement stmt = null; + StringBuilder sql = new StringBuilder("CREATE TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" ( vptid serial PRIMARY KEY, "); + sql.append(" vptname character varying(64) NOT NULL, "); + sql.append(" vptstatus smallint NOT NULL, "); + sql.append(" vpttimestamp timestamp with time zone ) "); + try { + stmt = connection.createStatement(); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("ALTER TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" OWNER TO ").append(_pgUsername); + stmt.executeUpdate(sql.toString()); + + sql = new StringBuilder("GRANT ALL ON TABLE "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" TO public"); + stmt.executeUpdate(sql.toString()); + + for (String schemaName : DataReposVersionManager.DEFAULTXPTVERSIONTABLE_NAMES) { + sql = new StringBuilder("INSERT INTO "); + sql.append(encodeSchemaTableName(pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)); + sql.append(" (vptname, vptstatus) VALUES ('"); + sql.append(schemaName).append("', "); + sql.append(DataReposVersionManager.VSSTATUS_AVAILABLE).append(" )"); + stmt.executeUpdate(sql.toString()); + } + + } finally { + if (stmt != null) stmt.close(); + } + } + private void updateRepoStatusToReady(String targetSchema) { if (targetDataStore == null) return; Connection connection = null; @@ -1287,6 +1430,38 @@ sbSQL.append(" SET vsstatus = "); sbSQL.append(DataReposVersionManager.VSSTATUS_READY); sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vsschema = '"); + sbSQL.append(targetSchema).append("'"); + + connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + stmt = connection.createStatement(); + int count = stmt.executeUpdate(sbSQL.toString()); + if (count != 1) { + logger.info("update status for " + targetSchema + " update result count=" + + count); + } + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connection, Transaction.AUTO_COMMIT, null); + } + } + + private void updatePWThemeStatusToReady(String targetSchema) { + if (targetDataStore == null) return; + Connection connection = null; + Statement stmt = null; + ResultSet rs = null; + boolean needCreate = false; + try { + StringBuilder sbSQL = new StringBuilder("UPDATE "); + sbSQL.append(encodeSchemaTableName(_pgSchema, DataReposVersionManager.XPTVERSIONTABLE_NAME)).append(' '); + sbSQL.append(" SET vptstatus = "); + sbSQL.append(DataReposVersionManager.VSSTATUS_READY); + sbSQL.append(" , vstimestamp = CURRENT_TIMESTAMP WHERE vptname = '"); sbSQL.append(targetSchema).append("'"); connection = targetDataStore.getConnection(Transaction.AUTO_COMMIT); @@ -1352,4 +1527,321 @@ public final void resetQueryTime() { queryTime = 0; } + + private void convertDynamicColorTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, dyncolor) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setString(3, colorText); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void convertPowerOwnerTheme(AbstractOracleJobContext context, String targetTableBaseName) throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerTheme"); + return; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + PreparedStatement pstmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTargetTable(connectionPG, targetTableName, + "(tid smallint not null, oid int not null, fowner smallint not null)"); + + pstmt = connectionPG.prepareStatement("INSERT INTO " + + encodeSchemaTableName(_pgSchema, targetTableName) + + " (tid, oid, fowner) VALUES (?, ?, ?)" ); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + pstmt.setShort(1, (short) cid); + pstmt.setInt(2, (int) oid); + pstmt.setShort(3, (short) ownerId); + pstmt.addBatch(); + + if (count % MAX_BATCHSIZE == 0) { + pstmt.executeBatch(); + } + ++count; + } + + pstmt.executeBatch(); + createTargetTableIndex(connectionPG, targetTableName); + + logger.info("Execute Update Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(pstmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + } + + private void createOrClearTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndex(Connection connection, String tableName) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private boolean convertDynamicColorThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertDynamicColorThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + if (!(connectionPG instanceof PGConnection)) { + return false; + } + + ResultSet rs = null; + Statement stmt = null; + try { + + DefaultColorTable colorTable = (DefaultColorTable) DefaultColorTable.getInstance(); + String targetTableName = targetTableBaseName + FDYNCOLOR_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_COLORTAB); + rs.setFetchSize(50); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, dyncolor varchar(10) not null)"); + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 4096); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int colorId = rs.getInt(3); + String colorText = colorTable.getColorCode(colorId); + + sb.append(cid).append(','); + sb.append(oid).append(",'"); + sb.append(colorText).append("'\n"); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private boolean convertPowerOwnerThemeWithCopyAPI(AbstractOracleJobContext context, String targetTableBaseName) + throws IOException { + if (context == null) { + getLogger().info("jobContext is null in convertPowerOwnerThemeWithCopyAPI"); + return false; + } + Connection connection = context.getOracleConnection(); + Connection connectionPG = targetDataStore.getConnection(Transaction.AUTO_COMMIT); + + boolean found = false; + ResultSet rs = null; + Statement stmt = null; + try { + connectionPG.setAutoCommit(false); + String targetTableName = targetTableBaseName + FOWNER_SUFFIX; + String targetTempName = "tmp_" + targetTableName; + logger.info("target table:" + targetTableName); + stmt = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + rs = stmt.executeQuery(FETCH_CONNFDR); + rs.setFetchSize(50); + + createOrClearTempTargetTable(connectionPG, targetTempName, + "(tid smallint not null, oid int not null, fowner smallint not null)"); + + StringBuilder sb = new StringBuilder(); + + CopyManager cpMgr = ((PGConnection) connectionPG).getCopyAPI(); + PushbackReader reader = new PushbackReader(new StringReader(""), 4096); + + final int MAX_BATCHSIZE = 50; + int count = 0; + while (rs.next()) { + int cid = rs.getInt(1); + long oid = rs.getLong(2); + int ownerId = rs.getInt(3); + + sb.append(cid).append(','); + sb.append(oid).append(','); + sb.append(ownerId).append('\n'); + + if (count % MAX_BATCHSIZE == 0) { + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + sb.delete(0, sb.length()); + } + ++count; + } + + reader.unread(sb.toString().toCharArray()); + cpMgr.copyIn("COPY " + targetTempName + " FROM STDIN WITH CSV", reader); + createTargetTableIndexAndDropTemp(connectionPG, targetTableName, targetTempName); + + logger.info("Execute Copy Count=" + count); + } catch (SQLException e) { + logger.info(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + JDBCUtils.close(connectionPG, Transaction.AUTO_COMMIT, null); + } + return true; + } + + private void createOrClearTempTargetTable(Connection connection, String tableName, String sql) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("DROP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + "CASCADE"); + } + + stmt.executeUpdate("CREATE TEMP TABLE " + encodeSchemaTableName(_pgSchema, tableName) + " " + sql); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } + + private void createTargetTableIndexAndDropTemp(Connection connection, String tableName, String tempTable) throws SQLException { + Statement stmt = connection.createStatement(); + ResultSet rs = null; + try { + stmt.execute("CREATE TABLE " + tableName +" AS SELECT * FROM " + tempTable); + rs = connection.getMetaData().getTables(null, _pgSchema, tableName, new String[]{"TABLE"}); + if (rs.next()) { + stmt.execute("ALTER TABLE " + encodeSchemaTableName(_pgSchema, tableName) + + " ADD PRIMARY KEY (tid, oid)"); + } + stmt.execute("DROP TABLE " + tempTable); + } finally { + JDBCUtils.close(rs); + JDBCUtils.close(stmt); + } + } } -- Gitblit v0.0.0-SNAPSHOT