commented unused function
This commit is contained in:
@@ -6,6 +6,8 @@ use super::{types::*};
|
|||||||
use reqwest::Client as HttpClient;
|
use reqwest::Client as HttpClient;
|
||||||
use reqwest::header::{HeaderMap, HeaderValue};
|
use reqwest::header::{HeaderMap, HeaderValue};
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
use csv::{ReaderBuilder, StringRecord, WriterBuilder};
|
||||||
|
use chrono::NaiveDate;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::path::{Path};
|
use std::path::{Path};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@@ -203,12 +205,12 @@ impl OpenFigiClient {
|
|||||||
name: item["name"].as_str().unwrap_or("").to_string(),
|
name: item["name"].as_str().unwrap_or("").to_string(),
|
||||||
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
|
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
|
||||||
exch_code: item["exchCode"].as_str().unwrap_or("").to_string(),
|
exch_code: item["exchCode"].as_str().unwrap_or("").to_string(),
|
||||||
compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
|
composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
|
||||||
securityType: sec_type.to_string(),
|
security_type: sec_type.to_string(),
|
||||||
marketSector: market_sec.to_string(),
|
market_sector: market_sec.to_string(),
|
||||||
shareClassFIGI: item["shareClassFIGI"].as_str().unwrap_or("").to_string(),
|
share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(),
|
||||||
securityType2: item["securityType2"].as_str().unwrap_or("").to_string(),
|
security_type2: item["securityType2"].as_str().unwrap_or("").to_string(),
|
||||||
securityDescription: item["securityDescription"].as_str().unwrap_or("").to_string(),
|
security_description: item["securityDescription"].as_str().unwrap_or("").to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
all_figi_infos.push(figi_info);
|
all_figi_infos.push(figi_info);
|
||||||
@@ -318,12 +320,43 @@ async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
|
|||||||
/// Returns the extracted date in format "DDMMYYYY" from the filename.
|
/// Returns the extracted date in format "DDMMYYYY" from the filename.
|
||||||
/// If no GLEIF file is found, returns None.
|
/// If no GLEIF file is found, returns None.
|
||||||
async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<Option<String>> {
|
async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<Option<String>> {
|
||||||
|
// First check for subdirectories named as DDMMYYYY and pick the most recent date
|
||||||
|
let mut dir_entries = tokio_fs::read_dir(gleif_cache_dir)
|
||||||
|
.await
|
||||||
|
.context("Failed to read gleif cache directory")?;
|
||||||
|
|
||||||
|
let mut found_dates: Vec<NaiveDate> = Vec::new();
|
||||||
|
|
||||||
|
while let Some(entry) = dir_entries.next_entry().await? {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.is_dir() {
|
||||||
|
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
|
||||||
|
// Expect folder name in DDMMYYYY
|
||||||
|
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
|
||||||
|
if let Ok(nd) = NaiveDate::parse_from_str(name, "%d%m%Y") {
|
||||||
|
found_dates.push(nd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found_dates.is_empty() {
|
||||||
|
found_dates.sort();
|
||||||
|
if let Some(most_recent) = found_dates.last() {
|
||||||
|
let date_str = most_recent.format("%d%m%Y").to_string();
|
||||||
|
let msg = format!(" Found GLEIF data dated (from subdirs): {}", date_str);
|
||||||
|
logger::log_info(&msg).await;
|
||||||
|
return Ok(Some(date_str));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: look for CSV files in the directory as before
|
||||||
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
|
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
|
||||||
.await
|
.await
|
||||||
.context("Failed to read gleif cache directory")?;
|
.context("Failed to read gleif cache directory")?;
|
||||||
|
|
||||||
let mut csv_files = Vec::new();
|
let mut csv_files = Vec::new();
|
||||||
|
|
||||||
while let Some(entry) = entries.next_entry().await? {
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
if let Some(filename) = path.file_name() {
|
if let Some(filename) = path.file_name() {
|
||||||
@@ -333,20 +366,20 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<O
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if csv_files.is_empty() {
|
if csv_files.is_empty() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort files in reverse order (most recent first) based on date in filename
|
// Sort files in reverse order (most recent first) based on date in filename
|
||||||
csv_files.sort();
|
csv_files.sort();
|
||||||
csv_files.reverse();
|
csv_files.reverse();
|
||||||
|
|
||||||
let most_recent = &csv_files[0];
|
let most_recent = &csv_files[0];
|
||||||
let date = extract_gleif_date_from_filename(most_recent);
|
let date = extract_gleif_date_from_filename(most_recent);
|
||||||
|
|
||||||
let msg = format!(" Found GLEIF data dated: {}", date);
|
let msg = format!(" Found GLEIF data dated: {}", date);
|
||||||
|
|
||||||
logger::log_info(&msg).await;
|
logger::log_info(&msg).await;
|
||||||
Ok(Some(date))
|
Ok(Some(date))
|
||||||
}
|
}
|
||||||
@@ -434,7 +467,7 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
let dir = DataPaths::new(".")?;
|
let dir = DataPaths::new(".")?;
|
||||||
let gleif_cache_dir = dir.cache_gleif_dir();
|
let gleif_cache_dir = dir.cache_gleif_dir();
|
||||||
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
||||||
|
|
||||||
// Determine the GLEIF date to use
|
// Determine the GLEIF date to use
|
||||||
let date = if let Some(d) = gleif_date {
|
let date = if let Some(d) = gleif_date {
|
||||||
let msg = format!("Using provided GLEIF date: {}", d);
|
let msg = format!("Using provided GLEIF date: {}", d);
|
||||||
@@ -443,7 +476,7 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
} else {
|
} else {
|
||||||
// Find the most recent GLEIF file
|
// Find the most recent GLEIF file
|
||||||
logger::log_info("Searching for most recent GLEIF file...").await;
|
logger::log_info("Searching for most recent GLEIF file...").await;
|
||||||
match find_most_recent_gleif_date(gleif_cache_dir).await? {
|
match find_most_recent_gleif_date(&gleif_cache_dir).await? {
|
||||||
Some(d) => d,
|
Some(d) => d,
|
||||||
None => {
|
None => {
|
||||||
let err = "No GLEIF CSV file found in cache/gleif directory";
|
let err = "No GLEIF CSV file found in cache/gleif directory";
|
||||||
@@ -452,7 +485,10 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Creat date-based subdirectory in the gleif cache
|
||||||
|
let gleif_date_dir = gleif_cache_dir.join(&date);
|
||||||
|
|
||||||
// Create date-based subdirectory in the mapping cache
|
// Create date-based subdirectory in the mapping cache
|
||||||
let msg = format!("Creating date directory for: {}", date);
|
let msg = format!("Creating date directory for: {}", date);
|
||||||
logger::log_info(&msg).await;
|
logger::log_info(&msg).await;
|
||||||
@@ -500,6 +536,7 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
let mut processed = sector_maps.values().map(|m| m.len()).sum::<usize>();
|
let mut processed = sector_maps.values().map(|m| m.len()).sum::<usize>();
|
||||||
let total = leis.len();
|
let total = leis.len();
|
||||||
let mut no_hit_leis = Vec::new(); // Track LEIs with no data found (no_hit)
|
let mut no_hit_leis = Vec::new(); // Track LEIs with no data found (no_hit)
|
||||||
|
let mut leis_to_delete_batch = Vec::new(); // Batch delete every 100 LEIs
|
||||||
|
|
||||||
let msg = format!("Total LEIs to process: {}, already processed: {}", total, processed);
|
let msg = format!("Total LEIs to process: {}, already processed: {}", total, processed);
|
||||||
|
|
||||||
@@ -535,12 +572,18 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
let no_hit_msg = format!(" no_hit: LEI {} returned no FIGIs", lei);
|
let no_hit_msg = format!(" no_hit: LEI {} returned no FIGIs", lei);
|
||||||
logger::log_warn(&no_hit_msg).await;
|
logger::log_warn(&no_hit_msg).await;
|
||||||
no_hit_leis.push(lei.clone());
|
no_hit_leis.push(lei.clone());
|
||||||
|
leis_to_delete_batch.push(lei.clone());
|
||||||
|
|
||||||
// Remove immediately from GLEIF CSV to prevent progress loss on interrupt
|
// Delete every 100 no_hit LEIs to prevent progress loss on interrupt
|
||||||
if let Err(e) = remove_lei_from_gleif_csv_single(gleif_cache_dir, &lei).await {
|
if leis_to_delete_batch.len() >= 100 {
|
||||||
let warn_msg = format!("Warning: Failed to remove LEI {} from GLEIF CSV: {}", lei, e);
|
let batch_msg = format!("Batch deleting {} LEIs from GLEIF CSV...", leis_to_delete_batch.len());
|
||||||
eprintln!("{}", warn_msg);
|
logger::log_info(&batch_msg).await;
|
||||||
logger::log_warn(&warn_msg).await;
|
if let Err(e) = remove_leis_batch_from_gleif_csv(&gleif_date_dir, &leis_to_delete_batch).await {
|
||||||
|
let warn_msg = format!("Warning: Failed to batch remove LEIs from GLEIF CSV: {}", e);
|
||||||
|
eprintln!("{}", warn_msg);
|
||||||
|
logger::log_warn(&warn_msg).await;
|
||||||
|
}
|
||||||
|
leis_to_delete_batch.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
@@ -554,7 +597,7 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
let mut uncategorized_figis = Vec::new();
|
let mut uncategorized_figis = Vec::new();
|
||||||
|
|
||||||
for figi_info in all_figi_infos {
|
for figi_info in all_figi_infos {
|
||||||
let sector = figi_info.marketSector.clone();
|
let sector = figi_info.market_sector.clone();
|
||||||
|
|
||||||
if sector.is_empty() {
|
if sector.is_empty() {
|
||||||
// Case 2: Hit but no marketSecDes - save to uncategorized
|
// Case 2: Hit but no marketSecDes - save to uncategorized
|
||||||
@@ -610,9 +653,20 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<Str
|
|||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete any remaining LEIs in the batch
|
||||||
|
if !leis_to_delete_batch.is_empty() {
|
||||||
|
let batch_msg = format!("Final batch: Deleting {} LEIs from GLEIF CSV...", leis_to_delete_batch.len());
|
||||||
|
logger::log_info(&batch_msg).await;
|
||||||
|
if let Err(e) = remove_leis_batch_from_gleif_csv(gleif_cache_dir, &leis_to_delete_batch).await {
|
||||||
|
let warn_msg = format!("Warning: Failed to delete final batch from GLEIF CSV: {}", e);
|
||||||
|
eprintln!("{}", warn_msg);
|
||||||
|
logger::log_warn(&warn_msg).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Log final summary for no_hit LEIs (they've already been removed incrementally)
|
// Log final summary for no_hit LEIs (they've already been removed incrementally)
|
||||||
if !no_hit_leis.is_empty() {
|
if !no_hit_leis.is_empty() {
|
||||||
let no_hit_summary = format!("no_hit (removed incrementally from GLEIF): {} LEIs", no_hit_leis.len());
|
let no_hit_summary = format!("no_hit (removed in batches from GLEIF): {} LEIs", no_hit_leis.len());
|
||||||
println!("{}", no_hit_summary);
|
println!("{}", no_hit_summary);
|
||||||
logger::log_info(&no_hit_summary).await;
|
logger::log_info(&no_hit_summary).await;
|
||||||
}
|
}
|
||||||
@@ -696,81 +750,10 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) ->
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes a single invalid LEI from the GLEIF CSV file immediately.
|
/// Removes multiple invalid LEIs from the GLEIF CSV file in a single batch operation.
|
||||||
///
|
///
|
||||||
/// This function is called after each no_hit detection to prevent progress loss on interrupt.
|
/// This function is more efficient than removing LEIs one at a time.
|
||||||
/// It reads the GLEIF CSV, filters out the specific LEI, and overwrites the file.
|
/// It reads the GLEIF CSV once, filters out all specified LEIs, and overwrites the file once.
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `gleif_cache_dir` - Path to the cache/gleif directory
|
|
||||||
/// * `lei` - The LEI string to remove
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// Ok(()) if successful, Err if file operations fail.
|
|
||||||
async fn remove_lei_from_gleif_csv_single(gleif_cache_dir: &Path, lei: &str) -> anyhow::Result<()> {
|
|
||||||
// Find the most recent GLEIF CSV file
|
|
||||||
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
|
|
||||||
.await
|
|
||||||
.context("Failed to read gleif cache directory")?;
|
|
||||||
|
|
||||||
let mut csv_files = Vec::new();
|
|
||||||
|
|
||||||
while let Some(entry) = entries.next_entry().await? {
|
|
||||||
let path = entry.path();
|
|
||||||
if let Some(filename) = path.file_name() {
|
|
||||||
let filename_str = filename.to_string_lossy();
|
|
||||||
if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") {
|
|
||||||
csv_files.push(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if csv_files.is_empty() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort and get the most recent
|
|
||||||
csv_files.sort();
|
|
||||||
csv_files.reverse();
|
|
||||||
let gleif_file = &csv_files[0];
|
|
||||||
|
|
||||||
// Read the CSV file
|
|
||||||
let content = tokio_fs::read_to_string(gleif_file)
|
|
||||||
.await
|
|
||||||
.context("Failed to read GLEIF CSV")?;
|
|
||||||
|
|
||||||
// Filter out line with this LEI
|
|
||||||
let filtered_lines: Vec<&str> = content
|
|
||||||
.lines()
|
|
||||||
.filter(|line| {
|
|
||||||
// GLEIF CSV format: ISIN,LEI
|
|
||||||
let parts: Vec<&str> = line.split(',').collect();
|
|
||||||
if parts.len() >= 2 {
|
|
||||||
parts[1] != lei
|
|
||||||
} else {
|
|
||||||
true // Keep lines that don't match the format (e.g., header)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Only write if something was actually removed
|
|
||||||
if filtered_lines.len() < content.lines().count() {
|
|
||||||
let new_content = filtered_lines.join("\n") + "\n";
|
|
||||||
tokio_fs::write(gleif_file, new_content)
|
|
||||||
.await
|
|
||||||
.context("Failed to write filtered GLEIF CSV")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes invalid LEIs from the GLEIF CSV file.
|
|
||||||
///
|
|
||||||
/// When an API call succeeds but returns no data (no_hit), the LEI is considered invalid
|
|
||||||
/// and should be removed from the GLEIF CSV to prevent re-scraping on future runs.
|
|
||||||
///
|
|
||||||
/// This function reads the GLEIF CSV, filters out the specified LEIs, and overwrites the file.
|
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
@@ -779,8 +762,10 @@ async fn remove_lei_from_gleif_csv_single(gleif_cache_dir: &Path, lei: &str) ->
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// Ok(()) if successful, Err if file operations fail.
|
/// Ok(()) if successful, Err if file operations fail.
|
||||||
async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> {
|
async fn remove_leis_batch_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> {
|
||||||
logger::log_info(&format!("Removing {} invalid LEIs from GLEIF CSV...", leis_to_remove.len())).await;
|
if leis_to_remove.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Find the most recent GLEIF CSV file
|
// Find the most recent GLEIF CSV file
|
||||||
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
|
let mut entries = tokio_fs::read_dir(gleif_cache_dir)
|
||||||
@@ -788,7 +773,7 @@ async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[St
|
|||||||
.context("Failed to read gleif cache directory")?;
|
.context("Failed to read gleif cache directory")?;
|
||||||
|
|
||||||
let mut csv_files = Vec::new();
|
let mut csv_files = Vec::new();
|
||||||
|
|
||||||
while let Some(entry) = entries.next_entry().await? {
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
if let Some(filename) = path.file_name() {
|
if let Some(filename) = path.file_name() {
|
||||||
@@ -800,53 +785,151 @@ async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[St
|
|||||||
}
|
}
|
||||||
|
|
||||||
if csv_files.is_empty() {
|
if csv_files.is_empty() {
|
||||||
logger::log_warn("No GLEIF CSV files found for removal operation").await;
|
logger::log_warn("No GLEIF CSV files found for batch removal operation").await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort and get the most recent
|
// Prefer an original (non-_clean) GLEIF CSV if available; otherwise use the most recent file.
|
||||||
csv_files.sort();
|
csv_files.sort();
|
||||||
csv_files.reverse();
|
csv_files.reverse();
|
||||||
let gleif_file = &csv_files[0];
|
|
||||||
let debug_msg = format!("Reading GLEIF file: {}", gleif_file.display());
|
let mut gleif_file: &std::path::PathBuf = &csv_files[0];
|
||||||
|
// Try to find the most recent filename that does NOT end with "_clean.csv"
|
||||||
|
if let Some(non_clean) = csv_files.iter().find(|p| {
|
||||||
|
p.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.map(|s| !s.to_lowercase().ends_with("_clean.csv"))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}) {
|
||||||
|
gleif_file = non_clean;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare clean file path: insert "_clean" before extension
|
||||||
|
let orig_path = gleif_file;
|
||||||
|
let file_name = orig_path.file_name().and_then(|n| n.to_str()).unwrap_or("gleif.csv");
|
||||||
|
let mut stem = orig_path.file_stem().and_then(|s| s.to_str()).unwrap_or("isin-lei").to_string();
|
||||||
|
let parent = orig_path.parent().unwrap_or_else(|| Path::new("."));
|
||||||
|
// Avoid creating a double "_clean_clean.csv". If stem already ends with "_clean", keep it.
|
||||||
|
if stem.to_lowercase().ends_with("_clean") {
|
||||||
|
// stem is already clean; keep same filename (no double suffix)
|
||||||
|
// e.g., stem="isin-lei-24112025_clean" -> clean_name="isin-lei-24112025_clean.csv"
|
||||||
|
} else {
|
||||||
|
stem = format!("{}_clean", stem);
|
||||||
|
}
|
||||||
|
|
||||||
|
let clean_name = format!("{}.csv", stem);
|
||||||
|
let clean_path = parent.join(&clean_name);
|
||||||
|
|
||||||
|
// If a clean file already exists, operate on it; otherwise read original and write clean file
|
||||||
|
let source_path = if clean_path.exists() { &clean_path } else { orig_path };
|
||||||
|
|
||||||
|
let debug_msg = format!("Reading GLEIF source for batch removal: {} (writing to {})", source_path.display(), clean_path.display());
|
||||||
logger::log_info(&debug_msg).await;
|
logger::log_info(&debug_msg).await;
|
||||||
|
|
||||||
// Read the CSV file
|
// Cleanup any accidental double-clean files in the same directory: if a file ends with
|
||||||
let content = tokio_fs::read_to_string(gleif_file)
|
// "_clean_clean.csv" replace it with single "_clean.csv" or remove it if target exists.
|
||||||
.await
|
if let Ok(mut dir_entries) = tokio_fs::read_dir(parent).await {
|
||||||
.context("Failed to read GLEIF CSV")?;
|
while let Ok(Some(entry)) = dir_entries.next_entry().await {
|
||||||
|
if let Some(name) = entry.file_name().to_str().map(|s| s.to_string()) {
|
||||||
let original_lines = content.lines().count();
|
if name.to_lowercase().ends_with("_clean_clean.csv") {
|
||||||
|
let offending = entry.path();
|
||||||
// Convert LEIs to remove into a HashSet for faster lookup
|
let candidate = offending.file_name().and_then(|n| n.to_str()).unwrap_or("");
|
||||||
let remove_set: std::collections::HashSet<_> = leis_to_remove.iter().cloned().collect();
|
let target_name = candidate.replacen("_clean_clean.csv", "_clean.csv", 1);
|
||||||
|
let target_path = parent.join(target_name);
|
||||||
// Filter out lines with LEIs to remove
|
|
||||||
let filtered_lines: Vec<&str> = content
|
if !target_path.exists() {
|
||||||
.lines()
|
// Rename offending -> target
|
||||||
.filter(|line| {
|
let _ = tokio_fs::rename(&offending, &target_path).await;
|
||||||
// GLEIF CSV format: ISIN,LEI
|
let msg = format!("Renamed {} -> {}", offending.display(), target_path.display());
|
||||||
let parts: Vec<&str> = line.split(',').collect();
|
logger::log_info(&msg).await;
|
||||||
if parts.len() >= 2 {
|
} else {
|
||||||
!remove_set.contains(parts[1])
|
// Target exists already; remove offending duplicate
|
||||||
} else {
|
let _ = tokio_fs::remove_file(&offending).await;
|
||||||
true // Keep lines that don't match the format (e.g., header)
|
let msg = format!("Removed duplicate {}", offending.display());
|
||||||
|
logger::log_info(&msg).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.collect();
|
}
|
||||||
|
|
||||||
let removed_count = original_lines - filtered_lines.len();
|
// Read file into memory and parse with csv crate for robust handling of quoted fields
|
||||||
|
let content = tokio_fs::read_to_string(source_path)
|
||||||
// Write back the filtered content
|
|
||||||
let new_content = filtered_lines.join("\n") + "\n";
|
|
||||||
tokio_fs::write(gleif_file, new_content)
|
|
||||||
.await
|
.await
|
||||||
.context("Failed to write filtered GLEIF CSV")?;
|
.context("Failed to read GLEIF CSV source")?;
|
||||||
|
|
||||||
let success_msg = format!("✓ Removed {} invalid LEIs from GLEIF CSV (was {} lines, now {} lines)", leis_to_remove.len(), original_lines, filtered_lines.len());
|
// Convert LEIs to remove into a HashSet (normalized)
|
||||||
|
let remove_set: std::collections::HashSet<String> = leis_to_remove
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.trim().trim_matches('"').to_uppercase())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Build CSV reader: try with headers first; allow flexible records
|
||||||
|
let mut reader = ReaderBuilder::new()
|
||||||
|
.has_headers(true)
|
||||||
|
.flexible(true)
|
||||||
|
.from_reader(content.as_bytes());
|
||||||
|
|
||||||
|
// Remember headers (if present) and then iterate records.
|
||||||
|
let headers_record = match reader.headers() {
|
||||||
|
Ok(h) => Some(h.clone()),
|
||||||
|
Err(_) => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// We'll collect kept records and count original rows
|
||||||
|
let mut kept_records: Vec<StringRecord> = Vec::new();
|
||||||
|
let mut original_count: usize = 0;
|
||||||
|
let mut removed_count: usize = 0;
|
||||||
|
|
||||||
|
// For robustness, search all columns for a matching LEI instead of relying on a single column index.
|
||||||
|
for result in reader.records() {
|
||||||
|
let record = result.context("Failed to parse CSV record")?;
|
||||||
|
original_count += 1;
|
||||||
|
|
||||||
|
// Check every field for a match in the remove set
|
||||||
|
let mut matched = false;
|
||||||
|
for field in record.iter() {
|
||||||
|
let norm = field.trim().trim_matches('"').to_uppercase();
|
||||||
|
if remove_set.contains(&norm) {
|
||||||
|
matched = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if matched {
|
||||||
|
removed_count += 1;
|
||||||
|
} else {
|
||||||
|
kept_records.push(record.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_count = kept_records.len();
|
||||||
|
|
||||||
|
// Write back using csv writer to preserve quoting/format into clean file
|
||||||
|
let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]);
|
||||||
|
// If original had headers, write them back
|
||||||
|
if let Some(headers) = headers_record {
|
||||||
|
wtr.write_record(headers.iter())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for rec in &kept_records {
|
||||||
|
wtr.write_record(rec.iter())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let out_bytes = wtr.into_inner().context("Failed to finalize CSV writer")?;
|
||||||
|
let out_str = String::from_utf8(out_bytes).context("CSV output not valid UTF-8")?;
|
||||||
|
|
||||||
|
tokio_fs::write(&clean_path, out_str)
|
||||||
|
.await
|
||||||
|
.context("Failed to write filtered GLEIF CSV clean file")?;
|
||||||
|
|
||||||
|
let success_msg = format!(
|
||||||
|
"✓ Batch attempted to remove {} LEIs from GLEIF CSV (was {} records, now {} records, removed {} rows) -> {}",
|
||||||
|
leis_to_remove.len(), original_count, new_count, removed_count, clean_path.display()
|
||||||
|
);
|
||||||
println!("{}", success_msg);
|
println!("{}", success_msg);
|
||||||
logger::log_info(&success_msg).await;
|
logger::log_info(&success_msg).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -907,7 +990,7 @@ pub async fn load_or_build_all_securities(
|
|||||||
let mut option_securities = Vec::new();
|
let mut option_securities = Vec::new();
|
||||||
|
|
||||||
for figi_info in figi_infos {
|
for figi_info in figi_infos {
|
||||||
match figi_info.securityType.as_str() {
|
match figi_info.security_type.as_str() {
|
||||||
"Common Stock" => common_stocks.push(figi_info.clone()),
|
"Common Stock" => common_stocks.push(figi_info.clone()),
|
||||||
"Equity WRT" => warrant_securities.push(figi_info.clone()),
|
"Equity WRT" => warrant_securities.push(figi_info.clone()),
|
||||||
"Equity Option" => option_securities.push(figi_info.clone()),
|
"Equity Option" => option_securities.push(figi_info.clone()),
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/
|
|||||||
/// - Not an equity (ETF, bond, etc.)
|
/// - Not an equity (ETF, bond, etc.)
|
||||||
/// - Missing critical fields
|
/// - Missing critical fields
|
||||||
/// - Network or JSON parsing errors
|
/// - Network or JSON parsing errors
|
||||||
pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
|
/*pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
|
||||||
let url = format!(
|
let url = format!(
|
||||||
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile",
|
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile",
|
||||||
ticker
|
ticker
|
||||||
@@ -149,7 +149,7 @@ pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
|
|||||||
exchange_mic,
|
exchange_mic,
|
||||||
currency,
|
currency,
|
||||||
})
|
})
|
||||||
}
|
}*/
|
||||||
|
|
||||||
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
|
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
|
||||||
///
|
///
|
||||||
@@ -546,6 +546,67 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
|
|||||||
let parsed_filename = parse_gleif_filename(&filename);
|
let parsed_filename = parse_gleif_filename(&filename);
|
||||||
logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await;
|
logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await;
|
||||||
|
|
||||||
|
// Determine date (DDMMYYYY) from parsed filename: "isin-lei-DDMMYYYY.csv"
|
||||||
|
let mut date_str = String::new();
|
||||||
|
if let Some(start_idx) = parsed_filename.find("isin-lei-") {
|
||||||
|
let rest = &parsed_filename[start_idx + 9..];
|
||||||
|
if rest.len() >= 8 {
|
||||||
|
date_str = rest[0..8].to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we parsed a date, use/create a date folder under cache/gleif and operate inside it; otherwise use cache root.
|
||||||
|
let date_dir = if !date_str.is_empty() {
|
||||||
|
let p = gleif_cache_dir.join(&date_str);
|
||||||
|
// Ensure the date folder exists (create if necessary)
|
||||||
|
if let Err(e) = std::fs::create_dir_all(&p) {
|
||||||
|
let msg = format!("Failed to create date directory {:?}: {}", p, e);
|
||||||
|
logger::log_warn(&msg).await;
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(p)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Choose the directory where we'll look for existing files and where we'll save the new ones
|
||||||
|
let target_dir = date_dir.clone().unwrap_or_else(|| gleif_cache_dir.to_path_buf());
|
||||||
|
|
||||||
|
// If the date folder exists (or was created), prefer any *_clean.csv inside it and return that immediately
|
||||||
|
if let Some(ref ddir) = date_dir {
|
||||||
|
if let Ok(entries) = std::fs::read_dir(ddir) {
|
||||||
|
for entry in entries.flatten() {
|
||||||
|
if let Some(name) = entry.file_name().to_str() {
|
||||||
|
if name.to_lowercase().ends_with("_clean.csv") {
|
||||||
|
let path = ddir.join(name);
|
||||||
|
logger::log_info(&format!("Found existing clean GLEIF CSV: {}", path.display())).await;
|
||||||
|
return Ok(Some(path.to_string_lossy().to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no clean file found in the date folder (or date folder doesn't exist), check whether the csv/zip already exist in the target dir
|
||||||
|
let csv_candidate_name = parsed_filename.replace(".zip", ".csv");
|
||||||
|
let csv_candidate = target_dir.join(&csv_candidate_name);
|
||||||
|
let zip_candidate = target_dir.join(&parsed_filename);
|
||||||
|
|
||||||
|
if csv_candidate.exists() {
|
||||||
|
logger::log_info(&format!("Found existing GLEIF CSV: {}", csv_candidate.display())).await;
|
||||||
|
return Ok(Some(csv_candidate.to_string_lossy().to_string()));
|
||||||
|
}
|
||||||
|
if zip_candidate.exists() {
|
||||||
|
// If zip exists but csv does not, extract later; for now prefer returning csv path (may be created by extraction step)
|
||||||
|
let inferred_csv = target_dir.join(csv_candidate_name);
|
||||||
|
if inferred_csv.exists() {
|
||||||
|
logger::log_info(&format!("Found existing extracted CSV next to ZIP: {}", inferred_csv.display())).await;
|
||||||
|
return Ok(Some(inferred_csv.to_string_lossy().to_string()));
|
||||||
|
}
|
||||||
|
// otherwise we'll overwrite/extract into target_dir below
|
||||||
|
}
|
||||||
|
|
||||||
let bytes = match resp.bytes().await {
|
let bytes = match resp.bytes().await {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -555,9 +616,13 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
// Ensure target directory exists (create if it's the date folder and was absent earlier)
|
||||||
|
if let Some(ref ddir) = date_dir {
|
||||||
|
let _ = std::fs::create_dir_all(ddir);
|
||||||
|
}
|
||||||
|
|
||||||
let zip_path = gleif_cache_dir.join(&parsed_filename);
|
let zip_path = target_dir.join(&parsed_filename);
|
||||||
let csv_path = gleif_cache_dir.join(parsed_filename.replace(".zip", ".csv"));
|
let csv_path = target_dir.join(parsed_filename.replace(".zip", ".csv"));
|
||||||
|
|
||||||
if let Err(e) = tokio::fs::write(&zip_path, &bytes).await {
|
if let Err(e) = tokio::fs::write(&zip_path, &bytes).await {
|
||||||
let msg = format!("Failed to write ZIP file: {}", e);
|
let msg = format!("Failed to write ZIP file: {}", e);
|
||||||
@@ -616,19 +681,16 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
|
|||||||
if let Err(e) = csv_file.read_to_end(&mut csv_bytes) {
|
if let Err(e) = csv_file.read_to_end(&mut csv_bytes) {
|
||||||
let msg = format!("Failed to extract CSV: {}", e);
|
let msg = format!("Failed to extract CSV: {}", e);
|
||||||
logger::log_error(&msg).await;
|
logger::log_error(&msg).await;
|
||||||
println!("{}", msg);
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await {
|
if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await {
|
||||||
let msg = format!("Failed to save CSV file: {}", e);
|
let msg = format!("Failed to save CSV file: {}", e);
|
||||||
logger::log_error(&msg).await;
|
logger::log_error(&msg).await;
|
||||||
println!("{}", msg);
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path);
|
let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path);
|
||||||
println!("{}", msg);
|
|
||||||
logger::log_info(&msg).await;
|
logger::log_info(&msg).await;
|
||||||
|
|
||||||
Ok(Some(csv_path.to_string_lossy().to_string()))
|
Ok(Some(csv_path.to_string_lossy().to_string()))
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ pub async fn save_prices_by_source(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update available_exchanges.json with fetch results
|
/// Update available_exchanges.json with fetch results
|
||||||
pub async fn update_available_exchange(
|
/*pub async fn update_available_exchange(
|
||||||
paths: &DataPaths,
|
paths: &DataPaths,
|
||||||
isin: &str,
|
isin: &str,
|
||||||
ticker: &str,
|
ticker: &str,
|
||||||
@@ -205,7 +205,7 @@ pub async fn update_available_exchange(
|
|||||||
}
|
}
|
||||||
|
|
||||||
save_available_exchanges(paths, isin, exchanges).await
|
save_available_exchanges(paths, isin, exchanges).await
|
||||||
}
|
}*/
|
||||||
|
|
||||||
/// Infer currency from ticker suffix
|
/// Infer currency from ticker suffix
|
||||||
fn infer_currency_from_ticker(ticker: &str) -> String {
|
fn infer_currency_from_ticker(ticker: &str) -> String {
|
||||||
|
|||||||
@@ -53,12 +53,18 @@ pub struct FigiInfo {
|
|||||||
pub name: String,
|
pub name: String,
|
||||||
pub ticker: String,
|
pub ticker: String,
|
||||||
pub exch_code: String,
|
pub exch_code: String,
|
||||||
pub compositeFIGI: String,
|
#[serde(rename = "compositeFIGI")]
|
||||||
pub securityType: String,
|
pub composite_figi: String,
|
||||||
pub marketSector: String,
|
#[serde(rename = "securityType")]
|
||||||
pub shareClassFIGI: String,
|
pub security_type: String,
|
||||||
pub securityType2: String,
|
#[serde(rename = "marketSector")]
|
||||||
pub securityDescription: String,
|
pub market_sector: String,
|
||||||
|
#[serde(rename = "shareClassFIGI")]
|
||||||
|
pub share_class_figi: String,
|
||||||
|
#[serde(rename = "securityType2")]
|
||||||
|
pub security_type2: String,
|
||||||
|
#[serde(rename = "securityDescription")]
|
||||||
|
pub security_description: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Company Info
|
/// Company Info
|
||||||
@@ -77,11 +83,11 @@ pub struct CompanyInfo{
|
|||||||
/// # Attributes
|
/// # Attributes
|
||||||
/// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
|
/// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
|
||||||
/// * figi: metadata with ISIN as key
|
/// * figi: metadata with ISIN as key
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
/*#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct CompanyMetadata {
|
pub struct CompanyMetadata {
|
||||||
pub lei: String,
|
pub lei: String,
|
||||||
pub figi: Option<Vec<FigiInfo>>,
|
pub figi: Option<Vec<FigiInfo>>,
|
||||||
}
|
}*/
|
||||||
|
|
||||||
/// Warrant Info
|
/// Warrant Info
|
||||||
///
|
///
|
||||||
@@ -112,13 +118,13 @@ pub struct OptionInfo {
|
|||||||
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
|
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
/*#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct PrimaryInfo {
|
pub struct PrimaryInfo {
|
||||||
pub isin: String,
|
pub isin: String,
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub exchange_mic: String,
|
pub exchange_mic: String,
|
||||||
pub currency: String,
|
pub currency: String,
|
||||||
}
|
}*/
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct AvailableExchange {
|
pub struct AvailableExchange {
|
||||||
@@ -133,28 +139,4 @@ pub struct AvailableExchange {
|
|||||||
pub discovered_at: Option<String>, // When this exchange was first discovered
|
pub discovered_at: Option<String>, // When this exchange was first discovered
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub fetch_count: u32, // How many times successfully fetched
|
pub fetch_count: u32, // How many times successfully fetched
|
||||||
}
|
|
||||||
|
|
||||||
impl AvailableExchange {
|
|
||||||
pub fn new(ticker: String, exchange_mic: String, currency: String) -> Self {
|
|
||||||
Self {
|
|
||||||
exchange_mic,
|
|
||||||
ticker,
|
|
||||||
has_daily: false,
|
|
||||||
has_5min: false,
|
|
||||||
last_successful_fetch: None,
|
|
||||||
currency,
|
|
||||||
discovered_at: Some(chrono::Local::now().format("%Y-%m-%d").to_string()),
|
|
||||||
fetch_count: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn record_success(&mut self, has_daily: bool, has_5min: bool) {
|
|
||||||
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
|
|
||||||
|
|
||||||
self.has_daily |= has_daily;
|
|
||||||
self.has_5min |= has_5min;
|
|
||||||
self.last_successful_fetch = Some(today);
|
|
||||||
self.fetch_count += 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user