removed id creation on scrape

This commit is contained in:
2026-01-14 14:28:16 +01:00
parent 4ea0c78d3d
commit 93fbefc9d4
11 changed files with 107 additions and 226 deletions

View File

@@ -275,12 +275,26 @@ async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Resu
Ok(())
}
/// Loads checkpoint and replays log, returning set of existing keys
async fn load_checkpoint_and_replay(
/// Generic function to load checkpoint and replay log with custom key extraction
///
/// This function handles the common pattern of loading and merging checkpoint and log files,
/// with custom key extraction logic provided by a closure.
///
/// # Arguments
/// * `checkpoint_path` - Path to checkpoint file
/// * `log_path` - Path to log file
/// * `key_extractor` - Closure that extracts a key from a JSON entry
///
/// # Returns
/// HashSet of extracted keys
async fn load_checkpoint_and_replay_generic<F>(
checkpoint_path: &Path,
log_path: &Path,
key_field: &str,
) -> anyhow::Result<HashSet<String>> {
key_extractor: F,
) -> anyhow::Result<HashSet<String>>
where
F: Fn(&Value) -> Option<String>,
{
let mut keys = HashSet::new();
// Load checkpoint if it exists
@@ -290,12 +304,12 @@ async fn load_checkpoint_and_replay(
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
if let Some(key) = key_extractor(&entry) {
keys.insert(key);
}
}
}
@@ -308,12 +322,12 @@ async fn load_checkpoint_and_replay(
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
if let Some(key) = key_extractor(&entry) {
keys.insert(key);
}
}
}
@@ -322,64 +336,36 @@ async fn load_checkpoint_and_replay(
Ok(keys)
}
/// Loads checkpoint and replays log, returning set of existing keys (simple field extraction)
async fn load_checkpoint_and_replay(
checkpoint_path: &Path,
log_path: &Path,
key_field: &str,
) -> anyhow::Result<HashSet<String>> {
load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| {
entry[key_field].as_str().map(|s| s.to_string())
}).await
}
/// Loads checkpoint and replays log for nested structures (warrants/options)
async fn load_checkpoint_and_replay_nested(
checkpoint_path: &Path,
log_path: &Path,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
if !underlying.is_empty() && !type_field.is_empty() {
Some(format!("{}::{}", underlying, type_field))
} else {
None
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
Ok(keys)
}).await
}
/// Creates a checkpoint by copying log to checkpoint atomically
@@ -454,11 +440,10 @@ async fn process_lei_figi_file_batched(
let batch_size = 100;
let mut processed_count = 0;
// === PHASE 1: Process common stocks and build company_id mapping ===
// === PHASE 1: Process common stocks ===
logger::log_info(" Phase 1: Processing common stocks...").await;
let mut common_batch: Vec<CompanyData> = Vec::new();
let mut company_id_map: HashMap<String, String> = HashMap::new(); // company_name -> company_id
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
@@ -480,14 +465,7 @@ async fn process_lei_figi_file_batched(
// Process common stocks
if !common_stocks.is_empty() {
if let Some(mut entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
// Generate UUID for company if not already done
if !company_id_map.contains_key(&entry.name) {
let company_id = uuid::Uuid::new_v4().to_string();
company_id_map.insert(entry.name.clone(), company_id.clone());
entry.id = company_id;
}
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
// Add to existing set immediately to prevent duplicates in same run
existing_companies.insert(entry.name.clone());
common_batch.push(entry);
@@ -513,9 +491,9 @@ async fn process_lei_figi_file_batched(
stats.companies_added += common_batch.len();
}
logger::log_info(&format!(" Phase 1 complete: Generated {} company UUIDs", company_id_map.len())).await;
logger::log_info(" Phase 1 complete").await;
// === PHASE 2: Process dependent securities using company_id mapping ===
// === PHASE 2: Process dependent securities (warrants, options, corporate bonds) ===
logger::log_info(" Phase 2: Processing warrants, options, and corporate bonds...").await;
let mut warrants_batch: Vec<WarrantData> = Vec::new();
@@ -545,15 +523,15 @@ async fn process_lei_figi_file_batched(
group_securities(&figis);
if !warrant_securities.is_empty() {
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants, &company_id_map) {
let key = entry.company_id.clone();
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
let key = entry.company_name.clone();
existing_warrants.insert(key);
warrants_batch.push(entry);
}
}
if !option_securities.is_empty() {
for entry in prepare_option_entries(&option_securities, existing_options, &company_id_map) {
for entry in prepare_option_entries(&option_securities, existing_options) {
let key = entry.company_name.clone();
existing_options.insert(key);
options_batch.push(entry);
@@ -561,7 +539,7 @@ async fn process_lei_figi_file_batched(
}
if !corporate_bonds_securities.is_empty() {
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds, &company_id_map) {
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) {
let key = entry.underlying_company_name.clone();
existing_corporate_bonds.insert(key);
corporate_bonds_batch.push(entry);
@@ -671,13 +649,8 @@ fn prepare_common_stock_entry(
let grouped_by_isin = group_figis_by_isin(figi_infos);
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
let id = format!("company_{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos());
Some(CompanyData {
id,
name,
primary_isin,
securities: grouped_by_isin,
@@ -688,12 +661,10 @@ fn prepare_common_stock_entry(
/// Prepares warrant entries for batching
/// Prepares warrant entries for batching
///
/// Groups warrant contracts by underlying company, using company_id from the company_id_map
/// if the company exists, otherwise generates a new ID for the warrant.
/// Groups warrant contracts by underlying company.
fn prepare_warrant_entries(
warrant_securities: &[FigiData],
existing_keys: &HashSet<String>,
company_id_map: &HashMap<String, String>,
) -> Vec<WarrantData> {
let mut entries = Vec::new();
@@ -718,18 +689,12 @@ fn prepare_warrant_entries(
continue;
}
// Use company_id from map if company exists, otherwise generate new ID for warrant
let company_id = company_id_map.get(&underlying_company)
.cloned()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let mut warrants_by_type: HashMap<String, WarrantDetails> = HashMap::new();
for (warrant_type, figi) in contracts {
let (_, issuer, _) = parse_warrant_name(&figi.name);
let warrant_detail = WarrantDetails {
company_id: company_id.clone(),
company_name: underlying_company.clone(),
issuer_company_name: issuer,
warrant_type: warrant_type.clone(),
@@ -745,7 +710,6 @@ fn prepare_warrant_entries(
}
let warrant_info = WarrantData {
company_id,
company_name: underlying_company.clone(),
warrants: warrants_by_type,
};
@@ -769,12 +733,10 @@ fn prepare_warrant_entries(
/// Vector of OptionData entries, one per unique underlying company
/// Prepares option entries for batching
///
/// Groups option contracts by underlying company, using company_id from the company_id_map
/// if the company exists, otherwise generates a new ID for the option.
/// Groups option contracts by underlying company.
fn prepare_option_entries(
option_securities: &[FigiData],
existing_keys: &HashSet<String>,
company_id_map: &HashMap<String, String>,
) -> Vec<OptionData> {
let mut entries = Vec::new();
@@ -799,11 +761,6 @@ fn prepare_option_entries(
continue;
}
// Use company_id from map if company exists, otherwise generate new ID for option
let company_id = company_id_map.get(&underlying_company)
.cloned()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
// Build OptionContracts and extract strikes/expirations
let mut option_contracts: HashMap<i64, (Vec<OptionContract>, Vec<OptionContract>)> = HashMap::new();
let mut all_strikes: std::collections::HashSet<u64> = std::collections::HashSet::new();
@@ -857,7 +814,6 @@ fn prepare_option_entries(
.collect::<Vec<_>>();
let option_data = OptionData {
company_id,
company_name: underlying_company.clone(),
expiration_dates,
strikes,
@@ -884,12 +840,10 @@ fn prepare_option_entries(
/// Vector of CorporateBondInfo entries, one per unique issuer
/// Prepares corporate bond entries for batching
///
/// Groups corporate bonds by issuer (underlying_company_name), using company_id from the company_id_map
/// if the company exists, otherwise generates a new ID for the bond.
/// Groups corporate bonds by issuer (underlying_company_name).
fn prepare_corporate_bond_entries(
corporate_bond_securities: &[FigiData],
existing_keys: &HashSet<String>,
company_id_map: &HashMap<String, String>,
) -> Vec<CorporateBondData> {
let mut entries = Vec::new();
@@ -912,11 +866,6 @@ fn prepare_corporate_bond_entries(
continue;
}
// Use company_id from map if company exists, otherwise generate new ID for bond
let company_id = company_id_map.get(&issuer)
.cloned()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
// Group by ISIN
let bonds_by_isin = group_figis_by_isin(&figis);
@@ -931,7 +880,6 @@ fn prepare_corporate_bond_entries(
}
let bond_info = CorporateBondData {
company_id,
underlying_company_name: issuer.clone(),
bonds: bonds_by_isin,
bond_details: bond_details_map,