#![allow(clippy::collapsible_match)]
use crate::{
async_protocols::{blockchain_interface::DKGProtocolEngine, KeygenPartyId},
debug_logger::DebugLogger,
};
use codec::{Codec, Encode};
use sc_network::NetworkService;
use sc_network_sync::SyncingService;
use sp_consensus::SyncOracle;
use crate::signing_manager::SigningManager;
use dkg_primitives::types::SSID;
use futures::StreamExt;
use parking_lot::{Mutex, RwLock};
use sc_client_api::{Backend, FinalityNotification};
use sc_keystore::LocalKeystore;
use sp_api::ApiError;
use sp_arithmetic::traits::SaturatedConversion;
use sp_core::ecdsa;
use sp_runtime::traits::{Block, Get, Header, NumberFor};
use std::{
collections::{BTreeSet, HashMap},
marker::PhantomData,
sync::Arc,
};
use tokio::sync::mpsc::UnboundedSender;
use dkg_primitives::{
types::{DKGError, DKGMessage, NetworkMsgPayload, SessionId, SignedDKGMessage},
AuthoritySetId, DKGReport, MisbehaviourType,
};
use dkg_runtime_primitives::{
crypto::{AuthorityId, Public},
gossip_messages::MisbehaviourMessage,
utils::to_slice_33,
AggregatedMisbehaviourReports, AggregatedPublicKeys, AuthoritySet, BatchId, DKGApi,
MaxAuthorities, MaxProposalLength, MaxProposalsInBatch, MaxReporters, MaxSignatureLength,
StoredUnsignedProposalBatch, GENESIS_AUTHORITY_SET_ID,
};
pub use crate::constants::worker::*;
use crate::{
async_protocols::{remote::AsyncProtocolRemote, types::LocalKeyType, AsyncProtocolParameters},
dkg_modules::DKGModules,
error,
gossip_engine::GossipEngineIface,
gossip_messages::{
misbehaviour_report::{gossip_misbehaviour_report, handle_misbehaviour_report},
public_key_gossip::handle_public_key_broadcast,
},
keygen_manager::KeygenManager,
keystore::DKGKeystore,
metric_inc, metric_set,
metrics::Metrics,
utils::{find_authorities_change, generate_authority_mapping},
Client,
};
pub type Shared<T> = Arc<RwLock<T>>;
pub struct WorkerParams<B, BE, C, GE>
where
B: Block,
GE: GossipEngineIface,
{
pub client: Arc<C>,
pub backend: Arc<BE>,
pub key_store: DKGKeystore,
pub gossip_engine: GE,
pub db_backend: Arc<dyn crate::db::DKGDbBackend>,
pub metrics: Option<Metrics>,
pub local_keystore: Option<Arc<LocalKeystore>>,
pub latest_header: Arc<RwLock<Option<B::Header>>>,
pub network: Option<Arc<NetworkService<B, B::Hash>>>,
pub sync_service: Option<Arc<SyncingService<B>>>,
pub test_bundle: Option<TestBundle>,
pub _marker: PhantomData<B>,
}
pub struct DKGWorker<B, BE, C, GE>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE>,
GE: GossipEngineIface,
{
pub client: Arc<C>,
pub backend: Arc<BE>,
pub key_store: DKGKeystore,
pub gossip_engine: Arc<GE>,
pub db: Arc<dyn crate::db::DKGDbBackend>,
pub metrics: Arc<Option<Metrics>>,
pub best_authorities: Shared<Vec<(u16, Public)>>,
pub next_best_authorities: Shared<Vec<(u16, Public)>>,
pub latest_header: Shared<Option<B::Header>>,
pub current_validator_set: Shared<AuthoritySet<Public, MaxAuthorities>>,
pub queued_validator_set: Shared<AuthoritySet<Public, MaxAuthorities>>,
pub aggregated_public_keys: Shared<AggregatedPublicKeysAndSigs>,
pub aggregated_misbehaviour_reports: Shared<AggregatedMisbehaviourReportStore>,
pub local_keystore: Shared<Option<Arc<LocalKeystore>>>,
pub network: Option<Arc<NetworkService<B, B::Hash>>>,
pub sync_service: Option<Arc<SyncingService<B>>>,
pub test_bundle: Option<TestBundle>,
pub logger: DebugLogger,
pub dkg_modules: DKGModules<B, BE, C, GE>,
pub signing_manager: SigningManager<B, BE, C, GE>,
pub keygen_manager: KeygenManager<B, BE, C, GE>,
pub(crate) error_handler_channel: ErrorHandlerChannel,
_backend: PhantomData<(BE, MaxProposalLength)>,
}
#[derive(Clone)]
pub(crate) struct ErrorHandlerChannel {
pub tx: UnboundedSender<DKGError>,
rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<DKGError>>>>,
}
#[derive(Clone)]
pub struct TestBundle {
pub to_test_client: UnboundedSender<TestClientPayload>,
pub current_test_id: Arc<RwLock<Option<uuid::Uuid>>>,
}
pub type TestClientPayload = (uuid::Uuid, Result<(), String>, Option<Vec<u8>>);
impl<B, BE, C, GE> Clone for DKGWorker<B, BE, C, GE>
where
B: Block,
BE: Backend<B> + 'static,
C: Client<B, BE> + 'static,
GE: GossipEngineIface,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
backend: self.backend.clone(),
key_store: self.key_store.clone(),
db: self.db.clone(),
gossip_engine: self.gossip_engine.clone(),
metrics: self.metrics.clone(),
best_authorities: self.best_authorities.clone(),
next_best_authorities: self.next_best_authorities.clone(),
latest_header: self.latest_header.clone(),
current_validator_set: self.current_validator_set.clone(),
queued_validator_set: self.queued_validator_set.clone(),
aggregated_public_keys: self.aggregated_public_keys.clone(),
aggregated_misbehaviour_reports: self.aggregated_misbehaviour_reports.clone(),
local_keystore: self.local_keystore.clone(),
test_bundle: self.test_bundle.clone(),
network: self.network.clone(),
sync_service: self.sync_service.clone(),
logger: self.logger.clone(),
dkg_modules: self.dkg_modules.clone(),
signing_manager: self.signing_manager.clone(),
keygen_manager: self.keygen_manager.clone(),
error_handler_channel: self.error_handler_channel.clone(),
_backend: PhantomData,
}
}
}
pub type AggregatedPublicKeysAndSigs = HashMap<SessionId, AggregatedPublicKeys>;
pub type AggregatedMisbehaviourReportStore = HashMap<
(MisbehaviourType, SessionId, AuthorityId),
AggregatedMisbehaviourReports<AuthorityId, MaxSignatureLength, MaxReporters>,
>;
impl<B, BE, C, GE> DKGWorker<B, BE, C, GE>
where
B: Block + Codec,
BE: Backend<B> + Unpin + 'static,
GE: GossipEngineIface + 'static,
C: Client<B, BE> + 'static,
C::Api: DKGApi<B, AuthorityId, NumberFor<B>, MaxProposalLength, MaxAuthorities>,
{
pub fn new(worker_params: WorkerParams<B, BE, C, GE>, logger: DebugLogger) -> Self {
let WorkerParams {
client,
backend,
key_store,
db_backend,
gossip_engine,
metrics,
local_keystore,
latest_header,
network,
sync_service,
test_bundle,
..
} = worker_params;
let clock = Clock { latest_header: latest_header.clone() };
let signing_manager = SigningManager::<B, BE, C, GE>::new(logger.clone(), clock.clone());
let keygen_manager = KeygenManager::new(logger.clone(), clock);
let dkg_modules = DKGModules::default();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let error_handler_channel = ErrorHandlerChannel { tx, rx: Arc::new(Mutex::new(Some(rx))) };
let this = DKGWorker {
client,
backend,
key_store,
db: db_backend,
keygen_manager,
gossip_engine: Arc::new(gossip_engine),
metrics: Arc::new(metrics),
best_authorities: Arc::new(RwLock::new(vec![])),
next_best_authorities: Arc::new(RwLock::new(vec![])),
current_validator_set: Arc::new(RwLock::new(AuthoritySet::empty())),
queued_validator_set: Arc::new(RwLock::new(AuthoritySet::empty())),
latest_header,
aggregated_public_keys: Arc::new(RwLock::new(HashMap::new())),
aggregated_misbehaviour_reports: Arc::new(RwLock::new(HashMap::new())),
local_keystore: Arc::new(RwLock::new(local_keystore)),
test_bundle,
error_handler_channel,
logger,
network,
sync_service,
signing_manager,
dkg_modules,
_backend: PhantomData,
};
this.dkg_modules.initialize(this.clone());
this
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ProtoStageType {
KeygenGenesis,
KeygenStandard,
Signing { unsigned_proposal_hash: [u8; 32] },
}
#[derive(Debug, Copy, Clone)]
pub struct AnticipatedKeygenExecutionStatus {
pub execute: bool,
pub force_execute: bool,
}
impl<B, BE, C, GE> DKGWorker<B, BE, C, GE>
where
B: Block,
BE: Backend<B> + Unpin + 'static,
GE: GossipEngineIface + 'static,
C: Client<B, BE> + 'static,
C::Api: DKGApi<B, AuthorityId, NumberFor<B>, MaxProposalLength, MaxAuthorities>,
{
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub(crate) fn generate_async_proto_params(
&self,
best_authorities: Vec<(KeygenPartyId, Public)>,
authority_public_key: Public,
party_i: KeygenPartyId,
session_id: SessionId,
stage: ProtoStageType,
associated_block: NumberFor<B>,
ssid: SSID,
) -> Result<
AsyncProtocolParameters<
DKGProtocolEngine<
B,
BE,
C,
GE,
MaxProposalLength,
MaxAuthorities,
BatchId,
MaxProposalsInBatch,
MaxSignatureLength,
>,
MaxAuthorities,
>,
DKGError,
> {
let best_authorities = Arc::new(best_authorities);
let authority_public_key = Arc::new(authority_public_key);
let now = self.get_latest_block_number();
let associated_block_id: u64 = associated_block.saturated_into();
let authority_mapping = generate_authority_mapping(&best_authorities, stage);
let status_handle = AsyncProtocolRemote::new(
now,
session_id,
self.logger.clone(),
associated_block_id,
ssid,
stage,
authority_mapping,
);
let active_local_key = match stage {
ProtoStageType::KeygenGenesis => None,
ProtoStageType::KeygenStandard => None,
ProtoStageType::Signing { .. } => {
let (active_local_key, _) = self.fetch_local_keys(session_id);
active_local_key
},
};
self.logger.debug(format!(
"Active local key enabled for stage {:?}? {}",
stage,
active_local_key.is_some()
));
let params = AsyncProtocolParameters {
engine: DKGProtocolEngine {
backend: self.backend.clone(),
latest_header: self.latest_header.clone(),
client: self.client.clone(),
keystore: self.key_store.clone(),
db: self.db.clone(),
gossip_engine: self.gossip_engine.clone(),
aggregated_public_keys: self.aggregated_public_keys.clone(),
current_validator_set: self.current_validator_set.clone(),
local_keystore: self.local_keystore.clone(),
vote_results: Arc::new(Default::default()),
is_genesis: stage == ProtoStageType::KeygenGenesis,
metrics: self.metrics.clone(),
test_bundle: self.test_bundle.clone(),
logger: self.logger.clone(),
_pd: Default::default(),
},
session_id,
db: self.db.clone(),
keystore: self.key_store.clone(),
current_validator_set: self.current_validator_set.clone(),
best_authorities,
party_i,
authority_public_key,
batch_id_gen: Arc::new(Default::default()),
handle: status_handle,
logger: self.logger.clone(),
local_key: active_local_key,
associated_block_id,
};
match &stage {
ProtoStageType::Signing { unsigned_proposal_hash } => {
self.logger.debug(format!("Signing protocol for proposal hash {unsigned_proposal_hash:?} will start later in the signing manager"));
Ok(params)
},
ProtoStageType::KeygenGenesis | ProtoStageType::KeygenStandard => {
self.logger.debug(format!(
"Protocol for stage {stage:?} will start later in the keygen manager"
));
Ok(params)
},
}
}
fn fetch_local_keys(
&self,
current_session_id: SessionId,
) -> (Option<LocalKeyType>, Option<LocalKeyType>) {
let next_session_id = current_session_id + 1;
let active_local_key = self.db.get_local_key(current_session_id).ok().flatten();
let next_local_key = self.db.get_local_key(next_session_id).ok().flatten();
(active_local_key, next_local_key)
}
pub async fn get_party_index(&self, header: &B::Header) -> Option<u16> {
let public = self.get_authority_public_key();
let best_authorities = self.get_best_authorities(header).await;
for elt in best_authorities {
if elt.1 == public {
return Some(elt.0)
}
}
None
}
pub async fn get_next_party_index(&self, header: &B::Header) -> Option<u16> {
let public = self.get_authority_public_key();
let next_best_authorities = self.get_next_best_authorities(header).await;
for elt in next_best_authorities {
if elt.1 == public {
return Some(elt.0)
}
}
None
}
pub async fn get_keygen_threshold(&self, header: &B::Header) -> u16 {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().keygen_threshold(at).unwrap_or_default()
})
.await
}
pub async fn get_next_keygen_threshold(&self, header: &B::Header) -> u16 {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().next_keygen_threshold(at).unwrap_or_default()
})
.await
}
pub async fn get_signature_threshold(&self, header: &B::Header) -> u16 {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().signature_threshold(at).unwrap_or_default()
})
.await
}
pub async fn get_next_signature_threshold(&self, header: &B::Header) -> u16 {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().next_signature_threshold(at).unwrap_or_default()
})
.await
}
pub async fn get_dkg_pub_key(&self, header: &B::Header) -> (AuthoritySetId, Vec<u8>) {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().dkg_pub_key(at).unwrap_or_default()
})
.await
}
pub async fn dkg_pub_key_is_unset(&self, header: &B::Header) -> bool {
self.get_dkg_pub_key(header).await.1.is_empty()
}
pub async fn get_next_dkg_pub_key(
&self,
header: &B::Header,
) -> Option<(AuthoritySetId, Vec<u8>)> {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().next_dkg_pub_key(at).unwrap_or_default()
})
.await
}
#[allow(dead_code)]
pub async fn get_keygen_jailed(
&self,
header: &B::Header,
set: &[AuthorityId],
) -> Vec<AuthorityId> {
let at = header.hash();
let set = set.to_vec();
self.exec_client_function(move |client| {
client.runtime_api().get_keygen_jailed(at, set).unwrap_or_default()
})
.await
}
pub async fn get_best_authorities(&self, header: &B::Header) -> Vec<(u16, AuthorityId)> {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().get_best_authorities(at).unwrap_or_default()
})
.await
}
pub async fn get_next_best_authorities(&self, header: &B::Header) -> Vec<(u16, AuthorityId)> {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().get_next_best_authorities(at).unwrap_or_default()
})
.await
}
pub async fn validator_set(
&self,
header: &B::Header,
) -> Option<(AuthoritySet<Public, MaxAuthorities>, AuthoritySet<Public, MaxAuthorities>)> {
Self::validator_set_inner(&self.logger, header, &self.client).await
}
async fn validator_set_inner(
logger: &DebugLogger,
header: &B::Header,
client: &Arc<C>,
) -> Option<(AuthoritySet<Public, MaxAuthorities>, AuthoritySet<Public, MaxAuthorities>)> {
let new = if let Some((new, queued)) = find_authorities_change::<B>(header) {
Some((new, queued))
} else {
let at = header.hash();
let current_authority_set = exec_client_function(client, move |client| {
client.runtime_api().authority_set(at).ok()
})
.await;
let queued_authority_set = exec_client_function(client, move |client| {
client.runtime_api().queued_authority_set(at).ok()
})
.await;
match (current_authority_set, queued_authority_set) {
(Some(current), Some(queued)) => Some((current, queued)),
_ => None,
}
};
logger.trace(format!("🕸️ active validator set: {new:?}"));
new
}
fn verify_validator_set(
&self,
block: &NumberFor<B>,
mut active: AuthoritySet<Public, MaxAuthorities>,
) -> Result<(), error::Error> {
let active: BTreeSet<Public> = active.authorities.drain(..).collect();
let store: BTreeSet<Public> = self.key_store.public_keys()?.drain(..).collect();
let missing: Vec<_> = store.difference(&active).cloned().collect();
if !missing.is_empty() {
self.logger.debug(format!(
"🕸️ for block {block:?}, public key missing in validator set is: {missing:?}"
));
}
Ok(())
}
async fn process_block_notification(&self, header: &B::Header) {
if let Some(latest_header) = self.latest_header.read().clone() {
if latest_header.number() >= header.number() {
self.logger.debug(
format!("🕸️ Latest header {} is greater than or equal to current header {}, returning...",
latest_header.number(),
header.number()
)
);
return
}
}
self.logger
.debug(format!("🕸️ Processing block notification for block {}", header.number()));
metric_set!(self, dkg_latest_block_height, header.number());
*self.latest_header.write() = Some(header.clone());
self.logger.debug(format!("🕸️ Latest header is now: {:?}", header.number()));
if let Some(sync_service) = &self.sync_service {
if sync_service.is_major_syncing() {
self.logger.debug("🕸️ Chain not fully synced, skipping block processing!");
return
}
}
if self.dkg_pub_key_is_unset(header).await {
self.logger
.debug("🕸️ Maybe enacting genesis authorities since dkg pub key is empty");
self.maybe_enact_genesis_authorities(header).await;
self.keygen_manager.on_block_finalized(header, self).await;
} else {
self.maybe_rotate_local_session(header).await;
self.keygen_manager.on_block_finalized(header, self).await;
if let Err(e) = self.signing_manager.on_block_finalized(header, self).await {
self.logger
.error(format!("🕸️ Error running signing_manager.on_block_finalized: {e:?}"));
}
}
}
async fn maybe_enact_genesis_authorities(&self, header: &B::Header) {
if let Some((active, _queued)) = self.validator_set(header).await {
if active.id == GENESIS_AUTHORITY_SET_ID {
self.logger.debug(format!("🕸️ GENESIS SESSION ID {:?}", active.id));
metric_set!(self, dkg_validator_set_id, active.id);
let _ = self.verify_validator_set(header.number(), active.clone());
*self.current_validator_set.write() = active.clone();
*self.best_authorities.write() = self.get_best_authorities(header).await;
*self.next_best_authorities.write() = self.get_next_best_authorities(header).await;
} else {
self.logger.debug(format!("🕸️ NOT IN GENESIS SESSION ID {:?}", active.id));
}
} else {
self.logger.debug("🕸️ No active validators");
}
}
async fn maybe_rotate_local_session(&self, header: &B::Header) {
if let Some((active, queued)) = self.validator_set(header).await {
self.logger.debug(format!("🕸️ ACTIVE SESSION ID {:?}", active.id));
metric_set!(self, dkg_validator_set_id, active.id);
let _ = self.verify_validator_set(header.number(), active.clone());
let (set_id, _) = self.get_dkg_pub_key(header).await;
let queued_authority_set_id = self.queued_validator_set.read().id;
self.logger.debug(format!("🕸️ CURRENT SET ID: {set_id:?}"));
self.logger
.debug(format!("🕸️ QUEUED AUTHORITY SET ID: {queued_authority_set_id:?}"));
if set_id != queued_authority_set_id {
self.logger.debug(format!("🕸️ Queued authority set id {queued_authority_set_id} is not the same as the on chain authority set id {set_id}, will not rotate the local sessions."));
return
}
*self.current_validator_set.write() = active;
*self.queued_validator_set.write() = queued;
*self.best_authorities.write() = self.next_best_authorities.read().clone();
*self.next_best_authorities.write() = self.get_next_best_authorities(header).await;
if let Some(metrics) = self.metrics.as_ref() {
metrics.reset_session_metrics();
}
self.logger.clear_local_logs();
} else {
self.logger.info(
"🕸️ No update to local session found, not rotating local sessions".to_string(),
);
}
}
async fn handle_finality_notification(&self, notification: FinalityNotification<B>) {
self.logger.trace(format!("🕸️ Finality notification: {notification:?}"));
self.process_block_notification(¬ification.header).await;
}
#[cfg_attr(
feature = "debug-tracing",
dkg_logging::instrument(target = "dkg", skip_all, ret, err, fields(signed_dkg_message))
)]
async fn verify_signature_against_authorities(
&self,
signed_dkg_msg: SignedDKGMessage<Public>,
) -> Result<DKGMessage<Public>, DKGError> {
Self::verify_signature_against_authorities_inner(
&self.logger,
signed_dkg_msg,
&self.latest_header,
&self.client,
)
.await
}
pub async fn verify_signature_against_authorities_inner(
logger: &DebugLogger,
signed_dkg_msg: SignedDKGMessage<Public>,
latest_header: &Arc<RwLock<Option<B::Header>>>,
client: &Arc<C>,
) -> Result<DKGMessage<Public>, DKGError> {
let dkg_msg = signed_dkg_msg.msg;
let encoded = dkg_msg.encode();
let signature = signed_dkg_msg.signature.ok_or(DKGError::GenericError {
reason: "Signature not found in signed_dkg_msg".into(),
})?;
let mut authorities: Option<(Vec<AuthorityId>, Vec<AuthorityId>)> = None;
let latest_header = { latest_header.read().clone() };
if let Some(header) = latest_header {
authorities = Self::validator_set_inner(logger, &header, client)
.await
.map(|a| (a.0.authorities.into(), a.1.authorities.into()));
}
if authorities.is_none() {
return Err(DKGError::GenericError { reason: "No authorities".into() })
}
let check_signers = |xs: &[AuthorityId]| {
return dkg_runtime_primitives::utils::verify_signer_from_set_ecdsa(
xs.iter()
.map(|x| {
let slice_33 =
to_slice_33(&x.encode()).expect("AuthorityId encoding failed!");
ecdsa::Public::from_raw(slice_33)
})
.collect(),
&encoded,
&signature,
)
.1
};
if check_signers(&authorities.clone().expect("Checked for empty authorities above").0) ||
check_signers(&authorities.expect("Checked for empty authorities above").1)
{
Ok(dkg_msg)
} else {
Err(DKGError::GenericError {
reason: "Message signature is not from a registered authority or next authority"
.into(),
})
}
}
#[cfg_attr(
feature = "debug-tracing",
dkg_logging::instrument(target = "dkg", skip_all, fields(dkg_error))
)]
pub async fn handle_dkg_error(&self, dkg_error: DKGError) {
self.logger.error(format!("Received error: {dkg_error:?}"));
metric_inc!(self, dkg_error_counter);
let (offenders, session_id) = match dkg_error {
DKGError::KeygenMisbehaviour { ref bad_actors, session_id, .. } => {
metric_inc!(self, dkg_keygen_misbehaviour_error);
(bad_actors.clone(), session_id)
},
DKGError::KeygenTimeout { ref bad_actors, session_id, .. } => {
metric_inc!(self, dkg_keygen_timeout_error);
(bad_actors.clone(), session_id)
},
DKGError::SignMisbehaviour { ref bad_actors, session_id, .. } => {
metric_inc!(self, dkg_sign_misbehaviour_error);
(bad_actors.clone(), session_id)
},
_ => Default::default(),
};
self.logger
.error(format!("Bad Actors : {offenders:?}, Session Id : {session_id:?}"));
for offender in offenders {
match dkg_error {
DKGError::KeygenMisbehaviour { bad_actors: _, .. } =>
self.handle_dkg_report(DKGReport::KeygenMisbehaviour { offender, session_id })
.await,
DKGError::KeygenTimeout { .. } =>
self.handle_dkg_report(DKGReport::KeygenMisbehaviour { offender, session_id })
.await,
DKGError::SignMisbehaviour { bad_actors: _, .. } =>
self.handle_dkg_report(DKGReport::SignMisbehaviour { offender, session_id })
.await,
_ => (),
}
}
}
#[cfg_attr(
feature = "debug-tracing",
dkg_logging::instrument(target = "dkg", skip_all, ret, err, fields(dkg_msg))
)]
async fn process_incoming_dkg_message(
&self,
dkg_msg: SignedDKGMessage<Public>,
) -> Result<(), DKGError> {
metric_inc!(self, dkg_inbound_messages);
self.logger
.info(format!("Processing incoming DKG message: {:?}", dkg_msg.msg.session_id,));
match &dkg_msg.msg.payload {
NetworkMsgPayload::Keygen(_) => {
self.keygen_manager.deliver_message(dkg_msg);
Ok(())
},
NetworkMsgPayload::Offline(..) | NetworkMsgPayload::Vote(..) => {
self.signing_manager.deliver_message(dkg_msg);
Ok(())
},
NetworkMsgPayload::PublicKeyBroadcast(_) => {
match self.verify_signature_against_authorities(dkg_msg).await {
Ok(dkg_msg) => {
match handle_public_key_broadcast(self, dkg_msg).await {
Ok(()) => (),
Err(err) => self
.logger
.error(format!("🕸️ Error while handling DKG message {err:?}")),
};
},
Err(err) => self.logger.error(format!(
"Error while verifying signature against authorities: {err:?}"
)),
}
Ok(())
},
NetworkMsgPayload::MisbehaviourBroadcast(_) => {
match self.verify_signature_against_authorities(dkg_msg).await {
Ok(dkg_msg) => {
match handle_misbehaviour_report(self, dkg_msg).await {
Ok(()) => (),
Err(err) => self.logger.error(format!(
"🕸️ Error while handling misbehaviour message {err:?}"
)),
};
},
Err(err) => self.logger.error(format!(
"Error while verifying signature against authorities: {err:?}"
)),
}
Ok(())
},
}
}
async fn handle_dkg_report(&self, dkg_report: DKGReport) {
let (offender, session_id, misbehaviour_type) = match dkg_report {
DKGReport::KeygenMisbehaviour { offender, session_id } => {
self.logger.info(format!(
"🕸️ DKG Keygen misbehaviour @ Session ({session_id}) by {offender}"
));
(offender, session_id, MisbehaviourType::Keygen)
},
DKGReport::SignMisbehaviour { offender, session_id } => {
self.logger.info(format!(
"🕸️ DKG Signing misbehaviour @ Session ({session_id}) by {offender}"
));
(offender, session_id, MisbehaviourType::Sign)
},
};
let misbehaviour_msg =
MisbehaviourMessage { misbehaviour_type, session_id, offender, signature: vec![] };
let gossip = gossip_misbehaviour_report(self, misbehaviour_msg).await;
if gossip.is_err() {
self.logger.info("🕸️ DKG gossip_misbehaviour_report failed!");
}
}
pub fn authenticate_msg_origin(
&self,
is_main_round: bool,
authorities: (Vec<Public>, Vec<Public>),
msg: &[u8],
signature: &[u8],
) -> Result<Public, DKGError> {
let get_keys = |accts: &[Public]| {
accts
.iter()
.map(|x| {
ecdsa::Public(to_slice_33(&x.encode()).unwrap_or_else(|| {
panic!("Failed to convert account id to ecdsa public key")
}))
})
.collect::<Vec<ecdsa::Public>>()
};
let maybe_signers =
if is_main_round { get_keys(&authorities.0) } else { get_keys(&authorities.1) };
let (maybe_signer, success) = dkg_runtime_primitives::utils::verify_signer_from_set_ecdsa(
maybe_signers,
msg,
signature,
);
if !success {
return Err(DKGError::GenericError {
reason: "Message signature is not from a registered authority".to_string(),
})
}
let signer = maybe_signer.ok_or(DKGError::GenericError {
reason: "verify_signer_from_set_ecdsa could not determin signer!".to_string(),
})?;
Ok(Public::from(signer))
}
pub async fn should_execute_new_keygen(
&self,
header: &B::Header,
) -> AnticipatedKeygenExecutionStatus {
let at = header.hash();
let (execute, force_execute) = self
.exec_client_function(move |client| {
client.runtime_api().should_execute_new_keygen(at).unwrap_or_default()
})
.await;
AnticipatedKeygenExecutionStatus { execute, force_execute }
}
pub async fn get_unsigned_proposal_batches(
&self,
header: &B::Header,
) -> Result<
Vec<
StoredUnsignedProposalBatch<
BatchId,
MaxProposalLength,
MaxProposalsInBatch,
<<B as Block>::Header as Header>::Number,
>,
>,
ApiError,
> {
let at = header.hash();
self.exec_client_function(move |client| {
client.runtime_api().get_unsigned_proposal_batches(at)
})
.await
}
pub async fn exec_client_function<F, T>(&self, function: F) -> T
where
for<'a> F: FnOnce(&'a C) -> T,
T: Send + 'static,
F: Send + 'static,
{
let client = &self.client;
exec_client_function(client, function).await
}
async fn initialization(&mut self) {
let mut stream = self.client.finality_notification_stream();
while let Some(notif) = stream.next().await {
if let Some((active, queued)) = self.validator_set(¬if.header).await {
*self.best_authorities.write() = self.get_best_authorities(¬if.header).await;
*self.current_validator_set.write() = active;
*self.queued_validator_set.write() = queued;
self.handle_finality_notification(notif.clone()).await;
self.logger.debug("Initialization complete");
return
}
}
}
pub async fn run(mut self) {
crate::deadlock_detection::deadlock_detect();
self.initialization().await;
self.logger.debug("Starting DKG Iteration loop");
let finality_notification_task = self.finality_notification_task();
let gossip_engine_stream_task = self.gossip_engine_message_stream_task();
let error_handling_task = self.spawn_error_handling_task();
let res = tokio::select! {
res0 = finality_notification_task => res0,
res1 = gossip_engine_stream_task => res1,
res2 = error_handling_task => res2,
};
self.logger
.error(format!("DKG Worker finished prematurely. The cause: {res:?}"));
}
async fn finality_notification_task(&self) -> Result<(), DKGError> {
let mut stream = self.client.finality_notification_stream();
while let Some(notification) = stream.next().await {
dkg_logging::debug!("Going to handle Finality notification");
self.handle_finality_notification(notification).await;
}
self.logger.error("Finality notification stream ended");
Err(DKGError::CriticalError { reason: "Finality notification stream ended".to_string() })
}
async fn gossip_engine_message_stream_task(&self) -> Result<(), DKGError> {
let mut stream =
self.gossip_engine.get_stream().expect("keygen gossip stream already taken");
while let Some(msg) = stream.recv().await {
let msg_hash = crate::debug_logger::raw_message_to_hash(msg.msg.payload.payload());
self.logger.debug(format!(
"Going to handle message for session {} | hash: {msg_hash}",
msg.msg.session_id
));
self.logger.checkpoint_message_raw(msg.msg.payload.payload(), "CP1-incoming");
match self.process_incoming_dkg_message(msg).await {
Ok(_) => {},
Err(e) => {
self.logger.error(format!("Error processing keygen message: {e:?}"));
},
}
}
Err(DKGError::CriticalError { reason: "Gossip engine stream ended".to_string() })
}
async fn spawn_error_handling_task(&self) -> Result<(), DKGError> {
let mut error_handler_rx = self
.error_handler_channel
.rx
.lock()
.take()
.expect("Error handler tx already taken");
while let Some(error) = error_handler_rx.recv().await {
self.logger.debug("Going to handle Error");
self.handle_dkg_error(error).await;
}
Err(DKGError::CriticalError { reason: "Error handler stream ended".to_string() })
}
}
#[auto_impl::auto_impl(&mut, &, Arc)]
pub trait KeystoreExt {
fn get_keystore(&self) -> &DKGKeystore;
fn get_authority_public_key(&self) -> Public {
self.get_keystore()
.authority_id(
&self.get_keystore().public_keys().expect("Could not find authority public key"),
)
.unwrap_or_else(|| panic!("Could not find authority public key"))
}
fn get_sr25519_public_key(&self) -> sp_core::sr25519::Public {
self.get_keystore()
.sr25519_public_key(&self.get_keystore().sr25519_public_keys().unwrap_or_default())
.unwrap_or_else(|| panic!("Could not find sr25519 key in keystore"))
}
}
impl<B, BE, C, GE> KeystoreExt for DKGWorker<B, BE, C, GE>
where
B: Block,
BE: Backend<B>,
GE: GossipEngineIface,
C: Client<B, BE>,
MaxProposalLength: Get<u32>,
MaxAuthorities: Get<u32>,
{
fn get_keystore(&self) -> &DKGKeystore {
&self.key_store
}
}
impl KeystoreExt for DKGKeystore {
fn get_keystore(&self) -> &DKGKeystore {
self
}
}
#[auto_impl::auto_impl(&mut, &, Arc)]
pub trait HasLatestHeader<B: Block>: Send + Sync + 'static {
fn get_latest_header(&self) -> &Arc<RwLock<Option<B::Header>>>;
fn get_latest_block_number(&self) -> NumberFor<B> {
if let Some(latest_header) = self.get_latest_header().read().clone() {
*latest_header.number()
} else {
NumberFor::<B>::from(0u32)
}
}
}
impl<B, BE, C, GE> HasLatestHeader<B> for DKGWorker<B, BE, C, GE>
where
B: Block,
BE: Backend<B> + 'static,
GE: GossipEngineIface,
C: Client<B, BE> + 'static,
MaxProposalLength: Get<u32>,
MaxAuthorities: Get<u32>,
{
fn get_latest_header(&self) -> &Arc<RwLock<Option<B::Header>>> {
&self.latest_header
}
#[doc = " Gets latest block number from latest block header"]
fn get_latest_block_number(&self) -> NumberFor<B> {
if let Some(latest_header) = self.get_latest_header().read().clone() {
*latest_header.number()
} else {
NumberFor::<B>::from(0u32)
}
}
}
pub struct Clock<B: Block> {
pub latest_header: Arc<RwLock<Option<B::Header>>>,
}
impl<B: Block> Clone for Clock<B> {
fn clone(&self) -> Self {
Self { latest_header: self.latest_header.clone() }
}
}
impl<B: Block> HasLatestHeader<B> for Clock<B> {
fn get_latest_header(&self) -> &Arc<RwLock<Option<B::Header>>> {
&self.latest_header
}
}
async fn exec_client_function<B, C, BE, F, T>(client: &Arc<C>, function: F) -> T
where
for<'a> F: FnOnce(&'a C) -> T,
B: Block,
BE: Backend<B>,
C: Client<B, BE> + 'static,
T: Send + 'static,
F: Send + 'static,
{
let client = client.clone();
tokio::task::spawn_blocking(move || function(&client))
.await
.expect("Failed to spawn blocking task")
}