Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker-build: ## 🐳 Build the Docker image
-t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) .
@echo

LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266
LEAN_SPEC_COMMIT_HASH:=b39472e73f8a7d603cc13d14426eed14c6eff6f1

leanSpec:
git clone https://github.com/leanEthereum/leanSpec.git --single-branch
Expand Down
9 changes: 8 additions & 1 deletion bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// Whether this node acts as a committee aggregator
#[arg(long, default_value = "false")]
is_aggregator: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -104,7 +107,10 @@ async fn main() {
let store = Store::from_anchor_state(backend, genesis_state);

let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel();
let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys);
// Use first validator ID for subnet subscription
let first_validator_id = validator_keys.keys().min().copied();
let blockchain =
BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator);

let p2p_handle = tokio::spawn(start_p2p(
node_p2p_key,
Expand All @@ -113,6 +119,7 @@ async fn main() {
blockchain,
p2p_rx,
store.clone(),
first_validator_id,
));

ethlambda_rpc::start_rpc_server(metrics_socket, store)
Expand Down
66 changes: 55 additions & 11 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer;
use ethlambda_storage::Store;
use ethlambda_types::{
ShortRoot,
attestation::{Attestation, AttestationData, SignedAttestation},
attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
primitives::{H256, ssz::TreeHash},
signature::ValidatorSecretKey,
Expand All @@ -30,6 +30,8 @@ pub enum P2PMessage {
PublishAttestation(SignedAttestation),
/// Publish a block to the gossip network.
PublishBlock(SignedBlockWithAttestation),
/// Publish an aggregated attestation to the gossip network.
PublishAggregatedAttestation(SignedAggregatedAttestation),
/// Fetch a block by its root hash.
FetchBlock(H256),
}
Expand All @@ -38,14 +40,23 @@ pub struct BlockChain {
handle: GenServerHandle<BlockChainServer>,
}

/// Seconds in a slot. Each slot has 4 intervals of 1 second each.
/// Seconds in a slot.
pub const SECONDS_PER_SLOT: u64 = 4;
/// Milliseconds in a slot.
pub const MILLISECONDS_PER_SLOT: u64 = 4_000;
/// Milliseconds per interval (800ms ticks).
pub const MILLISECONDS_PER_INTERVAL: u64 = 800;
/// Number of intervals per slot (5 intervals of 800ms = 4 seconds).
pub const INTERVALS_PER_SLOT: u64 = 5;
/// Number of attestation committees per slot.
pub const ATTESTATION_COMMITTEE_COUNT: u64 = 1;

impl BlockChain {
pub fn spawn(
store: Store,
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
validator_keys: HashMap<u64, ValidatorSecretKey>,
is_aggregator: bool,
) -> BlockChain {
let genesis_time = store.config().genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
Expand All @@ -54,6 +65,7 @@ impl BlockChain {
p2p_tx,
key_manager,
pending_blocks: HashMap::new(),
is_aggregator,
pending_block_parents: HashMap::new(),
}
.start();
Expand Down Expand Up @@ -85,6 +97,20 @@ impl BlockChain {
.await
.inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation"));
}

/// Sends an aggregated attestation to the BlockChain for processing.
pub async fn notify_new_aggregated_attestation(
&mut self,
attestation: SignedAggregatedAttestation,
) {
let _ = self
.handle
.cast(CastMessage::NewAggregatedAttestation(attestation))
.await
.inspect_err(
|err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"),
);
}
}

/// GenServer that sequences all blockchain updates.
Expand All @@ -104,16 +130,19 @@ struct BlockChainServer {
// chain at lookup time, since a cached ancestor may itself have become pending with
// a deeper missing parent after the entry was created.
pending_block_parents: HashMap<H256, H256>,

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,
}

impl BlockChainServer {
fn on_tick(&mut self, timestamp: u64) {
let genesis_time = self.store.config().genesis_time;

// Calculate current slot and interval
// Calculate current slot and interval from seconds
let time_since_genesis = timestamp.saturating_sub(genesis_time);
let slot = time_since_genesis / SECONDS_PER_SLOT;
let interval = time_since_genesis % SECONDS_PER_SLOT;
let interval = (time_since_genesis % SECONDS_PER_SLOT) * 1000 / MILLISECONDS_PER_INTERVAL;

// Update current slot metric
metrics::update_current_slot(slot);
Expand All @@ -126,7 +155,12 @@ impl BlockChainServer {
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some());
store::on_tick(
&mut self.store,
timestamp,
proposer_validator_id.is_some(),
self.is_aggregator,
);

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
Expand All @@ -138,7 +172,7 @@ impl BlockChainServer {
self.produce_attestations(slot);
}

// Update safe target slot metric (updated by store.on_tick at interval 2)
// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
}

Expand Down Expand Up @@ -374,15 +408,21 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: SignedAttestation) {
let _ = store::on_gossip_attestation(&mut self.store, attestation)
let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}
}

#[derive(Clone, Debug)]
enum CastMessage {
NewBlock(SignedBlockWithAttestation),
NewAttestation(SignedAttestation),
NewAggregatedAttestation(SignedAggregatedAttestation),
Tick,
}

Expand Down Expand Up @@ -414,11 +454,12 @@ impl GenServer for BlockChainServer {
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_secs());
// Schedule the next tick at the start of the next second
let millis_to_next_sec =
((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64;
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL);
send_after(
Duration::from_millis(millis_to_next_sec),
Duration::from_millis(ms_to_next_interval),
handle.clone(),
message,
);
Expand All @@ -427,6 +468,9 @@ impl GenServer for BlockChainServer {
self.on_block(signed_block);
}
CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation),
CastMessage::NewAggregatedAttestation(attestation) => {
self.on_gossip_aggregated_attestation(attestation);
}
}
CastResponse::NoReply
}
Expand Down
Loading
Loading