use futures::SinkExt; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio_util::codec::Framed; use tracing::{debug, error, info}; use crate::proto::codec::IrcCodec; use crate::proto::message::{Command, IrcMessage}; /// A handle to send messages to the server. /// Cheaply cloneable — pass it wherever you need to write. #[derive(Clone)] pub struct Sender { tx: mpsc::UnboundedSender, } impl Sender { /// Send a raw `IrcMessage` to the server. pub fn send(&self, msg: IrcMessage) { // Only fails if the connection task has shut down let _ = self.tx.send(msg); } /// Send a PRIVMSG to a channel or user. pub fn privmsg(&self, target: &str, text: &str) { self.send(IrcMessage::new( Command::Privmsg, vec![target.to_string(), text.to_string()], )); } /// Join a channel. pub fn join(&self, channel: &str) { self.send(IrcMessage::new(Command::Join, vec![channel.to_string()])); } /// Part a channel. pub fn part(&self, channel: &str, reason: Option<&str>) { let mut params = vec![channel.to_string()]; if let Some(r) = reason { params.push(r.to_string()); } self.send(IrcMessage::new(Command::Part, params)); } /// Change nick. pub fn nick(&self, new_nick: &str) { self.send(IrcMessage::new(Command::Nick, vec![new_nick.to_string()])); } } /// Establish a TCP connection and return: /// - A `Sender` handle for writing messages /// - An `mpsc::UnboundedReceiver` for reading incoming messages /// /// Two background tasks are spawned: /// - A **writer task**: drains the sender channel and writes to the TCP stream /// - A **reader task**: reads from the TCP stream and forwards to the inbox /// /// This split means the caller never has to hold a lock to send a message. pub async fn connect( addr: &str, ) -> Result<(Sender, mpsc::UnboundedReceiver), std::io::Error> { info!("Connecting to {}", addr); let stream = TcpStream::connect(addr).await?; info!("TCP connected to {}", addr); let framed = Framed::new(stream, IrcCodec::new()); let (mut sink, mut stream) = futures::StreamExt::split(framed); // Channel for outbound messages (caller → writer task) let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); // Channel for inbound messages (reader task → caller) let (in_tx, in_rx) = mpsc::unbounded_channel::(); // Writer task: takes messages from out_rx and sends them to the server tokio::spawn(async move { while let Some(msg) = out_rx.recv().await { debug!("--> {}", crate::proto::serializer::serialize(&msg)); if let Err(e) = sink.send(msg).await { error!("Write error: {}", e); break; } } debug!("Writer task shut down"); }); // Reader task: receives messages from the server and forwards to in_tx tokio::spawn(async move { use futures::StreamExt; while let Some(result) = stream.next().await { match result { Ok(msg) => { debug!("<-- {}", crate::proto::serializer::serialize(&msg)); if in_tx.send(msg).is_err() { break; // Receiver dropped, shut down } } Err(e) => { error!("Read error: {}", e); break; } } } debug!("Reader task shut down"); }); Ok((Sender { tx: out_tx }, in_rx)) }