feat: prepare for scanning

This commit is contained in:
Lys 2023-10-09 23:20:23 +03:00
parent b18d0dd747
commit 4d826ed2a4
Signed by: lyssieth
GPG key ID: C9CF3D614FAA3940
11 changed files with 402 additions and 149 deletions

5
Cargo.lock generated
View file

@ -1765,6 +1765,9 @@ name = "once_cell"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
dependencies = [
"parking_lot_core",
]
[[package]]
name = "opaque-debug"
@ -2209,8 +2212,10 @@ dependencies = [
"cfg-if",
"color-eyre",
"entities",
"futures-lite",
"md5",
"migration",
"once_cell",
"poem",
"poem-ext",
"quick-xml",

View file

@ -36,3 +36,5 @@ url-escape = "0.1.1"
sea-orm = { workspace = true }
entities = { workspace = true }
migration = { workspace = true }
once_cell = { version = "1.18.0", features = ["parking_lot"] }
futures-lite = "1.13.0"

View file

@ -20,6 +20,7 @@ use tracing_subscriber::{fmt, EnvFilter};
mod authentication;
mod random_types;
mod rest;
mod scan;
mod subsonic;
mod ui;
mod utils;

View file

@ -17,7 +17,6 @@ pub async fn get_album(
auth: Authentication,
Query(params): Query<GetAlbumParams>,
) -> SubsonicResponse {
let txn = txn.clone();
let u = utils::verify_user(txn.clone(), auth).await;
match u {
@ -25,7 +24,7 @@ pub async fn get_album(
Err(e) => return e,
}
let album = Album::find_by_id(params.id).one(&*txn).await;
let album = Album::find_by_id(params.id).one(&**txn).await;
let Ok(Some(album)) = album else {
match album {
Ok(Some(_)) => unreachable!("Some(album) covered by `let .. else`"),
@ -37,7 +36,7 @@ pub async fn get_album(
}
};
let tracks = album.find_related(Track).all(&*txn).await;
let tracks = album.find_related(Track).all(&**txn).await;
let tracks = match tracks {
Ok(t) => t,

View file

@ -1,9 +1,11 @@
use entities::prelude::MusicFolder;
use poem::web::Data;
use poem_ext::db::DbTxn;
use sea_orm::EntityTrait;
use crate::{
authentication::Authentication,
subsonic::{MusicFolder, SubsonicResponse},
subsonic::{Error, SubsonicResponse},
utils::{self},
};
@ -16,16 +18,11 @@ pub async fn get_music_folders(Data(txn): Data<&DbTxn>, auth: Authentication) ->
Err(e) => return e,
}
let folders = vec![
MusicFolder {
id: 0,
name: "Music".to_string(),
},
MusicFolder {
id: 1,
name: "Podcasts".to_string(),
},
];
let folders = MusicFolder::find().all(&**txn).await;
let Ok(folders) = folders else {
return SubsonicResponse::new_error(Error::RequestedDataWasNotFound(None));
};
SubsonicResponse::new_music_folders(folders)
}

View file

@ -12,6 +12,8 @@ mod get_album_list;
mod get_album;
// rest/stream
mod stream;
// rest/startScan
mod start_scan;
pub fn build() -> Box<dyn Endpoint<Output = poem::Response>> {
Route::new()
@ -22,5 +24,6 @@ pub fn build() -> Box<dyn Endpoint<Output = poem::Response>> {
.at("/getAlbumList2", get_album_list::get_album_list)
.at("/getAlbum", get_album::get_album)
.at("/stream", stream::stream)
.at("/startScan", start_scan::start_scan)
.boxed()
}

View file

@ -0,0 +1,46 @@
use poem::web::Data;
use poem_ext::db::DbTxn;
use tracing::warn;
use crate::{
authentication::Authentication,
subsonic::{Error, SubsonicResponse},
utils::{self},
};
#[poem::handler]
pub async fn start_scan(Data(txn): Data<&DbTxn>, auth: Authentication) -> SubsonicResponse {
let u = utils::verify_user(txn.clone(), auth).await;
match u {
Ok(u) => {
if !u.is_admin {
return SubsonicResponse::new_error(Error::UserIsNotAuthorizedForGivenOperation(
None,
));
}
}
Err(e) => return e,
}
crate::scan::start_scan();
let res = crate::scan::get_scan_status().await;
match res {
Ok(status) => {
if status.errors.is_empty() {
SubsonicResponse::new_scan_status(status.scanning, status.count)
} else {
warn!("Failed to start scan:");
for e in status.errors {
warn!("{e}");
}
SubsonicResponse::new_error(Error::Generic(None))
}
}
Err(e) => {
warn!("Failed to start scan: {e}");
SubsonicResponse::new_error(Error::Generic(None))
}
}
}

166
rave/src/scan.rs Normal file
View file

@ -0,0 +1,166 @@
use color_eyre::{Report, Result};
use futures_lite::StreamExt;
use once_cell::sync::Lazy;
use sea_orm::{ConnectOptions, Database, DatabaseTransaction, TransactionTrait};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;
use tracing::warn;
mod walk;
pub fn start_scan() {
tokio::spawn(scan());
}
static STATUS: Lazy<Arc<RwLock<ScanStatus>>> = Lazy::new(|| {
Arc::new(RwLock::new(ScanStatus {
scanning: false,
count: 0,
errors: Vec::new(),
}))
});
pub async fn get_scan_status() -> Result<ScanStatus> {
let stat = STATUS.read().await;
Ok(stat.clone())
}
#[derive(Debug, Clone)]
pub struct ScanStatus {
pub scanning: bool,
pub count: u64,
pub errors: Vec<Arc<Report>>,
}
async fn scan() {
{
let mut stat = STATUS.write().await;
stat.scanning = true;
}
let url = std::env::var("DATABASE_URL").expect("DATABASE_URL not set");
let conn = ConnectOptions::new(url);
let dbc = get_dbc(conn).await;
if dbc.is_none() {
return;
}
let dbc = dbc.expect("Failed to connect to database");
let root_dir = get_root_dir();
let mut walk = walk::WalkDir::new(root_dir);
let mut count = 0;
while let Some(res) = walk.next().await {
let Some(de) = check_dir_entry(res, &mut count).await else {
continue;
};
let Some(txn) = create_txn(&dbc, &mut count).await else {
continue;
};
if let Err(e) = handle_entry(&txn, de).await {
warn!("Failed to handle directory entry: {e}");
{
let mut write = STATUS.write().await;
write.errors.push(Arc::new(e));
}
let _ = txn.rollback().await;
count += 1;
continue;
}
let _ = txn.commit().await;
}
{
let mut stat = STATUS.write().await;
stat.scanning = false;
stat.count = count;
}
}
async fn handle_entry(tx: &DatabaseTransaction, entry: walk::DirEntry) -> Result<()> {
let path = entry.path();
let path = path
.to_str()
.ok_or_else(|| Report::msg("Failed to convert path to string"))?;
let file_type = entry.file_type().await?;
let meta = entry.metadata().await?;
// TODO: figure out how to scan. steal from Gonic if we have to :3
Ok(())
}
async fn create_txn(
dbc: &sea_orm::DatabaseConnection,
count: &mut u64,
) -> Option<DatabaseTransaction> {
let txn = match dbc.begin().await {
Ok(txn) => txn,
Err(e) => {
warn!("Failed to start database transaction: {e}");
{
let mut write = STATUS.write().await;
write.errors.push(Arc::new(Report::new(e)));
}
*count += 1;
return None;
}
};
Some(txn)
}
async fn check_dir_entry(
res: std::result::Result<Arc<tokio::fs::DirEntry>, std::io::Error>,
count: &mut u64,
) -> Option<Arc<tokio::fs::DirEntry>> {
let de = match res {
Ok(de) => de,
Err(e) => {
warn!("Failed to read directory entry: {e}");
{
let mut write = STATUS.write().await;
write.errors.push(Arc::new(Report::new(e)));
}
*count += 1;
return None;
}
};
Some(de)
}
async fn get_dbc(conn: ConnectOptions) -> Option<sea_orm::DatabaseConnection> {
let dbc = Database::connect(conn).await;
let Ok(dbc) = dbc else {
let e = dbc.expect_err("Failed to connect to database");
warn!("Failed to connect to database: {e}");
{
let mut stat = STATUS.write().await;
stat.scanning = false;
stat.errors
.push(Arc::new(Report::msg("Failed to connect to database")));
}
return None;
};
Some(dbc)
}
fn get_root_dir() -> PathBuf {
let root_dir = std::env::var("RAVE_STORAGE_DIR").expect("RAVE_STORAGE_DIR not set");
PathBuf::from(root_dir)
}

149
rave/src/scan/walk.rs Normal file
View file

@ -0,0 +1,149 @@
#![allow(dead_code)]
use std::{
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures_lite::{future::Boxed as BoxedFut, stream, Future, FutureExt, Stream, StreamExt};
use tokio::fs::{read_dir, ReadDir};
pub use tokio::io::Result;
pub type DirEntry = Arc<tokio::fs::DirEntry>;
type BoxStream = futures_lite::stream::Boxed<Result<DirEntry>>;
pub struct WalkDir {
root: PathBuf,
entries: BoxStream,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Filtering {
Ignore,
IgnoreDir,
Continue,
}
impl WalkDir {
pub fn new(root: impl AsRef<Path>) -> Self {
Self {
root: root.as_ref().to_path_buf(),
entries: walk_dir(
root,
None::<Box<dyn FnMut(DirEntry) -> BoxedFut<Filtering> + Send>>,
),
}
}
pub fn filter<F, Fut>(self, f: F) -> Self
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
let root = self.root.clone();
Self {
root: self.root,
entries: walk_dir(root, Some(f)),
}
}
}
impl Stream for WalkDir {
type Item = Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let entries = Pin::new(&mut self.entries);
entries.poll_next(cx)
}
}
fn walk_dir<F, Fut>(root: impl AsRef<Path>, filter: Option<F>) -> BoxStream
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
stream::unfold(
State::Start((root.as_ref().to_path_buf(), filter)),
move |state| async move {
match state {
State::Start((root, filter)) => match read_dir(root).await {
Err(e) => Some((Err(e), State::Done)),
Ok(rd) => walk(vec![rd], filter).await,
},
State::Walk((dirs, filter)) => walk(dirs, filter).await,
State::Done => None,
}
},
)
.boxed()
}
enum State<F> {
Start((PathBuf, Option<F>)),
Walk((Vec<ReadDir>, Option<F>)),
Done,
}
type UnfoldState<F> = (Result<DirEntry>, State<F>);
fn walk<F, Fut>(mut dirs: Vec<ReadDir>, filter: Option<F>) -> BoxedFut<Option<UnfoldState<F>>>
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
async move {
if let Some(dir) = dirs.last_mut() {
match dir.next_entry().await {
Ok(Some(entry)) => walk_entry(Arc::new(entry), dirs, filter).await,
Ok(None) => {
dirs.pop();
walk(dirs, filter).await
}
Err(e) => Some((Err(e), State::Walk((dirs, filter)))),
}
} else {
None
}
}
.boxed()
}
fn walk_entry<F, Fut>(
entry: DirEntry,
mut dirs: Vec<ReadDir>,
mut filter: Option<F>,
) -> BoxedFut<Option<UnfoldState<F>>>
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
async move {
match entry.file_type().await {
Err(e) => Some((Err(e), State::Walk((dirs, filter)))),
Ok(ft) => {
let filtering = match filter.as_mut() {
Some(filter) => filter(entry.clone()).await,
None => Filtering::Continue,
};
if ft.is_dir() {
let rd = match read_dir(entry.path()).await {
Err(e) => return Some((Err(e), State::Walk((dirs, filter)))),
Ok(rd) => rd,
};
if filtering != Filtering::IgnoreDir {
dirs.push(rd);
}
}
match filtering {
Filtering::Continue => Some((Ok(entry), State::Walk((dirs, filter)))),
Filtering::IgnoreDir | Filtering::Ignore => walk(dirs, filter).await,
}
}
}
}
.boxed()
}

View file

@ -1,11 +1,8 @@
#![allow(dead_code)] // TODO: Remove this
use std::fmt::Display;
use entities::{album, track};
use entities::{album, music_folder, track};
use poem::{http::StatusCode, IntoResponse, Response};
use serde::{ser::SerializeStruct, Serialize};
use time::OffsetDateTime;
use crate::authentication::VersionTriple;
@ -39,7 +36,7 @@ impl SubsonicResponse {
}
}
pub fn new_music_folders(music_folders: Vec<MusicFolder>) -> Self {
pub fn new_music_folders(music_folders: Vec<music_folder::Model>) -> Self {
Self::new(SubResponseType::MusicFolders { music_folders })
}
@ -67,6 +64,15 @@ impl SubsonicResponse {
value: Box::new(SubResponseType::Error(inner)),
}
}
pub fn new_scan_status(scanning: bool, count: u64) -> Self {
Self {
xmlns: "http://subsonic.org/restapi".to_string(),
status: ResponseStatus::Ok,
version: VersionTriple(1, 16, 1),
value: Box::new(SubResponseType::ScanStatus { scanning, count }),
}
}
}
#[derive(Debug, Clone, Serialize)]
@ -74,7 +80,7 @@ pub enum SubResponseType {
#[serde(rename = "musicFolders")]
MusicFolders {
#[serde(rename = "musicFolder")]
music_folders: Vec<MusicFolder>,
music_folders: Vec<music_folder::Model>,
},
#[serde(rename = "error")]
Error(Error),
@ -100,130 +106,16 @@ pub enum SubResponseType {
#[serde(flatten)]
songs: Vec<track::Model>,
},
#[serde(rename = "scanStatus")]
ScanStatus {
#[serde(rename = "scanning")]
scanning: bool,
#[serde(rename = "count")]
count: u64,
},
Empty,
}
#[derive(Debug, Clone, Serialize, Default)]
#[serde(default)]
pub struct AlbumId3 {
#[serde(rename = "@id", serialize_with = "crate::utils::album_id")]
pub id: i64,
#[serde(rename = "@name")]
pub name: String,
#[serde(rename = "@artist", skip_serializing_if = "Option::is_none")]
pub artist: Option<String>,
#[serde(rename = "@artistId", skip_serializing_if = "Option::is_none")]
pub artist_id: Option<i32>,
#[serde(rename = "@coverArt", skip_serializing_if = "Option::is_none")]
pub cover_art: Option<String>,
#[serde(rename = "@songCount")]
pub song_count: i32,
#[serde(rename = "@duration")]
pub duration: i32,
#[serde(rename = "@playCount", skip_serializing_if = "Option::is_none")]
pub play_count: Option<i64>,
#[serde(rename = "@created", skip_serializing_if = "Option::is_none")]
pub created: Option<OffsetDateTime>,
#[serde(rename = "@starred", skip_serializing_if = "Option::is_none")]
pub starred: Option<OffsetDateTime>,
#[serde(rename = "@year", skip_serializing_if = "Option::is_none")]
pub year: Option<i32>,
#[serde(rename = "@genre", skip_serializing_if = "Option::is_none")]
pub genre: Option<String>,
#[serde(rename = "@musicFolder", skip_serializing_if = "Option::is_none")]
pub folder_id: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Default)]
#[serde(default)]
pub struct Child {
#[serde(rename = "@id")]
pub id: String,
#[serde(rename = "@parent", skip_serializing_if = "Option::is_none")]
pub parent: Option<i32>,
#[serde(rename = "@isDir")]
pub is_dir: bool,
#[serde(rename = "@title")]
pub title: String,
#[serde(rename = "@album", skip_serializing_if = "Option::is_none")]
pub album: Option<String>,
#[serde(rename = "@artist", skip_serializing_if = "Option::is_none")]
pub artist: Option<String>,
#[serde(rename = "@track", skip_serializing_if = "Option::is_none")]
pub track: Option<i32>,
#[serde(rename = "@year", skip_serializing_if = "Option::is_none")]
pub year: Option<i32>,
#[serde(rename = "@genre", skip_serializing_if = "Option::is_none")]
pub genre: Option<String>,
#[serde(rename = "@coverArt", skip_serializing_if = "Option::is_none")]
pub cover_art: Option<String>,
#[serde(rename = "@size", skip_serializing_if = "Option::is_none")]
pub size: Option<i32>,
#[serde(rename = "@contentType", skip_serializing_if = "Option::is_none")]
pub content_type: Option<String>,
#[serde(rename = "@suffix", skip_serializing_if = "Option::is_none")]
pub suffix: Option<String>,
#[serde(
rename = "@transcodedContentType",
skip_serializing_if = "Option::is_none"
)]
pub transcoded_content_type: Option<String>,
#[serde(rename = "@transcodedSuffix", skip_serializing_if = "Option::is_none")]
pub transcoded_suffix: Option<String>,
#[serde(rename = "@duration", skip_serializing_if = "Option::is_none")]
pub duration: Option<i32>,
#[serde(rename = "@bitRate", skip_serializing_if = "Option::is_none")]
pub bit_rate: Option<i32>,
#[serde(rename = "@path", skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(rename = "@isVideo", skip_serializing_if = "Option::is_none")]
pub is_video: Option<bool>,
#[serde(rename = "@userRating", skip_serializing_if = "Option::is_none")]
pub user_rating: Option<i32>,
#[serde(rename = "@averageRating", skip_serializing_if = "Option::is_none")]
pub average_rating: Option<f32>,
#[serde(rename = "@playCount", skip_serializing_if = "Option::is_none")]
pub play_count: Option<i32>,
#[serde(rename = "@discNumber", skip_serializing_if = "Option::is_none")]
pub disc_number: Option<i32>,
#[serde(rename = "@created", skip_serializing_if = "Option::is_none")]
pub created: Option<OffsetDateTime>,
#[serde(rename = "@starred", skip_serializing_if = "Option::is_none")]
pub starred: Option<OffsetDateTime>,
#[serde(rename = "@albumId", skip_serializing_if = "Option::is_none")]
pub album_id: Option<String>,
#[serde(rename = "@artistId", skip_serializing_if = "Option::is_none")]
pub artist_id: Option<String>,
#[serde(rename = "@type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<MediaType>,
#[serde(rename = "@bookmarkPosition", skip_serializing_if = "Option::is_none")]
pub bookmark_position: Option<i32>,
#[serde(rename = "@originalWidth", skip_serializing_if = "Option::is_none")]
pub original_width: Option<i32>,
#[serde(rename = "@originalHeight", skip_serializing_if = "Option::is_none")]
pub original_height: Option<i32>,
}
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
pub enum MediaType {
#[serde(rename = "music")]
Music,
#[serde(rename = "video")]
Video,
#[serde(rename = "audiobook")]
Audiobook,
#[serde(rename = "podcast")]
Podcast,
}
#[derive(Debug, Clone, Serialize)]
pub struct MusicFolder {
#[serde(rename = "@id")]
pub id: i64,
#[serde(rename = "@name")]
pub name: String,
}
#[derive(Debug, Clone, Copy)]
pub enum ResponseStatus {
Ok,
@ -243,6 +135,7 @@ impl Serialize for ResponseStatus {
}
#[derive(Debug, Clone)]
#[allow(unused)]
pub enum Error {
Generic(Option<String>),
RequiredParameterMissing(Option<String>),

View file

@ -1,7 +1,6 @@
use entities::{prelude::User, user};
use poem_ext::db::DbTxn;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde::Serializer;
use tracing::error;
use crate::{
@ -42,10 +41,3 @@ pub async fn verify_user(
}
}
}
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn album_id<S: Serializer>(id: &i64, s: S) -> Result<S::Ok, S::Error> {
let str = format!("al-{id}");
s.serialize_str(&str)
}