Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out the zstandard change with history. #6166

Open
wants to merge 70 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
18d4ae2
initial zstandard compression for parameters and results; also remove…
guzzijones Jun 22, 2023
cf051bb
fix test_executions_fixtures.py tests
guzzijones Jun 23, 2023
c02b350
index liveaction
guzzijones Jun 23, 2023
d38a955
fix action_chain unit testing
guzzijones Jun 23, 2023
cced8e3
fix execution1.yaml fixture for rule enforcement testing
guzzijones Jun 23, 2023
2532784
black reformat, fix log format string
guzzijones Jun 23, 2023
71a1a53
fix test_garbage_collector.py integration test
guzzijones Jun 23, 2023
9e0c5a5
black formatting
guzzijones Jun 23, 2023
d06a14d
black fixes for contrib
guzzijones Jun 23, 2023
2245b62
fix lint errors
guzzijones Jun 23, 2023
bdaaf4a
fix migration test for 3.5
guzzijones Jul 3, 2023
161a236
black format v35 unit test
guzzijones Jul 3, 2023
664a0fb
set actual byte size in result_size field instead of compressed size
guzzijones Jul 3, 2023
c7a6ce0
setting maxDiff to see errors
guzzijones Jul 3, 2023
02ccef1
remove traceback
guzzijones Jul 3, 2023
5e724d9
change log entry
guzzijones Jul 3, 2023
c963e39
use errors list instead of original execution.errors
guzzijones Jul 3, 2023
a164c59
test each error individually
guzzijones Jul 3, 2023
1128e96
benchmarks and integration test
guzzijones Jul 5, 2023
ea7a5a7
black fixes
guzzijones Jul 5, 2023
a554d38
fix lint error
guzzijones Jul 5, 2023
6b3b40e
fix output in integration test
guzzijones Jul 5, 2023
c69f5ad
add migration for liveaction
guzzijones Jul 5, 2023
0268596
remove sym link migration
guzzijones Jul 5, 2023
86243da
add migration script for execution db liveaction
guzzijones Jul 5, 2023
7c47bca
add config option to turn off zstandard compression
guzzijones Jul 6, 2023
2e7f5e5
black formatting
guzzijones Jul 6, 2023
92b81a9
fix version on migration script
guzzijones Jul 6, 2023
e4f29ee
update sample config
guzzijones Jul 6, 2023
514f779
initial header compression
guzzijones Jul 13, 2023
a18213c
add new conf setting for zstandard
guzzijones Jul 13, 2023
8bfd3aa
add ability to change compression via config setting
guzzijones Jul 13, 2023
10c6a38
black changes
guzzijones Jul 13, 2023
0083d83
change liveaction to liveaction_id; also move inquiry mask code into …
guzzijones Jul 14, 2023
5296042
remove liveaction from action execution api
guzzijones Jul 14, 2023
0192c1d
Update st2common/bin/migrations/v3.9/st2-migrate-liveaction-executiondb
guzzijones Jul 14, 2023
d855f39
Update st2common/bin/migrations/v3.9/st2-migrate-liveaction-executiondb
guzzijones Jul 14, 2023
f7cd0d2
black fixes
guzzijones Jul 14, 2023
115e83d
untested migration script
guzzijones Jul 14, 2023
48d9749
flake fixes
guzzijones Jul 14, 2023
090200e
fix test actionchain liveaction
guzzijones Jul 14, 2023
2c47689
import fixes
guzzijones Jul 14, 2023
0a342a1
fix inquiry ttl liveaction_id
guzzijones Jul 14, 2023
53367df
zipp <=3.16 for python 3.6 compatibility
guzzijones Jul 14, 2023
14cc5bd
zipp requirement for python 3.6
guzzijones Jul 14, 2023
7e2be10
pin zipp < 3.16
guzzijones Jul 14, 2023
af3548f
liveaction return inside actionexecution api
guzzijones Jul 17, 2023
a98d25b
fix unit tests to allow embedded liveaction
guzzijones Jul 17, 2023
ff89408
fix stream test
guzzijones Jul 17, 2023
1aaf114
add back array params test for alias execution
guzzijones Jul 17, 2023
130cc87
migration script change
guzzijones Jul 17, 2023
a12fcb5
compress and uncompress methods
guzzijones Jul 19, 2023
af0adf2
Update st2common/st2common/models/api/execution.py
guzzijones Jul 18, 2023
e28ddc1
fix import error compress and uncompress
guzzijones Jul 19, 2023
1b64e1c
add test for st2.inquiry.respond secret masking
guzzijones Jul 19, 2023
8d7491b
black formatting fix
guzzijones Jul 19, 2023
2401dfd
lint fixes
guzzijones Jul 19, 2023
4132b30
fix migration
guzzijones Jul 19, 2023
4df8a9a
add inheritance
guzzijones Jul 19, 2023
619496c
pymongo query for setting liveaction id
guzzijones Jul 19, 2023
82a1e30
working migration script
guzzijones Jul 19, 2023
766dab1
black fix
guzzijones Jul 19, 2023
2743518
add required fields for liveaction in case where liveaction cannot be…
guzzijones Jul 19, 2023
e6cac68
remove breakpoint
guzzijones Jul 19, 2023
1e10909
migrate paused inquiries
guzzijones Jul 31, 2023
78050d0
Update st2common/st2common/fields.py
guzzijones Jul 23, 2023
397e7dc
black fix migration script
guzzijones Aug 2, 2023
d4506dd
add pending actions to cover inquiries to migration
guzzijones Oct 10, 2023
450294a
black and lint fixes
guzzijones Oct 10, 2023
6b3bbd5
remove the liveaction payload -> id change but leave the zstandard co…
rebrowning Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ in development

