Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feat/make_sla_work
Browse files Browse the repository at this point in the history
  • Loading branch information
TomMcL committed Aug 23, 2023
2 parents 72ba446 + f52d89d commit 1b311c2
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 42 deletions.
1 change: 1 addition & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pipeline {
skipDefaultCheckout true
timestamps()
timeout(time: 50, unit: 'MINUTES')
disableConcurrentBuilds(abortPrevious: true)
}
parameters {
string( name: 'VEGA_VERSION', defaultValue: 'feature/integrate-sla-rebased-2',
Expand Down
1 change: 1 addition & 0 deletions examples/nullchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
sell_specs=[("PEGGED_REFERENCE_MID", i * 2, i) for i in range(1, 10)],
is_amendment=False,
)

vega.submit_order(
trading_key=MM_WALLET.name,
market_id=market_id,
Expand Down
21 changes: 14 additions & 7 deletions examples/visualisations/loss_socialisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ def run(self, pause: bool = False, test: bool = False):
asset=asset_id,
amount=self.TRADER_MINT,
)
self.vega.wait_for_total_catchup()
self.vega.wait_fn(60)
self.vega.wait_for_total_catchup()

logging.info(
f"Trader A Party: public_key = {self.vega.wallet.public_key(name=TRADER_A.key_name, wallet_name=TRADER_A.wallet_name)}"
"Trader A Party: public_key ="
f" {self.vega.wallet.public_key(name=TRADER_A.key_name, wallet_name=TRADER_A.wallet_name)}"
)
logging.info(
f"Trader B Party: public_key = {self.vega.wallet.public_key(name=TRADER_B.key_name, wallet_name=TRADER_B.wallet_name)}"
"Trader B Party: public_key ="
f" {self.vega.wallet.public_key(name=TRADER_B.key_name, wallet_name=TRADER_B.wallet_name)}"
)

if pause:
Expand Down Expand Up @@ -114,7 +116,8 @@ def run(self, pause: bool = False, test: bool = False):
# Pause to allow user to login to wallet on console
if pause:
input(
f"Paused at price 500. Trader B should have a short position. Press Enter to continue."
f"Paused at price 500. Trader B should have a short position. Press"
f" Enter to continue."
)

# Go through market movements
Expand All @@ -128,17 +131,20 @@ def run(self, pause: bool = False, test: bool = False):
spread=10,
volume=1,
)
self.vega.wait_for_total_catchup()
self.vega.wait_fn(60)
self.vega.wait_for_total_catchup()

if pause:
if price == 710:
input(
f"Paused at price {price}. Trader B should have been closed out. Trader A should have had loss-socialisation Press Enter to continue."
f"Paused at price {price}. Trader B should have been closed"
" out. Trader A should have had loss-socialisation Press Enter"
" to continue."
)
else:
input(
f"Paused at price {price}. Trader B margin should have increased. Press Enter to continue."
f"Paused at price {price}. Trader B margin should have"
" increased. Press Enter to continue."
)

