Data Source
| Property | Value |
|---|---|
| Source Type | File-based custodian data feed |
| Delivery Method | Azure Blob Storage — hot tier |
| File Format | Fixed-width / CSV per object type |
| Authentication | redwood-hot-storage-config (Azure Key Vault) |
| Blob Container | Configured per account in schwab_sync_config |
| Sync Modes | full incremental |
| Processing Package | datafeed-sync-schwab (internal Python wheel) |
| DAG | redwood_schwab_data_sync |
| DAG File | redwood_schwab_sync_dag.py |
Schwab delivers daily data files organized by master account and object type. Files land in Azure Blob hot storage. The sync DAG downloads, parses, and loads each file into the corresponding PostgreSQL table. Incremental mode uses a SHA-256 file hash stored in schwab_sync_log to skip already-imported files. Full mode replaces all rows for that account/object combination.
Objects Being Synced
Config, Logs, Monitoring & Schedule
Configuration Table — schwab_sync_config
| Column | Type | Description |
|---|---|---|
master_account | VARCHAR(50) | Schwab master account identifier |
object | VARCHAR(10) | Object code (acc, trn, rps, sec, etc.) |
sync_mode | VARCHAR(20) | full or incremental |
destination_table | VARCHAR(100) | Target PostgreSQL table name |
enabled | BOOLEAN | Whether this config row is active |
description | TEXT | Human-readable label |
archival_policy | VARCHAR(50) | Archival tier reference |
last_sync_date | TIMESTAMPTZ | Timestamp of last successful sync |
database_schema | VARCHAR(50) | PostgreSQL schema (default: public) |
created_at / updated_at | TIMESTAMPTZ | Audit timestamps |
Log Table — schwab_sync_log
| Column | Type | Description |
|---|---|---|
log_id | BIGINT PK | Auto-increment log identifier |
master_account | VARCHAR(50) | Account identifier |
filename | VARCHAR(255) | Source filename |
filedate | DATE | Date extracted from filename |
file_hash | VARCHAR(64) | SHA-256 hash — used for incremental deduplication |
number_of_records | INTEGER | Row count loaded |
destination_table | VARCHAR(100) | Target table name |
status | VARCHAR(20) | synced skipped error pending |
database_archived_date | TIMESTAMPTZ | When DB rows were archived to cold storage |
file_archived_date | TIMESTAMPTZ | When source file was archived |
sync_started_at / sync_completed_at | TIMESTAMPTZ | Run window |
error_message | TEXT | Error detail if status = error |
Monitoring & Alerting
| Property | Value |
|---|---|
| Notifier | airflow_email_notifier — send_dag_notification |
| Subject prefix | [Redwood-Schwab] |
| On success | Yes |
| On failure | Yes |
| Retries | 0 |
| Execution timeout | 4 hours |
| In-pipeline validation | File hash deduplication · import row count verification · auto-table creation |
Email recipients: prashant.surana@collation.ai · monitor@collation.ai · mschwartz@sequoia-financial.com · handerson@sequoia-financial.com · vzivich@sequoia-financial.com · hpatel@sequoia-financial.com · karishni.mehta@collation.ai
DAG Schedule
Max active runs: 1 · Catchup: disabled · Start date: 2026-02-25
Bronze Layer — Database Schema
public schema of the Redwood PostgreSQL database. Every table includes filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, and last_sync_at TIMESTAMP.schwab_acc — Account Master
CREATE TABLE public.schwab_acc ( -- Account identifiers account_id VARCHAR(255), mstracct_number VARCHAR(255), master_account_name VARCHAR(255), custdian_id VARCHAR(255), account_status VARCHAR(255), acct_type VARCHAR(255), acct_regis VARCHAR(255), date_opened DATE, date_of_birth DATE, -- Taxpayer info taxpayer_first_name VARCHAR(255), taxpayer_middle_name VARCHAR(255), taxpayer_last_name VARCHAR(255), txpyr_title VARCHAR(255), ssntin_number VARCHAR(255), organization_primary_name VARCHAR(255), -- Contact phone_nbr VARCHAR(255), business_ph VARCHAR(255), mobile_phone_number VARCHAR(255), email_address VARCHAR(255), -- Mailing address mailing_address_line_1 VARCHAR(255), mailing_address_line_2 VARCHAR(255), mailing_address_line_3 VARCHAR(255), account_mailing_city VARCHAR(255), st_cd VARCHAR(255), acct_mail_zip VARCHAR(255), acct_mailing_ctry_code VARCHAR(255), -- Account titles & managed account account_title_line_1 VARCHAR(255), account_title_line_2 VARCHAR(255), account_title_line_3 VARCHAR(255), alias_name VARCHAR(255), managed_account_investment_strategy VARCHAR(255), managed_account_money_manager VARCHAR(255), mngdacct_pltfrmcd VARCHAR(255), -- Restriction codes, version markers, flags ... rstr_rsn1 BIGINT, rstr_rsn2 BIGINT, rstr_rsn3 VARCHAR(255), -- Source tracking filename VARCHAR(255), business_date DATE, __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_trn — Transaction Detail
CREATE TABLE public.schwab_trn ( record_id UUID NOT NULL, account_id VARCHAR(255), mstracct_number VARCHAR(255), -- Trade dates trade_date DATE, settlmnt_date DATE, tran_date DATE, business_date DATE, tr_cd VARCHAR(255), transaction_category VARCHAR(255), transaction_detail_description VARCHAR(255), debit_credit VARCHAR(255), -- Amounts quantity NUMERIC, price NUMERIC, net_amount NUMERIC, gross_amount NUMERIC, commission NUMERIC, accrued_interest NUMERIC, -- Fees exchange_processing_fee NUMERIC, step_in_fee NUMERIC, prime_broker_fee NUMERIC, broker_service_fee NUMERIC, research_fee NUMERIC, exec_brok_comm NUMERIC, markupmarkdown NUMERIC, redemption_fee NUMERIC, trade_away_fee NUMERIC, other_fee NUMERIC, state_tax_withholding NUMERIC, federal_tefra_withholding NUMERIC, -- Security identifiers ticker_symbol VARCHAR(255), cusip VARCHAR(255), isin VARCHAR(255), sedol VARCHAR(255), schwab_sec_nbr VARCHAR(255), options_display_symbol VARCHAR(255), -- Broker / execution broker_code VARCHAR(255), broker_name VARCHAR(255), stock_exchg VARCHAR(255), order_id VARCHAR(255), order_number VARCHAR(255), filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_rps — Position Snapshot
CREATE TABLE public.schwab_rps ( row_id INTEGER NOT NULL, account_id VARCHAR(255), mstracct_number VARCHAR(255), business_date DATE, -- Security ticker_symbol VARCHAR(255), cusip VARCHAR(255), isin VARCHAR(255), sedol VARCHAR(255), schwab_sec_nbr VARCHAR(255), options_display_symbol VARCHAR(255), underlying_ticker_symbol VARCHAR(255), -- Quantities by settlement type quantity_settled NUMERIC, quantity_unsettledlong NUMERIC, quantity_unsettledshort NUMERIC, quantity_settledunsettled NUMERIC, quantity_cash_settled NUMERIC, quantity_margin_settled NUMERIC, quantity_short_settled NUMERIC, non_tradable_quantity NUMERIC, -- Pricing closing_price NUMERIC, closing_price_unfactored NUMERIC, market_value_settledunsettled NUMERIC, factor NUMERIC, factor_date DATE, tips_factor NUMERIC, asset_backed_factor NUMERIC, prod_code VARCHAR(255), l_s VARCHAR(255), -- Long/Short indicator cash_balance_type VARCHAR(255), filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_rps2 — Position Summary
CREATE TABLE public.schwab_rps2 ( account_id VARCHAR(255), mstracct_number VARCHAR(255), acct_type VARCHAR(255), acct_regis VARCHAR(255), business_date DATE, -- Market values net_mv_positions NUMERIC, net_mv_plus_cash NUMERIC, mkt_value_long NUMERIC, mkt_value_short NUMERIC, eqty_excl_option NUMERIC, eqty_incl_option NUMERIC, eqty_percentage NUMERIC, -- Cash & balances cash_balance_settled_only NUMERIC, cash_margin_bal_settled NUMERIC, bank_cash NUMERIC, bank_sweep_ibf NUMERIC, money_mkt_funds NUMERIC, -- Margin margin_balance NUMERIC, mrgn_buying_pwr NUMERIC, maintenance_call NUMERIC, net_credit_debit NUMERIC, available_to_pay NUMERIC, daily_margin_int NUMERIC, mtd_margin_int NUMERIC, mnth_end_div_pay NUMERIC, filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_sec — Security Master
CREATE TABLE public.schwab_sec ( mstracct_number VARCHAR(255), business_date DATE, -- Identifiers ticker_symbol VARCHAR(255), cusip VARCHAR(255), isin VARCHAR(255), sedol VARCHAR(255), schwab_sec_nbr VARCHAR(255), item_issue_id VARCHAR(255), -- Description security_description_line_1 VARCHAR(255), security_description_line_2 VARCHAR(255), security_description_line_3 VARCHAR(255), -- Pricing & valuation closing_price NUMERIC, closing_price_unfactored NUMERIC, par_value NUMERIC, face_value_amt NUMERIC, factor NUMERIC, factor_date DATE, security_valuation_unit NUMERIC, -- Bond fields maturity_date DATE, call_date DATE, call_price NUMERIC, issue_date DATE, interest_rate NUMERIC, next_pay_date DATE, next_put_date DATE, next_put_price NUMERIC, -- Options opt_expr_date DATE, options_multiplier NUMERIC, strike_price NUMERIC, -- Ratings sp_rating VARCHAR(255), moodys_rating VARCHAR(255), dividendinterest_frequency VARCHAR(255), filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_rld / schwab_rlu / schwab_rly — Realized Gains/Losses
Three tables share the same structure: rld = closed lots · rlu = unrealized open lots · rly = year-to-date.
CREATE TABLE public.schwab_rld ( -- also rlu, rly account_id VARCHAR(255), mstracct_number VARCHAR(255), business_date DATE, trans_code VARCHAR(255), ticker_symbol VARCHAR(255), cusip VARCHAR(255), isin VARCHAR(255), -- Dates closing_date DATE, acquired_date DATE, orgpurch_date DATE, settle_date DATE, closing_quantity NUMERIC, -- Cost basis cost_per_share NUMERIC, cost_basis_unamortized NUMERIC, adjusted_cost_per_share NUMERIC, adjusted_cost_basis_amortized NUMERIC, original_purchase_price NUMERIC, proceeds NUMERIC, proceeds_per_share NUMERIC, -- Gains/losses gainloss_dollars NUMERIC, gainloss_percentage NUMERIC, adjusted_gainloss_dollars NUMERIC, adjusted_gainloss_percentage NUMERIC, transaction_gainloss_dollars NUMERIC, disallowed_loss_amount BOOLEAN, adj_disallowed_loss_amount NUMERIC, days_held VARCHAR(255), tax_code VARCHAR(255), federal_tax_withholding VARCHAR(255), state_tax_withholding NUMERIC, filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_uln / schwab_ult — Unrealized Long Positions / Transactions
CREATE TABLE public.schwab_uln ( account_id VARCHAR(255), mstracct_number VARCHAR(255), business_date DATE, ticker_symbol VARCHAR(255), cusip VARCHAR(255), isin VARCHAR(255), prod_code VARCHAR(255), current_quantity NUMERIC, l_s VARCHAR(255), current_market_value NUMERIC, accrued_interest NUMERIC, acquired_date DATE, orgpurch_date DATE, org_purchase_price NUMERIC, cost_per_share NUMERIC, cost_basis_unamortized NUMERIC, adjusted_cost_per_share NUMERIC, adjusted_cost_basis_amortized NUMERIC, adjusted_cost_incldg_unpd_amort NUMERIC, original_cost_basis NUMERIC, transaction_cost NUMERIC, transaction_cost_per_share NUMERIC, unrealized_gainloss NUMERIC, disallowed_loss_amount NUMERIC, days_held VARCHAR(255), yield_tomaturity NUMERIC, filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
schwab_upn / schwab_upt — Unrealized Positions / Transactions
Similar to uln/ult — position-level summary without lot-level detail. Includes current_quantity, current_market_value, cost_per_share, adjusted_cost_basis_amortized, unrealized_gainloss, principal_paydown_factor.
schwab_rtu / schwab_rty — Realized Transactions (Unrealized / YTD)
Same structure as schwab_rld minus employee-option fields. rtu = open trade lots, rty = year-to-date summary.
schwab_tcf — Trade Confirm (FIX Messages)
CREATE TABLE public.schwab_tcf ( account VARCHAR(255), master VARCHAR(255), account_type VARCHAR(255), bus_date DATE, trd_date DATE, settl_dt DATE, action VARCHAR(255), txdesc VARCHAR(255), symbol VARCHAR(255), cusip VARCHAR(255), quantity NUMERIC, price NUMERIC, principal NUMERIC, total_amount NUMERIC, accrued_int NUMERIC, -- Fees schwab_comm NUMERIC, exec_brok_comm NUMERIC, broker_svc_fee NUMERIC, prime_brok_fee NUMERIC, step_in_fee NUMERIC, exch_proc_fee NUMERIC, transact_fee NUMERIC, research_fee NUMERIC, markupmarkdown NUMERIC, state_tax NUMERIC, other_fees NUMERIC, cancel VARCHAR(255), -- FIX protocol fields: message_1 through message_150 (VARCHAR(255) each) -- account_nameaddress_1..6, account_title_1..3 filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, last_sync_at TIMESTAMP );
Archiving Policy
| Setting | Value |
|---|---|
| DAG | redwood_schwab_file_archiver |
| DAG File | redwood_schwab_archiver.py |
| Schedule | 0 14 * * * — Daily 2:00 PM UTC |
| Retention threshold | 7 days (Airflow Variable: schwab_archive_older_than_days) |
| Grouping | By date extracted from filename |
| ZIP prefix | schwab (Airflow Variable: schwab_archive_zip_prefix) |
| Cold storage path | {cold_container}/{account}/{YYYYMMDD}/{zip_file} |
| Temp workspace | /tmp/schwab_sync (Airflow Variable: schwab_temp_dir) |
| Hot storage secret | redwood-hot-storage-config |
| Cold storage secret | redwood-cold-storage-config |
Archive Process
- fetch_credentials — Loads hot and cold storage configs from Azure Key Vault
- create_temp_folder — Creates per-run temp directory
- archive_{account} (dynamic, one per master account) — Identifies files older than threshold → groups by date → creates per-date ZIP → uploads to cold storage → verifies remote copy → deletes originals from hot storage
- cleanup_temp_storage — Removes temp folder (trigger_rule: all_done)