added ids for companies
This commit is contained in:
@@ -429,6 +429,11 @@ async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::R
|
||||
}
|
||||
|
||||
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
|
||||
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
|
||||
///
|
||||
/// Two-phase processing:
|
||||
/// Phase 1: Process all common stocks first to generate company UUIDs
|
||||
/// Phase 2: Process dependent securities (warrants, options, corporate bonds) using company UUIDs
|
||||
async fn process_lei_figi_file_batched(
|
||||
input_path: &Path,
|
||||
common_log_path: &Path,
|
||||
@@ -449,11 +454,11 @@ async fn process_lei_figi_file_batched(
|
||||
let batch_size = 100;
|
||||
let mut processed_count = 0;
|
||||
|
||||
// === PHASE 1: Process common stocks and build company_id mapping ===
|
||||
logger::log_info(" Phase 1: Processing common stocks...").await;
|
||||
|
||||
let mut common_batch: Vec<CompanyData> = Vec::new();
|
||||
let mut warrants_batch: Vec<WarrantData> = Vec::new();
|
||||
let mut options_batch: Vec<OptionData> = Vec::new();
|
||||
let mut corporate_bonds_batch: Vec<CorporateBondData> = Vec::new();
|
||||
let mut government_bonds_batch: Vec<GovernmentBondData> = Vec::new();
|
||||
let mut company_id_map: HashMap<String, String> = HashMap::new(); // company_name -> company_id
|
||||
|
||||
for (line_num, line) in content.lines().enumerate() {
|
||||
if line.trim().is_empty() {
|
||||
@@ -471,54 +476,24 @@ async fn process_lei_figi_file_batched(
|
||||
}
|
||||
|
||||
// Group by security type
|
||||
let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) =
|
||||
group_securities(&figis);
|
||||
let (common_stocks, _, _, _, _) = group_securities(&figis);
|
||||
|
||||
// Collect entries for batching and update existing keys
|
||||
// Process common stocks
|
||||
if !common_stocks.is_empty() {
|
||||
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
|
||||
if let Some(mut entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
|
||||
// Generate UUID for company if not already done
|
||||
if !company_id_map.contains_key(&entry.name) {
|
||||
let company_id = uuid::Uuid::new_v4().to_string();
|
||||
company_id_map.insert(entry.name.clone(), company_id.clone());
|
||||
entry.id = company_id;
|
||||
}
|
||||
|
||||
// Add to existing set immediately to prevent duplicates in same run
|
||||
existing_companies.insert(entry.name.clone());
|
||||
common_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !warrant_securities.is_empty() {
|
||||
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
|
||||
// Add to existing set immediately
|
||||
let key = entry.company_id.clone();
|
||||
existing_warrants.insert(key);
|
||||
warrants_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !option_securities.is_empty() {
|
||||
for entry in prepare_option_entries(&option_securities, existing_options) {
|
||||
// Add to existing set immediately
|
||||
let key = entry.company_name.clone();
|
||||
existing_options.insert(key);
|
||||
options_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !corporate_bonds_securities.is_empty() {
|
||||
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) {
|
||||
// Use underlying_company_name as the key (not issuer_company_name)
|
||||
let key = entry.underlying_company_name.clone();
|
||||
existing_corporate_bonds.insert(key);
|
||||
corporate_bonds_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !government_bonds_securities.is_empty() {
|
||||
for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) {
|
||||
// Use issuer_name as the key (not issuer_country_name)
|
||||
let key = entry.issuer_name.clone();
|
||||
existing_government_bonds.insert(key);
|
||||
government_bonds_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Write batches when they reach size limit
|
||||
if common_batch.len() >= batch_size {
|
||||
write_batch_with_fsync(common_log_path, &common_batch).await?;
|
||||
@@ -526,6 +501,82 @@ async fn process_lei_figi_file_batched(
|
||||
common_batch.clear();
|
||||
}
|
||||
|
||||
processed_count += 1;
|
||||
if processed_count % 1000 == 0 {
|
||||
logger::log_info(&format!(" Phase 1: Processed {} LEI entries...", processed_count)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Write remaining common stocks batch
|
||||
if !common_batch.is_empty() {
|
||||
write_batch_with_fsync(common_log_path, &common_batch).await?;
|
||||
stats.companies_added += common_batch.len();
|
||||
}
|
||||
|
||||
logger::log_info(&format!(" Phase 1 complete: Generated {} company UUIDs", company_id_map.len())).await;
|
||||
|
||||
// === PHASE 2: Process dependent securities using company_id mapping ===
|
||||
logger::log_info(" Phase 2: Processing warrants, options, and corporate bonds...").await;
|
||||
|
||||
let mut warrants_batch: Vec<WarrantData> = Vec::new();
|
||||
let mut options_batch: Vec<OptionData> = Vec::new();
|
||||
let mut corporate_bonds_batch: Vec<CorporateBondData> = Vec::new();
|
||||
let mut government_bonds_batch: Vec<GovernmentBondData> = Vec::new();
|
||||
|
||||
processed_count = 0;
|
||||
|
||||
for (line_num, line) in content.lines().enumerate() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let entry: Value = serde_json::from_str(line)
|
||||
.context(format!("Failed to parse JSON on line {}", line_num + 1))?;
|
||||
|
||||
let figis: Vec<FigiData> = serde_json::from_value(entry["figis"].clone())
|
||||
.context("Invalid 'figis' field")?;
|
||||
|
||||
if figis.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Group by security type
|
||||
let (_, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) =
|
||||
group_securities(&figis);
|
||||
|
||||
if !warrant_securities.is_empty() {
|
||||
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants, &company_id_map) {
|
||||
let key = entry.company_id.clone();
|
||||
existing_warrants.insert(key);
|
||||
warrants_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !option_securities.is_empty() {
|
||||
for entry in prepare_option_entries(&option_securities, existing_options, &company_id_map) {
|
||||
let key = entry.company_name.clone();
|
||||
existing_options.insert(key);
|
||||
options_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !corporate_bonds_securities.is_empty() {
|
||||
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds, &company_id_map) {
|
||||
let key = entry.underlying_company_name.clone();
|
||||
existing_corporate_bonds.insert(key);
|
||||
corporate_bonds_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if !government_bonds_securities.is_empty() {
|
||||
for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) {
|
||||
let key = entry.issuer_name.clone();
|
||||
existing_government_bonds.insert(key);
|
||||
government_bonds_batch.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Write batches when they reach size limit
|
||||
if warrants_batch.len() >= batch_size {
|
||||
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
|
||||
stats.warrants_added += warrants_batch.len();
|
||||
@@ -552,16 +603,11 @@ async fn process_lei_figi_file_batched(
|
||||
|
||||
processed_count += 1;
|
||||
if processed_count % 1000 == 0 {
|
||||
logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await;
|
||||
logger::log_info(&format!(" Phase 2: Processed {} LEI entries...", processed_count)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Write remaining batches
|
||||
if !common_batch.is_empty() {
|
||||
write_batch_with_fsync(common_log_path, &common_batch).await?;
|
||||
stats.companies_added += common_batch.len();
|
||||
}
|
||||
|
||||
if !warrants_batch.is_empty() {
|
||||
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
|
||||
stats.warrants_added += warrants_batch.len();
|
||||
@@ -582,6 +628,7 @@ async fn process_lei_figi_file_batched(
|
||||
stats.government_bonds_added += government_bonds_batch.len();
|
||||
}
|
||||
|
||||
logger::log_info(" Phase 2 complete").await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -639,9 +686,14 @@ fn prepare_common_stock_entry(
|
||||
}
|
||||
|
||||
/// Prepares warrant entries for batching
|
||||
/// Prepares warrant entries for batching
|
||||
///
|
||||
/// Groups warrant contracts by underlying company, using company_id from the company_id_map
|
||||
/// if the company exists, otherwise generates a new ID for the warrant.
|
||||
fn prepare_warrant_entries(
|
||||
warrant_securities: &[FigiData],
|
||||
existing_keys: &HashSet<String>,
|
||||
company_id_map: &HashMap<String, String>,
|
||||
) -> Vec<WarrantData> {
|
||||
let mut entries = Vec::new();
|
||||
|
||||
@@ -666,10 +718,10 @@ fn prepare_warrant_entries(
|
||||
continue;
|
||||
}
|
||||
|
||||
let company_id = format!("warrant_{}", std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos());
|
||||
// Use company_id from map if company exists, otherwise generate new ID for warrant
|
||||
let company_id = company_id_map.get(&underlying_company)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
|
||||
let mut warrants_by_type: HashMap<String, WarrantDetails> = HashMap::new();
|
||||
|
||||
@@ -715,9 +767,14 @@ fn prepare_warrant_entries(
|
||||
///
|
||||
/// # Returns
|
||||
/// Vector of OptionData entries, one per unique underlying company
|
||||
/// Prepares option entries for batching
|
||||
///
|
||||
/// Groups option contracts by underlying company, using company_id from the company_id_map
|
||||
/// if the company exists, otherwise generates a new ID for the option.
|
||||
fn prepare_option_entries(
|
||||
option_securities: &[FigiData],
|
||||
existing_keys: &HashSet<String>,
|
||||
company_id_map: &HashMap<String, String>,
|
||||
) -> Vec<OptionData> {
|
||||
let mut entries = Vec::new();
|
||||
|
||||
@@ -742,6 +799,11 @@ fn prepare_option_entries(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Use company_id from map if company exists, otherwise generate new ID for option
|
||||
let company_id = company_id_map.get(&underlying_company)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
|
||||
// Build OptionContracts and extract strikes/expirations
|
||||
let mut option_contracts: HashMap<i64, (Vec<OptionContract>, Vec<OptionContract>)> = HashMap::new();
|
||||
let mut all_strikes: std::collections::HashSet<u64> = std::collections::HashSet::new();
|
||||
@@ -795,7 +857,7 @@ fn prepare_option_entries(
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let option_data = OptionData {
|
||||
company_id: underlying_company.clone(),
|
||||
company_id,
|
||||
company_name: underlying_company.clone(),
|
||||
expiration_dates,
|
||||
strikes,
|
||||
@@ -820,9 +882,14 @@ fn prepare_option_entries(
|
||||
///
|
||||
/// # Returns
|
||||
/// Vector of CorporateBondInfo entries, one per unique issuer
|
||||
/// Prepares corporate bond entries for batching
|
||||
///
|
||||
/// Groups corporate bonds by issuer (underlying_company_name), using company_id from the company_id_map
|
||||
/// if the company exists, otherwise generates a new ID for the bond.
|
||||
fn prepare_corporate_bond_entries(
|
||||
corporate_bond_securities: &[FigiData],
|
||||
existing_keys: &HashSet<String>,
|
||||
company_id_map: &HashMap<String, String>,
|
||||
) -> Vec<CorporateBondData> {
|
||||
let mut entries = Vec::new();
|
||||
|
||||
@@ -845,6 +912,11 @@ fn prepare_corporate_bond_entries(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Use company_id from map if company exists, otherwise generate new ID for bond
|
||||
let company_id = company_id_map.get(&issuer)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
|
||||
// Group by ISIN
|
||||
let bonds_by_isin = group_figis_by_isin(&figis);
|
||||
|
||||
@@ -859,6 +931,7 @@ fn prepare_corporate_bond_entries(
|
||||
}
|
||||
|
||||
let bond_info = CorporateBondData {
|
||||
company_id,
|
||||
underlying_company_name: issuer.clone(),
|
||||
bonds: bonds_by_isin,
|
||||
bond_details: bond_details_map,
|
||||
|
||||
Reference in New Issue
Block a user