Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions examples/raw/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::{self, Write};

use anyhow::Result;
use inode::p2p::{cmds::io_loop, node::InferenceNode, types::Mode};
use std::env;
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -15,17 +14,26 @@ async fn main() -> Result<()> {

dotenvy::dotenv().ok();

print!("Intiate at BOOTSTRAP node (Y/n): ");
io::stdout().flush().unwrap();

let mut io = String::new();
io::stdin().read_line(&mut io).unwrap();
let mut mode = Mode::Provider;
let args = env::args().skip(1);

let mut mode = Mode::Bootstrap;
if io.trim().to_lowercase() != "" {
mode = Mode::Provider;
for arg in args {
if arg == "bootstrap" {
mode = Mode::Bootstrap;
}
}

// print!("Intiate at BOOTSTRAP node (Y/n): ");
// io::stdout().flush().unwrap();

// let mut io = String::new();
// io::stdin().read_line(&mut io).unwrap();

// let mut mode = Mode::Bootstrap;
// if io.trim().to_lowercase() != "" {
// mode = Mode::Provider;
// }

let inode = InferenceNode::new(mode).await;
io_loop(inode).await.unwrap();

Expand Down
31 changes: 31 additions & 0 deletions inode/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,34 @@ pub fn generate_entropy() -> (String, String) {

(secret, hash)
}

pub fn generate_random() -> String {
let secret: String = rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();

secret
}

pub fn random_split(v: Vec<String>) -> (Vec<String>, Vec<String>) {
assert!(v.len() >= 2, "Vector must contain at least 2 elements");

let split_idx = rand::rng().random_range(1..v.len());

let left = v[..split_idx].to_vec();
let right = v[split_idx..].to_vec();

(left, right)
}

pub fn last_n_chars(s: &str, n: usize) -> String {
s.chars()
.rev()
.take(n)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}
34 changes: 33 additions & 1 deletion inode/src/p2p/cmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const COMMANDS: &[&str] = &[
"mesh => map of topics -> peer",
"\n",
"slm => converse with the AI",
"adv <topic> => advertize a exec/verify session",
"ack <topic> => acknowlege the EXECS/VERIFIERS",
"finalize <topic> => finalize the winner response",
"\n",
"pipe <privider_count> => Test out the automated pipeline",
];

pub fn print_commands() {
Expand Down Expand Up @@ -133,10 +138,37 @@ pub async fn handle_cmd(line: &str, inode: &Arc<InferenceNode>) -> Result<()> {
"slm" => {
let prompt = parts.collect::<Vec<_>>().join(" ");

let res = inode.slm.converse(prompt.as_ref()).await.unwrap();
let res = inode.service.slm.converse(prompt.as_ref()).await.unwrap();
println!("{}", res);
}

"adv" => {
let topic = parts.next().unwrap().to_string();
inode.service.adv(topic.clone(), None).await.unwrap();
}

"ack" => {
let topic = parts.next().unwrap().to_string();
let prompt = "Hey hows it going, let have somemfun talk about decentralized computaion, say when a distributed swarm of LLMs".to_string();
inode
.service
.ack(topic.clone(), None, Some(prompt))
.await
.unwrap();
}

"finalize" => {
let topic = parts.next().unwrap().to_string();
inode.service.finalize(None, Some(topic)).await.unwrap();
}

"pipe" => {
let provider_count: usize = parts.next().unwrap().to_string().parse().unwrap();
let winner = inode.service.pipeline(provider_count).await;

println!("Winner:\n{:?}", winner);
}

_ => println!("Unknown command"),
}
Ok(())
Expand Down
32 changes: 22 additions & 10 deletions inode/src/p2p/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use tokio::sync::{mpsc::Receiver, Mutex};
use tracing::{debug, info};

use crate::{
p2p::types::{IMsgType, Mode},
p2p::{
service::IService,
types::{IMsgType, Mode},
},
slm::core::SlmClient,
};

Expand All @@ -22,7 +25,7 @@ pub const PROVIDER_MESH: &str = "swarm/mesh";

pub struct InferenceNode {
pub p2p: Arc<Node>,
pub slm: SlmClient,
pub service: IService,

pub mode: Mode,
pub local: Multiaddr,
Expand Down Expand Up @@ -62,12 +65,18 @@ impl InferenceNode {
}
};

let bootmesh = Arc::new(Mutex::new(HashMap::new()));
let inode = Arc::new(InferenceNode {
p2p: p2p_tx.clone(),
slm: SlmClient::new("http://localhost:11434".to_string()),
service: IService::new(
listen_addr.to_string(),
p2p_tx.clone(),
SlmClient::new("http://localhost:11434".to_string()),
bootmesh.clone(),
),
mode,
local: listen_addr,
bootmesh: Arc::new(Mutex::new(HashMap::new())),
bootmesh,
});

let events = inode.clone();
Expand Down Expand Up @@ -117,6 +126,10 @@ impl InferenceNode {
let notification = event_rx.recv().await.unwrap();
let decoded = bincode::deserialize::<GlobalEvent>(&notification).unwrap();

if self.mode == Mode::Bootstrap {
continue;
}

match decoded {
GlobalEvent::Floodsub(event) => match event.msg_type {
FloodsubMsgType::Publish => {
Expand All @@ -129,12 +142,13 @@ impl InferenceNode {
IMsgType::General(msg) => {
debug!("FloodsubEvent: {topic} - {source}: {msg}")
}
IMsgType::Service(_payload) => {}
IMsgType::Service(ipayload) => {
self.service.handle_incoming(ipayload).await.unwrap()
}

IMsgType::Bootmesh(mesh) => {
let mut bootmesh = self.bootmesh.lock().await;
*bootmesh = mesh;

debug!("BOOTMESH");
}
}
}
Expand All @@ -157,7 +171,7 @@ impl InferenceNode {
let latest_mesh = self.p2p.floodsub_mesh().await.unwrap_or(HashMap::new());

if bootmesh == latest_mesh {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(300)).await;
continue;
}

Expand All @@ -177,8 +191,6 @@ impl InferenceNode {
// Wait 2 seconds for the new node to settle down
tokio::time::sleep(Duration::from_secs(2)).await;

debug!("BOOTMESH");

self.p2p
.floodsub_publish(PROVIDER_MESH.to_string(), payload)
.await
Expand Down
Loading
Loading