Python 3.6 is no longer supported; Stackstorm requires at least Python 3.8.

* implemented zstandard compression for parameters and results. #5995
contributed by @guzzijones12

Fixed
~~~~~
* Restore Pack integration testing (it was inadvertently skipped) and stop testing against `bionic` and `el7`. #6135
Expand Down
3 changes: 3 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ connection_timeout = 3000
db_name = st2
# host of db server
host = 127.0.0.1
# compression for parameter and result storage in liveaction and execution models
# Valid values: zstandard, none
parameter_result_compression = zstandard
# password for db login
password = None
# port of db server
Expand Down
1 change: 1 addition & 0 deletions conf/st2.dev.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Config used by local development environment (tools/launch.dev.sh)
[database]
host = 127.0.0.1
parameter_result_compression = zstandard

[api]
# Host and port to bind the API server.
Expand Down
14 changes: 12 additions & 2 deletions contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,22 @@ def start_workflow(self, action_parameters):
wf_def, self.execution, st2_ctx, notify_cfg=notify_cfg
)
except wf_exc.WorkflowInspectionError as e:
_, ex, tb = sys.exc_info()
status = ac_const.LIVEACTION_STATUS_FAILED
result = {"errors": e.args[1], "output": None}
result = {
"errors": e.args[1],
"output": None,
"traceback": "".join(traceback.format_tb(tb, 20)),
}
return (status, result, self.context)
except Exception as e:
_, ex, tb = sys.exc_info()
status = ac_const.LIVEACTION_STATUS_FAILED
result = {"errors": [{"message": six.text_type(e)}], "output": None}
result = {
"errors": [{"message": six.text_type(e)}],
"output": None,
"traceback": "".join(traceback.format_tb(tb, 20)),
}
return (status, result, self.context)

