System Design
System Event Detector
The System Event Detector tracks events triggered on the protocol state contract running on the anchor chain and forwards them to a callback queue with the appropriate routing key, depending on the event signature and type, among other information.
Process Hub Core
The Process Hub Core, defined in process_hub_core.py
, serves as the primary process manager in the snapshotter.
- Operated by the CLI tool
processhub_cmd.py
, it is responsible for starting and managing theSystemEventDetector
andProcessorDistributor
processes. - Additionally, it spawns the base snapshot and aggregator workers required for processing tasks from the
powerloom-backend-callback
queue. The number of workers and their configuration path can be adjusted inconfig/settings.json
.
Processor Distributor
The Processor Distributor, defined in processor_distributor.py
, is initiated using the processhub_cmd.py
CLI.
- It loads the preloader, base snapshotting, and aggregator config information from the settings file.
- It reads the events forwarded by the event detector to the
f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}'
RabbitMQ queue bound to a topic exchange as configured insettings.rabbitmq.setup.event_detector.exchange
(code-ref: RabbitMQ exchanges and queue setup in pooler). - It creates and distributes processing messages based on the preloader configuration present in
config/preloader.json
, the project configuration present inconfig/projects.json
, andconfig/aggregator.json
.- For
EpochReleased
events, it forwards such messages to base snapshot builders for data source contracts as configured inconfig/projects.json
for the current epoch information contained in the event.
- For
loading...
Delegation Workers for Preloaders
Preloaders often fetch and cache large volumes of data, such as all the transaction receipts for a block on the data source blockchain. In such cases, a single worker is often insufficient to feasibly fetch the data for timely base snapshot generation and subsequent aggregate snapshot generations to reach a consensus.
To address this, workers are defined as delegate_tasks
in config/preloader.json
. The Process Hub Core then launches a specific number of workers, as defined in the primary settings file, config/settings.json
, under the key callback_worker_config.num_delegate_workers
.
loading...
loading...
Delegation workers operate over a simple request-response queue architecture over RabbitMQ.
loading...
One of the preloaders bundled with this snapshotter peer is tasked with fetching all the transaction receipts within a given epoch's block range and because of the volume of data to be fetched it delegates this work to a bunch of delegation worker
- The Preloader: snapshotter/utils/preloaders/tx_receipts/preloader.py.
- The Delegation Workers: snapshotter/utils/preloaders/tx_receipts/delegated_worker/tx_receipts.py
As a common functionality shared by all preloaders that utilize delegate workers, this logic is present in the generic class DelegatorPreloaderAsyncWorker
that all such preloaders inherit. Here you can observe the workload is sent to the delegation workers
loading...
Upon sending out the workloads tagged by unique request IDs, the delegator sets up a temporary exclusive queue to which only the delegation workers meant for the task type push their responses.
loading...
The corresponding response being pushed by the delegation workers can be found here in the generic class DelegateAsyncWorker
that all such workers should inherit from:
loading...
Callback Workers
The callback workers are the ones that build the base snapshot and aggregation snapshots and as explained above, are launched by the process hub core according to the configurations in aggregator/projects.json
and config/aggregator.json
.
They listen to new messages on the RabbitMQ topic exchange as described in the following configuration, and the topic queue's initialization is as follows.
loading...
loading...
Upon receiving a message from the processor distributor after preloading is complete, the workers do most of the heavy lifting along with some sanity checks and then call the compute()
callback function on the project's configured snapshot worker class to transform the dependent data points as cached by the preloaders to finally generate the base snapshots.
RPC Helper
Extracting data from the blockchain state and generating the snapshot can be a complex task. The RpcHelper
, defined in utils/rpc.py
, has a bunch of helper functions to make this process easier. It handles all the retry
and caching
logic so that developers can focus on efficiently building their use cases.
Core API
This component is one of the most important and allows you to access the finalized protocol state on the smart contract running on the anchor chain. Find it in core_api.py
.
In the end,
- It provides a reliable gateway to the datasets finalized on the protocol state for a data market
- It abstracts away the parts that deal with varying interaces and APIs of
- decentralized storage protcols like IPFS, Filecoin
- other value added providers like Web3 Storage on such layers
- It abstracts away the parts that deal with varying interaces and APIs of
- It maintains a cache as well for redundancy and availability of the data sets
Its API endpoints are used by a decoupled frontend adapter logic that ultimately support building of rich data products for smart contracts and other web3 based applications.
In our All about Data
section of docs, find out more about the way this API is used by a frontend adapter to serve the Uniswap V2, Uniswap V3, and Aave V3 dashboards.