From 4d826ed2a4451de7318fbd91dbe6e7e319a7a49b Mon Sep 17 00:00:00 2001 From: Lyssieth Date: Mon, 9 Oct 2023 23:20:23 +0300 Subject: [PATCH] feat: prepare for scanning --- Cargo.lock | 5 + rave/Cargo.toml | 2 + rave/src/main.rs | 1 + rave/src/rest/get_album.rs | 5 +- rave/src/rest/get_music_folders.rs | 19 ++-- rave/src/rest/mod.rs | 3 + rave/src/rest/start_scan.rs | 46 ++++++++ rave/src/scan.rs | 166 +++++++++++++++++++++++++++++ rave/src/scan/walk.rs | 149 ++++++++++++++++++++++++++ rave/src/subsonic.rs | 147 ++++--------------------- rave/src/utils.rs | 8 -- 11 files changed, 402 insertions(+), 149 deletions(-) create mode 100644 rave/src/rest/start_scan.rs create mode 100644 rave/src/scan.rs create mode 100644 rave/src/scan/walk.rs diff --git a/Cargo.lock b/Cargo.lock index 58ace3c..7d50b94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1765,6 +1765,9 @@ name = "once_cell" version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +dependencies = [ + "parking_lot_core", +] [[package]] name = "opaque-debug" @@ -2209,8 +2212,10 @@ dependencies = [ "cfg-if", "color-eyre", "entities", + "futures-lite", "md5", "migration", + "once_cell", "poem", "poem-ext", "quick-xml", diff --git a/rave/Cargo.toml b/rave/Cargo.toml index 062eed7..6409d06 100644 --- a/rave/Cargo.toml +++ b/rave/Cargo.toml @@ -36,3 +36,5 @@ url-escape = "0.1.1" sea-orm = { workspace = true } entities = { workspace = true } migration = { workspace = true } +once_cell = { version = "1.18.0", features = ["parking_lot"] } +futures-lite = "1.13.0" diff --git a/rave/src/main.rs b/rave/src/main.rs index da47fd0..b40df53 100644 --- a/rave/src/main.rs +++ b/rave/src/main.rs @@ -20,6 +20,7 @@ use tracing_subscriber::{fmt, EnvFilter}; mod authentication; mod random_types; mod rest; +mod scan; mod subsonic; mod ui; mod utils; diff --git a/rave/src/rest/get_album.rs b/rave/src/rest/get_album.rs index 8d145f2..2c48d4a 100644 --- a/rave/src/rest/get_album.rs +++ b/rave/src/rest/get_album.rs @@ -17,7 +17,6 @@ pub async fn get_album( auth: Authentication, Query(params): Query, ) -> SubsonicResponse { - let txn = txn.clone(); let u = utils::verify_user(txn.clone(), auth).await; match u { @@ -25,7 +24,7 @@ pub async fn get_album( Err(e) => return e, } - let album = Album::find_by_id(params.id).one(&*txn).await; + let album = Album::find_by_id(params.id).one(&**txn).await; let Ok(Some(album)) = album else { match album { Ok(Some(_)) => unreachable!("Some(album) covered by `let .. else`"), @@ -37,7 +36,7 @@ pub async fn get_album( } }; - let tracks = album.find_related(Track).all(&*txn).await; + let tracks = album.find_related(Track).all(&**txn).await; let tracks = match tracks { Ok(t) => t, diff --git a/rave/src/rest/get_music_folders.rs b/rave/src/rest/get_music_folders.rs index eb9febf..75a77df 100644 --- a/rave/src/rest/get_music_folders.rs +++ b/rave/src/rest/get_music_folders.rs @@ -1,9 +1,11 @@ +use entities::prelude::MusicFolder; use poem::web::Data; use poem_ext::db::DbTxn; +use sea_orm::EntityTrait; use crate::{ authentication::Authentication, - subsonic::{MusicFolder, SubsonicResponse}, + subsonic::{Error, SubsonicResponse}, utils::{self}, }; @@ -16,16 +18,11 @@ pub async fn get_music_folders(Data(txn): Data<&DbTxn>, auth: Authentication) -> Err(e) => return e, } - let folders = vec![ - MusicFolder { - id: 0, - name: "Music".to_string(), - }, - MusicFolder { - id: 1, - name: "Podcasts".to_string(), - }, - ]; + let folders = MusicFolder::find().all(&**txn).await; + + let Ok(folders) = folders else { + return SubsonicResponse::new_error(Error::RequestedDataWasNotFound(None)); + }; SubsonicResponse::new_music_folders(folders) } diff --git a/rave/src/rest/mod.rs b/rave/src/rest/mod.rs index f004787..138c38f 100644 --- a/rave/src/rest/mod.rs +++ b/rave/src/rest/mod.rs @@ -12,6 +12,8 @@ mod get_album_list; mod get_album; // rest/stream mod stream; +// rest/startScan +mod start_scan; pub fn build() -> Box> { Route::new() @@ -22,5 +24,6 @@ pub fn build() -> Box> { .at("/getAlbumList2", get_album_list::get_album_list) .at("/getAlbum", get_album::get_album) .at("/stream", stream::stream) + .at("/startScan", start_scan::start_scan) .boxed() } diff --git a/rave/src/rest/start_scan.rs b/rave/src/rest/start_scan.rs new file mode 100644 index 0000000..fbf28b4 --- /dev/null +++ b/rave/src/rest/start_scan.rs @@ -0,0 +1,46 @@ +use poem::web::Data; +use poem_ext::db::DbTxn; +use tracing::warn; + +use crate::{ + authentication::Authentication, + subsonic::{Error, SubsonicResponse}, + utils::{self}, +}; + +#[poem::handler] +pub async fn start_scan(Data(txn): Data<&DbTxn>, auth: Authentication) -> SubsonicResponse { + let u = utils::verify_user(txn.clone(), auth).await; + + match u { + Ok(u) => { + if !u.is_admin { + return SubsonicResponse::new_error(Error::UserIsNotAuthorizedForGivenOperation( + None, + )); + } + } + Err(e) => return e, + } + crate::scan::start_scan(); + + let res = crate::scan::get_scan_status().await; + + match res { + Ok(status) => { + if status.errors.is_empty() { + SubsonicResponse::new_scan_status(status.scanning, status.count) + } else { + warn!("Failed to start scan:"); + for e in status.errors { + warn!("{e}"); + } + SubsonicResponse::new_error(Error::Generic(None)) + } + } + Err(e) => { + warn!("Failed to start scan: {e}"); + SubsonicResponse::new_error(Error::Generic(None)) + } + } +} diff --git a/rave/src/scan.rs b/rave/src/scan.rs new file mode 100644 index 0000000..7d2fb06 --- /dev/null +++ b/rave/src/scan.rs @@ -0,0 +1,166 @@ +use color_eyre::{Report, Result}; +use futures_lite::StreamExt; +use once_cell::sync::Lazy; +use sea_orm::{ConnectOptions, Database, DatabaseTransaction, TransactionTrait}; +use std::{path::PathBuf, sync::Arc}; +use tokio::sync::RwLock; +use tracing::warn; + +mod walk; + +pub fn start_scan() { + tokio::spawn(scan()); +} + +static STATUS: Lazy>> = Lazy::new(|| { + Arc::new(RwLock::new(ScanStatus { + scanning: false, + count: 0, + errors: Vec::new(), + })) +}); + +pub async fn get_scan_status() -> Result { + let stat = STATUS.read().await; + + Ok(stat.clone()) +} + +#[derive(Debug, Clone)] +pub struct ScanStatus { + pub scanning: bool, + pub count: u64, + pub errors: Vec>, +} + +async fn scan() { + { + let mut stat = STATUS.write().await; + + stat.scanning = true; + } + + let url = std::env::var("DATABASE_URL").expect("DATABASE_URL not set"); + let conn = ConnectOptions::new(url); + let dbc = get_dbc(conn).await; + + if dbc.is_none() { + return; + } + let dbc = dbc.expect("Failed to connect to database"); + + let root_dir = get_root_dir(); + let mut walk = walk::WalkDir::new(root_dir); + + let mut count = 0; + + while let Some(res) = walk.next().await { + let Some(de) = check_dir_entry(res, &mut count).await else { + continue; + }; + + let Some(txn) = create_txn(&dbc, &mut count).await else { + continue; + }; + + if let Err(e) = handle_entry(&txn, de).await { + warn!("Failed to handle directory entry: {e}"); + + { + let mut write = STATUS.write().await; + write.errors.push(Arc::new(e)); + } + let _ = txn.rollback().await; + + count += 1; + continue; + } + + let _ = txn.commit().await; + } + + { + let mut stat = STATUS.write().await; + + stat.scanning = false; + stat.count = count; + } +} + +async fn handle_entry(tx: &DatabaseTransaction, entry: walk::DirEntry) -> Result<()> { + let path = entry.path(); + let path = path + .to_str() + .ok_or_else(|| Report::msg("Failed to convert path to string"))?; + let file_type = entry.file_type().await?; + let meta = entry.metadata().await?; + + // TODO: figure out how to scan. steal from Gonic if we have to :3 + + Ok(()) +} + +async fn create_txn( + dbc: &sea_orm::DatabaseConnection, + count: &mut u64, +) -> Option { + let txn = match dbc.begin().await { + Ok(txn) => txn, + Err(e) => { + warn!("Failed to start database transaction: {e}"); + + { + let mut write = STATUS.write().await; + write.errors.push(Arc::new(Report::new(e))); + } + + *count += 1; + return None; + } + }; + Some(txn) +} + +async fn check_dir_entry( + res: std::result::Result, std::io::Error>, + count: &mut u64, +) -> Option> { + let de = match res { + Ok(de) => de, + Err(e) => { + warn!("Failed to read directory entry: {e}"); + + { + let mut write = STATUS.write().await; + write.errors.push(Arc::new(Report::new(e))); + } + + *count += 1; + return None; + } + }; + Some(de) +} + +async fn get_dbc(conn: ConnectOptions) -> Option { + let dbc = Database::connect(conn).await; + let Ok(dbc) = dbc else { + let e = dbc.expect_err("Failed to connect to database"); + warn!("Failed to connect to database: {e}"); + + { + let mut stat = STATUS.write().await; + stat.scanning = false; + stat.errors + .push(Arc::new(Report::msg("Failed to connect to database"))); + } + + return None; + }; + Some(dbc) +} + +fn get_root_dir() -> PathBuf { + let root_dir = std::env::var("RAVE_STORAGE_DIR").expect("RAVE_STORAGE_DIR not set"); + PathBuf::from(root_dir) +} diff --git a/rave/src/scan/walk.rs b/rave/src/scan/walk.rs new file mode 100644 index 0000000..307e2f0 --- /dev/null +++ b/rave/src/scan/walk.rs @@ -0,0 +1,149 @@ +#![allow(dead_code)] + +use std::{ + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures_lite::{future::Boxed as BoxedFut, stream, Future, FutureExt, Stream, StreamExt}; + +use tokio::fs::{read_dir, ReadDir}; + +pub use tokio::io::Result; +pub type DirEntry = Arc; + +type BoxStream = futures_lite::stream::Boxed>; + +pub struct WalkDir { + root: PathBuf, + entries: BoxStream, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Filtering { + Ignore, + IgnoreDir, + Continue, +} + +impl WalkDir { + pub fn new(root: impl AsRef) -> Self { + Self { + root: root.as_ref().to_path_buf(), + entries: walk_dir( + root, + None:: BoxedFut + Send>>, + ), + } + } + + pub fn filter(self, f: F) -> Self + where + F: FnMut(DirEntry) -> Fut + Send + 'static, + Fut: Future + Send, + { + let root = self.root.clone(); + Self { + root: self.root, + entries: walk_dir(root, Some(f)), + } + } +} + +impl Stream for WalkDir { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let entries = Pin::new(&mut self.entries); + entries.poll_next(cx) + } +} + +fn walk_dir(root: impl AsRef, filter: Option) -> BoxStream +where + F: FnMut(DirEntry) -> Fut + Send + 'static, + Fut: Future + Send, +{ + stream::unfold( + State::Start((root.as_ref().to_path_buf(), filter)), + move |state| async move { + match state { + State::Start((root, filter)) => match read_dir(root).await { + Err(e) => Some((Err(e), State::Done)), + Ok(rd) => walk(vec![rd], filter).await, + }, + State::Walk((dirs, filter)) => walk(dirs, filter).await, + State::Done => None, + } + }, + ) + .boxed() +} + +enum State { + Start((PathBuf, Option)), + Walk((Vec, Option)), + Done, +} + +type UnfoldState = (Result, State); + +fn walk(mut dirs: Vec, filter: Option) -> BoxedFut>> +where + F: FnMut(DirEntry) -> Fut + Send + 'static, + Fut: Future + Send, +{ + async move { + if let Some(dir) = dirs.last_mut() { + match dir.next_entry().await { + Ok(Some(entry)) => walk_entry(Arc::new(entry), dirs, filter).await, + Ok(None) => { + dirs.pop(); + walk(dirs, filter).await + } + Err(e) => Some((Err(e), State::Walk((dirs, filter)))), + } + } else { + None + } + } + .boxed() +} + +fn walk_entry( + entry: DirEntry, + mut dirs: Vec, + mut filter: Option, +) -> BoxedFut>> +where + F: FnMut(DirEntry) -> Fut + Send + 'static, + Fut: Future + Send, +{ + async move { + match entry.file_type().await { + Err(e) => Some((Err(e), State::Walk((dirs, filter)))), + Ok(ft) => { + let filtering = match filter.as_mut() { + Some(filter) => filter(entry.clone()).await, + None => Filtering::Continue, + }; + if ft.is_dir() { + let rd = match read_dir(entry.path()).await { + Err(e) => return Some((Err(e), State::Walk((dirs, filter)))), + Ok(rd) => rd, + }; + if filtering != Filtering::IgnoreDir { + dirs.push(rd); + } + } + match filtering { + Filtering::Continue => Some((Ok(entry), State::Walk((dirs, filter)))), + Filtering::IgnoreDir | Filtering::Ignore => walk(dirs, filter).await, + } + } + } + } + .boxed() +} diff --git a/rave/src/subsonic.rs b/rave/src/subsonic.rs index 5818494..103352e 100644 --- a/rave/src/subsonic.rs +++ b/rave/src/subsonic.rs @@ -1,11 +1,8 @@ -#![allow(dead_code)] // TODO: Remove this - use std::fmt::Display; -use entities::{album, track}; +use entities::{album, music_folder, track}; use poem::{http::StatusCode, IntoResponse, Response}; use serde::{ser::SerializeStruct, Serialize}; -use time::OffsetDateTime; use crate::authentication::VersionTriple; @@ -39,7 +36,7 @@ impl SubsonicResponse { } } - pub fn new_music_folders(music_folders: Vec) -> Self { + pub fn new_music_folders(music_folders: Vec) -> Self { Self::new(SubResponseType::MusicFolders { music_folders }) } @@ -67,6 +64,15 @@ impl SubsonicResponse { value: Box::new(SubResponseType::Error(inner)), } } + + pub fn new_scan_status(scanning: bool, count: u64) -> Self { + Self { + xmlns: "http://subsonic.org/restapi".to_string(), + status: ResponseStatus::Ok, + version: VersionTriple(1, 16, 1), + value: Box::new(SubResponseType::ScanStatus { scanning, count }), + } + } } #[derive(Debug, Clone, Serialize)] @@ -74,7 +80,7 @@ pub enum SubResponseType { #[serde(rename = "musicFolders")] MusicFolders { #[serde(rename = "musicFolder")] - music_folders: Vec, + music_folders: Vec, }, #[serde(rename = "error")] Error(Error), @@ -100,130 +106,16 @@ pub enum SubResponseType { #[serde(flatten)] songs: Vec, }, + #[serde(rename = "scanStatus")] + ScanStatus { + #[serde(rename = "scanning")] + scanning: bool, + #[serde(rename = "count")] + count: u64, + }, Empty, } -#[derive(Debug, Clone, Serialize, Default)] -#[serde(default)] -pub struct AlbumId3 { - #[serde(rename = "@id", serialize_with = "crate::utils::album_id")] - pub id: i64, - #[serde(rename = "@name")] - pub name: String, - #[serde(rename = "@artist", skip_serializing_if = "Option::is_none")] - pub artist: Option, - #[serde(rename = "@artistId", skip_serializing_if = "Option::is_none")] - pub artist_id: Option, - #[serde(rename = "@coverArt", skip_serializing_if = "Option::is_none")] - pub cover_art: Option, - #[serde(rename = "@songCount")] - pub song_count: i32, - #[serde(rename = "@duration")] - pub duration: i32, - #[serde(rename = "@playCount", skip_serializing_if = "Option::is_none")] - pub play_count: Option, - #[serde(rename = "@created", skip_serializing_if = "Option::is_none")] - pub created: Option, - #[serde(rename = "@starred", skip_serializing_if = "Option::is_none")] - pub starred: Option, - #[serde(rename = "@year", skip_serializing_if = "Option::is_none")] - pub year: Option, - #[serde(rename = "@genre", skip_serializing_if = "Option::is_none")] - pub genre: Option, - #[serde(rename = "@musicFolder", skip_serializing_if = "Option::is_none")] - pub folder_id: Option, -} - -#[derive(Debug, Clone, Serialize, Default)] -#[serde(default)] -pub struct Child { - #[serde(rename = "@id")] - pub id: String, - #[serde(rename = "@parent", skip_serializing_if = "Option::is_none")] - pub parent: Option, - #[serde(rename = "@isDir")] - pub is_dir: bool, - #[serde(rename = "@title")] - pub title: String, - #[serde(rename = "@album", skip_serializing_if = "Option::is_none")] - pub album: Option, - #[serde(rename = "@artist", skip_serializing_if = "Option::is_none")] - pub artist: Option, - #[serde(rename = "@track", skip_serializing_if = "Option::is_none")] - pub track: Option, - #[serde(rename = "@year", skip_serializing_if = "Option::is_none")] - pub year: Option, - #[serde(rename = "@genre", skip_serializing_if = "Option::is_none")] - pub genre: Option, - #[serde(rename = "@coverArt", skip_serializing_if = "Option::is_none")] - pub cover_art: Option, - #[serde(rename = "@size", skip_serializing_if = "Option::is_none")] - pub size: Option, - #[serde(rename = "@contentType", skip_serializing_if = "Option::is_none")] - pub content_type: Option, - #[serde(rename = "@suffix", skip_serializing_if = "Option::is_none")] - pub suffix: Option, - #[serde( - rename = "@transcodedContentType", - skip_serializing_if = "Option::is_none" - )] - pub transcoded_content_type: Option, - #[serde(rename = "@transcodedSuffix", skip_serializing_if = "Option::is_none")] - pub transcoded_suffix: Option, - #[serde(rename = "@duration", skip_serializing_if = "Option::is_none")] - pub duration: Option, - #[serde(rename = "@bitRate", skip_serializing_if = "Option::is_none")] - pub bit_rate: Option, - #[serde(rename = "@path", skip_serializing_if = "Option::is_none")] - pub path: Option, - #[serde(rename = "@isVideo", skip_serializing_if = "Option::is_none")] - pub is_video: Option, - #[serde(rename = "@userRating", skip_serializing_if = "Option::is_none")] - pub user_rating: Option, - #[serde(rename = "@averageRating", skip_serializing_if = "Option::is_none")] - pub average_rating: Option, - #[serde(rename = "@playCount", skip_serializing_if = "Option::is_none")] - pub play_count: Option, - #[serde(rename = "@discNumber", skip_serializing_if = "Option::is_none")] - pub disc_number: Option, - #[serde(rename = "@created", skip_serializing_if = "Option::is_none")] - pub created: Option, - #[serde(rename = "@starred", skip_serializing_if = "Option::is_none")] - pub starred: Option, - #[serde(rename = "@albumId", skip_serializing_if = "Option::is_none")] - pub album_id: Option, - #[serde(rename = "@artistId", skip_serializing_if = "Option::is_none")] - pub artist_id: Option, - #[serde(rename = "@type", skip_serializing_if = "Option::is_none")] - pub r#type: Option, - #[serde(rename = "@bookmarkPosition", skip_serializing_if = "Option::is_none")] - pub bookmark_position: Option, - #[serde(rename = "@originalWidth", skip_serializing_if = "Option::is_none")] - pub original_width: Option, - #[serde(rename = "@originalHeight", skip_serializing_if = "Option::is_none")] - pub original_height: Option, -} - -#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] -pub enum MediaType { - #[serde(rename = "music")] - Music, - #[serde(rename = "video")] - Video, - #[serde(rename = "audiobook")] - Audiobook, - #[serde(rename = "podcast")] - Podcast, -} - -#[derive(Debug, Clone, Serialize)] -pub struct MusicFolder { - #[serde(rename = "@id")] - pub id: i64, - #[serde(rename = "@name")] - pub name: String, -} - #[derive(Debug, Clone, Copy)] pub enum ResponseStatus { Ok, @@ -243,6 +135,7 @@ impl Serialize for ResponseStatus { } #[derive(Debug, Clone)] +#[allow(unused)] pub enum Error { Generic(Option), RequiredParameterMissing(Option), diff --git a/rave/src/utils.rs b/rave/src/utils.rs index fe8489c..41c7572 100644 --- a/rave/src/utils.rs +++ b/rave/src/utils.rs @@ -1,7 +1,6 @@ use entities::{prelude::User, user}; use poem_ext::db::DbTxn; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; -use serde::Serializer; use tracing::error; use crate::{ @@ -42,10 +41,3 @@ pub async fn verify_user( } } } - -#[allow(clippy::trivially_copy_pass_by_ref)] -pub fn album_id(id: &i64, s: S) -> Result { - let str = format!("al-{id}"); - - s.serialize_str(&str) -}