-
Notifications
You must be signed in to change notification settings - Fork 7
/
exp_tpcc_recovery.py
98 lines (84 loc) · 4.3 KB
/
exp_tpcc_recovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from tpcc_parameters import *
import time
import subprocess
import os
import sys
CHECKPOINT_WITH_MAIN_JSON = """
{"operators": {"l": {"type": "Checkpoint", "withMain": true}}, "edges": [["l","l"]]}
"""
CHECKPOINT_JSON = """
{"operators": {"l": {"type": "Checkpoint", "withMain": false}}, "edges": [["l","l"]]}
"""
def clearFileSystemCache():
sys.stdout.write("Clearing file system cache...")
sys.stdout.flush()
subprocess.call('sudo sh -c "echo 3 > /proc/sys/vm/drop_caches"', shell=True)
print " done."
class RecoveryBenchmark(benchmark.TPCCBenchmark):
def __init__(self, benchmarkGroupId, benchmarkRunId, buildSettings, **kwargs):
benchmark.TPCCBenchmark.__init__(self, benchmarkGroupId, benchmarkRunId, buildSettings, **kwargs)
self.createCheckpoint = kwargs["createCheckpoint"] if kwargs.has_key("createCheckpoint") else False
self.outputFile = os.path.join(self._dirResults, "recoverytime.txt")
def benchAfterLoad(self):
# persist the main after load
self.fireQuery(CHECKPOINT_WITH_MAIN_JSON)
def benchBeforeStop(self):
if self.createCheckpoint:
print "Creating Checkpoint"
self.fireQuery(CHECKPOINT_JSON)
print "Done"
def benchAfter(self):
""" benchmark run is finished, hyrise server stopped, tables persisted
restart with --recoverAndExit flag and take the time """
if self._remote or self._manual:
print "cannot benchmark recovery on manual or remote build!"
return
env = {
"HYRISE_DB_PATH" : self._dirHyriseDB,
"LD_LIBRARY_PATH" : self._dirBinary+":/usr/local/lib64/"
}
server = os.path.join(self._dirBinary, "hyrise-server_%s" % self._buildSettings["BLD"])
logdef = os.path.join(self._dirBinary, "log.properties")
threadstring = ""
if (self._serverThreads > 0):
threadstring = "--threads=%s" % self._serverThreads
# run recovery `n` times and take the average
n = 5
avgRecoveryTime = 0.0
for i in range(n):
clearFileSystemCache()
proc = subprocess.Popen([server, "--port=%s" % self._port, "--logdef=%s" % logdef, threadstring, "--recoverAndExit"],
cwd=self._dirBinary,
env=env,
stdout=open("/dev/null") if not self._stdout else None,
stderr=open("/dev/null") if not self._stderr else None)
proc.wait() # wait for server to terminate
recoveryTime = int(open(os.path.join(self._dirBinary, "recoverytime.txt")).read())
print "Run #%i: Recovery time was %1.5fs" % (i, recoveryTime / 1000.0 / 1000.0)
avgRecoveryTime += recoveryTime / n
print "Average Recovery time was %1.5fs" % (avgRecoveryTime / 1000.0 / 1000.0)
open(self.outputFile, "w").write(str(avgRecoveryTime))
groupId = "tpcc_recovery"
sLogger = benchmark.Settings("Logger", BLD="release", PERSISTENCY="BUFFEREDLOGGER", WITH_GROUP_COMMIT=1)
sLoggerCP = benchmark.Settings("LoggerWithCheckpoint", BLD="release", PERSISTENCY="BUFFEREDLOGGER", WITH_GROUP_COMMIT=1)
sNVRAM = benchmark.Settings("NVRAM", BLD="release", PERSISTENCY="NVRAM", NVRAM_FOLDER=os.path.join("/mnt/pmfs", os.environ["USER"]), WITH_GROUP_COMMIT=1)
clear_dir(os.path.join(os.getcwd(), "builds", "Logger", "persistency"))
clear_dir(os.path.join(os.getcwd(), "builds", "LoggerWithCheckpoint", "persistency"))
reset_nvram_directory()
clearFileSystemCache()
for runtime in [0, 20, 40]:
runId = "deltaFilltime%s" % runtime
kwArgs = kwargs.copy()
kwArgs["runtime"] = runtime
kwArgs["numUsers"] = 20
kwArgs["warmuptime"] = 0
kwArgs["scheduler"] = "WSCoreBoundQueuesScheduler"
bLogger = RecoveryBenchmark(groupId, runId, sLogger, **kwArgs)
bLoggerCP = RecoveryBenchmark(groupId, runId, sLoggerCP, createCheckpoint=True, **kwArgs)
bNVRAM = RecoveryBenchmark(groupId, runId, sNVRAM, **kwArgs)
bLogger.run()
clear_dir(os.path.join(os.getcwd(), "builds", "Logger", "persistency"))
bLoggerCP.run()
clear_dir(os.path.join(os.getcwd(), "builds", "LoggerWithCheckpoint", "persistency"))
bNVRAM.run()
reset_nvram_directory()