Bronze Layer · Data Source

Schwab

File-based custodian data feed delivered via Azure Blob Storage. 15 object types covering accounts, positions, transactions, realized gains/losses, and security master data.

15Tables
12:00 PMUTC Daily
PostgreSQLTarget DB
DailyArchive
I

Data Source

PropertyValue
Source TypeFile-based custodian data feed
Delivery MethodAzure Blob Storage — hot tier
File FormatFixed-width / CSV per object type
Authenticationredwood-hot-storage-config (Azure Key Vault)
Blob ContainerConfigured per account in schwab_sync_config
Sync Modesfull incremental
Processing Packagedatafeed-sync-schwab (internal Python wheel)
DAGredwood_schwab_data_sync
DAG Fileredwood_schwab_sync_dag.py

Schwab delivers daily data files organized by master account and object type. Files land in Azure Blob hot storage. The sync DAG downloads, parses, and loads each file into the corresponding PostgreSQL table. Incremental mode uses a SHA-256 file hash stored in schwab_sync_log to skip already-imported files. Full mode replaces all rows for that account/object combination.

II

Objects Being Synced

schwab_acc
Account master — registration, taxpayer, address, managed account
schwab_trn
Transaction detail — trades, dividends, interest, cash movements
schwab_rps
Position snapshot — quantities by settlement type, market value
schwab_rps2
Position summary — margin, equity, buying power, cash balances
schwab_sec
Security master — ratings, maturity, coupon, factor, valuation
schwab_rld
Realized gains/losses — closed lot detail with cost basis
schwab_rlu
Realized gains/losses — unrealized portion (open lots)
schwab_rly
Realized gains/losses — year-to-date summary
schwab_rtu
Realized transactions — unrealized (open trade lots)
schwab_rty
Realized transactions — year-to-date
schwab_uln
Unrealized long positions — cost basis, gains/losses by lot
schwab_ult
Unrealized long transactions
schwab_upn
Unrealized positions summary
schwab_upt
Unrealized position transactions
schwab_tcf
Trade confirm — FIX protocol fields, commissions, fees
III

Config, Logs, Monitoring & Schedule

Configuration Table — schwab_sync_config

ColumnTypeDescription
master_accountVARCHAR(50)Schwab master account identifier
objectVARCHAR(10)Object code (acc, trn, rps, sec, etc.)
sync_modeVARCHAR(20)full or incremental
destination_tableVARCHAR(100)Target PostgreSQL table name
enabledBOOLEANWhether this config row is active
descriptionTEXTHuman-readable label
archival_policyVARCHAR(50)Archival tier reference
last_sync_dateTIMESTAMPTZTimestamp of last successful sync
database_schemaVARCHAR(50)PostgreSQL schema (default: public)
created_at / updated_atTIMESTAMPTZAudit timestamps

Log Table — schwab_sync_log

ColumnTypeDescription
log_idBIGINT PKAuto-increment log identifier
master_accountVARCHAR(50)Account identifier
filenameVARCHAR(255)Source filename
filedateDATEDate extracted from filename
file_hashVARCHAR(64)SHA-256 hash — used for incremental deduplication
number_of_recordsINTEGERRow count loaded
destination_tableVARCHAR(100)Target table name
statusVARCHAR(20)synced skipped error pending
database_archived_dateTIMESTAMPTZWhen DB rows were archived to cold storage
file_archived_dateTIMESTAMPTZWhen source file was archived
sync_started_at / sync_completed_atTIMESTAMPTZRun window
error_messageTEXTError detail if status = error

Monitoring & Alerting

PropertyValue
Notifierairflow_email_notifiersend_dag_notification
Subject prefix[Redwood-Schwab]
On successYes
On failureYes
Retries0
Execution timeout4 hours
In-pipeline validationFile hash deduplication · import row count verification · auto-table creation

Email recipients: prashant.surana@collation.ai · monitor@collation.ai · mschwartz@sequoia-financial.com · handerson@sequoia-financial.com · vzivich@sequoia-financial.com · hpatel@sequoia-financial.com · karishni.mehta@collation.ai

DAG Schedule

12:00 PM UTC
0 12 * * * · Daily
2:00 PM UTC
0 14 * * * · Daily (2h after sync)

Max active runs: 1 · Catchup: disabled · Start date: 2026-02-25

IV

Bronze Layer — Database Schema

Schema: All tables reside in the public schema of the Redwood PostgreSQL database. Every table includes filename VARCHAR(255), __master_account VARCHAR(255), imported_at TIMESTAMP, and last_sync_at TIMESTAMP.

schwab_acc — Account Master

