Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python3.10 support and make code python3.8+ #716

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate
3. TeraSort (terasort)

TeraSort is a standard benchmark created by Jim Gray. Its input data is generated by Hadoop TeraGen example program.

4. Repartition (micro/repartition)

This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. The workload randomly selects the post-shuffle partition for each record, performs shuffle write and read, evenly repartitioning the records. There are 2 parameters providing options to eliminate data source & sink I/Os: hibench.repartition.cacheinmemory(default: false) and hibench.repartition.disableOutput(default: false), controlling whether or not to 1) cache the input in memory at first 2) write the result to storage

5. Sleep (sleep)
Expand All @@ -61,10 +61,10 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate
2. K-means clustering (Kmeans)

This workload tests the K-means (a well-known clustering algorithm for knowledge discovery and data mining) clustering in spark.mllib. The input data set is generated by GenKMeansDataset based on Uniform Distribution and Guassian Distribution. There is also an optimized K-means implementation based on DAL (Intel Data Analytics Library), which is available in the dal module of sparkbench.

3. Gaussian Mixture Model (GMM)

Gaussian Mixture Model represents a composite distribution whereby points are drawn from one of k Gaussian sub-distributions, each with its own probability. It's implemented in spark.mllib. The input data set is generated by GenKMeansDataset based on Uniform Distribution and Guassian Distribution.
3. Gaussian Mixture Model (GMM)

Gaussian Mixture Model represents a composite distribution whereby points are drawn from one of k Gaussian sub-distributions, each with its own probability. It's implemented in spark.mllib. The input data set is generated by GenKMeansDataset based on Uniform Distribution and Guassian Distribution.

4. Logistic Regression (LR)

Expand All @@ -80,7 +80,7 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate

7. XGBoost (XGBoost)

XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. This workload is implemented with XGBoost4J-Spark API in spark.mllib and the input data set is generated by GradientBoostedTreeDataGenerator.
XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. This workload is implemented with XGBoost4J-Spark API in spark.mllib and the input data set is generated by GradientBoostedTreeDataGenerator.

8. Linear Regression (Linear)

Expand Down Expand Up @@ -125,9 +125,9 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate

**Graph Benchmark:**

1. NWeight (nweight)
1. NWeight (nweight)

NWeight is an iterative graph-parallel algorithm implemented by Spark GraphX and pregel. The algorithm computes associations between two vertices that are n-hop away.
NWeight is an iterative graph-parallel algorithm implemented by Spark GraphX and pregel. The algorithm computes associations between two vertices that are n-hop away.


**Streaming Benchmarks:**
Expand All @@ -139,16 +139,16 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate
2. Repartition (streaming/repartition)

This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitions. It tests the efficiency of data shuffle in the streaming frameworks.

3. Stateful Wordcount (wordcount)

This workload counts words cumulatively received from Kafka every few seconds. This tests the stateful operator performance and Checkpoint/Acker cost in the streaming frameworks.

4. Fixwindow (fixwindow)

The workloads performs a window based aggregation. It tests the performance of window operation in the streaming frameworks.


### Supported Hadoop/Spark/Flink/Storm/Gearpump releases: ###

- Hadoop: Apache Hadoop 3.0.x, 3.1.x, 3.2.x, 2.x, CDH5, HDP
Expand All @@ -159,5 +159,3 @@ There are totally 29 workloads in HiBench. The workloads are divided into 6 cate
- Kafka: 0.8.2.2

---


131 changes: 97 additions & 34 deletions bin/functions/execute_with_log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
Expand All @@ -13,51 +13,76 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fnmatch
import os
import re
import subprocess
import sys
from time import sleep
from time import time

import sys, os, subprocess
from terminalsize import get_terminal_size
from time import time, sleep
import re
import fnmatch


def load_colors():
color_script_fn = os.path.join(os.path.dirname(__file__), "color.enabled.sh")
color_script_fn = os.path.join(
os.path.dirname(__file__),
"color.enabled.sh",
)
with open(color_script_fn) as f:
return dict([(k,v.split("'")[1].replace('\e[', "\033[")) for k,v in [x.strip().split('=') for x in f.readlines() if x.strip() and not x.strip().startswith('#')]])
return {
k: v.split("'")[1].replace(r"\e[", "\033[")
for k, v in [
x.strip().split("=")
for x in f.readlines()
if x.strip() and not x.strip().startswith("#")
]
} # noqa: E501

Color=load_colors()

Color = load_colors()
if int(os.environ.get("HIBENCH_PRINTFULLLOG", 0)):
Color['ret'] = os.linesep
Color["ret"] = os.linesep
else:
Color['ret']='\r'
Color["ret"] = "\r"

tab_matcher = re.compile("\t")
tabstop = 8


def replace_tab_to_space(s):
def tab_replacer(match):
pos = match.start()
length = pos % tabstop
if not length: length += tabstop
if not length:
length += tabstop
return " " * length

return tab_matcher.sub(tab_replacer, s)


