moved data capturing into cache folder

This commit is contained in:
2025-12-05 22:32:42 +01:00
parent 58a498e694
commit a6823dc938
6 changed files with 230 additions and 80 deletions

1
.gitignore vendored
View File

@@ -32,6 +32,7 @@ target/
**/*.jsonl
**/*.csv
**/*.zip
**/*.log
#/economic_events*
#/economic_event_changes*

View File

@@ -1,15 +1,18 @@
use crate::util::directories::DataPaths;
// src/corporate/openfigi.rs
use super::{types::*};
use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Instant;
use tokio::time::{sleep, Duration};
use tokio::fs as tokio_fs;
use tokio::io::AsyncWriteExt;
use anyhow::{Context, anyhow};
#[derive(Clone)]
@@ -103,7 +106,7 @@ impl OpenFigiClient {
.map(|isin| json!({
"idType": "ID_ISIN",
"idValue": isin,
"marketSecDes": "Equity",
//"marketSecDes": "Equity",
}))
.collect();
@@ -195,9 +198,69 @@ impl OpenFigiClient {
}
}
/// Extracts the date from a GLEIF CSV filename in the format "isin-lei-DDMMYYYY.csv".
///
/// # Arguments
///
/// * `filename` - The GLEIF CSV filename (e.g., "isin-lei-24112025.csv")
///
/// # Returns
///
/// A string in the format "DDMMYYYY" (e.g., "24112025") if successfully parsed, otherwise the original filename.
fn extract_gleif_date_from_filename(filename: &str) -> String {
// Pattern: isin-lei-DDMMYYYY.csv
if let Some(start_idx) = filename.find("isin-lei-") {
let rest = &filename[start_idx + 9..]; // Skip "isin-lei-"
if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) {
return rest[0..8].to_string();
}
}
filename.to_string()
}
/// Finds the most recent GLEIF CSV file in the cache/gleif directory.
///
/// Returns the extracted date in format "DDMMYYYY" from the filename.
/// If no GLEIF file is found, returns None.
async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<Option<String>> {
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
.await
.context("Failed to read gleif cache directory")?;
let mut csv_files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if let Some(filename) = path.file_name() {
let filename_str = filename.to_string_lossy();
if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") {
csv_files.push(filename_str.to_string());
}
}
}
if csv_files.is_empty() {
return Ok(None);
}
// Sort files in reverse order (most recent first) based on date in filename
csv_files.sort();
csv_files.reverse();
let most_recent = &csv_files[0];
let date = extract_gleif_date_from_filename(most_recent);
println!(" Found GLEIF data dated: {}", date);
Ok(Some(date))
}
/// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI.
///
/// Attempts to load existing entries from "data/corporate/by_lei/lei_to_figi.jsonl" (JSON Lines format,
/// The mapping is stored in cache/glei_openfigi/{GLEIF_DATE}/lei_to_figi.jsonl, where GLEIF_DATE
/// is extracted from the GLEIF CSV filename (format: DDMMYYYY). If no specific date is provided,
/// the most recent GLEIF file in cache/gleif is used.
///
/// Attempts to load existing entries from the date-based jsonl file (JSON Lines format,
/// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to
/// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally.
///
@@ -209,6 +272,7 @@ impl OpenFigiClient {
/// # Arguments
///
/// * `lei_to_isins` - HashMap of LEI to Vec<ISIN> (used for fetching missing entries).
/// * `gleif_date` - Optional date in format "DDMMYYYY". If None, uses the most recent GLEIF file.
///
/// # Returns
///
@@ -218,12 +282,29 @@ impl OpenFigiClient {
///
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if OpenFIGI queries fail during fetching.
pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let data_dir = Path::new("data/corporate/by_lei");
tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?;
let path = data_dir.join("lei_to_figi.jsonl");
let mut lei_to_figis: HashMap<String, Vec<FigiInfo>> = load_lei_to_figi_jsonl(&path)?;
pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>, gleif_date: Option<&str>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let dir = DataPaths::new(".")?;
let gleif_cache_dir = dir.cache_gleif_dir();
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
// Determine the GLEIF date to use
let date = if let Some(d) = gleif_date {
d.to_string()
} else {
// Find the most recent GLEIF file
match find_most_recent_gleif_date(gleif_cache_dir).await? {
Some(d) => d,
None => return Err(anyhow!("No GLEIF CSV file found in cache/gleif directory")),
}
};
// Create date-based subdirectory in the mapping cache
let date_dir = map_cache_dir.join(&date);
tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?;
let path = date_dir.join("lei_to_figi.jsonl");
println!("Using LEI→FIGI mapping at: {}", path.display());
let mut lei_to_figis: HashMap<String, Vec<FigiInfo>> = load_lei_to_figi_jsonl(&path).await?;
let client = OpenFigiClient::new()?;
if !client.has_key {
@@ -258,7 +339,7 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>
}
// Append to .jsonl incrementally
append_lei_to_figi_jsonl(&path, &lei, &figis).context("Failed to append to JSONL")?;
append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?;
// Insert into in-memory map
lei_to_figis.insert(lei.clone(), figis);
@@ -290,18 +371,16 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>
/// # Errors
///
/// Returns an error if the file cannot be opened or if any line fails to parse as JSON.
fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
async fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let mut map = HashMap::new();
if !path.exists() {
return Ok(map);
}
let file = File::open(path).context("Failed to open JSONL file for reading")?;
let reader = BufReader::new(file);
let content = tokio_fs::read_to_string(path).await.context("Failed to read JSONL file")?;
for (line_num, line) in reader.lines().enumerate() {
let line = line.context(format!("Failed to read line {}", line_num + 1))?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
@@ -328,20 +407,24 @@ fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result<HashMap<String, Vec<Fig
/// # Errors
///
/// Returns an error if the file cannot be opened for append or if serialization fails.
fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("Failed to open JSONL file for append")?;
async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> {
let entry = json!({
"lei": lei,
"figis": figis,
});
let line = serde_json::to_string(&entry).context("Failed to serialize entry")? + "\n";
file.write_all(line.as_bytes()).context("Failed to write to JSONL file")?;
let mut file = tokio_fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.context("Failed to open JSONL file for append")?;
file.write_all(line.as_bytes())
.await
.context("Failed to write to JSONL file")?;
Ok(())
}
@@ -373,19 +456,22 @@ pub async fn load_or_build_all_securities(
HashMap<String, HashMap<String, OptionInfo>>
)> {
// Load existing data
let mut companies = load_from_cache("data/corporate/by_name/common_stocks.json").await?
let mut commons = load_from_cache("data/corporate/by_name/common_stocks.json").await?
.unwrap_or_else(HashMap::new);
let mut warrants = load_from_cache("data/corporate/by_name/warrants.json").await?
.unwrap_or_else(HashMap::new);
let mut options = load_from_cache("data/corporate/by_name/options.json").await?
.unwrap_or_else(HashMap::new);
/*let mut preferred = load_from_cache("data/corporate/by_name/preferred.json").await?
.unwrap_or_else(HashMap::new);*/
println!("Loaded existing data:");
println!(" - Companies: {}", companies.len());
println!(" - Companies: {}", commons.len());
println!(" - Warrants: {}", warrants.len());
println!(" - Options: {}", options.len());
let mut stats = ProcessingStats::new(companies.len(), warrants.len(), options.len());
let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len());
println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len());
@@ -410,7 +496,7 @@ pub async fn load_or_build_all_securities(
// Process common stocks -> companies
if !common_stocks.is_empty() {
process_common_stocks(&mut companies, &common_stocks, &mut stats);
process_common_stocks(&mut commons, &common_stocks, &mut stats);
}
// Process warrants
@@ -424,14 +510,14 @@ pub async fn load_or_build_all_securities(
}
}
stats.print_summary(companies.len(), warrants.len(), options.len());
stats.print_summary(commons.len(), warrants.len(), options.len());
// Save all three HashMaps
save_to_cache("data/corporate/by_name/common_stocks.json", &companies).await?;
save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?;
save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?;
save_to_cache("data/corporate/by_name/options.json", &options).await?;
Ok((companies, warrants, options))
Ok((commons, warrants, options))
}
/// Statistics tracker for processing
@@ -795,7 +881,8 @@ pub async fn load_figi_type_lists() -> anyhow::Result<()> {
let client = OpenFigiClient::new()?;
// Create cache directory
let cache_dir = Path::new("data/openfigi");
let dir = DataPaths::new(".")?;
let cache_dir = dir.cache_openfigi_dir();
tokio_fs::create_dir_all(cache_dir).await
.context("Failed to create data/openfigi directory")?;

View File

@@ -1,7 +1,7 @@
// src/corporate/scraper.rs
use super::{types::*, helpers::*, openfigi::*};
//use crate::corporate::openfigi::OpenFigiClient;
use crate::{webdriver::webdriver::*};
use crate::{webdriver::webdriver::*, util::directories::DataPaths, util::logger};
use fantoccini::{Client, Locator};
use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Utc};
@@ -490,11 +490,19 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
if let Err(e) = std::fs::create_dir_all("data/gleif") {
println!("Failed to create data directory: {e}");
// Initialize DataPaths and create cache/gleif directory
let paths = DataPaths::new(".")?;
let gleif_cache_dir = paths.cache_gleif_dir();
if let Err(e) = std::fs::create_dir_all(&gleif_cache_dir) {
let msg = format!("Failed to create cache/gleif directory: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
logger::log_info("Corporate Scraper: Downloading ISIN/LEI mapping from GLEIF...").await;
// Download ZIP and get the filename from Content-Disposition header
let client = match reqwest::Client::builder()
.user_agent(USER_AGENT)
@@ -503,7 +511,9 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
{
Ok(c) => c,
Err(e) => {
println!("Failed to create HTTP client: {e}");
let msg = format!("Failed to create HTTP client: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
};
@@ -511,11 +521,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let resp = match client.get(url).send().await {
Ok(r) if r.status().is_success() => r,
Ok(resp) => {
println!("Server returned HTTP {}", resp.status());
let msg = format!("Server returned HTTP {}", resp.status());
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
Err(e) => {
println!("Failed to download ISIN/LEI ZIP: {e}");
let msg = format!("Failed to download ISIN/LEI ZIP: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
};
@@ -528,21 +542,30 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
.and_then(|s| s.split("filename=").nth(1).map(|f| f.trim_matches('"').to_string()))
.unwrap_or_else(|| "isin_lei.zip".to_string());
// Parse timestamp from filename and convert to DDMMYYYY format
let parsed_filename = parse_gleif_filename(&filename);
logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await;
let bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
println!("Failed to read ZIP bytes: {e}");
let msg = format!("Failed to read ZIP bytes: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
};
let zip_path = format!("data/gleif/{}", filename);
let csv_path = format!("data/gleif/{}", filename.replace(".zip", ".csv"));
let zip_path = gleif_cache_dir.join(&parsed_filename);
let csv_path = gleif_cache_dir.join(parsed_filename.replace(".zip", ".csv"));
if let Err(e) = tokio::fs::write(&zip_path, &bytes).await {
println!("Failed to write ZIP file: {e}");
let msg = format!("Failed to write ZIP file: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
logger::log_info(&format!("Corporate Scraper: Saved ZIP to {:?}", zip_path)).await;
// Extract CSV
let archive = match std::fs::File::open(&zip_path)
@@ -550,11 +573,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
{
Ok(Ok(a)) => a,
Ok(Err(e)) => {
println!("Invalid ZIP: {e}");
let msg = format!("Invalid ZIP: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
Err(e) => {
println!("Cannot open ZIP file: {e}");
let msg = format!("Cannot open ZIP file: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
};
@@ -568,7 +595,9 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
}) {
Some(i) => i,
None => {
println!("ZIP did not contain a CSV file");
let msg = "ZIP did not contain a CSV file";
logger::log_error(msg).await;
println!("{}", msg);
return Ok(None);
}
};
@@ -576,23 +605,58 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let mut csv_file = match archive.by_index(idx) {
Ok(f) => f,
Err(e) => {
println!("Failed to read CSV entry: {e}");
let msg = format!("Failed to read CSV entry: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
};
let mut csv_bytes = Vec::new();
if let Err(e) = csv_file.read_to_end(&mut csv_bytes) {
println!("Failed to extract CSV: {e}");
let msg = format!("Failed to extract CSV: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await {
println!("Failed to save CSV file: {e}");
let msg = format!("Failed to save CSV file: {}", e);
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None);
}
Ok(Some(csv_path))
let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path);
println!("{}", msg);
logger::log_info(&msg).await;
Ok(Some(csv_path.to_string_lossy().to_string()))
}
/// Parse GLEIF filename and convert timestamp to DDMMYYYY format
/// Example: "isin-lei-20251124T080254.csv" -> "isin-lei-24112025.csv"
fn parse_gleif_filename(filename: &str) -> String {
// Try to find pattern: isin-lei-YYYYMMDDTHHMMSS.zip/csv
if let Some(start_idx) = filename.find("isin-lei-") {
let rest = &filename[start_idx + 9..]; // After "isin-lei-"
// Extract the 8 digits (YYYYMMDD)
if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) {
let date_part = &rest[0..8];
// date_part is YYYYMMDD, convert to DDMMYYYY
if date_part.len() == 8 {
let year = &date_part[0..4];
let month = &date_part[4..6];
let day = &date_part[6..8];
let extension = if filename.ends_with(".zip") { ".zip" } else { ".csv" };
return format!("isin-lei-{}{}{}{}", day, month, year, extension);
}
}
}
// Fallback: return original filename if parsing fails
filename.to_string()
}

View File

@@ -61,7 +61,7 @@ pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> a
// 3. Build FIGI → LEI map
logger::log_info("Corporate Update: Building FIGI → LEI map...").await;
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins).await {
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins, None).await {
Ok(map) => {
let msg = format!("Corporate Update: Built FIGI map with {} entries", map.len());
println!("{}", msg);

View File

@@ -7,39 +7,10 @@ const EXTRACTION_JS: &str = include_str!("extraction_script.js");
pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> {
client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?;
//dismiss_overlays(client).await?;
/*if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await {
tab.click().await?;
println!("High importance tab selected");
sleep(Duration::from_secs(2)).await;
}*/
Ok(())
}
/*pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 {
let removed: bool = client
.execute(
r#"(() => {
const iframe = document.querySelector('iframe[title="Contentpass First Layer"]');
if (iframe && iframe.parentNode) {
iframe.parentNode.removeChild(iframe);
return true;
}
return false;
})()"#,
vec![],
)
.await?
.as_bool()
.unwrap_or(false);
if removed { break; }
sleep(Duration::from_millis(500)).await;
}
Ok(())
}*/
pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> {
let script = format!(
r#"

View File

@@ -7,6 +7,10 @@ pub struct DataPaths {
data_dir: PathBuf,
cache_dir: PathBuf,
logs_dir: PathBuf,
// Cache data subdirectories
cache_gleif_dir: PathBuf,
cache_openfigi_dir: PathBuf,
cache_gleif_openfigi_map_dir: PathBuf,
// Economic data subdirectories
economic_events_dir: PathBuf,
economic_changes_dir: PathBuf,
@@ -25,6 +29,11 @@ impl DataPaths {
let cache_dir = base_dir.join("cache");
let logs_dir = base_dir.join("logs");
// Cache subdirectories
let cache_gleif_dir = cache_dir.join("gleif");
let cache_openfigi_dir = cache_dir.join("openfigi");
let cache_gleif_openfigi_map_dir = cache_dir.join("glei_openfigi");
// Economic subdirectories
let economic_events_dir = data_dir.join("economic").join("events");
let economic_changes_dir = economic_events_dir.join("changes");
@@ -39,6 +48,9 @@ impl DataPaths {
fs::create_dir_all(&data_dir)?;
fs::create_dir_all(&cache_dir)?;
fs::create_dir_all(&logs_dir)?;
fs::create_dir_all(&cache_gleif_dir)?;
fs::create_dir_all(&cache_openfigi_dir)?;
fs::create_dir_all(&cache_gleif_openfigi_map_dir)?;
fs::create_dir_all(&economic_events_dir)?;
fs::create_dir_all(&economic_changes_dir)?;
fs::create_dir_all(&corporate_events_dir)?;
@@ -50,6 +62,9 @@ impl DataPaths {
data_dir,
cache_dir,
logs_dir,
cache_gleif_dir,
cache_openfigi_dir,
cache_gleif_openfigi_map_dir,
economic_events_dir,
economic_changes_dir,
corporate_events_dir,
@@ -74,6 +89,18 @@ impl DataPaths {
&self.logs_dir
}
pub fn cache_gleif_dir(&self) -> &Path {
&self.cache_gleif_dir
}
pub fn cache_openfigi_dir(&self) -> &Path {
&self.cache_openfigi_dir
}
pub fn cache_gleif_openfigi_map_dir(&self) -> &Path {
&self.cache_gleif_openfigi_map_dir
}
/// Get the economic events directory
pub fn economic_events_dir(&self) -> &Path {
&self.economic_events_dir