1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use enclose::enclose;
use futures::FutureExt;
use std::collections::hash_map::Entry;

use crate::constants::STREAMS_STORAGE_KEY;
use crate::models::common::{Loadable, ResourceLoadable};
use crate::models::ctx::{CtxError, CtxStatus};
use crate::runtime::msg::{Action, ActionCtx, CtxAuthResponse, Event, Internal, Msg};
use crate::runtime::{Effect, EffectFuture, Effects, Env, EnvFutureExt};
use crate::types::streams::{StreamsBucket, StreamsItem, StreamsItemKey};

pub fn update_streams<E: Env + 'static>(
    streams: &mut StreamsBucket,
    status: &CtxStatus,
    msg: &Msg,
) -> Effects {
    match msg {
        Msg::Action(Action::Ctx(ActionCtx::Logout)) | Msg::Internal(Internal::Logout) => {
            let next_streams = StreamsBucket::default();
            if *streams != next_streams {
                *streams = next_streams;
                Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
            } else {
                Effects::none().unchanged()
            }
        }
        Msg::Internal(Internal::StreamLoaded {
            stream,
            stream_request: Some(stream_request),
            meta_item:
                ResourceLoadable {
                    request: meta_request,
                    content: Some(meta_content),
                },
        }) if !meta_content.is_loading() => {
            let meta_id = &meta_request.path.id;
            let video_id = &stream_request.path.id;

            let key = StreamsItemKey {
                meta_id: meta_id.to_owned(),
                video_id: video_id.to_owned(),
            };
            let last_stream_item = match meta_content {
                Loadable::Ready(meta_item) => streams.last_stream_item(video_id, meta_item),
                _ => streams.items.get(&key),
            };
            let last_stream_state = last_stream_item.and_then(|item| item.adjusted_state(stream));

            let streams_item = StreamsItem {
                stream: stream.to_owned(),
                r#type: meta_request.path.r#type.to_owned(),
                meta_id: meta_id.to_owned(),
                video_id: video_id.to_owned(),
                meta_transport_url: meta_request.base.to_owned(),
                stream_transport_url: stream_request.base.to_owned(),
                state: last_stream_state,
                mtime: E::now(),
            };

            streams.items.insert(key, streams_item);
            Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
        }
        Msg::Internal(Internal::StreamStateChanged {
            state,
            stream_request: Some(stream_request),
            meta_request: Some(meta_request),
        }) => {
            let meta_id = &meta_request.path.id;
            let video_id = &stream_request.path.id;

            let key = StreamsItemKey {
                meta_id: meta_id.to_owned(),
                video_id: video_id.to_owned(),
            };
            let entry = streams
                .items
                .entry(key)
                .and_modify(|item| item.state = Some(state.to_owned()));
            match entry {
                Entry::Occupied(_) => Effects::msg(Msg::Internal(Internal::StreamsChanged(false))),
                _ => Effects::none().unchanged(),
            }
        }
        Msg::Internal(Internal::StreamsChanged(persisted)) if !persisted => {
            Effects::one(push_streams_to_storage::<E>(streams)).unchanged()
        }
        Msg::Internal(Internal::CtxAuthResult(auth_request, result)) => match (status, result) {
            (CtxStatus::Loading(loading_auth_request), Ok(CtxAuthResponse { auth, .. }))
                if loading_auth_request == auth_request =>
            {
                let next_streams = StreamsBucket::new(Some(auth.user.id.to_owned()));
                if *streams != next_streams {
                    *streams = next_streams;
                    Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
                } else {
                    Effects::none().unchanged()
                }
            }
            _ => Effects::none().unchanged(),
        },
        _ => Effects::none().unchanged(),
    }
}

fn push_streams_to_storage<E: Env + 'static>(streams: &StreamsBucket) -> Effect {
    EffectFuture::Sequential(
        E::set_storage(STREAMS_STORAGE_KEY, Some(&streams))
            .map(enclose!((streams.uid => uid) move |result| match result {
                Ok(_) => Msg::Event(Event::StreamsPushedToStorage { uid }),
                Err(error) => Msg::Event(Event::Error {
                    error: CtxError::from(error),
                    source: Box::new(Event::StreamsPushedToStorage { uid }),
                })
            }))
            .boxed_env(),
    )
    .into()
}