Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #58 from tanghaodong25/master
Browse files Browse the repository at this point in the history
fix several issues when running TPC-DS workload
  • Loading branch information
Haodong Tang committed Oct 25, 2019
2 parents 617dbd8 + 511df29 commit 9e75dd2
Show file tree
Hide file tree
Showing 31 changed files with 870 additions and 1,539 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.nio.channels.FileLock;
import java.sql.Connection;
import org.sqlite.SQLiteConfig;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -34,136 +33,142 @@ public void createTable(String root_dir) {
+ " UNIQUE(shuffleId, device)\n"
+ ");\n";

synchronized (file) {
FileOutputStream fos = null;
Connection conn = null;
Statement stmt = null;
try {
fos = new FileOutputStream(file);
fos.getChannel().lock();
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
stmt.execute(sql);
FileOutputStream fos = null;
FileLock fl = null;
Connection conn = null;
Statement stmt = null;
try {
fos = new FileOutputStream(file);
fl = fos.getChannel().lock();
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
stmt.execute(sql);

sql = "CREATE TABLE IF NOT EXISTS devices (\n"
+ " device text UNIQUE,\n"
+ " mount_count int\n"
+ ");";
stmt.execute(sql);
sql = "CREATE TABLE IF NOT EXISTS devices (\n"
+ " device text UNIQUE,\n"
+ " mount_count int\n"
+ ");";
stmt.execute(sql);
} catch (SQLException e) {
System.out.println("createTable failed:" + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
System.out.println("createTable failed:" + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (fl != null) {
fl.release();
}
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Metastore DB connected: " + url);
}

public void insertRecord(String shuffleId, String device) {
String sql = "INSERT OR IGNORE INTO metastore(shuffleId,device) VALUES('" + shuffleId + "','" + device + "')";
synchronized (file) {
FileOutputStream fos = null;
Connection conn = null;
Statement stmt = null;
FileOutputStream fos = null;
FileLock fl = null;
Connection conn = null;
Statement stmt = null;
try {
fos = new FileOutputStream(file);
fl = fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fos = new FileOutputStream(file);
fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
stmt.executeUpdate(sql);
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (fl != null) {
fl.release();
}
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public String getShuffleDevice(String shuffleId){
String sql = "SELECT device FROM metastore where shuffleId = ?";
String res = "";
synchronized (file) {
FileOutputStream fos = null;
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
FileOutputStream fos = null;
FileLock fl = null;
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
fos = new FileOutputStream(file);
fl = fos.getChannel().lock();
conn = DriverManager.getConnection(url);
stmt = conn.prepareStatement(sql);
stmt.setString(1, shuffleId);
rs = stmt.executeQuery();
while (rs.next()) {
res = rs.getString("device");
}
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fos = new FileOutputStream(file);
fos.getChannel().lock();
conn = DriverManager.getConnection(url);
stmt = conn.prepareStatement(sql);
stmt.setString(1, shuffleId);
rs = stmt.executeQuery();
while (rs.next()) {
res = rs.getString("device");
}
if (rs != null) rs.close();
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
}
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (rs != null) rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (fl != null) {
fl.release();
}
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return res;
Expand All @@ -175,69 +180,71 @@ public String getUnusedDevice(ArrayList<String> full_device_list){
HashMap<String, Integer> device_count = new HashMap<String, Integer>();
String device = "";
int count;
synchronized (file) {
FileOutputStream fos = null;
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
fos = new FileOutputStream(file);
fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
device_list.add(rs.getString("device"));
device_count.put(rs.getString("device"), rs.getInt("mount_count"));
}
full_device_list.removeAll(device_list);
if (full_device_list.size() == 0) {
// reuse old device, picked the device has smallest mount_count
device = getDeviceWithMinCount(device_count);
if (device != null && device.length() == 0) {
throw new SQLException();
}
count = (Integer) device_count.get(device) + 1;
sql = "UPDATE devices SET mount_count = " + count + " WHERE device = '" + device + "'\n";
} else {
device = full_device_list.get(0);
count = 1;
sql = "INSERT OR IGNORE INTO devices(device, mount_count) VALUES('" + device + "', " + count + ")\n";
FileOutputStream fos = null;
FileLock fl = null;
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
fos = new FileOutputStream(file);
fl = fos.getChannel().lock();
SQLiteConfig config = new SQLiteConfig();
config.setBusyTimeout("30000");
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
device_list.add(rs.getString("device"));
device_count.put(rs.getString("device"), rs.getInt("mount_count"));
}
full_device_list.removeAll(device_list);
if (full_device_list.size() == 0) {
// reuse old device, picked the device has smallest mount_count
device = getDeviceWithMinCount(device_count);
if (device != null && device.length() == 0) {
throw new SQLException();
}
count = (Integer) device_count.get(device) + 1;
sql = "UPDATE devices SET mount_count = " + count + " WHERE device = '" + device + "'\n";
} else {
device = full_device_list.get(0);
count = 1;
sql = "INSERT OR IGNORE INTO devices(device, mount_count) VALUES('" + device + "', " + count + ")\n";
}

System.out.println(sql);
System.out.println(sql);

stmt.executeUpdate(sql);
stmt.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (rs != null) rs.close();
} catch (SQLException e) {
e.printStackTrace();
System.exit(-1);
} catch (IOException e) {
}
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (rs != null) rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
try {
if (fl != null) {
fl.release();
}
try {
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
if (fos != null) {
fos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Metastore DB: get unused device, should be " + device + ".");
Expand Down
Loading

0 comments on commit 9e75dd2

Please sign in to comment.