Skip to content

Commit

Permalink
Snapshotter identity check (#46)
Browse files Browse the repository at this point in the history
* feat: do not proceed with init script if snapshotter is not active

* feat: snapshotter active status check at runtime

* build image for testnet_pretask

* chore: ping reporting service every 30 secs

* feat+chore: won't process further epochs is snapshotters is disabled, reduce default rpc polling interval to 10 seconds

* chore: made PROTOCOL_STATE_CONTRACT and PROST_RPC URL mandatory, remove default values

---------

Co-authored-by: Akshay Dahiya <[email protected]>
Co-authored-by: Swaroop Hegde <[email protected]>
  • Loading branch information
3 people authored Sep 6, 2023
1 parent 20b4604 commit 0e65170
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 21 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
release:
types: ['published']
push:
branches: [ "dockerify" ]
branches: [ "dockerify", "testnet_pretask" ]
# Publish semver tags as releases.
tags: [ 'v*.*.*' ]
pull_request:
Expand All @@ -25,7 +25,8 @@ env:
jobs:
build:

runs-on: ubuntu-latest
runs-on:
group: larger-runners
permissions:
contents: read
packages: write
Expand Down
7 changes: 7 additions & 0 deletions init_processes.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/bin/bash

poetry run python -m snapshotter.snapshotter_id_ping
ret_status=$?

if [ $ret_status -ne 0 ]; then
echo "Snapshotter identity check failed on protocol smart contract"
exit 1
fi
echo 'starting processes...';
pm2 start pm2.config.js

Expand Down
19 changes: 16 additions & 3 deletions snapshotter/core_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key
from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key
from snapshotter.utils.redis.redis_keys import active_status_key
from snapshotter.utils.rpc import RpcHelper


Expand Down Expand Up @@ -97,11 +98,23 @@ async def startup_boilerplate():
await app.state.ipfs_singleton.init_sessions()
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client

# Health check endpoint that returns 200 OK


# Health check endpoint
@app.get('/health')
async def health_check():
async def health_check(
request: Request,
response: Response,
):
redis_conn: aioredis.Redis = request.app.state.redis_pool
_ = await redis_conn.get(active_status_key)
if _:
active_status = bool(int(_))
if not active_status:
response.status_code = 503
return {
'status': 'error',
'message': 'Snapshotter is not active',
}
return {'status': 'OK'}

# get current epoch
Expand Down
5 changes: 4 additions & 1 deletion snapshotter/process_hub_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import threading
import time
from urllib.parse import urljoin
import uuid
from multiprocessing import Process
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(self, name, **kwargs):
keepalive_expiry=300,
),
)
self._last_reporting_service_ping = 0
self._thread_shutdown_event = threading.Event()
self._shutdown_initiated = False