return self._handle_workflow_return_value(wf_ex_db)
Expand Down
44 changes: 39 additions & 5 deletions contrib/runners/orquesta_runner/tests/unit/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,21 @@ def test_fail_start_task_input_value_type(self):
workflow_execution=str(wf_ex_db.id)
)[0]
self.assertEqual(tk_ex_db.status, wf_statuses.FAILED)
self.assertDictEqual(tk_ex_db.result, {"errors": expected_errors})
self.assertEqual(
tk_ex_db.result["errors"][0]["type"], expected_errors[0]["type"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["message"], expected_errors[0]["message"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["task_id"], expected_errors[0]["task_id"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["route"], expected_errors[0]["route"]
)

lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(lv_ac_db.result, expected_result)

ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
Expand Down Expand Up @@ -522,13 +532,37 @@ def test_fail_next_task_input_value_type(self):
# Assert workflow execution and task2 execution failed.
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(str(wf_ex_db.id))
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
self.assertListEqual(
self.sort_workflow_errors(wf_ex_db.errors), expected_errors
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["type"],
expected_errors[0]["type"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["message"],
expected_errors[0]["message"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["task_id"],
expected_errors[0]["task_id"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["route"],
expected_errors[0]["route"],
)

tk2_ex_db = wf_db_access.TaskExecution.query(task_id="task2")[0]
self.assertEqual(tk2_ex_db.status, wf_statuses.FAILED)
self.assertDictEqual(tk2_ex_db.result, {"errors": expected_errors})
self.assertEqual(
tk2_ex_db.result["errors"][0]["type"], expected_errors[0]["type"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["message"], expected_errors[0]["message"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["task_id"], expected_errors[0]["task_id"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["route"], expected_errors[0]["route"]
)

lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
Expand Down
6 changes: 5 additions & 1 deletion contrib/runners/orquesta_runner/tests/unit/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ def test_notify_task_list_nonexistent_task(self):
}

self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(lv_ac_db.result, expected_result)
self.assertEqual(
lv_ac_db.result["errors"][0]["message"],
expected_result["errors"][0]["message"],
)
self.assertIsNone(lv_ac_db.result["output"], expected_result["output"])

def test_notify_task_list_item_value(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,13 @@ def _do_run(self, runner):
):
queries.setup_query(runner.liveaction.id, runner.runner_type, context)
except:
LOG.exception("Failed to run action.")
_, ex, tb = sys.exc_info()
# mark execution as failed.
status = action_constants.LIVEACTION_STATUS_FAILED
LOG.exception(
"Failed to run action. traceback: %s"
% "".join(traceback.format_tb(tb, 20))
)
# include the error message and traceback to try and provide some hints.
result = {
"error": str(ex),
Expand Down
57 changes: 34 additions & 23 deletions st2actions/st2actions/policies/concurrency_by_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@

from __future__ import absolute_import

import six

from st2common.constants import action as action_constants
from st2common import log as logging
from st2common.fields import JSONDictEscapedFieldCompatibilityField
from st2common.persistence import action as action_access
from st2common.services import action as action_service
from st2common.policies.concurrency import BaseConcurrencyApplicator
Expand All @@ -41,31 +40,43 @@ def __init__(
)
self.attributes = attributes or []

def _get_filters(self, target):
filters = {
("parameters__%s" % k): v
for k, v in six.iteritems(target.parameters)
if k in self.attributes
}

filters["action"] = target.action
filters["status"] = None

return filters

def _apply_before(self, target):
# Get the count of scheduled and running instances of the action.
filters = self._get_filters(target)

# Get the count of scheduled instances of the action.
filters["status"] = action_constants.LIVEACTION_STATUS_SCHEDULED
scheduled = action_access.LiveAction.count(**filters)
scheduled_filters = {
"status": action_constants.LIVEACTION_STATUS_SCHEDULED,
"action": target.action,
}
scheduled = [i for i in action_access.LiveAction.query(**scheduled_filters)]

# Get the count of running instances of the action.
filters["status"] = action_constants.LIVEACTION_STATUS_RUNNING
running = action_access.LiveAction.count(**filters)
running_filters = {
"status": action_constants.LIVEACTION_STATUS_RUNNING,
"action": target.action,
}
running = [i for i in action_access.LiveAction.query(**running_filters)]
running.extend(scheduled)
count = 0
target_parameters = JSONDictEscapedFieldCompatibilityField().parse_field_value(
target.parameters
)
target_key_value_policy_attributes = {
k: v for k, v in target_parameters.items() if k in self.attributes
}

count = scheduled + running
for i in running:
running_event_parameters = (
JSONDictEscapedFieldCompatibilityField().parse_field_value(i.parameters)
)
# list of event parameter values that are also in policy
running_event_policy_item_key_value_attributes = {
k: v
for k, v in running_event_parameters.items()
if k in self.attributes
}
if (
running_event_policy_item_key_value_attributes
== target_key_value_policy_attributes
):
count += 1

# Mark the execution as scheduled if threshold is not reached or delayed otherwise.
if count < self.threshold:
Expand Down
37 changes: 13 additions & 24 deletions st2api/st2api/controllers/v1/actionexecutions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from st2common.exceptions import apivalidation as validation_exc
from st2common.exceptions import param as param_exc
from st2common.exceptions import trace as trace_exc
from st2common.fields import JSONDictEscapedFieldCompatibilityField
from st2common.models.api.action import LiveActionAPI
from st2common.models.api.action import LiveActionCreateAPI
from st2common.models.api.base import cast_argument_value
Expand Down Expand Up @@ -416,36 +417,18 @@ def get(

:rtype: ``str``
"""
# NOTE: Here we intentionally use as_pymongo() to avoid mongoengine layer even for old style
# data
# NOTE: we need to use to_python() to uncompress the data
try:
result = (
self.access.impl.model.objects.filter(id=id)
.only("result")
.as_pymongo()[0]
self.access.impl.model.objects.filter(id=id).only("result")[0].result
)
except IndexError:
raise NotFoundException("Execution with id %s not found" % (id))

if isinstance(result["result"], dict):
# For backward compatibility we also support old non JSON field storage format
if pretty_format:
response_body = orjson.dumps(
result["result"], option=orjson.OPT_INDENT_2
)
else:
response_body = orjson.dumps(result["result"])
if pretty_format:
response_body = orjson.dumps(result, option=orjson.OPT_INDENT_2)
else:
# For new JSON storage format we just use raw value since it's already JSON serialized
# string
response_body = result["result"]

if pretty_format:
# Pretty format is not a default behavior since it adds quite some overhead (e.g.
# 10-30ms for non pretty format for 4 MB json vs ~120 ms for pretty formatted)
response_body = orjson.dumps(
orjson.loads(result["result"]), option=orjson.OPT_INDENT_2
)
response_body = orjson.dumps(result)

response = Response()
response.headers["Content-Type"] = "text/json"
Expand Down Expand Up @@ -634,8 +617,14 @@ def post(self, spec_api, id, requester_user, no_merge=False, show_secrets=False)

# Merge in any parameters provided by the user
new_parameters = {}
original_parameters = getattr(existing_execution, "parameters", b"{}")
original_params_decoded = (
JSONDictEscapedFieldCompatibilityField().parse_field_value(
original_parameters
)
)
if not no_merge:
new_parameters.update(getattr(existing_execution, "parameters", {}))
new_parameters.update(original_params_decoded)
new_parameters.update(spec_api.parameters)

# Create object for the new execution
Expand Down
Loading
Loading