339 lines
11 KiB
Rust
339 lines
11 KiB
Rust
use serde::{de::DeserializeOwned, Serialize, Deserialize};
|
|
use std::{
|
|
collections::HashMap,
|
|
sync::Arc,
|
|
time::{Duration, Instant},
|
|
path::PathBuf,
|
|
};
|
|
use tokio::sync::RwLock;
|
|
use tokio::fs;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CachedHttpResponse {
|
|
pub data: Vec<u8>,
|
|
pub content_type: String,
|
|
pub headers: HashMap<String, String>,
|
|
pub status_code: u16,
|
|
#[serde(with = "instant_serde")]
|
|
pub cached_at: Instant,
|
|
#[serde(with = "instant_serde")]
|
|
pub expires_at: Instant,
|
|
}
|
|
|
|
// Custom serializer for Instant (can't be serialized directly)
|
|
mod instant_serde {
|
|
use super::*;
|
|
use serde::{Deserializer, Serializer};
|
|
|
|
pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: Serializer,
|
|
{
|
|
// Convert to duration since app start - this is approximate but works for our use case
|
|
let duration_since_start = instant.elapsed();
|
|
serializer.serialize_u64(duration_since_start.as_secs())
|
|
}
|
|
|
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
|
|
where
|
|
D: Deserializer<'de>,
|
|
{
|
|
let secs = <u64 as Deserialize>::deserialize(deserializer)?;
|
|
// For loaded items, set as if they were cached "now" minus the stored duration
|
|
// This isn't perfect but works for expiration checking
|
|
Ok(Instant::now() - Duration::from_secs(secs))
|
|
}
|
|
}
|
|
|
|
// Simplified cache interface - removed trait object complexity
|
|
// Each cache type will implement these methods directly
|
|
|
|
#[derive(Debug)]
|
|
struct CacheEntry {
|
|
data: Vec<u8>,
|
|
expires_at: Instant,
|
|
}
|
|
|
|
impl CacheEntry {
|
|
fn new(data: Vec<u8>, ttl: Duration) -> Self {
|
|
Self {
|
|
data,
|
|
expires_at: Instant::now() + ttl,
|
|
}
|
|
}
|
|
|
|
fn is_expired(&self) -> bool {
|
|
Instant::now() > self.expires_at
|
|
}
|
|
}
|
|
|
|
pub struct MemoryCache {
|
|
store: Arc<RwLock<HashMap<String, CacheEntry>>>,
|
|
http_store: Arc<RwLock<HashMap<String, CachedHttpResponse>>>,
|
|
max_size: usize,
|
|
cache_dir: Option<PathBuf>,
|
|
}
|
|
|
|
impl MemoryCache {
|
|
pub fn new(max_size: usize) -> Self {
|
|
Self {
|
|
store: Arc::new(RwLock::new(HashMap::new())),
|
|
http_store: Arc::new(RwLock::new(HashMap::new())),
|
|
max_size,
|
|
cache_dir: None,
|
|
}
|
|
}
|
|
|
|
pub fn with_disk_cache(mut self, cache_dir: PathBuf) -> Self {
|
|
self.cache_dir = Some(cache_dir);
|
|
self
|
|
}
|
|
|
|
fn get_cache_file_path(&self, url: &str) -> Option<PathBuf> {
|
|
self.cache_dir.as_ref().map(|dir| {
|
|
// Create a safe filename from URL
|
|
let hash = {
|
|
use std::collections::hash_map::DefaultHasher;
|
|
use std::hash::{Hash, Hasher};
|
|
let mut hasher = DefaultHasher::new();
|
|
url.hash(&mut hasher);
|
|
hasher.finish()
|
|
};
|
|
dir.join(format!("cache_{}.json", hash))
|
|
})
|
|
}
|
|
|
|
async fn cleanup_expired(&self) {
|
|
let mut store = self.store.write().await;
|
|
let now = Instant::now();
|
|
store.retain(|_, entry| entry.expires_at > now);
|
|
}
|
|
|
|
async fn ensure_capacity(&self) {
|
|
let mut store = self.store.write().await;
|
|
|
|
if store.len() >= self.max_size {
|
|
// Remove oldest entries if we're at capacity
|
|
// Collect keys to remove to avoid borrow issues
|
|
let mut to_remove: Vec<String> = Vec::new();
|
|
{
|
|
let entries: Vec<_> = store.iter().collect();
|
|
let mut sorted_entries = entries;
|
|
sorted_entries.sort_by_key(|(_, entry)| entry.expires_at);
|
|
|
|
let remove_count = sorted_entries.len().saturating_sub(self.max_size / 2);
|
|
for (key, _) in sorted_entries.into_iter().take(remove_count) {
|
|
to_remove.push(key.clone());
|
|
}
|
|
}
|
|
|
|
// Now remove the keys
|
|
for key in to_remove {
|
|
store.remove(&key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl MemoryCache {
|
|
pub async fn get<T>(&self, key: &str) -> Option<T>
|
|
where
|
|
T: DeserializeOwned + Send + 'static,
|
|
{
|
|
// Clean up expired entries periodically
|
|
if rand::random::<f32>() < 0.1 {
|
|
self.cleanup_expired().await;
|
|
}
|
|
|
|
let store = self.store.read().await;
|
|
if let Some(entry) = store.get(key) {
|
|
if !entry.is_expired() {
|
|
if let Ok(value) = serde_json::from_slice(&entry.data) {
|
|
return Some(value);
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
pub async fn set<T>(&self, key: &str, value: &T, ttl: Duration)
|
|
where
|
|
T: Serialize + Send + Sync,
|
|
{
|
|
if let Ok(data) = serde_json::to_vec(value) {
|
|
self.ensure_capacity().await;
|
|
|
|
let mut store = self.store.write().await;
|
|
store.insert(key.to_string(), CacheEntry::new(data, ttl));
|
|
}
|
|
}
|
|
|
|
pub async fn remove(&self, key: &str) {
|
|
let mut store = self.store.write().await;
|
|
store.remove(key);
|
|
}
|
|
|
|
pub async fn clear(&self) {
|
|
let mut store = self.store.write().await;
|
|
store.clear();
|
|
}
|
|
|
|
pub async fn len(&self) -> usize {
|
|
let store = self.store.read().await;
|
|
store.len()
|
|
}
|
|
|
|
pub async fn invalidate_prefix(&self, prefix: &str) {
|
|
let mut store = self.store.write().await;
|
|
store.retain(|key, _| !key.starts_with(prefix));
|
|
|
|
let mut http_store = self.http_store.write().await;
|
|
http_store.retain(|key, _| !key.starts_with(prefix));
|
|
}
|
|
|
|
// HTTP Response Caching Methods
|
|
|
|
pub async fn get_http_response(&self, url: &str) -> Option<CachedHttpResponse> {
|
|
// Clean up expired entries periodically
|
|
if rand::random::<f32>() < 0.1 {
|
|
self.cleanup_expired_http().await;
|
|
}
|
|
|
|
// 1. Check memory cache first (fastest)
|
|
{
|
|
let store = self.http_store.read().await;
|
|
println!("🔍 Memory cache lookup for: {}", url);
|
|
println!("🔍 Memory cache has {} entries", store.len());
|
|
|
|
if let Some(response) = store.get(url) {
|
|
if !response.is_expired() {
|
|
println!("🔍 Memory cache HIT - found valid entry");
|
|
return Some(response.clone());
|
|
} else {
|
|
println!("🔍 Memory cache entry expired");
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2. Check disk cache (persistent)
|
|
if let Some(cache_path) = self.get_cache_file_path(url) {
|
|
println!("🔍 Checking disk cache at: {:?}", cache_path);
|
|
|
|
if let Ok(file_content) = fs::read(&cache_path).await {
|
|
if let Ok(cached_response) = serde_json::from_slice::<CachedHttpResponse>(&file_content) {
|
|
if !cached_response.is_expired() {
|
|
println!("🔍 Disk cache HIT - loading into memory");
|
|
|
|
// Load back into memory cache for faster future access
|
|
let mut store = self.http_store.write().await;
|
|
store.insert(url.to_string(), cached_response.clone());
|
|
|
|
return Some(cached_response);
|
|
} else {
|
|
println!("🔍 Disk cache entry expired, removing file");
|
|
let _ = fs::remove_file(&cache_path).await;
|
|
}
|
|
} else {
|
|
println!("🔍 Failed to parse disk cache file");
|
|
}
|
|
} else {
|
|
println!("🔍 No disk cache file found");
|
|
}
|
|
}
|
|
|
|
println!("🔍 Cache MISS - no valid entry found");
|
|
None
|
|
}
|
|
|
|
pub async fn set_http_response(&self, url: &str, response: CachedHttpResponse) {
|
|
self.ensure_http_capacity().await;
|
|
|
|
// Store in memory cache
|
|
let mut store = self.http_store.write().await;
|
|
println!("🔍 Storing in memory cache: {}", url);
|
|
println!("🔍 Memory cache will have {} entries after insert", store.len() + 1);
|
|
store.insert(url.to_string(), response.clone());
|
|
drop(store); // Release the lock before async disk operation
|
|
|
|
// Store in disk cache (async, non-blocking)
|
|
if let Some(cache_path) = self.get_cache_file_path(url) {
|
|
println!("🔍 Storing to disk cache: {:?}", cache_path);
|
|
|
|
// Ensure cache directory exists
|
|
if let Some(parent) = cache_path.parent() {
|
|
let _ = fs::create_dir_all(parent).await;
|
|
}
|
|
|
|
// Serialize and save to disk
|
|
match serde_json::to_vec(&response) {
|
|
Ok(serialized) => {
|
|
if let Err(e) = fs::write(&cache_path, serialized).await {
|
|
println!("🔍 Failed to write disk cache: {}", e);
|
|
} else {
|
|
println!("🔍 Successfully saved to disk cache");
|
|
}
|
|
}
|
|
Err(e) => {
|
|
println!("🔍 Failed to serialize for disk cache: {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn cleanup_expired_http(&self) {
|
|
let mut store = self.http_store.write().await;
|
|
let now = Instant::now();
|
|
store.retain(|_, response| response.expires_at > now);
|
|
}
|
|
|
|
async fn ensure_http_capacity(&self) {
|
|
let mut store = self.http_store.write().await;
|
|
|
|
if store.len() >= self.max_size {
|
|
// Remove oldest entries if we're at capacity
|
|
let mut to_remove: Vec<String> = Vec::new();
|
|
{
|
|
let entries: Vec<_> = store.iter().collect();
|
|
let mut sorted_entries = entries;
|
|
sorted_entries.sort_by_key(|(_, response)| response.cached_at);
|
|
|
|
let remove_count = sorted_entries.len().saturating_sub(self.max_size / 2);
|
|
for (key, _) in sorted_entries.into_iter().take(remove_count) {
|
|
to_remove.push(key.clone());
|
|
}
|
|
}
|
|
|
|
// Now remove the keys
|
|
for key in to_remove {
|
|
store.remove(&key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl CachedHttpResponse {
|
|
pub fn new(
|
|
data: Vec<u8>,
|
|
content_type: String,
|
|
headers: HashMap<String, String>,
|
|
status_code: u16,
|
|
ttl: Duration
|
|
) -> Self {
|
|
let now = Instant::now();
|
|
Self {
|
|
data,
|
|
content_type,
|
|
headers,
|
|
status_code,
|
|
cached_at: now,
|
|
expires_at: now + ttl,
|
|
}
|
|
}
|
|
|
|
pub fn is_expired(&self) -> bool {
|
|
Instant::now() > self.expires_at
|
|
}
|
|
}
|
|
|
|
// Add rand dependency for periodic cleanup
|
|
// This is a simple implementation - in production you might want to use a more sophisticated cache like moka
|