if test:
Expand Down Expand Up @@ -181,6 +187,7 @@ def run(self, pause: bool = False, test: bool = False):
with VegaServiceNull(
run_with_console=args.console,
warn_on_raw_data_access=False,
retain_log_files=True,
) as vega:
vis = LossSocialisationVisualisation(vega=vega)
vis.run(pause=args.pause, test=True)
1 change: 0 additions & 1 deletion scripts/run-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ set -x
pytest -s -v -m integration \
--junitxml ${RESULT_DIR}/integration-test-results.xml \
--log-cli-level "${LOG_LEVEL}" \
-n "${PARALLEL_WORKERS}" \
-k "${TEST_FUNCTION}"
11 changes: 6 additions & 5 deletions vega_sim/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def num_from_padded_int(to_convert: Union[str, int], decimals: int) -> float:
def wait_for_datanode_sync(
trading_data_client: VegaTradingDataClientV2,
core_data_client: VegaCoreClient,
max_retries: int = 100,
max_retries: int = 650,
) -> None:
"""Waits for Datanode to catch up to vega core client.
Note: Will wait for datanode 'latest' time to catch up to core time
Expand All @@ -77,8 +77,8 @@ def wait_for_datanode_sync(
10, 0.5, lambda: trading_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
while core_time > trading_time:
logging.debug(f"Sleeping in wait_for_datanode_sync for {0.05 * 1.03**attempts}")
time.sleep(0.01 * 1.03**attempts)
logging.debug(f"Sleeping in wait_for_datanode_sync for {0.005 * 1.1**attempts}")
time.sleep(0.0005 * 1.1**attempts)
try:
trading_time = retry(
10,
Expand All @@ -98,18 +98,19 @@ def wait_for_datanode_sync(

def wait_for_core_catchup(
core_data_client: VegaCoreClient,
max_retries: int = 20,
max_retries: int = 200,
) -> None:
"""Waits for core node to fully execute everything in it's backlog.
Note that this operates by a rough cut of requesting time twice and checking for it
being unchanged, so only works on nullchain where we control time. May wait forever
in a standard tendermint chain
"""
attempts = 1

core_time = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
time.sleep(0.01)
time.sleep(0.0001)
core_time_two = retry(
10, 0.5, lambda: core_data_client.GetVegaTime(GetVegaTimeRequest()).timestamp
)
Expand Down
81 changes: 52 additions & 29 deletions vega_sim/null_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ def find_free_port(existing_set: Optional[Set[int]] = None):
while ret_sock in existing_set:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ret_sock = s.getsockname()[1]

num_tries += 1
Expand Down Expand Up @@ -386,6 +385,26 @@ def manage_vega_processes(
data_node_docker_volume = docker_client.volumes.create()
data_node_container = docker_client.containers.run(
"timescale/timescaledb:2.8.0-pg14",
command=[
"-c",
"max_connections=50",
"-c",
"logging_collector=true",
"-c",
"log_destination=stderr",
"-c",
"work_mem=5MB",
"-c",
"huge_pages=off",
"-c",
"shared_memory_type=sysv",
"-c",
"dynamic_shared_memory_type=sysv",
"-c",
"shared_buffers=2GB",
"-c",
"temp_buffers=5MB",
],
detach=True,
ports={5432: port_config[Ports.DATA_NODE_POSTGRES]},
volumes=[f"{data_node_docker_volume.name}:/var/lib/postgresql/data"],
Expand Down Expand Up @@ -593,14 +612,38 @@ def sighandler(signal, frame):
if signal is None:
logging.debug("VegaServiceNull exited normally")
else:
logging.debug(f"VegaServiceNull exited after trap the {signal} signal")
logging.debug(f"VegaServiceNull exited after trapping the {signal} signal")

logger.debug("Received signal from parent process")

logger.info("Starting termination for processes")
for name, process in processes.items():
logger.debug(f"Terminating process {name}(pid: {process.pid})")
process.terminate()

for name, process in processes.items():
attempts = 0
while process.poll() is None:
logger.debug(f"Process {name} still not terminated")
time.sleep(1)
attempts += 1
if attempts > 60:
logger.warning(
"Gracefully terminating process timed-out. Killing process"
f" {name}."
)
process.kill()
logger.debug(f"Process {name} stopped with {process.poll()}")
if process.poll() == 0:
logger.debug(f"Process {name} terminated.")
if process.poll() == -9:
logger.debug(f"Process {name} killed.")

if use_docker_postgres:

def kill_docker_container() -> None:
try:
data_node_container.kill()
data_node_container.stop()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.debug(
Expand All @@ -615,9 +658,12 @@ def kill_docker_container() -> None:
retry(10, 1.0, kill_docker_container)

removed = False
for _ in range(10):
logger.debug(f"Removing volume {data_node_docker_volume.name}")
for _ in range(20):
if data_node_container.status == "running":
time.sleep(3)
continue
try:
logging.info(f"Removing volume {data_node_docker_volume.name}")
data_node_docker_volume.remove(force=True)
removed = True
break
Expand All @@ -637,30 +683,6 @@ def kill_docker_container() -> None:
logging.exception(
"Docker volume failed to cleanup, will require manual cleaning"
)

logger.debug("Starting termination for processes")
for name, process in processes.items():
logger.debug(f"Terminating process {name}(pid: {process.pid})")
process.terminate()

for name, process in processes.items():
attempts = 0
while process.poll() is None:
logger.debug(f"Process {name} still not terminated")
time.sleep(1)
attempts += 1
if attempts > 60:
logger.warning(
"Gracefully terminating process timed-out. Killing process"
f" {name}."
)
process.kill()
logger.debug(f"Process {name} stopped with {process.poll()}")
if process.poll() == 0:
logger.debug(f"Process {name} terminated.")
if process.poll() == -9:
logger.debug(f"Process {name} killed.")

if not retain_log_files and os.path.exists(tmp_vega_dir):
shutil.rmtree(tmp_vega_dir)

Expand All @@ -675,6 +697,7 @@ def kill_docker_container() -> None:
[
signal.SIGKILL, # The process was explicitly killed by somebody wielding the kill program.
signal.SIGTERM, # The process was explicitly killed by somebody wielding the terminate program.
signal.SIGCHLD,
]
)
sighandler(None, None)
Expand Down

0 comments on commit 1b311c2

Please sign in to comment.