CREATE TABLE public.schwab_acc (
    -- Account identifiers
    account_id                          VARCHAR(255),
    mstracct_number                     VARCHAR(255),
    master_account_name                 VARCHAR(255),
    custdian_id                         VARCHAR(255),
    account_status                      VARCHAR(255),
    acct_type                           VARCHAR(255),
    acct_regis                          VARCHAR(255),
    date_opened                         DATE,
    date_of_birth                       DATE,
    -- Taxpayer info
    taxpayer_first_name                 VARCHAR(255),
    taxpayer_middle_name                VARCHAR(255),
    taxpayer_last_name                  VARCHAR(255),
    txpyr_title                         VARCHAR(255),
    ssntin_number                       VARCHAR(255),
    organization_primary_name           VARCHAR(255),
    -- Contact
    phone_nbr                           VARCHAR(255),
    business_ph                         VARCHAR(255),
    mobile_phone_number                 VARCHAR(255),
    email_address                       VARCHAR(255),
    -- Mailing address
    mailing_address_line_1              VARCHAR(255),
    mailing_address_line_2              VARCHAR(255),
    mailing_address_line_3              VARCHAR(255),
    account_mailing_city                VARCHAR(255),
    st_cd                               VARCHAR(255),
    acct_mail_zip                       VARCHAR(255),
    acct_mailing_ctry_code              VARCHAR(255),
    -- Account titles & managed account
    account_title_line_1                VARCHAR(255),
    account_title_line_2                VARCHAR(255),
    account_title_line_3                VARCHAR(255),
    alias_name                          VARCHAR(255),
    managed_account_investment_strategy VARCHAR(255),
    managed_account_money_manager       VARCHAR(255),
    mngdacct_pltfrmcd                   VARCHAR(255),
    -- Restriction codes, version markers, flags ...
    rstr_rsn1 BIGINT, rstr_rsn2 BIGINT, rstr_rsn3 VARCHAR(255),
    -- Source tracking
    filename         VARCHAR(255),
    business_date    DATE,
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_trn — Transaction Detail

CREATE TABLE public.schwab_trn (
    record_id                        UUID NOT NULL,
    account_id                       VARCHAR(255),
    mstracct_number                  VARCHAR(255),
    -- Trade dates
    trade_date                       DATE,
    settlmnt_date                    DATE,
    tran_date                        DATE,
    business_date                    DATE,
    tr_cd                            VARCHAR(255),
    transaction_category             VARCHAR(255),
    transaction_detail_description   VARCHAR(255),
    debit_credit                     VARCHAR(255),
    -- Amounts
    quantity                         NUMERIC,
    price                            NUMERIC,
    net_amount                       NUMERIC,
    gross_amount                     NUMERIC,
    commission                       NUMERIC,
    accrued_interest                 NUMERIC,
    -- Fees
    exchange_processing_fee          NUMERIC,
    step_in_fee                      NUMERIC,
    prime_broker_fee                 NUMERIC,
    broker_service_fee               NUMERIC,
    research_fee                     NUMERIC,
    exec_brok_comm                   NUMERIC,
    markupmarkdown                   NUMERIC,
    redemption_fee                   NUMERIC,
    trade_away_fee                   NUMERIC,
    other_fee                        NUMERIC,
    state_tax_withholding            NUMERIC,
    federal_tefra_withholding        NUMERIC,
    -- Security identifiers
    ticker_symbol                    VARCHAR(255),
    cusip                            VARCHAR(255),
    isin                             VARCHAR(255),
    sedol                            VARCHAR(255),
    schwab_sec_nbr                   VARCHAR(255),
    options_display_symbol           VARCHAR(255),
    -- Broker / execution
    broker_code                      VARCHAR(255),
    broker_name                      VARCHAR(255),
    stock_exchg                      VARCHAR(255),
    order_id                         VARCHAR(255),
    order_number                     VARCHAR(255),
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_rps — Position Snapshot

CREATE TABLE public.schwab_rps (
    row_id                        INTEGER NOT NULL,
    account_id                    VARCHAR(255),
    mstracct_number               VARCHAR(255),
    business_date                 DATE,
    -- Security
    ticker_symbol                 VARCHAR(255),
    cusip                         VARCHAR(255),
    isin                          VARCHAR(255),
    sedol                         VARCHAR(255),
    schwab_sec_nbr                VARCHAR(255),
    options_display_symbol        VARCHAR(255),
    underlying_ticker_symbol      VARCHAR(255),
    -- Quantities by settlement type
    quantity_settled              NUMERIC,
    quantity_unsettledlong        NUMERIC,
    quantity_unsettledshort       NUMERIC,
    quantity_settledunsettled     NUMERIC,
    quantity_cash_settled         NUMERIC,
    quantity_margin_settled       NUMERIC,
    quantity_short_settled        NUMERIC,
    non_tradable_quantity         NUMERIC,
    -- Pricing
    closing_price                 NUMERIC,
    closing_price_unfactored      NUMERIC,
    market_value_settledunsettled NUMERIC,
    factor                        NUMERIC,
    factor_date                   DATE,
    tips_factor                   NUMERIC,
    asset_backed_factor           NUMERIC,
    prod_code                     VARCHAR(255),
    l_s                           VARCHAR(255),  -- Long/Short indicator
    cash_balance_type             VARCHAR(255),
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_rps2 — Position Summary

CREATE TABLE public.schwab_rps2 (
    account_id                VARCHAR(255),
    mstracct_number           VARCHAR(255),
    acct_type                 VARCHAR(255),
    acct_regis                VARCHAR(255),
    business_date             DATE,
    -- Market values
    net_mv_positions          NUMERIC,
    net_mv_plus_cash          NUMERIC,
    mkt_value_long            NUMERIC,
    mkt_value_short           NUMERIC,
    eqty_excl_option          NUMERIC,
    eqty_incl_option          NUMERIC,
    eqty_percentage           NUMERIC,
    -- Cash & balances
    cash_balance_settled_only NUMERIC,
    cash_margin_bal_settled   NUMERIC,
    bank_cash                 NUMERIC,
    bank_sweep_ibf            NUMERIC,
    money_mkt_funds           NUMERIC,
    -- Margin
    margin_balance            NUMERIC,
    mrgn_buying_pwr           NUMERIC,
    maintenance_call          NUMERIC,
    net_credit_debit          NUMERIC,
    available_to_pay          NUMERIC,
    daily_margin_int          NUMERIC,
    mtd_margin_int            NUMERIC,
    mnth_end_div_pay          NUMERIC,
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_sec — Security Master

CREATE TABLE public.schwab_sec (
    mstracct_number           VARCHAR(255),
    business_date             DATE,
    -- Identifiers
    ticker_symbol             VARCHAR(255),
    cusip                     VARCHAR(255),
    isin                      VARCHAR(255),
    sedol                     VARCHAR(255),
    schwab_sec_nbr            VARCHAR(255),
    item_issue_id             VARCHAR(255),
    -- Description
    security_description_line_1 VARCHAR(255),
    security_description_line_2 VARCHAR(255),
    security_description_line_3 VARCHAR(255),
    -- Pricing & valuation
    closing_price             NUMERIC,
    closing_price_unfactored  NUMERIC,
    par_value                 NUMERIC,
    face_value_amt            NUMERIC,
    factor                    NUMERIC,
    factor_date               DATE,
    security_valuation_unit   NUMERIC,
    -- Bond fields
    maturity_date             DATE,
    call_date                 DATE,
    call_price                NUMERIC,
    issue_date                DATE,
    interest_rate             NUMERIC,
    next_pay_date             DATE,
    next_put_date             DATE,
    next_put_price            NUMERIC,
    -- Options
    opt_expr_date             DATE,
    options_multiplier        NUMERIC,
    strike_price              NUMERIC,
    -- Ratings
    sp_rating                 VARCHAR(255),
    moodys_rating             VARCHAR(255),
    dividendinterest_frequency VARCHAR(255),
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_rld / schwab_rlu / schwab_rly — Realized Gains/Losses

Three tables share the same structure: rld = closed lots · rlu = unrealized open lots · rly = year-to-date.

CREATE TABLE public.schwab_rld (  -- also rlu, rly
    account_id                    VARCHAR(255),
    mstracct_number               VARCHAR(255),
    business_date                 DATE,
    trans_code                    VARCHAR(255),
    ticker_symbol                 VARCHAR(255),
    cusip                         VARCHAR(255),
    isin                          VARCHAR(255),
    -- Dates
    closing_date                  DATE,
    acquired_date                 DATE,
    orgpurch_date                 DATE,
    settle_date                   DATE,
    closing_quantity              NUMERIC,
    -- Cost basis
    cost_per_share                NUMERIC,
    cost_basis_unamortized        NUMERIC,
    adjusted_cost_per_share       NUMERIC,
    adjusted_cost_basis_amortized NUMERIC,
    original_purchase_price       NUMERIC,
    proceeds                      NUMERIC,
    proceeds_per_share            NUMERIC,
    -- Gains/losses
    gainloss_dollars              NUMERIC,
    gainloss_percentage           NUMERIC,
    adjusted_gainloss_dollars     NUMERIC,
    adjusted_gainloss_percentage  NUMERIC,
    transaction_gainloss_dollars  NUMERIC,
    disallowed_loss_amount        BOOLEAN,
    adj_disallowed_loss_amount    NUMERIC,
    days_held                     VARCHAR(255),
    tax_code                      VARCHAR(255),
    federal_tax_withholding       VARCHAR(255),
    state_tax_withholding         NUMERIC,
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_uln / schwab_ult — Unrealized Long Positions / Transactions

CREATE TABLE public.schwab_uln (
    account_id                      VARCHAR(255),
    mstracct_number                 VARCHAR(255),
    business_date                   DATE,
    ticker_symbol                   VARCHAR(255),
    cusip                           VARCHAR(255),
    isin                            VARCHAR(255),
    prod_code                       VARCHAR(255),
    current_quantity                NUMERIC,
    l_s                             VARCHAR(255),
    current_market_value            NUMERIC,
    accrued_interest                NUMERIC,
    acquired_date                   DATE,
    orgpurch_date                   DATE,
    org_purchase_price              NUMERIC,
    cost_per_share                  NUMERIC,
    cost_basis_unamortized          NUMERIC,
    adjusted_cost_per_share         NUMERIC,
    adjusted_cost_basis_amortized   NUMERIC,
    adjusted_cost_incldg_unpd_amort NUMERIC,
    original_cost_basis             NUMERIC,
    transaction_cost                NUMERIC,
    transaction_cost_per_share      NUMERIC,
    unrealized_gainloss             NUMERIC,
    disallowed_loss_amount          NUMERIC,
    days_held                       VARCHAR(255),
    yield_tomaturity                NUMERIC,
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);

schwab_upn / schwab_upt — Unrealized Positions / Transactions

Similar to uln/ult — position-level summary without lot-level detail. Includes current_quantity, current_market_value, cost_per_share, adjusted_cost_basis_amortized, unrealized_gainloss, principal_paydown_factor.

schwab_rtu / schwab_rty — Realized Transactions (Unrealized / YTD)

Same structure as schwab_rld minus employee-option fields. rtu = open trade lots, rty = year-to-date summary.

schwab_tcf — Trade Confirm (FIX Messages)

CREATE TABLE public.schwab_tcf (
    account         VARCHAR(255),
    master          VARCHAR(255),
    account_type    VARCHAR(255),
    bus_date        DATE,
    trd_date        DATE,
    settl_dt        DATE,
    action          VARCHAR(255),
    txdesc          VARCHAR(255),
    symbol          VARCHAR(255),
    cusip           VARCHAR(255),
    quantity        NUMERIC,
    price           NUMERIC,
    principal       NUMERIC,
    total_amount    NUMERIC,
    accrued_int     NUMERIC,
    -- Fees
    schwab_comm     NUMERIC,
    exec_brok_comm  NUMERIC,
    broker_svc_fee  NUMERIC,
    prime_brok_fee  NUMERIC,
    step_in_fee     NUMERIC,
    exch_proc_fee   NUMERIC,
    transact_fee    NUMERIC,
    research_fee    NUMERIC,
    markupmarkdown  NUMERIC,
    state_tax       NUMERIC,
    other_fees      NUMERIC,
    cancel          VARCHAR(255),
    -- FIX protocol fields: message_1 through message_150 (VARCHAR(255) each)
    -- account_nameaddress_1..6, account_title_1..3
    filename         VARCHAR(255),
    __master_account VARCHAR(255),
    imported_at      TIMESTAMP,
    last_sync_at     TIMESTAMP
);
V

Archiving Policy

SettingValue
DAGredwood_schwab_file_archiver
DAG Fileredwood_schwab_archiver.py
Schedule0 14 * * * — Daily 2:00 PM UTC
Retention threshold7 days (Airflow Variable: schwab_archive_older_than_days)
GroupingBy date extracted from filename
ZIP prefixschwab (Airflow Variable: schwab_archive_zip_prefix)
Cold storage path{cold_container}/{account}/{YYYYMMDD}/{zip_file}
Temp workspace/tmp/schwab_sync (Airflow Variable: schwab_temp_dir)
Hot storage secretredwood-hot-storage-config
Cold storage secretredwood-cold-storage-config

Archive Process

  1. fetch_credentials — Loads hot and cold storage configs from Azure Key Vault
  2. create_temp_folder — Creates per-run temp directory
  3. archive_{account} (dynamic, one per master account) — Identifies files older than threshold → groups by date → creates per-date ZIP → uploads to cold storage → verifies remote copy → deletes originals from hot storage
  4. cleanup_temp_storage — Removes temp folder (trigger_rule: all_done)