Oracle to Vertica Custom ETL Using Java

Moderator: NorbertKrupa

Post Reply
jmaamo
Newbie
Newbie
Posts: 2
Joined: Wed May 29, 2013 9:47 pm

Oracle to Vertica Custom ETL Using Java

Post by jmaamo » Wed May 29, 2013 10:29 pm

Here's the updated code as of June 5, 2013.

Feel free to post your comments. I will attempt to rewrite this code in the future with Multi-threading and see what happens.

import java.util.Properties;
import java.util.Date;
import java.util.*;
import java.sql.Timestamp;
import java.sql.*;
import oracle.jdbc.driver.*;
import oracle.sql.*;

/*
DROP SEQUENCE rp_etl_tables_s;
CREATE SEQUENCE rp_etl_tables_s START WITH 1000;
DROP TABLE rp_etl_tables
CREATE TABLE rp_etl_tables
(table_id INTEGER NOT NULL PRIMARY KEY,
data_source_mapping_id INTEGER NOT NULL,
source_table_name VARCHAR2(100),
target_table_name VARCHAR2(100),
commit_size NUMBER,
fetch_size NUMBER,
load_type CHAR(1), -- [C=Complete; F=Fast]
load_start_date DATE,
load_end_date DATE,
load_status CHAR(1) --
);


--------------------------
To populate control table:
--------------------------
insert into rp_etl_tables
select
rp_etl_tables_s.nextval,
NULL,
etl.table_name,
etl.table_name,
2000,
1000,
'C',
NULL,
NULL,
NULL
from
(select table_name from user_tables
union
select mview_name table_name from user_mviews
union
select view_name table_name from user_views
order by 1) etl


=======
To run:
=======
SET CLASSPATH=C:\DevWork\VerticaPPMIntegration\lib\ojdbc6.jar;C:\DevWork\VerticaPPMIntegration\lib\vertica-jdk5-6.0.1-0.jar;.
java -Xms1280m -Xmx4096m -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC KcrtRequestsLoad

*/


