feat(sdk): Refresh well-known cache in the background
Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
committed by
Andy Balaam
parent
38f34e66eb
commit
c6d1cf20b6
@@ -613,7 +613,7 @@ impl ClientBuilder {
|
||||
None => NotSet,
|
||||
};
|
||||
let well_known = match well_known {
|
||||
Some(well_known) => Cached(Some(well_known.into())),
|
||||
Some(well_known) => Cached(TtlValue::new(Some(well_known.into()))),
|
||||
None => NotSet,
|
||||
};
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use ruma::api::{
|
||||
SupportedVersions,
|
||||
client::discovery::get_authorization_server_metadata::v1::AuthorizationServerMetadata,
|
||||
};
|
||||
use tokio::sync::{Mutex as AsyncMutex, RwLock};
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
|
||||
use crate::HttpError;
|
||||
|
||||
@@ -35,7 +35,7 @@ pub(crate) struct ClientCaches {
|
||||
/// - The versions fetched from an *authenticated* request to the server.
|
||||
pub(crate) supported_versions: Cache<SupportedVersions>,
|
||||
/// Well-known information.
|
||||
pub(super) well_known: RwLock<CachedValue<Option<WellKnownResponse>>>,
|
||||
pub(super) well_known: Cache<Option<WellKnownResponse>>,
|
||||
pub(crate) server_metadata: AsyncMutex<TtlCache<String, AuthorizationServerMetadata>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -391,7 +391,7 @@ impl ClientInner {
|
||||
http_client: HttpClient,
|
||||
base_client: BaseClient,
|
||||
supported_versions: CachedValue<TtlValue<SupportedVersions>>,
|
||||
well_known: CachedValue<Option<WellKnownResponse>>,
|
||||
well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
|
||||
respect_login_well_known: bool,
|
||||
event_cache: OnceCell<EventCache>,
|
||||
send_queue: Arc<SendQueueData>,
|
||||
@@ -404,7 +404,7 @@ impl ClientInner {
|
||||
) -> Arc<Self> {
|
||||
let caches = ClientCaches {
|
||||
supported_versions: Cache::with_value(supported_versions),
|
||||
well_known: well_known.into(),
|
||||
well_known: Cache::with_value(well_known),
|
||||
server_metadata: Mutex::new(TtlCache::new()),
|
||||
};
|
||||
|
||||
@@ -542,7 +542,7 @@ impl Client {
|
||||
) -> Result<()> {
|
||||
self.set_homeserver(homeserver_url);
|
||||
self.reset_well_known().await?;
|
||||
if let Some(well_known) = self.load_or_fetch_well_known().await? {
|
||||
if let Some(well_known) = self.well_known().await {
|
||||
self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
|
||||
}
|
||||
Ok(())
|
||||
@@ -2374,58 +2374,104 @@ impl Client {
|
||||
Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
|
||||
}
|
||||
|
||||
/// Load well-known from storage, or fetch it from network and cache it.
|
||||
async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
|
||||
match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
|
||||
Ok(Some(stored)) => {
|
||||
if let Some(well_known) =
|
||||
stored.into_well_known().and_then(|value| value.into_data())
|
||||
{
|
||||
return Ok(well_known);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// fallthrough: cache is empty
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("error when loading cached well-known: {err}");
|
||||
// fallthrough to network.
|
||||
}
|
||||
/// Get the well-known file of the homeserver from the cache.
|
||||
///
|
||||
/// If the data in the cache has expired, this will trigger a background
|
||||
/// task to refresh it.
|
||||
async fn well_known_cached(
|
||||
&self,
|
||||
) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
|
||||
let well_known_cache = &self.inner.caches.well_known;
|
||||
|
||||
let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
|
||||
cached
|
||||
} else if let Some(stored) = self
|
||||
.state_store()
|
||||
.get_kv_data(StateStoreDataKey::WellKnown)
|
||||
.await?
|
||||
.and_then(|value| value.into_well_known())
|
||||
{
|
||||
// Copy the data from the store in the in-memory cache.
|
||||
well_known_cache.set_value(stored.clone());
|
||||
|
||||
stored
|
||||
} else {
|
||||
return Ok(CachedValue::NotSet);
|
||||
};
|
||||
|
||||
// Spawn a task to refresh the cache if it has expired.
|
||||
if value.has_expired() {
|
||||
debug!("spawning task to refresh well-known cache");
|
||||
|
||||
let client = self.clone();
|
||||
self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
|
||||
client.refresh_well_known_cache().await;
|
||||
});
|
||||
}
|
||||
|
||||
let well_known = self.fetch_client_well_known().await.map(Into::into);
|
||||
Ok(CachedValue::Cached(value.into_data_unchecked()))
|
||||
}
|
||||
|
||||
/// Refresh the well-known file of the homeserver in the cache.
|
||||
async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
|
||||
let well_known_cache = &self.inner.caches.well_known;
|
||||
|
||||
let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
// There is already a refresh in progress, wait for it to finish.
|
||||
let guard = well_known_cache.refresh_lock.lock().await;
|
||||
|
||||
// A refresh can't fail because we ignore failures, so there shouldn't be an
|
||||
// error in the refresh lock.
|
||||
|
||||
// Reuse the data if it was cached and it hasn't expired.
|
||||
if let CachedValue::Cached(value) = well_known_cache.value()
|
||||
&& !value.has_expired()
|
||||
{
|
||||
return value.into_data_unchecked();
|
||||
}
|
||||
|
||||
// The data wasn't cached or has expired, we need to make another request.
|
||||
guard
|
||||
}
|
||||
};
|
||||
|
||||
let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
|
||||
|
||||
// Attempt to cache the result in storage.
|
||||
if let Err(err) = self
|
||||
.state_store()
|
||||
.set_kv_data(
|
||||
StateStoreDataKey::WellKnown,
|
||||
StateStoreDataValue::WellKnown(TtlValue::new(well_known.clone())),
|
||||
StateStoreDataValue::WellKnown(well_known.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("error when caching well-known: {err}");
|
||||
}
|
||||
|
||||
Ok(well_known)
|
||||
well_known_cache.set_value(well_known.clone());
|
||||
|
||||
well_known.into_data_unchecked()
|
||||
}
|
||||
|
||||
async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
|
||||
let well_known = &self.inner.caches.well_known;
|
||||
if let CachedValue::Cached(val) = &*well_known.read().await {
|
||||
return Ok(val.clone());
|
||||
/// Get the well-known file of the homeserver by fetching it from the server
|
||||
/// or the cache.
|
||||
async fn well_known(&self) -> Option<WellKnownResponse> {
|
||||
match self.well_known_cached().await {
|
||||
Ok(CachedValue::Cached(value)) => {
|
||||
return value;
|
||||
}
|
||||
Ok(CachedValue::NotSet) => {
|
||||
// The cache is empty, make a request.
|
||||
}
|
||||
Err(error) => {
|
||||
warn!("error when loading cached well-known: {error}");
|
||||
// Fallthrough to make a request.
|
||||
}
|
||||
}
|
||||
|
||||
let mut guarded_well_known = well_known.write().await;
|
||||
if let CachedValue::Cached(val) = &*guarded_well_known {
|
||||
return Ok(val.clone());
|
||||
}
|
||||
|
||||
let well_known = self.load_or_fetch_well_known().await?;
|
||||
|
||||
*guarded_well_known = CachedValue::Cached(well_known.clone());
|
||||
|
||||
Ok(well_known)
|
||||
self.refresh_well_known_cache().await
|
||||
}
|
||||
|
||||
/// Get information about the homeserver's advertised RTC foci by fetching
|
||||
@@ -2449,7 +2495,7 @@ impl Client {
|
||||
/// # anyhow::Ok(()) };
|
||||
/// ```
|
||||
pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
|
||||
let well_known = self.get_or_load_and_cache_well_known().await?;
|
||||
let well_known = self.well_known().await;
|
||||
|
||||
Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
|
||||
}
|
||||
@@ -2460,7 +2506,7 @@ impl Client {
|
||||
/// in the cache. This functions makes it possible to force reset it.
|
||||
pub async fn reset_well_known(&self) -> Result<()> {
|
||||
// Empty the in-memory caches.
|
||||
self.inner.caches.well_known.write().await.take();
|
||||
self.inner.caches.well_known.reset();
|
||||
|
||||
// Empty the store cache.
|
||||
Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
|
||||
@@ -3117,7 +3163,7 @@ impl Client {
|
||||
.clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
|
||||
.await?,
|
||||
self.inner.caches.supported_versions.value(),
|
||||
self.inner.caches.well_known.read().await.clone(),
|
||||
self.inner.caches.well_known.value(),
|
||||
self.inner.respect_login_well_known,
|
||||
self.inner.event_cache.clone(),
|
||||
self.inner.send_queue_data.clone(),
|
||||
@@ -3976,12 +4022,27 @@ pub(crate) mod tests {
|
||||
// Now, reset the cache, and observe the endpoints being called again once.
|
||||
client.reset_well_known().await.unwrap();
|
||||
|
||||
server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
|
||||
server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
|
||||
|
||||
// Hits network again.
|
||||
assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
|
||||
// Hits in-memory cache again.
|
||||
assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
|
||||
|
||||
// Force an expiry of the data.
|
||||
let well_known = client.well_known().await;
|
||||
let mut ttl_value = TtlValue::new(well_known);
|
||||
ttl_value.expire();
|
||||
client.inner.caches.well_known.set_value(ttl_value);
|
||||
|
||||
// Call the method again to trigger a cache refresh background task.
|
||||
client.well_known().await;
|
||||
|
||||
// We wait for the task to finish, the endpoint should have been called again.
|
||||
// We need to wait a bit because the first requests using the server name of the
|
||||
// user will fail, only the requests using the homeserver URL will succeed.
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
|
||||
Reference in New Issue
Block a user