Expand Down Expand Up @@ -263,7 +265,7 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None):
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes',
mapping=proc_id_map,
)
if settings.reporting.service_url:
if settings.reporting.service_url and int(time.time()) - self._last_reporting_service_ping >= 30:
try:
self._httpx_client.post(
url=urljoin(settings.reporting.service_url, '/ping'),
Expand All @@ -276,6 +278,7 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None):
self._logger.error(
'Error while pinging reporting service: {}', e,
)
self._last_reporting_service_ping = int(time.time())
self._logger.error(
(
'Caught thread shutdown notification event. Deleting process'
Expand Down
21 changes: 19 additions & 2 deletions snapshotter/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from snapshotter.utils.models.data_models import PreloaderAsyncFutureDetails
from snapshotter.utils.models.data_models import SnapshotterStates
from snapshotter.utils.models.data_models import SnapshotterStateUpdate
from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent
from snapshotter.utils.models.message_models import EpochBase
from snapshotter.utils.models.message_models import PayloadCommitFinalizedMessage
from snapshotter.utils.models.message_models import PowerloomCalculateAggregateMessage
Expand All @@ -50,6 +51,7 @@
from snapshotter.utils.models.message_models import PowerloomSnapshotSubmittedMessage
from snapshotter.utils.models.settings_model import AggregateOn
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import active_status_key
from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key
from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping
from snapshotter.utils.redis.redis_keys import project_finalized_data_zset
Expand Down Expand Up @@ -662,7 +664,15 @@ async def _on_rabbitmq_message(self, message: IncomingMessage):
int(time.time()),
)
asyncio.ensure_future(self._cleanup_older_epoch_status(_.epochId))
await self._epoch_release_processor(message)

_ = await self._redis_conn.get(active_status_key)
if _:
active_status = bool(int(_))
if not active_status:
self._logger.error('System is not active, ignoring released Epoch')
else:
await self._epoch_release_processor(message)

elif message_type == 'SnapshotSubmitted':
await self._distribute_callbacks_aggregate(
message,
Expand All @@ -674,7 +684,14 @@ async def _on_rabbitmq_message(self, message: IncomingMessage):
)
elif message_type == 'ProjectsUpdated':
await self._update_all_projects(message)

elif message_type == 'SnapshottersUpdated':
msg_cast = SnapshottersUpdatedEvent.parse_raw(message.body)
if msg_cast.snapshotterAddress == settings.instance_id:
if self._redis_conn:
await self._redis_conn.set(
active_status_key,
int(msg_cast.allowed),
)
else:
self._logger.error(
(
Expand Down
43 changes: 43 additions & 0 deletions snapshotter/snapshotter_id_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import sys
from web3 import Web3
from snapshotter.auth.helpers.redis_conn import RedisPoolCache
from snapshotter.settings.config import settings
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.redis.redis_keys import active_status_key
from snapshotter.utils.rpc import RpcHelper


async def main():
aioredis_pool = RedisPoolCache(pool_size=1000)
await aioredis_pool.populate()
redis_conn = aioredis_pool._aioredis_pool
anchor_rpc = RpcHelper(settings.anchor_chain_rpc)
protocol_abi = read_json_file(settings.protocol_state.abi)
protocol_state_contract = anchor_rpc.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
settings.protocol_state.address,
),
abi=protocol_abi,
)
snapshotters_arr_query = await anchor_rpc.web3_call(
[
protocol_state_contract.functions.getAllSnapshotters(),
],
redis_conn
)
allowed_snapshotters = snapshotters_arr_query[0]
if settings.instance_id in allowed_snapshotters:
print('Snapshotting allowed...')
await redis_conn.set(
active_status_key,
int(True)
)
sys.exit(0)
else:
print('Snapshotting not allowed...')
sys.exit(1)


if __name__ == '__main__':
asyncio.run(main())
11 changes: 11 additions & 0 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from snapshotter.utils.models.data_models import EventBase
from snapshotter.utils.models.data_models import ProjectsUpdatedEvent
from snapshotter.utils.models.data_models import SnapshotFinalizedEvent
from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent
from snapshotter.utils.rabbitmq_helpers import RabbitmqThreadedSelectLoopInteractor
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import event_detector_last_processed_block
Expand Down Expand Up @@ -119,12 +120,15 @@ def __init__(self, name, **kwargs):
'EpochReleased': self.contract.events.EpochReleased._get_event_abi(),
'SnapshotFinalized': self.contract.events.SnapshotFinalized._get_event_abi(),
'ProjectsUpdated': self.contract.events.ProjectsUpdated._get_event_abi(),
'SnapshottersUpdated': self.contract.events.SnapshottersUpdated._get_event_abi(),
}

EVENT_SIGS = {
'EpochReleased': 'EpochReleased(uint256,uint256,uint256,uint256)',
'SnapshotFinalized': 'SnapshotFinalized(uint256,uint256,string,string,uint256)',
'ProjectsUpdated': 'ProjectsUpdated(string,bool,uint256)',
'SnapshottersUpdated': 'SnapshottersUpdated(address,bool)',

}

self.event_sig, self.event_abi = get_event_sig_and_abi(
Expand Down Expand Up @@ -187,6 +191,13 @@ async def get_events(self, from_block: int, to_block: int):
timestamp=int(time.time()),
)
events.append((log.event, event))
elif log.event == 'SnapshottersUpdated':
event = SnapshottersUpdatedEvent(
snapshotterAddress=log.args.snapshotterAddress,
allowed=log.args.allowed,
timestamp=int(time.time()),
)
events.append((log.event, event))

self._logger.info('Events: {}', events)
return events
Expand Down
5 changes: 5 additions & 0 deletions snapshotter/utils/models/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class ProjectsUpdatedEvent(EventBase):
enableEpochId: int


class SnapshottersUpdatedEvent(EventBase):
snapshotterAddress: str
allowed: bool


class SnapshotSubmittedEvent(EventBase):
snapshotCid: str
epochId: int
Expand Down
2 changes: 2 additions & 0 deletions snapshotter/utils/redis/redis_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

