Skip to content

Commit

Permalink
Move DAV Push logic to dav crate
Browse files Browse the repository at this point in the history
  • Loading branch information
lennart-k committed Jan 15, 2025
1 parent 985d4d0 commit 658e6eb
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 81 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ quote = "1.0"
proc-macro2 = "1.0"
heck = "0.5"
darling = "0.20"
reqwest = { version = "0.12", features = [
"rustls-tls",
"charset",
"http2",
], default-features = false }

[dependencies]
rustical_store = { workspace = true }
rustical_store_sqlite = { workspace = true }
Expand Down Expand Up @@ -134,11 +140,6 @@ rpassword.workspace = true
argon2.workspace = true
pbkdf2.workspace = true
password-hash.workspace = true
reqwest = { version = "0.12", features = [
"rustls-tls",
"charset",
"http2",
], default-features = false }
rustical_xml.workspace = true
reqwest.workspace = true
rustical_dav.workspace = true
quick-xml.workspace = true
2 changes: 2 additions & 0 deletions crates/dav/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ log = { workspace = true }
derive_more = { workspace = true }
tracing = { workspace = true }
tracing-actix-web = { workspace = true }
reqwest.workspace = true
tokio.workspace = true
2 changes: 2 additions & 0 deletions crates/dav/src/push/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod prop;
mod push_notifier;
mod push_register;

pub use prop::*;
pub use push_notifier::push_notifier;
pub use push_register::*;
68 changes: 68 additions & 0 deletions crates/dav/src/push/push_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::xml::multistatus::PropstatElement;
use actix_web::http::StatusCode;
use rustical_store::{CollectionOperation, CollectionOperationType, SubscriptionStore};
use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot};
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use tracing::{error, info};

#[derive(XmlSerialize, Debug)]
struct PushMessageProp {
#[xml(ns = "crate::namespace::NS_DAV")]
topic: String,
#[xml(ns = "crate::namespace::NS_DAV")]
sync_token: Option<String>,
}

#[derive(XmlSerialize, XmlRootTag, Debug)]
#[xml(root = b"push-message", ns = "crate::namespace::NS_DAVPUSH")]
#[xml(ns_prefix(crate::namespace::NS_DAVPUSH = b"", crate::namespace::NS_DAV = b"D",))]
struct PushMessage {
#[xml(ns = "crate::namespace::NS_DAV")]
propstat: PropstatElement<PushMessageProp>,
}

pub async fn push_notifier(
mut recv: Receiver<CollectionOperation>,
sub_store: Arc<impl SubscriptionStore>,
) {
while let Some(message) = recv.recv().await {
if let Ok(subscribers) = sub_store.get_subscriptions(&message.topic).await {
let status = match message.r#type {
CollectionOperationType::Object => StatusCode::OK,
CollectionOperationType::Delete => StatusCode::NOT_FOUND,
};
let push_message = PushMessage {
propstat: PropstatElement {
prop: PushMessageProp {
topic: message.topic,
sync_token: message.sync_token,
},
status,
},
};
let mut output: Vec<_> = b"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n".into();
let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4);
if let Err(err) = push_message.serialize_root(&mut writer) {
error!("Could not serialize push message: {}", err);
continue;
}
let payload = String::from_utf8(output).unwrap();
for subscriber in subscribers {
info!(
"Sending a push message to {}: {}",
subscriber.push_resource, payload
);
let client = reqwest::Client::new();
if let Err(err) = client
.post(subscriber.push_resource)
.body(payload.to_owned())
.send()
.await
{
error!("{err}");
}
}
}
}
}
78 changes: 4 additions & 74 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use crate::config::Config;
use actix_web::http::{KeepAlive, StatusCode};
use actix_web::http::KeepAlive;
use actix_web::HttpServer;
use anyhow::Result;
use app::make_app;
use clap::{Parser, Subcommand};
use commands::{cmd_gen_config, cmd_pwhash};
use config::{DataStoreConfig, SqliteDataStoreConfig};
use rustical_dav::xml::multistatus::PropstatElement;
use rustical_dav::push::push_notifier;
use rustical_store::auth::StaticUserStore;
use rustical_store::{AddressbookStore, CalendarStore, CollectionOperation, SubscriptionStore};
use rustical_store_sqlite::addressbook_store::SqliteAddressbookStore;
use rustical_store_sqlite::calendar_store::SqliteCalendarStore;
use rustical_store_sqlite::{create_db_pool, SqliteStore};
use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot};
use setup_tracing::setup_tracing;
use std::fs;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use tracing::{error, info};

mod app;
mod commands;
Expand Down Expand Up @@ -65,27 +63,6 @@ async fn get_data_stores(
})
}

// TODO: Move this code somewhere else :)

#[derive(XmlSerialize, Debug)]
struct PushMessageProp {
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
topic: String,
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
sync_token: Option<String>,
}

#[derive(XmlSerialize, XmlRootTag, Debug)]
#[xml(root = b"push-message", ns = "rustical_dav::namespace::NS_DAVPUSH")]
#[xml(ns_prefix(
rustical_dav::namespace::NS_DAVPUSH = b"",
rustical_dav::namespace::NS_DAV = b"D",
))]
struct PushMessage {
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
propstat: PropstatElement<PushMessageProp>,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
Expand All @@ -102,57 +79,10 @@ async fn main() -> Result<()> {

setup_tracing(&config.tracing);

let (addr_store, cal_store, subscription_store, mut update_recv) =
let (addr_store, cal_store, subscription_store, update_recv) =
get_data_stores(!args.no_migrations, &config.data_store).await?;

let subscription_store_clone = subscription_store.clone();
tokio::spawn(async move {
let subscription_store = subscription_store_clone.clone();
while let Some(message) = update_recv.recv().await {
if let Ok(subscribers) =
subscription_store.get_subscriptions(&message.topic).await
{
let status = match message.r#type {
rustical_store::CollectionOperationType::Object => StatusCode::OK,
rustical_store::CollectionOperationType::Delete => {
StatusCode::NOT_FOUND
}
};
let push_message = PushMessage {
propstat: PropstatElement {
prop: PushMessageProp {
topic: message.topic,
sync_token: message.sync_token,
},
status,
},
};
let mut output: Vec<_> =
b"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n".into();
let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4);
if let Err(err) = push_message.serialize_root(&mut writer) {
error!("Could not serialize push message: {}", err);
continue;
}
let payload = String::from_utf8(output).unwrap();
for subscriber in subscribers {
info!(
"Sending a push message to {}: {}",
subscriber.push_resource, payload
);
let client = reqwest::Client::new();
if let Err(err) = client
.post(subscriber.push_resource)
.body(payload.to_owned())
.send()
.await
{
error!("{err}");
}
}
}
}
});
tokio::spawn(push_notifier(update_recv, subscription_store.clone()));

let user_store = Arc::new(match config.auth {
config::AuthConfig::Static(config) => StaticUserStore::new(config),
Expand Down

0 comments on commit 658e6eb

Please sign in to comment.