From 835983808d74b674296c256983527bf913441f9f Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sat, 6 Jun 2026 23:24:56 +0530 Subject: [PATCH 1/2] feat: raw pipeline complete --- examples/raw/src/main.rs | 28 +-- inode/src/common/mod.rs | 11 ++ inode/src/p2p/cmds.rs | 23 ++- inode/src/p2p/node.rs | 28 ++- inode/src/p2p/service.rs | 369 ++++++++++++++++++++++++++++++++++++++- inode/src/p2p/types.rs | 2 +- inode/src/slm/core.rs | 107 ++++++++++-- 7 files changed, 534 insertions(+), 34 deletions(-) diff --git a/examples/raw/src/main.rs b/examples/raw/src/main.rs index 2d764ee..b3eda62 100644 --- a/examples/raw/src/main.rs +++ b/examples/raw/src/main.rs @@ -1,7 +1,6 @@ -use std::io::{self, Write}; - use anyhow::Result; use inode::p2p::{cmds::io_loop, node::InferenceNode, types::Mode}; +use std::env; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -15,17 +14,26 @@ async fn main() -> Result<()> { dotenvy::dotenv().ok(); - print!("Intiate at BOOTSTRAP node (Y/n): "); - io::stdout().flush().unwrap(); - - let mut io = String::new(); - io::stdin().read_line(&mut io).unwrap(); + let mut mode = Mode::Provider; + let mut args = env::args().skip(1); - let mut mode = Mode::Bootstrap; - if io.trim().to_lowercase() != "" { - mode = Mode::Provider; + while let Some(arg) = args.next() { + if arg == "bootstrap" { + mode = Mode::Bootstrap; + } } + // print!("Intiate at BOOTSTRAP node (Y/n): "); + // io::stdout().flush().unwrap(); + + // let mut io = String::new(); + // io::stdin().read_line(&mut io).unwrap(); + + // let mut mode = Mode::Bootstrap; + // if io.trim().to_lowercase() != "" { + // mode = Mode::Provider; + // } + let inode = InferenceNode::new(mode).await; io_loop(inode).await.unwrap(); diff --git a/inode/src/common/mod.rs b/inode/src/common/mod.rs index 5006a36..cefbef7 100644 --- a/inode/src/common/mod.rs +++ b/inode/src/common/mod.rs @@ -22,3 +22,14 @@ pub fn generate_entropy() -> (String, String) { (secret, hash) } + +pub fn random_split(v: Vec) -> (Vec, Vec) { + assert!(v.len() >= 2, "Vector must contain at least 2 elements"); + + let split_idx = rand::rng().random_range(1..v.len()); + + let left = v[..split_idx].to_vec(); + let right = v[split_idx..].to_vec(); + + (left, right) +} \ No newline at end of file diff --git a/inode/src/p2p/cmds.rs b/inode/src/p2p/cmds.rs index aafed80..ba349e3 100644 --- a/inode/src/p2p/cmds.rs +++ b/inode/src/p2p/cmds.rs @@ -30,6 +30,8 @@ const COMMANDS: &[&str] = &[ "mesh => map of topics -> peer", "\n", "slm => converse with the AI", + "adv => advertize a exec/verify session", + "finalize => finalize the winner response", ]; pub fn print_commands() { @@ -133,10 +135,29 @@ pub async fn handle_cmd(line: &str, inode: &Arc) -> Result<()> { "slm" => { let prompt = parts.collect::>().join(" "); - let res = inode.slm.converse(prompt.as_ref()).await.unwrap(); + let res = inode.service.slm.converse(prompt.as_ref()).await.unwrap(); println!("{}", res); } + "adv" => { + let topic = parts.next().unwrap().to_string(); + inode.service.adv(topic.clone(), None).await.unwrap(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let prompt = "Hey hows it going, let have somemfun talk about decentralized computaion, say when a distributed swarm of LLMs".to_string(); + inode + .service + .ack(topic.clone(), None, Some(prompt)) + .await + .unwrap(); + } + + "finalize" => { + let topic = parts.next().unwrap().to_string(); + inode.service.finalize(None, Some(topic)).await.unwrap(); + } + _ => println!("Unknown command"), } Ok(()) diff --git a/inode/src/p2p/node.rs b/inode/src/p2p/node.rs index 48d80db..0d444b7 100644 --- a/inode/src/p2p/node.rs +++ b/inode/src/p2p/node.rs @@ -13,7 +13,10 @@ use tokio::sync::{mpsc::Receiver, Mutex}; use tracing::{debug, info}; use crate::{ - p2p::types::{IMsgType, Mode}, + p2p::{ + service::IService, + types::{IMsgType, Mode}, + }, slm::core::SlmClient, }; @@ -22,7 +25,7 @@ pub const PROVIDER_MESH: &str = "swarm/mesh"; pub struct InferenceNode { pub p2p: Arc, - pub slm: SlmClient, + pub service: IService, pub mode: Mode, pub local: Multiaddr, @@ -62,12 +65,18 @@ impl InferenceNode { } }; + let bootmesh = Arc::new(Mutex::new(HashMap::new())); let inode = Arc::new(InferenceNode { p2p: p2p_tx.clone(), - slm: SlmClient::new("http://localhost:11434".to_string()), + service: IService::new( + listen_addr.to_string(), + p2p_tx.clone(), + SlmClient::new("http://localhost:11434".to_string()), + bootmesh.clone(), + ), mode, local: listen_addr, - bootmesh: Arc::new(Mutex::new(HashMap::new())), + bootmesh, }); let events = inode.clone(); @@ -117,6 +126,10 @@ impl InferenceNode { let notification = event_rx.recv().await.unwrap(); let decoded = bincode::deserialize::(¬ification).unwrap(); + if self.mode == Mode::Bootstrap { + continue; + } + match decoded { GlobalEvent::Floodsub(event) => match event.msg_type { FloodsubMsgType::Publish => { @@ -129,7 +142,10 @@ impl InferenceNode { IMsgType::General(msg) => { debug!("FloodsubEvent: {topic} - {source}: {msg}") } - IMsgType::Service(_payload) => {} + IMsgType::Service(ipayload) => { + self.service.handle_incoming(ipayload).await.unwrap() + } + IMsgType::Bootmesh(mesh) => { let mut bootmesh = self.bootmesh.lock().await; *bootmesh = mesh; @@ -157,7 +173,7 @@ impl InferenceNode { let latest_mesh = self.p2p.floodsub_mesh().await.unwrap_or(HashMap::new()); if bootmesh == latest_mesh { - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(300)).await; continue; } diff --git a/inode/src/p2p/service.rs b/inode/src/p2p/service.rs index 725d393..5ff607c 100644 --- a/inode/src/p2p/service.rs +++ b/inode/src/p2p/service.rs @@ -1,4 +1,371 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use anyhow::Result; +use rand::{rng, seq::IteratorRandom}; +use rnet_p2p::{ + identity::traits::{core::INode, protocols::INodeFloodsubAPI}, + node::node::Node, + protocols::FLOODSUB, +}; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; + +use crate::{ + common::random_split, + p2p::{node::PROVIDER_MESH, types::IMsgType}, + slm::core::SlmClient, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct IPayload { + stage: IStage, + leader: String, + source: String, + task_id: String, + + prompt: Option, + res: Option, + generator: Option, + verify_score: Option, + exec: Option>, + verifiers: Option>, +} #[derive(Debug, Serialize, Deserialize)] -pub struct IPayload {} +pub enum IStage { + Adv, + Exec, + Verf, + Final, +} + +pub struct SessionStorage { + pub prompt: String, + pub winner: Option, + pub execs: Vec, + pub verifiers: Vec, + pub responses: HashMap)>, +} + +#[derive(Debug)] +pub struct TaskWinner { + pub generator: String, + pub prompt: String, + pub response: String, + pub score: f32, +} + +pub struct IService { + pub local: String, + pub p2p: Arc, + pub slm: SlmClient, + pub bootmesh: Arc>>>, + + pub sessions: Arc>>, +} + +impl IService { + pub fn new( + local: String, + p2p: Arc, + slm: SlmClient, + bootmesh: Arc>>>, + ) -> Self { + Self { + local, + p2p, + slm, + bootmesh, + sessions: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn adv(&self, topic: String, ipayload: Option) -> Result<()> { + match ipayload.is_none() { + true => { + self.p2p.floodsub_subscribe(topic.clone()).await.unwrap(); + + let adv_payload = IPayload { + leader: self.local.clone(), + source: self.local.clone(), + task_id: topic.clone(), + stage: IStage::Adv, + + prompt: None, + res: None, + generator: None, + verify_score: None, + exec: None, + verifiers: None, + }; + + let ipayload = IMsgType::Service(adv_payload); + let fsub_payload = bincode::serialize(&ipayload).unwrap(); + + self.p2p + .floodsub_publish(PROVIDER_MESH.to_string(), fsub_payload) + .await + .unwrap(); + + let mut sessions = self.sessions.lock().await; + sessions.insert( + topic, + SessionStorage { + prompt: String::new(), + winner: None, + execs: vec![], + verifiers: vec![], + responses: HashMap::new(), + }, + ); + } + + false => { + let ipayload = ipayload.unwrap(); + info!("Exec/Vrify session starting in: {}\n", ipayload.task_id); + + // connect with the leader + debug!("Connecting with the leader: {}", ipayload.source); + self.p2p + .new_stream(&ipayload.source, vec![FLOODSUB.to_string()]) + .await + .unwrap(); + + // TODO: connect with a random peer too + self.p2p.floodsub_subscribe(ipayload.task_id).await.unwrap(); + } + } + + Ok(()) + } + + pub async fn ack( + &self, + topic: String, + ipayload: Option, + prompt: Option, + ) -> Result<()> { + match ipayload.is_none() { + true => { + let mesh = self.p2p.floodsub_mesh().await.unwrap(); + + let participants = mesh.get(&topic).unwrap().clone(); + let (execs, verifiers) = random_split(participants); + + println!("Execs: {:?}", execs); + println!("Verifiers: {:?}", verifiers); + + let ack_payload = IPayload { + stage: IStage::Exec, + leader: self.local.clone(), + source: self.local.clone(), + task_id: topic.clone(), + + prompt: prompt.clone(), + res: None, + generator: None, + verify_score: None, + exec: Some(execs.clone()), + verifiers: Some(verifiers.clone()), + }; + + let ipayload = IMsgType::Service(ack_payload); + let fsub_payload = bincode::serialize(&ipayload).unwrap(); + + self.p2p + .floodsub_publish(topic.clone(), fsub_payload) + .await + .unwrap(); + + // -------------- + let mut sessions = self.sessions.lock().await; + let task = sessions.get_mut(&topic).unwrap(); + + task.execs = execs; + task.verifiers = verifiers; + task.prompt = prompt.unwrap().clone(); + // -------------- + } + + false => { + let ipayload = ipayload.unwrap(); + + match ipayload.exec.clone().unwrap().contains(&self.local) { + true => { + info!("Selected as EXECUTOR node\n"); + tokio::time::sleep(Duration::from_secs(2)).await; + self.execution(ipayload).await.unwrap(); + } + false => { + info!("Selected as VERIFIER node, waiting for executors...\n"); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + } + + Ok(()) + } + + pub async fn execution(&self, ipayload: IPayload) -> Result<()> { + let prompt = ipayload.prompt.unwrap(); + let task_id = ipayload.task_id; + + let res = self.slm.converse(prompt.as_ref()).await.unwrap(); + + let exec_payload = IPayload { + stage: IStage::Verf, + leader: ipayload.leader, + source: self.local.clone(), + task_id: task_id.clone(), + + prompt: Some(prompt), + res: Some(res), + generator: Some(self.local.clone()), + verify_score: None, + exec: None, + verifiers: ipayload.verifiers, + }; + + let ipayload = IMsgType::Service(exec_payload); + let fsub_payload = bincode::serialize(&ipayload).unwrap(); + + self.p2p + .floodsub_publish(task_id, fsub_payload) + .await + .unwrap(); + + Ok(()) + } + + pub async fn verification(&self, ipayload: IPayload) -> Result<()> { + match ipayload.verifiers.unwrap().contains(&self.local) { + true => { + debug!("verifying..."); + let prompt = ipayload.prompt.unwrap(); + let res = ipayload.res.unwrap(); + let task_id = ipayload.task_id; + + let score = self.slm.verify(&prompt, &res).await.unwrap(); + + let ver_payload = IPayload { + stage: IStage::Final, + leader: ipayload.leader, + source: self.local.clone(), + task_id: task_id.clone(), + + prompt: Some(prompt), + res: Some(res), + generator: Some(ipayload.source), + verify_score: Some(score), + exec: None, + verifiers: None, + }; + + let ipayload = IMsgType::Service(ver_payload); + let fsub_payload = bincode::serialize(&ipayload).unwrap(); + + // After some jitter + let nonce = (200..900).choose(&mut rng()).unwrap(); + tokio::time::sleep(Duration::from_millis(nonce)).await; + + self.p2p + .floodsub_publish(task_id, fsub_payload) + .await + .unwrap(); + } + _ => {} + }; + + Ok(()) + } + + pub async fn finalize(&self, ipayload: Option, topic: Option) -> Result<()> { + match ipayload.is_none() { + false => { + let ipayload = ipayload.unwrap(); + + if ipayload.leader != self.local { + return Ok(()); + } + + info!("Received a score, from: {}\n", ipayload.source); + + let response = ipayload.res.unwrap(); + let score: f32 = ipayload.verify_score.unwrap().parse().unwrap(); + let generator = ipayload.generator.unwrap(); + + let mut sessions = self.sessions.lock().await; + let task = sessions.get_mut(&ipayload.task_id).unwrap(); + match task.responses.get_mut(&generator) { + None => { + task.responses.insert(generator, (response, vec![score])); + } + Some((_, scores)) => scores.push(score), + }; + } + true => { + let topic = topic.unwrap(); + let (prompt, responses) = { + let sessions = self.sessions.lock().await; + let task = sessions.get(&topic).unwrap(); + (task.prompt.clone(), task.responses.clone()) + }; + + let mut winner_generator = None; + let mut winning_response = None; + let mut highest_avg: f32 = -1.0; // Set below your strict 0.00 scoring floor + + for (generator, (res, scores)) in responses { + if scores.is_empty() { + warn!( + "Generator {} had no validation scores. Skipping.", + generator + ); + continue; + } + + let sum: f32 = scores.iter().sum(); + let avg = sum / (scores.len() as f32); + + if avg > highest_avg { + highest_avg = avg; + winner_generator = Some(generator); + winning_response = Some(res); + } + } + + let winner = TaskWinner { + generator: winner_generator.unwrap(), + response: winning_response.unwrap(), + prompt, + score: highest_avg, + }; + + info!("Task completed: {}, \nWinner: {:?}", topic, winner); + } + } + + Ok(()) + } + + pub async fn handle_incoming(&self, ipayload: IPayload) -> Result<()> { + match ipayload.stage { + IStage::Adv => self + .adv(ipayload.task_id.clone(), Some(ipayload)) + .await + .unwrap(), + + IStage::Exec => { + self.ack(ipayload.task_id.clone(), Some(ipayload), None) + .await + .unwrap(); + } + IStage::Verf => self.verification(ipayload).await.unwrap(), + IStage::Final => self.finalize(Some(ipayload), None).await.unwrap(), + } + + Ok(()) + } +} diff --git a/inode/src/p2p/types.rs b/inode/src/p2p/types.rs index ec72272..bdc87f6 100644 --- a/inode/src/p2p/types.rs +++ b/inode/src/p2p/types.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::p2p::service::IPayload; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Mode { Provider, Bootstrap, diff --git a/inode/src/slm/core.rs b/inode/src/slm/core.rs index 84838d1..6347e48 100644 --- a/inode/src/slm/core.rs +++ b/inode/src/slm/core.rs @@ -1,9 +1,16 @@ use anyhow::Result; use reqwest::Client; +use serde::{Deserialize, Serialize}; +use tracing::warn; -pub struct SlmPayload { - pub req: String, - pub res: Option, +#[derive(Debug, Deserialize, Serialize)] +pub struct SlmRes { + pub res: String, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct SlmVer { + pub score: String, } pub struct SlmClient { @@ -13,19 +20,19 @@ pub struct SlmClient { impl SlmClient { pub fn new(url: String) -> Self { - let sys_prompt = "you are general purpose LLM, that I am using for language processing. - There is going to be a specific format in which we will converse. I will send in the req in json format, - and the res should also be in the json format. See the formats below. + let sys_prompt = r#"You are a high-precision inference engine operating in a strict P2P network. +Your output will be evaluated by a ruthless cryptographic validator node that scores responses from 0.00 to 0.99. + +To maximize your score, you MUST adhere to these rules: +1. Zero Fluff: Never use conversational filler (e.g., "Here is the answer", "Sure!"). +2. High Density: Answer the prompt accurately and comprehensively, but with maximum conciseness. +3. No Hallucinations: Stick strictly to known facts. If a prompt is nonsensical, state that directly. - You will always the receive the prompt in this schema: - {\"prompt\": \"prompt_sent_by_the_user\"} +We will converse in a strict JSON format. +You will receive the prompt as: {"prompt": "user_input"} +You MUST respond exactly as: {"res": "your_dense_accurate_response"} - And your response should be exactly in this schema: - {\"res\": \"response generated by you\"} - Do not include markdown blocks, text wrappers, or trailing characters outside the raw JSON structure. - - So now the req is next." - .to_string(); +Do not include markdown blocks, code wrappers, or any trailing text outside the raw JSON."#.to_string(); Self { url, sys_prompt } } @@ -59,6 +66,76 @@ impl SlmClient { .unwrap_or(raw_text) .trim(); - Ok(clean_json.to_string()) + let res: SlmRes = match serde_json::from_str(clean_json) { + Ok(parsed) => parsed, + Err(_) => { + warn!("Model generated invalid JSON: {}", raw_text); + return Ok("Invalid response for SLM".to_string()); + } + }; + + Ok(res.res) + } + + pub async fn verify(&self, prompt: &str, res: &str) -> Result { + let client = Client::new(); + + // Hyper-strict system prompt tailored for validation + let verify_sys_prompt = "You are a highly critical evaluator of AI outputs. + Your sole objective is to score the provided 'Response' based on how accurately, concisely, and completely it answers the 'Prompt'. + Scale: 0.00 to 0.99. + Rules: + 1. NEVER award a 1.0 or 1. + 2. Deduct points for hallucination. + 3. If the response is mediocre, score it below 0.5. + + You must output EXACTLY and ONLY valid JSON matching this schema: + {\"score\": \"0.XX\"} + Do not include markdown blocks, text wrappers, explanations, or any trailing characters outside the raw JSON structure."; + + let combined_prompt = format!( + "{}\n\nPrompt: {}\n\nResponse to evaluate: {}\n\nJSON Output:", + verify_sys_prompt, prompt, res + ); + + let payload = serde_json::json!({ + "model": "qwen2.5:1.5b".to_string(), // Assuming same model for eval + "prompt": combined_prompt, + "stream": false, + "options": { + "temperature": 0.0 // Ensure deterministic scoring + } + }); + + let response = client + .post(format!("{}/api/generate", self.url)) + .json(&payload) + .send() + .await?; + + let body: serde_json::Value = response.json().await?; + let raw_text = body["response"].as_str().unwrap_or("").trim(); + + // Strip markdown wrappers if the model ignores the strict prompt + let clean_json = raw_text + .strip_prefix("```json") + .unwrap_or(raw_text) + .strip_suffix("```") + .unwrap_or(raw_text) + .trim(); + + let ver: SlmVer = match serde_json::from_str(clean_json) { + Ok(parsed) => parsed, + Err(_) => { + warn!( + "Model generated invalid JSON during verification: {}", + raw_text + ); + // Default to a failing score if the validator node fails to follow schema constraints + return Ok("0.00".to_string()); + } + }; + + Ok(ver.score) } } From 20caba26f2165003969a16a977e6444324a463bf Mon Sep 17 00:00:00 2001 From: lla-dane Date: Sun, 7 Jun 2026 22:45:12 +0530 Subject: [PATCH 2/2] feat: automated the whole pipeline --- examples/raw/src/main.rs | 4 +- inode/src/common/mod.rs | 22 ++++++- inode/src/p2p/cmds.rs | 15 ++++- inode/src/p2p/node.rs | 4 -- inode/src/p2p/service.rs | 125 +++++++++++++++++++++++++++++++++++---- inode/src/slm/mod.rs | 2 +- 6 files changed, 149 insertions(+), 23 deletions(-) diff --git a/examples/raw/src/main.rs b/examples/raw/src/main.rs index b3eda62..c016a84 100644 --- a/examples/raw/src/main.rs +++ b/examples/raw/src/main.rs @@ -15,9 +15,9 @@ async fn main() -> Result<()> { dotenvy::dotenv().ok(); let mut mode = Mode::Provider; - let mut args = env::args().skip(1); + let args = env::args().skip(1); - while let Some(arg) = args.next() { + for arg in args { if arg == "bootstrap" { mode = Mode::Bootstrap; } diff --git a/inode/src/common/mod.rs b/inode/src/common/mod.rs index cefbef7..af89f1e 100644 --- a/inode/src/common/mod.rs +++ b/inode/src/common/mod.rs @@ -23,6 +23,16 @@ pub fn generate_entropy() -> (String, String) { (secret, hash) } +pub fn generate_random() -> String { + let secret: String = rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + + secret +} + pub fn random_split(v: Vec) -> (Vec, Vec) { assert!(v.len() >= 2, "Vector must contain at least 2 elements"); @@ -32,4 +42,14 @@ pub fn random_split(v: Vec) -> (Vec, Vec) { let right = v[split_idx..].to_vec(); (left, right) -} \ No newline at end of file +} + +pub fn last_n_chars(s: &str, n: usize) -> String { + s.chars() + .rev() + .take(n) + .collect::>() + .into_iter() + .rev() + .collect() +} diff --git a/inode/src/p2p/cmds.rs b/inode/src/p2p/cmds.rs index ba349e3..5335376 100644 --- a/inode/src/p2p/cmds.rs +++ b/inode/src/p2p/cmds.rs @@ -31,7 +31,10 @@ const COMMANDS: &[&str] = &[ "\n", "slm => converse with the AI", "adv => advertize a exec/verify session", + "ack => acknowlege the EXECS/VERIFIERS", "finalize => finalize the winner response", + "\n", + "pipe => Test out the automated pipeline", ]; pub fn print_commands() { @@ -142,9 +145,10 @@ pub async fn handle_cmd(line: &str, inode: &Arc) -> Result<()> { "adv" => { let topic = parts.next().unwrap().to_string(); inode.service.adv(topic.clone(), None).await.unwrap(); + } - tokio::time::sleep(Duration::from_secs(1)).await; - + "ack" => { + let topic = parts.next().unwrap().to_string(); let prompt = "Hey hows it going, let have somemfun talk about decentralized computaion, say when a distributed swarm of LLMs".to_string(); inode .service @@ -158,6 +162,13 @@ pub async fn handle_cmd(line: &str, inode: &Arc) -> Result<()> { inode.service.finalize(None, Some(topic)).await.unwrap(); } + "pipe" => { + let provider_count: usize = parts.next().unwrap().to_string().parse().unwrap(); + let winner = inode.service.pipeline(provider_count).await; + + println!("Winner:\n{:?}", winner); + } + _ => println!("Unknown command"), } Ok(()) diff --git a/inode/src/p2p/node.rs b/inode/src/p2p/node.rs index 0d444b7..5cc678d 100644 --- a/inode/src/p2p/node.rs +++ b/inode/src/p2p/node.rs @@ -149,8 +149,6 @@ impl InferenceNode { IMsgType::Bootmesh(mesh) => { let mut bootmesh = self.bootmesh.lock().await; *bootmesh = mesh; - - debug!("BOOTMESH"); } } } @@ -193,8 +191,6 @@ impl InferenceNode { // Wait 2 seconds for the new node to settle down tokio::time::sleep(Duration::from_secs(2)).await; - debug!("BOOTMESH"); - self.p2p .floodsub_publish(PROVIDER_MESH.to_string(), payload) .await diff --git a/inode/src/p2p/service.rs b/inode/src/p2p/service.rs index 5ff607c..05a6535 100644 --- a/inode/src/p2p/service.rs +++ b/inode/src/p2p/service.rs @@ -12,7 +12,7 @@ use tokio::sync::Mutex; use tracing::{debug, info, warn}; use crate::{ - common::random_split, + common::{generate_random, last_n_chars, random_split}, p2p::{node::PROVIDER_MESH, types::IMsgType}, slm::core::SlmClient, }; @@ -45,6 +45,7 @@ pub struct SessionStorage { pub winner: Option, pub execs: Vec, pub verifiers: Vec, + pub rtc_score_count: u32, pub responses: HashMap)>, } @@ -116,6 +117,7 @@ impl IService { winner: None, execs: vec![], verifiers: vec![], + rtc_score_count: 0, responses: HashMap::new(), }, ); @@ -123,7 +125,26 @@ impl IService { false => { let ipayload = ipayload.unwrap(); - info!("Exec/Vrify session starting in: {}\n", ipayload.task_id); + let task_id = ipayload.task_id; + + // Ask if the node wants to participate + // let mut io = String::new(); + // print!("Participate in Exec/Vrify => [{task_id}] - (Y/n): "); + + // io::stdout().flush().unwrap(); + // io::stdin().read_line(&mut io).unwrap(); + // io = io.trim().to_lowercase(); + + // if io != "" && io != "y" { + // return Ok(()); + // } + + let nonce = (4..6).choose(&mut rng()).unwrap(); + tokio::time::sleep(Duration::from_secs(nonce)).await; + + // --------------------------- + + info!("Exec/Vrify session starting in: {}\n", task_id); // connect with the leader debug!("Connecting with the leader: {}", ipayload.source); @@ -133,7 +154,7 @@ impl IService { .unwrap(); // TODO: connect with a random peer too - self.p2p.floodsub_subscribe(ipayload.task_id).await.unwrap(); + self.p2p.floodsub_subscribe(task_id).await.unwrap(); } } @@ -275,35 +296,48 @@ impl IService { .await .unwrap(); } - _ => {} + false => return Ok(()), }; Ok(()) } - pub async fn finalize(&self, ipayload: Option, topic: Option) -> Result<()> { + pub async fn finalize( + &self, + ipayload: Option, + topic: Option, + ) -> Option { match ipayload.is_none() { false => { let ipayload = ipayload.unwrap(); if ipayload.leader != self.local { - return Ok(()); + return None; } - info!("Received a score, from: {}\n", ipayload.source); + let source = ipayload.source; + let generator = ipayload.generator.unwrap(); + let score: f32 = ipayload.verify_score.unwrap().parse().unwrap(); + let task_id = ipayload.task_id; + + let source_last5 = last_n_chars(&source, 5); + let generator_last5 = last_n_chars(&generator, 5); + + info!( + "SCORED: (...{source_last5}) => (...{generator_last5}) - {score}: [{task_id}]" + ); let response = ipayload.res.unwrap(); - let score: f32 = ipayload.verify_score.unwrap().parse().unwrap(); - let generator = ipayload.generator.unwrap(); let mut sessions = self.sessions.lock().await; - let task = sessions.get_mut(&ipayload.task_id).unwrap(); + let task = sessions.get_mut(&task_id).unwrap(); match task.responses.get_mut(&generator) { None => { task.responses.insert(generator, (response, vec![score])); } Some((_, scores)) => scores.push(score), }; + task.rtc_score_count += 1; } true => { let topic = topic.unwrap(); @@ -343,11 +377,11 @@ impl IService { score: highest_avg, }; - info!("Task completed: {}, \nWinner: {:?}", topic, winner); + return Some(winner); } } - Ok(()) + None } pub async fn handle_incoming(&self, ipayload: IPayload) -> Result<()> { @@ -363,9 +397,74 @@ impl IService { .unwrap(); } IStage::Verf => self.verification(ipayload).await.unwrap(), - IStage::Final => self.finalize(Some(ipayload), None).await.unwrap(), + IStage::Final => { + self.finalize(Some(ipayload), None).await; + } } Ok(()) } + + pub async fn pipeline(&self, provider_count: usize) -> Option { + let task_id = generate_random(); + + self.adv(task_id.clone(), None).await.unwrap(); + + // TODO: WAIT FOR GETTING ENOUGH PARTIPANTS, AND A TIMEOUT + loop { + let mesh = self.p2p.floodsub_mesh().await.unwrap(); + let participants = mesh.get(&task_id).unwrap_or(&Vec::new()).clone(); + + if participants.len() != provider_count { + tokio::time::sleep(Duration::from_secs(3)).await; + debug!( + "Waiting on PARTICIPANTS: {}/{provider_count}", + participants.len() + ); + continue; + } + + println!(); + debug!("Participants QUOTA filled, moving on to ACK...\n"); + tokio::time::sleep(Duration::from_secs(4)).await; + + break; + } + + // Move on with ACK phase + let prompt = "Hey hows it going, let have somemfun talk about decentralized computaion, say when a distributed swarm of LLMs".to_string(); + self.ack(task_id.clone(), None, Some(prompt)).await.unwrap(); + + // Wait until we get all the scores + let (execs, verifiers) = { + let tasks = self.sessions.lock().await; + let task = tasks.get(&task_id).unwrap(); + + (task.execs.clone(), task.verifiers.clone()) + }; + let all_score_count = execs.len() * verifiers.len(); + + loop { + let rtc_score_count = { + let tasks = self.sessions.lock().await; + let task = tasks.get(&task_id).unwrap(); + + task.rtc_score_count as usize + }; + + if rtc_score_count != all_score_count { + debug!("Waiting on SCORES: {rtc_score_count}/{all_score_count}"); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; + } + + println!(); + debug!("Got all the scores, finalizing the winner...\n"); + tokio::time::sleep(Duration::from_secs(2)).await; + + break; + } + + self.finalize(None, Some(task_id)).await + } } diff --git a/inode/src/slm/mod.rs b/inode/src/slm/mod.rs index 689cad3..5a7ca06 100644 --- a/inode/src/slm/mod.rs +++ b/inode/src/slm/mod.rs @@ -1 +1 @@ -pub mod core; \ No newline at end of file +pub mod core;