snapshot_submission_window_key = 'snapshotSubmissionWindow'

active_status_key = f'snapshotterActiveStatus:{settings.namespace}'

# project finalzed data zset


Expand Down
27 changes: 14 additions & 13 deletions snapshotter_autofill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@ if [ -z "$SIGNER_ACCOUNT_ADDRESS" ]; then
exit 1;
fi

if [ -z "$PROST_RPC_URL" ]; then
echo "PROST_RPC_URL not found, please set this in your .env!";
exit 1;
fi

if [ -z "$PROTOCOL_STATE_CONTRACT" ]; then
echo "PROTOCOL_STATE_CONTRACT not found, please set this in your .env!";
exit 1;
fi

echo "Found SOURCE RPC URL ${SOURCE_RPC_URL}";

echo "Found SIGNER ACCOUNT ADDRESS ${SIGNER_ACCOUNT_ADDRESS}";

if [ "$PROST_RPC_URL" ]; then
echo "Found PROST_RPC_URL ${PROST_RPC_URL}";
fi

if [ "$IPFS_URL" ]; then
echo "Found IPFS_URL ${IPFS_URL}";
fi

if [ "$PROTOCOL_STATE_CONTRACT" ]; then
echo "Found PROTOCOL_STATE_CONTRACT ${PROTOCOL_STATE_CONTRACT}";
fi

if [ "$SLACK_REPORTING_URL" ]; then
echo "Found SLACK_REPORTING_URL ${SLACK_REPORTING_URL}";
Expand All @@ -46,13 +49,11 @@ cp config/auth_settings.example.json config/auth_settings.json
cp config/settings.example.json config/settings.json

export namespace=UNISWAPV2
export prost_rpc_url="${PROST_RPC_URL:-https://rpc-prost1b.powerloom.io}"

export ipfs_url="${IPFS_URL:-/dns/ipfs/tcp/5001}"
export ipfs_api_key="${IPFS_API_KEY:-}"
export ipfs_api_secret="${IPFS_API_SECRET:-}"

export protocol_state_contract="${PROTOCOL_STATE_CONTRACT:-0x102Af943b34FAC403a6ACB8e463f44bE164aa942}"
export slack_reporting_url="${SLACK_REPORTING_URL:-}"
export powerloom_reporting_url="${POWERLOOM_REPORTING_URL:-}"

Expand All @@ -65,10 +66,10 @@ if [ -z "$IPFS_URL" ]; then
fi

echo "Using Namespace: ${namespace}"
echo "Using Prost RPC URL: ${prost_rpc_url}"
echo "Using Prost RPC URL: ${PROST_RPC_URL}"
echo "Using IPFS URL: ${ipfs_url}"
echo "Using IPFS API KEY: ${ipfs_api_key}"
echo "Using protocol state contract: ${protocol_state_contract}"
echo "Using protocol state contract: ${PROTOCOL_STATE_CONTRACT}"
echo "Using slack reporting url: ${slack_reporting_url}"
echo "Using powerloom reporting url: ${powerloom_reporting_url}"

Expand All @@ -78,7 +79,7 @@ sed -i'.backup' "s#account-address#$SIGNER_ACCOUNT_ADDRESS#" config/settings.jso

sed -i'.backup' "s#https://rpc-url#$SOURCE_RPC_URL#" config/settings.json

sed -i'.backup' "s#https://prost-rpc-url#$prost_rpc_url#" config/settings.json
sed -i'.backup' "s#https://prost-rpc-url#$PROST_RPC_URL#" config/settings.json

sed -i'.backup' "s#ipfs-writer-url#$ipfs_url#" config/settings.json
sed -i'.backup' "s#ipfs-writer-key#$ipfs_api_key#" config/settings.json
Expand All @@ -88,7 +89,7 @@ sed -i'.backup' "s#ipfs-reader-url#$ipfs_url#" config/settings.json
sed -i'.backup' "s#ipfs-reader-key#$ipfs_api_key#" config/settings.json
sed -i'.backup' "s#ipfs-reader-secret#$ipfs_api_secret#" config/settings.json

sed -i'.backup' "s#protocol-state-contract#$protocol_state_contract#" config/settings.json
sed -i'.backup' "s#protocol-state-contract#$PROTOCOL_STATE_CONTRACT#" config/settings.json

sed -i'.backup' "s#https://slack-reporting-url#$slack_reporting_url#" config/settings.json

Expand Down

0 comments on commit 0e65170

Please sign in to comment.