public class KcrtRequestsLoad {

private static String sourceUrl = "jdbc:oracle:thin:@216.55.171.999:1521:xxxxm01";
private static String sourceCLassName = "oracle.jdbc.driver.OracleDriver";
private static String sourceUsername = "xxxxx";
private static String sourcePassword = "yyyyy";
private static String sourceSelectAllRowsSQL = "";

private static String targetUrl = "jdbc:vertica://100.200.100.35:5433/VMart";
private static String targetCLassName = "com.vertica.jdbc.Driver";
private static String targetUsername = "aaaaaa";
private static String targetPassword = "ccccc";

private static String etlTablesSQL = "SELECT * FROM rp_etl_tables WHERE load_end_date IS NULL";
private static String updateEtlTableStartSQL = "UPDATE rp_etl_tables SET load_start_date = TO_DATE(?,'YYYY-MM-DD HH24:MI:SS'), load_status = ? WHERE source_table_name = ?";
private static String updateEtlTableEndSQL = "UPDATE rp_etl_tables SET load_end_date = TO_DATE(?,'YYYY-MM-DD HH24:MI:SS'), load_status = ? WHERE source_table_name = ?";
private static String targetCreateTableSQL = "";
private static StringBuilder targetCreateTableSQLSB;

private static String insertIntoTableSQL = "" ;
private static StringBuilder insertIntoTableSQLSB;

private static String COLUMN_NAME = "";
private static String DATA_TYPE = "";
private static String DATA_LENGTH = "";
private static String DATA_PRECISION = "";
private static String NULLABLE = "";

private static List<String> columnNames = new ArrayList<String>();

public static void main(String[] args) throws Exception {
Connection sourceConn = null;
Connection targetConn = null;

try {
Class.forName(sourceCLassName);
Properties sourceDBProp = new Properties();
sourceDBProp.put("user", sourceUsername);
sourceDBProp.put("password", sourcePassword);
sourceConn = DriverManager.getConnection(sourceUrl, sourceDBProp);

//clearEtlTable(sourceConn);

Class.forName(targetCLassName);
Properties verticaProp = new Properties();
verticaProp.put("user", targetUsername);
verticaProp.put("password", targetPassword);
targetConn = DriverManager.getConnection(targetUrl, verticaProp);

targetConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
targetConn.setAutoCommit(false);

String sourceTableName = "";
String targetTableName = "";
int commitSize = 0;
int fetchSize = 0;

String sourceTableMetaDataSQL = "";

PreparedStatement etlTablesStatement = sourceConn.prepareStatement(etlTablesSQL);
ResultSet etlTablesResultSet = etlTablesStatement.executeQuery();

while (etlTablesResultSet.next()) {

java.util.Date loadStartDate = new java.util.Date();
String loadStartDateStr = ((new Timestamp(loadStartDate.getTime())).toString()).substring(0, 19);
String loadStatusStart = "I";

sourceTableName = etlTablesResultSet.getString("SOURCE_TABLE_NAME");
targetTableName = etlTablesResultSet.getString("TARGET_TABLE_NAME");
commitSize = etlTablesResultSet.getInt("COMMIT_SIZE");
fetchSize = etlTablesResultSet.getInt("FETCH_SIZE");
//System.out.println(sourceTableName + " " + commitSize + " " + fetchSize + " " + loadStartDateStr);

PreparedStatement updateEtlTableStartStatement = sourceConn.prepareStatement(updateEtlTableStartSQL);
updateEtlTableStartStatement.setString(1,loadStartDateStr);
updateEtlTableStartStatement.setString(2,loadStatusStart);
updateEtlTableStartStatement.setString(3,sourceTableName);
updateEtlTableStartStatement.executeUpdate();

targetCreateTableSQL = "CREATE TABLE " + targetTableName + " (";
insertIntoTableSQL = "INSERT INTO " + targetTableName + " (";

sourceTableMetaDataSQL = "SELECT COLUMN_NAME, DECODE(DATA_TYPE,"
+ "'VARCHAR2','VARCHAR', "
+ "'NUMBER','NUMERIC', "
+ "DATA_TYPE ) DATA_TYPE,"
+ "DATA_LENGTH, DATA_PRECISION, NULLABLE "
+ "FROM USER_TAB_COLUMNS WHERE TABLE_NAME = '" + sourceTableName + "'" + " ORDER BY COLUMN_ID";

PreparedStatement sourceTableMetaDataStatement = sourceConn.prepareStatement(sourceTableMetaDataSQL);;
ResultSet sourceTableMetaDataResultSet = sourceTableMetaDataStatement.executeQuery();;

columnNames.clear();
while (sourceTableMetaDataResultSet.next()) {
COLUMN_NAME = sourceTableMetaDataResultSet.getString("COLUMN_NAME");
columnNames.add(COLUMN_NAME);
DATA_TYPE = sourceTableMetaDataResultSet.getString("DATA_TYPE");
DATA_LENGTH = sourceTableMetaDataResultSet.getString("DATA_LENGTH");
DATA_PRECISION = sourceTableMetaDataResultSet.getString("DATA_LENGTH");
NULLABLE = sourceTableMetaDataResultSet.getString("NULLABLE");
targetCreateTableSQL += COLUMN_NAME + " " + DATA_TYPE;
insertIntoTableSQL += COLUMN_NAME + ",\n";
if (DATA_TYPE.equals("VARCHAR"))
targetCreateTableSQL += "(" + DATA_LENGTH + ")";
if (NULLABLE.equals("N"))
targetCreateTableSQL += " NOT NULL";
targetCreateTableSQL += ",\n";
}
targetCreateTableSQLSB = new StringBuilder(targetCreateTableSQL);
targetCreateTableSQLSB.setCharAt(targetCreateTableSQL.length() - 2, ')');
targetCreateTableSQL = targetCreateTableSQLSB.toString();
//System.out.println(targetCreateTableSQL);

insertIntoTableSQLSB = new StringBuilder(insertIntoTableSQL);
insertIntoTableSQLSB.setCharAt(insertIntoTableSQL.length() - 2, ')');
insertIntoTableSQL = insertIntoTableSQLSB.toString();
insertIntoTableSQL += " VALUES (";
for (int i=0;i<columnNames.size();i++) {
insertIntoTableSQL += "?,";
}
insertIntoTableSQLSB = new StringBuilder(insertIntoTableSQL);
insertIntoTableSQLSB.setCharAt((insertIntoTableSQL.length()-1), ')');
insertIntoTableSQL = insertIntoTableSQLSB.toString();
//System.out.println(insertIntoTableSQL);

// Create Table in Target DB
Statement targetCreateTableStatement = targetConn.createStatement();
targetCreateTableStatement.executeUpdate(targetCreateTableSQL);

sourceSelectAllRowsSQL = "SELECT * FROM " + sourceTableName;
PreparedStatement sourceSelectAllRowsStatement = sourceConn.prepareStatement(sourceSelectAllRowsSQL);

ResultSet sourceSelectAllRowsResultSet = sourceSelectAllRowsStatement.executeQuery();
sourceSelectAllRowsResultSet.setFetchSize(fetchSize);

ResultSetMetaData md = sourceSelectAllRowsResultSet.getMetaData();
PreparedStatement pstmt = targetConn.prepareStatement(insertIntoTableSQL);

int rowCount = 0;
pstmt.clearBatch();

while (sourceSelectAllRowsResultSet.next()) {
for( int i = 1; i <= md.getColumnCount(); i++ ) {
pstmt.setString(i, sourceSelectAllRowsResultSet.getString(i));
}
pstmt.addBatch();
rowCount++;
if (rowCount >= commitSize) {
int[] batchResults = null;
try {
batchResults = pstmt.executeBatch();
} catch (BatchUpdateException e) {
System.out.println("Error message: " + e.getMessage());
batchResults = e.getUpdateCounts();
}
targetConn.commit();
pstmt.clearBatch();
rowCount=0;
}
}
int[] batchResults = null;
try {
batchResults = pstmt.executeBatch();
} catch (BatchUpdateException e) {
System.out.println("Error message: " + e.getMessage());
batchResults = e.getUpdateCounts();
}

java.util.Date loadEndDate = new java.util.Date();
String loadEndDateStr = ((new Timestamp(loadEndDate.getTime())).toString()).substring(0, 19);
String loadStatusEnd = "S";
updateEtlTableStartStatement = sourceConn.prepareStatement(updateEtlTableEndSQL);
updateEtlTableStartStatement.setString(1,loadEndDateStr);
updateEtlTableStartStatement.setString(2,loadStatusEnd);
updateEtlTableStartStatement.setString(3,sourceTableName);
updateEtlTableStartStatement.executeUpdate();

System.out.println(sourceTableName + " " + commitSize + " " + fetchSize + " " + loadStartDateStr + " " + loadEndDateStr);

targetConn.commit();
sourceConn.commit();

sourceTableMetaDataStatement.close();
sourceSelectAllRowsStatement.close();
sourceTableMetaDataResultSet.close();
sourceSelectAllRowsResultSet.close();

targetCreateTableStatement.close();
updateEtlTableStartStatement.close();

pstmt.close();
System.gc();
}

} catch (SQLException e) {
e.printStackTrace();
} finally {
if (sourceConn != null && !sourceConn.isClosed()) {
sourceConn.close();
targetConn.close();
}
}
}


private static void clearEtlTable(Connection sourceConn) throws SQLException {
PreparedStatement preparedStatement = null;
String updateEtlTableSQL = "UPDATE rp_etl_tables SET load_start_date = NULL, load_end_date = NULL, load_status = NULL";

try {
preparedStatement = sourceConn.prepareStatement(updateEtlTableSQL);
} catch (SQLException e) {
System.out.println(e.getMessage());
} finally {
if (preparedStatement != null) {
sourceConn.commit();
preparedStatement.close();
}
}

}


}
Last edited by jmaamo on Wed Jun 05, 2013 10:11 pm, edited 3 times in total.
June N. Maamo
Results Positive, Inc.
http://www.resultspositive.com/

User avatar
Julie
Master
Master
Posts: 221
Joined: Thu Apr 19, 2012 9:29 pm

Re: Oracle to Vertica Data Load Using Java

Post by Julie » Thu May 30, 2013 12:43 pm

jmaamo, thanks for the great post! I love your statement:
Caution: the code may contain bugs that I do not plan on fixing.
Sometimes I think Vertica should include the same comment in their release notes doc :lol:
Thanks,
Juliette

Post Reply

Return to “JDBC”