class _Matcher:
hadoop = re.compile(r"^.*map\s*=\s*(\d+)%,\s*reduce\s*=\s*(\d+)%.*$")
hadoop2 = re.compile(r"^.*map\s+\s*(\d+)%\s+reduce\s+\s*(\d+)%.*$")
spark = re.compile(r"^.*finished task \S+ in stage \S+ \(tid \S+\) in.*on.*\((\d+)/(\d+)\)\s*$")
spark = re.compile(
r"^.*finished task \S+ in stage \S+ \(tid \S+\) in.*on.*\((\d+)/(\d+)\)\s*$", # noqa: E501
)

def match(self, line):
for p in [self.hadoop, self.hadoop2]:
m = p.match(line)
if m:
return (float(m.groups()[0]) + float(m.groups()[1]))/2
return (float(m.groups()[0]) + float(m.groups()[1])) / 2

for p in [self.spark]:
m = p.match(line)
if m:
return float(m.groups()[0]) / float(m.groups()[1]) * 100



matcher = _Matcher()


def show_with_progress_bar(line, progress, line_width):
"""
Show text with progress bar.
Expand All @@ -70,33 +95,45 @@ def show_with_progress_bar(line, progress, line_width):
if len(line) < line_width:
line = line + " " * (line_width - len(line))
line = "{On_Yellow}{line_seg1}{On_Blue}{line_seg2}{Color_Off}{ret}".format(
line_seg1 = line[:pos], line_seg2 = line[pos:], **Color)
line_seg1=line[:pos],
line_seg2=line[pos:],
**Color,
)
sys.stdout.write(line)


def execute(workload_result_file, command_lines):
proc = subprocess.Popen(" ".join(command_lines), shell=True, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc = subprocess.Popen(
" ".join(command_lines),
shell=True,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
count = 100
last_time=0
log_file = open(workload_result_file, 'w')
last_time = 0
log_file = open(workload_result_file, "w")
# see http://stackoverflow.com/a/4417735/1442961
lines_iterator = iter(proc.stdout.readline, b"")
for line in lines_iterator:
count += 1
if count > 100 or time()-last_time>1: # refresh terminal size for 100 lines or each seconds
if (
count > 100 or time() - last_time > 1
): # refresh terminal size for 100 lines or each seconds
count, last_time = 0, time()
width, height = get_terminal_size()
width -= 1

try:
line = line.rstrip()
log_file.write(line+"\n")
log_file.write(str(line) + "\n")
log_file.flush()
except KeyboardInterrupt:
proc.terminate()
break
line = line.decode('utf-8')
line = line.decode("utf-8")
line = replace_tab_to_space(line)
#print "{Red}log=>{Color_Off}".format(**Color), line
# print("{Red}log=>{Color_Off}".format(**Color), line)
lline = line.lower()

def table_not_found_in_log(line):
Expand All @@ -116,29 +153,49 @@ def database_default_exist_in_log(line):
return False

def uri_with_key_not_found_in_log(line):
uri_with_key_not_found = "Could not find uri with key [dfs.encryption.key.provider.uri]"
uri_with_key_not_found = (
"Could not find uri with key [dfs.encryption.key.provider.uri]"
)
if uri_with_key_not_found in line:
return True
else:
return False

if ('error' in lline) and lline.lstrip() == lline:
#Bypass hive 'error's and KeyProviderCache error
bypass_error_condition = table_not_found_in_log or database_default_exist_in_log(lline) or uri_with_key_not_found_in_log(lline)
if ("error" in lline) and lline.lstrip() == lline:
# Bypass hive 'error's and KeyProviderCache error
bypass_error_condition = (
table_not_found_in_log
or database_default_exist_in_log(
lline,
)
or uri_with_key_not_found_in_log(lline)
)
if not bypass_error_condition:
COLOR = "Red"
sys.stdout.write((u"{%s}{line}{Color_Off}{ClearEnd}\n" % COLOR).format(line=line,**Color).encode('utf-8'))

sys.stdout.write(
("{%s}{line}{Color_Off}{ClearEnd}\n" % COLOR)
.format(
line=line,
**Color,
)
.encode("utf-8"),
)

else:
if len(line) >= width:
line = line[:width-4]+'...'
line = line[: width - 4] + "..."
progress = matcher.match(lline)
if progress is not None:
show_with_progress_bar(line, progress, width)
else:
sys.stdout.write(u"{line}{ClearEnd}{ret}".format(line=line, **Color).encode('utf-8'))
sys.stdout.write(
"{line}{ClearEnd}{ret}".format(
line=line,
**Color,
).encode("utf-8"),
)
sys.stdout.flush()
print
print()
log_file.close()
try:
proc.wait()
Expand All @@ -147,14 +204,20 @@ def uri_with_key_not_found_in_log(line):
return 1
return proc.returncode


def test_progress_bar():
for i in range(101):
show_with_progress_bar("test progress : %d" % i, i, 80)
sys.stdout.flush()

sleep(0.05)

if __name__=="__main__":
sys.exit(execute(workload_result_file=sys.argv[1],
command_lines=sys.argv[2:]))

if __name__ == "__main__":
sys.exit(
execute(
workload_result_file=sys.argv[1],
command_lines=sys.argv[2:],
),
)
# test_progress_bar()
Loading