1use axum::extract::Request;
2
3use axum::middleware::{Next, from_fn_with_state};
4use axum::{Router, routing::get};
5use axum::{extract::State, response::IntoResponse};
6use clap::{Parser, Subcommand};
7use firebase_auth::FirebaseAuth;
8use http::StatusCode;
9use socketioxide::SocketIo;
10use socketioxide::layer::SocketIoLayer;
11use sqlx::postgres::PgPoolOptions;
12use sqlx::{PgPool, Postgres};
13use sqlx_migrator::cli::MigrationCommand;
14use sqlx_migrator::migrator::{Migrate, Migrator};
15use sqlx_migrator::{Info, Plan};
16use std::path::Path;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tower::ServiceBuilder;
20use tower_http::cors::CorsLayer;
21use tracing::{error, info};
22use tracing_subscriber::filter::{EnvFilter, LevelFilter};
23
24mod app;
25mod auth;
26mod document;
27mod migrations;
28mod rpc;
29mod socket;
30mod user;
31
32use app::AppStatus;
33
34fn web_port() -> String {
36 dotenvy::var("PORT").unwrap_or("8000".to_string())
37}
38
39fn automerge_io_port() -> String {
43 dotenvy::var("AUTOMERGE_IO_PORT").unwrap_or("3000".to_string())
44}
45
46#[derive(Parser, Debug)]
47#[command(name = "catcolab")]
48#[command(about = "CatColab server and migration CLI", version)]
49struct Cli {
50 #[command(subcommand)]
51 command: Option<Command>,
52}
53
54#[derive(Subcommand, Debug)]
55enum Command {
56 Migrator(MigrationCommand),
58 Serve,
60}
61
62#[tokio::main]
63async fn main() {
64 let env_filter = EnvFilter::builder()
65 .with_default_directive(LevelFilter::INFO.into())
66 .from_env_lossy();
67
68 tracing_subscriber::fmt().with_env_filter(env_filter).init();
69
70 let db = PgPoolOptions::new()
71 .max_connections(10)
72 .connect(&dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` should be set"))
73 .await
74 .expect("Failed to connect to database");
75
76 let cli = Cli::parse();
77
78 let mut migrator = Migrator::default();
79 migrator
80 .add_migrations(migrations::migrations())
81 .expect("Failed to load migrations");
82
83 match cli.command.unwrap_or(Command::Serve) {
84 Command::Migrator(cmd) => {
85 let mut conn = db.acquire().await.expect("Failed to acquire DB connection");
86
87 cmd.run(&mut *conn, Box::new(migrator)).await.unwrap();
88 return;
89 }
90
91 Command::Serve => {
92 let (io_layer, io) = SocketIo::new_layer();
93
94 let (status_tx, status_rx) = watch::channel(AppStatus::Starting);
95 let state = app::AppState {
96 automerge_io: io,
97 db: db.clone(),
98 app_status: status_rx.clone(),
99 };
100
101 let firebase_auth = Arc::new(
106 FirebaseAuth::new(
107 &dotenvy::var("FIREBASE_PROJECT_ID")
108 .expect("`FIREBASE_PROJECT_ID` should be set"),
109 )
110 .await,
111 );
112
113 socket::setup_automerge_socket(state.clone());
114
115 tokio::try_join!(
116 run_migrator_apply(db.clone(), migrator, status_tx.clone()),
117 run_web_server(state.clone(), firebase_auth.clone()),
118 run_automerge_socket(io_layer),
119 )
120 .unwrap();
121 }
122 }
123}
124
125async fn run_migrator_apply(
126 db: PgPool,
127 migrator: Migrator<Postgres>,
128 status_tx: watch::Sender<AppStatus>,
129) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130 status_tx.send(AppStatus::Migrating)?;
131 info!("Applying database migrations...");
132
133 let mut conn = db.acquire().await?;
134 migrator.run(&mut conn, &Plan::apply_all()).await.unwrap();
135
136 status_tx.send(AppStatus::Running)?;
137 sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
138 info!("Migrations complete");
139
140 Ok(())
141}
142
143async fn app_status_gate(
144 State(status_rx): State<watch::Receiver<AppStatus>>,
145 req: Request,
146 next: Next,
147) -> impl IntoResponse {
148 let status = status_rx.borrow().clone();
150 match status {
151 AppStatus::Running => next.run(req).await,
152 AppStatus::Failed(reason) => {
153 (StatusCode::INTERNAL_SERVER_ERROR, format!("App failed to start: {reason}"))
154 .into_response()
155 }
156 AppStatus::Starting | AppStatus::Migrating => {
157 (StatusCode::SERVICE_UNAVAILABLE, "Server not ready yet").into_response()
158 }
159 }
160}
161
162async fn auth_middleware(
163 State(firebase_auth): State<Arc<FirebaseAuth>>,
164 mut req: Request<axum::body::Body>,
165 next: axum::middleware::Next,
166) -> impl IntoResponse {
167 match auth::authenticate_from_request(&firebase_auth, &req) {
168 Ok(Some(user)) => {
169 req.extensions_mut().insert(user);
170 }
171 Ok(_) => {}
172 Err(err) => {
173 error!("Authentication error: {err}");
174 }
175 }
176
177 next.run(req).await
178}
179
180async fn status_handler(State(status_rx): State<watch::Receiver<AppStatus>>) -> String {
181 match status_rx.borrow().clone() {
182 AppStatus::Starting => "Starting".into(),
183 AppStatus::Migrating => "Migrating".into(),
184 AppStatus::Running => "Running".into(),
185 AppStatus::Failed(reason) => format!("Failed: {reason}"),
186 }
187}
188
189use axum::routing::get_service;
190use tower_http::services::{ServeDir, ServeFile};
191
192async fn run_web_server(
193 state: app::AppState,
194 firebase_auth: Arc<FirebaseAuth>,
195) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
196 let port = web_port();
197 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")).await?;
198
199 let rpc_router = rpc::router();
200 let (qubit_service, qubit_handle) = rpc_router.to_service(state.clone());
201
202 let rpc_with_mw = ServiceBuilder::new()
203 .layer(from_fn_with_state(state.app_status.clone(), app_status_gate))
204 .layer(from_fn_with_state(firebase_auth, auth_middleware))
205 .service(qubit_service);
206
207 let status_router = Router::new()
210 .route("/status", get(status_handler))
211 .with_state(state.app_status.clone());
212
213 let mut app = Router::new().merge(status_router).nest_service("/rpc", rpc_with_mw);
214
215 if let Some(spa_dir) = spa_directory() {
216 let index = Path::new(&spa_dir).join("index.html");
217 let spa_service =
218 get_service(ServeDir::new(&spa_dir).not_found_service(ServeFile::new(index)));
219
220 info!("Serving frontend from directory: {spa_dir}");
221 app = app.nest_service("/", spa_service);
222 } else {
223 info!("frontend directory not found; keeping default text route at /");
224 app = app.route("/", get(|| async { "Hello! The CatColab server is running" }));
225 }
226
227 app = app.layer(CorsLayer::permissive());
228
229 info!("Web server listening at port {port}");
230
231 axum::serve(listener, app).await?;
232 qubit_handle.stop().ok();
233
234 Ok(())
235}
236
237fn spa_directory() -> Option<String> {
238 if let Ok(candidate) = dotenvy::var("SPA_DIR") {
242 let path = Path::new(&candidate);
243 if path.exists() && path.is_dir() {
244 return Some(candidate);
245 }
246 }
247 None
248}
249
250async fn run_automerge_socket(
251 io_layer: SocketIoLayer,
252) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
253 let port = automerge_io_port();
254 let listener = tokio::net::TcpListener::bind(format!("localhost:{port}")).await?;
255 let app = Router::new().layer(io_layer);
256 info!("Automerge socket listening at port {port}");
257 axum::serve(listener, app).await?;
258 Ok(())
259}