use std::collections::{hash_map::Entry, HashMap};
use chrono::{DateTime, Duration, Utc};
use futures::FutureExt;
use lazysort::SortedBy;
use once_cell::sync::Lazy;
use tracing::trace;
use crate::{
constants::{LAST_VIDEOS_IDS_EXTRA_PROP, NOTIFICATIONS_STORAGE_KEY, NOTIFICATION_ITEMS_COUNT},
models::{
common::{
eq_update, resources_update_with_vector_content, Loadable, ResourceLoadable,
ResourcesAction,
},
ctx::{CtxError, CtxStatus},
},
runtime::{
msg::{Action, ActionCtx, Event, Internal, Msg},
Effect, EffectFuture, Effects, Env, EnvFutureExt,
},
types::{
addon::{AggrRequest, ExtraType},
library::LibraryBucket,
notifications::{NotificationItem, NotificationsBucket},
profile::Profile,
resource::{MetaItem, MetaItemId, VideoId},
},
};
static REQUEST_LAST_VIDEOS_EVERY: Lazy<Duration> = Lazy::new(|| Duration::hours(6));
pub fn update_notifications<E: Env + 'static>(
notifications: &mut NotificationsBucket,
notification_catalogs: &mut Vec<ResourceLoadable<Vec<MetaItem>>>,
profile: &Profile,
library: &LibraryBucket,
status: &CtxStatus,
msg: &Msg,
) -> Effects {
match msg {
Msg::Action(Action::Ctx(ActionCtx::PullNotifications)) => {
Effects::msg(Msg::Internal(Internal::PullNotifications)).unchanged()
}
Msg::Internal(Internal::PullNotifications) => {
let (reason, should_make_request) = match notifications.last_updated {
Some(last_updated) if last_updated + *REQUEST_LAST_VIDEOS_EVERY <= E::now() => (
format!(
"`true` since {last_updated} + {hours} hours <= {now}",
hours = REQUEST_LAST_VIDEOS_EVERY.num_hours(),
now = E::now()
),
true,
),
None => ("`true` since last updated is `None`".to_string(), true),
Some(last_updated) => (
format!(
"`false` since {last_updated} + {hours} hours > {now}",
hours = REQUEST_LAST_VIDEOS_EVERY.num_hours(),
now = E::now()
),
false,
),
};
tracing::debug!(
name = "Notifications",
reason = reason,
last_updated = notifications.last_updated.as_ref().map(ToString::to_string),
hours = REQUEST_LAST_VIDEOS_EVERY.num_hours(),
"Should last-videos addon resource be called? {should_make_request}"
);
let sorted_library_items_id_types = library
.items
.values()
.filter(|library_item| library_item.should_pull_notifications())
.sorted_by(|a, b| b.mtime.cmp(&a.mtime))
.map(|library_item| (library_item.id.to_owned(), library_item.r#type.to_owned()))
.collect::<Vec<_>>();
let notifications_catalog_resource_effects =
if !sorted_library_items_id_types.is_empty() && should_make_request {
trace!(
"Sorted by `mtime` LibraryItem id and type: {:?}",
sorted_library_items_id_types
);
let catalog_resource_effects = resources_update_with_vector_content::<E, _>(
notification_catalogs,
ResourcesAction::force_request(
&AggrRequest::CatalogsFiltered(vec![ExtraType::Ids {
extra_name: LAST_VIDEOS_IDS_EXTRA_PROP.name.to_owned(),
id_types: sorted_library_items_id_types,
limit: Some(NOTIFICATION_ITEMS_COUNT),
}]),
&profile.addons,
),
);
notifications.last_updated = Some(E::now());
catalog_resource_effects
} else {
Effects::none().unchanged()
};
let notification_items_effects = update_notification_items::<E>(
&mut notifications.items,
notification_catalogs,
library,
);
notifications_catalog_resource_effects
.join(notification_items_effects)
.unchanged()
}
Msg::Action(Action::Ctx(ActionCtx::DismissNotificationItem(id))) => Effects::msg(
Msg::Internal(Internal::DismissNotificationItem(id.to_owned())),
)
.unchanged(),
Msg::Action(Action::Ctx(ActionCtx::Logout)) | Msg::Internal(Internal::Logout) => {
let notification_catalogs_effects = eq_update(notification_catalogs, vec![]);
let next_notifications = NotificationsBucket::new::<E>(profile.uid(), vec![]);
let notifications_effects = if *notifications != next_notifications {
*notifications = next_notifications;
Effects::msg(Msg::Internal(Internal::NotificationsChanged))
} else {
Effects::none().unchanged()
};
notification_catalogs_effects
.join(notifications_effects)
.unchanged()
}
Msg::Internal(Internal::CtxAuthResult(auth_request, result)) => match (status, result) {
(CtxStatus::Loading(loading_auth_request), Ok(_))
if loading_auth_request == auth_request =>
{
let notification_catalogs_effects = eq_update(notification_catalogs, vec![]);
let next_notifications = NotificationsBucket::new::<E>(profile.uid(), vec![]);
let notifications_effects = if *notifications != next_notifications {
*notifications = next_notifications;
Effects::msg(Msg::Internal(Internal::NotificationsChanged))
} else {
Effects::none().unchanged()
};
let pull_notifications_effects =
Effects::msg(Msg::Internal(Internal::PullNotifications)).unchanged();
notification_catalogs_effects
.join(notifications_effects)
.join(pull_notifications_effects)
.unchanged()
}
_ => Effects::none().unchanged(),
},
Msg::Internal(Internal::ResourceRequestResult(request, result)) => {
let notification_catalogs_effects = resources_update_with_vector_content::<E, _>(
notification_catalogs,
ResourcesAction::ResourceRequestResult { request, result },
);
let notification_items_effects = if notification_catalogs_effects.has_changed {
update_notification_items::<E>(
&mut notifications.items,
notification_catalogs,
library,
)
} else {
Effects::none().unchanged()
};
let notifications_effects = if notification_items_effects.has_changed {
Effects::msg(Msg::Internal(Internal::NotificationsChanged))
} else {
Effects::none().unchanged()
};
notification_catalogs_effects
.join(notification_items_effects)
.join(notifications_effects)
}
Msg::Internal(Internal::DismissNotificationItem(id)) => {
dismiss_notification_item::<E>(library, notifications, id)
}
Msg::Internal(Internal::NotificationsChanged) => {
Effects::one(push_notifications_to_storage::<E>(notifications)).unchanged()
}
_ => Effects::none().unchanged(),
}
}
fn update_notification_items<E: Env + 'static>(
notification_items: &mut HashMap<MetaItemId, HashMap<VideoId, NotificationItem>>,
notification_catalogs: &[ResourceLoadable<Vec<MetaItem>>],
library: &LibraryBucket,
) -> Effects {
let selected_catalogs = notification_catalogs
.iter()
.filter(|catalog| {
matches!(
&catalog.content,
Some(Loadable::Ready(_)) | Some(Loadable::Err(_))
)
})
.collect::<Vec<_>>();
let should_retail_video_released = |last_watched: Option<&DateTime<Utc>>,
video_released: Option<&DateTime<Utc>>|
-> Option<DateTime<Utc>> {
match (last_watched, video_released) {
(Some(last_watched), Some(video_released)) => {
if last_watched < video_released &&
video_released <= &E::now()
{
Some(*video_released)
} else {
None
}
}
(None, Some(video_released)) => Some(*video_released),
_ => None,
}
};
let next_notification_items =
library
.items
.iter()
.fold(HashMap::new(), |mut map, (meta_id, library_item)| {
if !library_item.should_pull_notifications() {
return map;
}
let meta_item = match selected_catalogs.iter().find_map(|catalog| {
catalog
.content
.as_ref()
.and_then(|content| content.ready())
.and_then(|content| {
content.iter().find(|meta_item| {
&meta_item.preview.id == meta_id && !meta_item.videos.is_empty()
})
})
}) {
Some(meta_item) => meta_item,
_ => {
match notification_items.get(meta_id) {
Some(existing_notifications) if !existing_notifications.is_empty() => {
let filtered_current_notifs = existing_notifications
.iter()
.filter_map(|(video_id, notif_item)| {
if should_retail_video_released(
library_item.state.last_watched.as_ref(),
Some(¬if_item.video_released),
)
.is_some()
{
Some((video_id.to_owned(), notif_item.to_owned()))
} else {
None
}
})
.collect();
map.insert(meta_id.to_owned(), filtered_current_notifs);
}
_ => {
}
}
return map;
}
};
let mut meta_notifs: &mut HashMap<_, _> =
map.entry(meta_id.to_owned()).or_default();
meta_item
.videos_iter()
.filter_map(
|video| match (&library_item.state.last_watched, video.released) {
(Some(last_watched), Some(video_released)) => {
if should_retail_video_released(
Some(last_watched),
Some(&video_released),
)
.is_some()
{
Some((&library_item.id, &video.id, video_released))
} else {
None
}
}
_ => None,
},
)
.fold(
&mut meta_notifs,
|meta_notifs, (meta_id, video_id, video_released)| {
let notif_entry = meta_notifs.entry(video_id.to_owned());
if let Entry::Vacant(new) = notif_entry {
let notification = NotificationItem {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
video_released,
};
new.insert(notification);
}
meta_notifs
},
);
if meta_notifs.is_empty() {
map.remove(meta_id);
}
map
});
eq_update(notification_items, next_notification_items)
}
fn push_notifications_to_storage<E: Env + 'static>(notifications: &NotificationsBucket) -> Effect {
let ids = notifications.items.keys().cloned().collect();
EffectFuture::Sequential(
E::set_storage(NOTIFICATIONS_STORAGE_KEY, Some(notifications))
.map(move |result| match result {
Ok(_) => Msg::Event(Event::NotificationsPushedToStorage { ids }),
Err(error) => Msg::Event(Event::Error {
error: CtxError::from(error),
source: Box::new(Event::NotificationsPushedToStorage { ids }),
}),
})
.boxed_env(),
)
.into()
}
fn dismiss_notification_item<E: Env + 'static>(
library: &LibraryBucket,
notifications: &mut NotificationsBucket,
id: &str,
) -> Effects {
match notifications.items.remove(id) {
Some(_) => {
let library_item_effects = match library.items.get(id) {
Some(library_item) => {
let mut library_item = library_item.to_owned();
library_item.state.last_watched = Some(E::now());
Effects::msg(Msg::Internal(Internal::UpdateLibraryItem(library_item)))
.unchanged()
}
_ => Effects::none().unchanged(),
};
Effects::msg(Msg::Internal(Internal::NotificationsChanged))
.join(library_item_effects)
.join(Effects::msg(Msg::Event(Event::NotificationsDismissed {
id: id.to_owned(),
})))
}
_ => Effects::none().unchanged(),
}
}