📡 FCC Database Lookup Script – Help Us Map the Airwaves
Are you tired of blindly guessing who’s broadcasting on a frequency? Our new open-source script is built to upload and process FCC licensing data into a PostgreSQL database — allowing you to correlate start and stop frequencies from a CSV file with the actual license holders.
This tool is ideal for researchers, TSCM professionals, RF engineers, and anyone analyzing the radio spectrum for surveillance, signal attribution, or regulatory tracking.
🧰 What It Does
Our FCC script:
- Downloads and unzips the latest public FCC license data (e.g., microwave, LM, market, tower, owner).
- Prepares the files for upload into PostgreSQL.
- Enables searching for matched owners based on a CSV list of start and stop frequencies with lat/lon data.
- Will support waterfall or FFT-based frequency snapshot uploads (with location correlation) soon.
🔧 Current Status
✅ Download + Decompress – Fully working
✅ PostgreSQL Upload – Partially working
❌ Data Mapping & Table Relationships – Needs your help!
The core upload logic connects to a local PostgreSQL instance and begins parsing the data, but the relational mapping between different FCC files (e.g., linking frequency blocks to entities) is still under construction. This is the critical part that enables proper matching between signals and licensed entities.
🚀 Get Involved
If you’re a developer with experience in:
- Python data pipelines
- FCC licensing formats
- PostgreSQL optimization
- Spectrum analysis tools
…then we encourage you to fork the repo, improve the mapping logic, and submit your updates!
📩 You can also email your contributions or questions to [Your Email Address], or post comments directly on this blog.
📂 Future Ideas
- Integration with [BB60C / SDR] to cross-reference live signals with licensed operators
- Support for signal classification / band identification
- TSCM-oriented reports showing nearby transmitters and active licenses
- Geo-fence tools to detect illegal or spoofed emitters
🧠 Why This Matters
We believe spectrum intelligence should be transparent and accessible. The FCC has the data — we just need better tools to use it. Whether you’re tracking unauthorized transmissions or trying to prove who’s behind a mysterious broadcast, this tool will help get you there.
🛠️ Let’s build it together.
import_fcc_to_db.py
#############################################
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
from sqlalchemy import create_engine
import zipfile
import requests
from datetime import datetime
import getpass
import sys
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Print environment info
logging.info(f"Python version: {sys.version}")
logging.info(f"Pandas version: {pd.__version__}")
# Configuration
FCC_BASE_URL = "https://data.fcc.gov/download/pub/uls/complete/"
LOCAL_DIR = "fcc_data"
def get_db_config():
"""Get database configuration with password prompt."""
logging.info("Database Configuration")
password = getpass.getpass("Enter PostgreSQL password: ")
return {
'dbname': 'fcc_data',
'user': 'postgres',
'password': password,
'host': 'localhost',
'port': '5432'
}
def test_db_connection(config):
"""Test database connection and create database if it doesn't exist."""
try:
conn = psycopg2.connect(
dbname='postgres',
user=config['user'],
password=config['password'],
host=config['host'],
port=config['port']
)
conn.autocommit = True
cur = conn.cursor()
cur.execute("SELECT 1 FROM pg_database WHERE datname = 'fcc_data'")
if not cur.fetchone():
logging.info("Creating fcc_data database...")
cur.execute("CREATE DATABASE fcc_data")
logging.info("Database created successfully!")
cur.close()
conn.close()
conn = psycopg2.connect(**config)
conn.close()
logging.info("Successfully connected to fcc_data database!")
return True
except psycopg2.OperationalError as e:
logging.error(f"Error connecting to database: {e}")
logging.error("Please make sure:\n1. PostgreSQL is running\n2. The password is correct\n3. The postgres user has permission to create databases")
return False
FCC_FILES = [
"a_LMcomm.zip",
"a_LMpriv.zip",
"a_micro.zip",
"a_cell.zip",
"a_tower.zip",
"a_market.zip",
"owner.zip"
]
def create_directories():
"""Create directories for data if they don't exist."""
os.makedirs(LOCAL_DIR, exist_ok=True)
logging.info(f"Created directory {LOCAL_DIR} if it didn't exist")
def download_file(url, local_path):
"""Download a file from a URL to a local path."""
logging.info(f"Downloading {url} to {local_path}...")
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"Downloaded {local_path}")
except requests.RequestException as e:
logging.error(f"Error downloading {url}: {e}")
raise
def unzip_file(zip_path, extract_dir):
"""Unzip a file to a specified directory."""
logging.info(f"Extracting {zip_path} to {extract_dir}...")
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
logging.info(f"Extracted {zip_path}")
except zipfile.BadZipFile as e:
logging.error(f"Error extracting {zip_path}: {e}")
raise
def ensure_fcc_files():
"""Check for FCC files and download/extract if missing."""
for fcc_file in FCC_FILES:
local_zip = os.path.join(LOCAL_DIR, fcc_file)
if not os.path.exists(local_zip):
url = FCC_BASE_URL + fcc_file
download_file(url, local_zip)
extract_dir = os.path.join(LOCAL_DIR, fcc_file.replace(".zip", ""))
if not os.path.exists(extract_dir) or not os.listdir(extract_dir):
os.makedirs(extract_dir, exist_ok=True)
unzip_file(local_zip, extract_dir)
def dms_to_decimal(degrees, minutes, seconds, direction):
"""Convert DMS (degrees, minutes, seconds) to decimal degrees."""
try:
decimal = float(degrees) + float(minutes)/60 + float(seconds)/3600
if direction in ['S', 'W']:
decimal = -decimal
return decimal
except (ValueError, TypeError):
return None
def preprocess_file(file_path, delimiter, expected_columns):
"""Preprocess a .dat file to ensure consistent column counts, processing all rows."""
logging.info(f"Preprocessing {file_path}")
temp_file = file_path + '.tmp'
skipped_lines = []
line_number = 0
processed_lines = 0
# Log first few rows to inspect field counts
with open(file_path, 'r', encoding='latin1', errors='replace') as f:
logging.info(f"Inspecting field counts for {file_path} (first 5 rows):")
for i in range(5):
line = f.readline().strip()
if not line:
break
fields = line.split(delimiter)
logging.info(f"Row {i+1}: {len(fields)} fields - {line[:100]}...")
# Process all lines
with open(file_path, 'r', encoding='latin1', errors='replace') as f_in:
with open(temp_file, 'w', encoding='latin1') as f_out:
for line in f_in:
line_number += 1
fields = line.strip().split(delimiter)
# Pad rows with fewer fields for CO.dat
if file_path.endswith('CO.dat') and len(fields) < expected_columns:
fields.extend([''] * (expected_columns - len(fields)))
if len(fields) != expected_columns:
skipped_lines.append((line_number, line.strip(), len(fields)))
continue
f_out.write('|'.join(fields) + '\n')
processed_lines += 1
if skipped_lines:
logging.warning(f"Skipped {len(skipped_lines)} lines in {file_path} due to incorrect column count:")
for ln, line, field_count in skipped_lines[:5]:
logging.warning(f"Line {ln}: {line[:100]}... (expected {expected_columns} fields, saw {field_count})")
if len(skipped_lines) > 5:
logging.warning(f"...and {len(skipped_lines) - 5} more lines skipped.")
logging.info(f"Processed {processed_lines} rows from {file_path}")
return temp_file
def create_database_schema(db_config):
"""Create the database schema for FCC data with comprehensive fields."""
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS fc_entities (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
entity_name VARCHAR(500),
licensee_name VARCHAR(500),
address VARCHAR(1000),
po_box VARCHAR(50),
city VARCHAR(100),
state VARCHAR(2),
zip_code VARCHAR(20),
frn VARCHAR(20) UNIQUE,
phone VARCHAR(20),
fax VARCHAR(20),
email VARCHAR(100),
suffix VARCHAR(10),
applicant_type_code VARCHAR(10),
applicant_type_other VARCHAR(50),
status_code VARCHAR(10),
status_date DATE,
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_frequencies (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
frequency FLOAT,
bandwidth VARCHAR(20),
emission_designator VARCHAR(20),
manufacturer VARCHAR(100),
model VARCHAR(100),
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_locations (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
latitude FLOAT,
longitude FLOAT,
elevation FLOAT,
city VARCHAR(100),
state VARCHAR(2),
county VARCHAR(100),
location_type VARCHAR(50),
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_markets (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
market_code VARCHAR(10),
state VARCHAR(2),
market_name VARCHAR(100),
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_licenses (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
license_status VARCHAR(10),
radio_service_code VARCHAR(10),
grant_date DATE,
expired_date DATE,
cancellation_date DATE,
renewal_date DATE,
operator_class VARCHAR(10),
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_amateur (
id SERIAL PRIMARY KEY,
uls_file_number VARCHAR(20),
call_sign VARCHAR(10),
operator_class VARCHAR(10),
group_code VARCHAR(10),
region_code VARCHAR(10),
trustee_call_sign VARCHAR(10),
trustee_indicator VARCHAR(10),
physician_certification VARCHAR(10),
ve_signature VARCHAR(10),
file_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS fc_unlicensed_bands (
id SERIAL PRIMARY KEY,
start_freq_mhz FLOAT,
stop_freq_mhz FLOAT,
description TEXT,
typical_devices TEXT
);
-- Populate unlicensed bands (common Part 15 frequencies)
INSERT INTO fc_unlicensed_bands (start_freq_mhz, stop_freq_mhz, description, typical_devices)
VALUES
(902.0, 928.0, 'ISM Band (Industrial, Scientific, Medical)', 'Cordless phones, RFID, IoT devices'),
(2400.0, 2483.5, 'Wi-Fi/Bluetooth Band', 'Wi-Fi routers, Bluetooth devices'),
(5725.0, 5850.0, 'Wi-Fi Band (5 GHz)', 'Wi-Fi routers, wireless cameras'),
(13.553, 13.567, 'RFID/NFC Band', 'RFID tags, NFC devices')
ON CONFLICT (id) DO NOTHING;
CREATE INDEX IF NOT EXISTS idx_fc_entities_uls ON fc_entities(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_entities_call_sign ON fc_entities(call_sign);
CREATE INDEX IF NOT EXISTS idx_fc_frequencies_uls ON fc_frequencies(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_frequencies_call_sign ON fc_frequencies(call_sign);
CREATE INDEX IF NOT EXISTS idx_fc_locations_uls ON fc_locations(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_locations_call_sign ON fc_locations(call_sign);
CREATE INDEX IF NOT EXISTS idx_fc_markets_uls ON fc_markets(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_markets_call_sign ON fc_markets(call_sign);
CREATE INDEX IF NOT EXISTS idx_fc_licenses_uls ON fc_licenses(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_amateur_uls ON fc_amateur(uls_file_number);
CREATE INDEX IF NOT EXISTS idx_fc_frequencies_freq ON fc_frequencies(frequency);
CREATE INDEX IF NOT EXISTS idx_fc_locations_coords ON fc_locations(latitude, longitude);
""")
conn.commit()
cur.close()
conn.close()
logging.info("Database schema created successfully")
def load_fcc_data_to_db(db_config):
"""Load all FCC data into PostgreSQL database."""
# Note: For large datasets, consider using PostgreSQL COPY for better performance
engine = create_engine(f'postgresql://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["dbname"]}')
# Field positions for different file types (0-based index)
field_positions = {
'EN.dat': {
'default': {
'uls_file_number': 2,
'call_sign': 3,
'entity_name': 6,
'first_name': 7,
'mi': 8,
'last_name': 9,
'suffix': 10,
'phone': 11,
'fax': 12,
'email': 13,
'address': 14,
'po_box': 15,
'city': 16,
'state': 17,
'zip_code': 18,
'frn': 19,
'applicant_type_code': 20,
'applicant_type_other': 21,
'status_code': 22,
'status_date': 23
},
'a_tower': {
'uls_file_number': 2,
'call_sign': 3,
'entity_name': 6,
'first_name': 7,
'mi': 8,
'last_name': 9,
'suffix': 10,
'phone': 11,
'fax': 12,
'email': 13,
'address': 14,
'po_box': 15,
'city': 16,
'state': 17,
'zip_code': 18,
'frn': 19,
'applicant_type_code': 20,
'applicant_type_other': 21,
'status_code': 22,
'status_date': 23
}
},
'FR.dat': {
'uls_file_number': 2,
'frequency': 10,
'bandwidth': 16,
'emission_designator': 8,
'manufacturer': 22,
'model': 23
},
'LO.dat': {
'uls_file_number': 2,
'call_sign': 3,
'city': 11,
'county': 12,
'state': 13,
'elevation': 16,
'lat_degrees': 17,
'lat_minutes': 18,
'lat_seconds': 19,
'lat_direction': 20,
'lon_degrees': 21,
'lon_minutes': 22,
'lon_seconds': 23,
'lon_direction': 24,
'location_type': 6
},
'MA.dat': {
'uls_file_number': 2,
'call_sign': 3,
'market_code': 4,
'market_name': 7,
'state': 8
},
'CO.dat': {
'default': {
'uls_file_number': 2,
'call_sign': 3,
'owner_name': 6
},
'a_tower': {
'uls_file_number': 2,
'call_sign': None,
'lat_degrees': 6,
'lat_minutes': 7,
'lat_seconds': 8,
'lat_direction': 9,
'lon_degrees': 11,
'lon_minutes': 12,
'lon_seconds': 13,
'lon_direction': 14,
'elevation': 15,
'location_type': 5
}
},
'AM.dat': { # Assumed to be _A.dat
'uls_file_number': 2,
'call_sign': 3,
'operator_class': 4,
'group_code': 5,
'region_code': 6,
'trustee_call_sign': 7,
'trustee_indicator': 8,
'physician_certification': 9,
've_signature': 10
},
'HD.dat': {
'uls_file_number': 2,
'call_sign': 3,
'license_status': 4,
'radio_service_code': 5,
'grant_date': 6,
'expired_date': 7,
'cancellation_date': 8,
'renewal_date': 9,
'operator_class': 10
}
}
# Expected column counts
expected_columns = {
'EN.dat': {
'default': 30,
'a_tower': 30 # Assume same as default unless validated otherwise
},
'FR.dat': 30,
'LO.dat': 51,
'MA.dat': 9,
'CO.dat': {
'default': 8,
'a_tower': 18
},
'AM.dat': 15,
'HD.dat': 20
}
# Files to process
relevant_files = ['EN.dat', 'FR.dat', 'LO.dat', 'MA.dat', 'CO.dat', 'AM.dat', 'HD.dat']
for fcc_file in FCC_FILES:
extract_dir = os.path.join(LOCAL_DIR, fcc_file.replace(".zip", ""))
if not os.path.exists(extract_dir):
logging.warning(f"Directory {extract_dir} does not exist, skipping")
continue
logging.info(f"Processing directory: {extract_dir}")
for file in os.listdir(extract_dir):
if not file.endswith(".dat") or file not in relevant_files:
logging.info(f"Skipping {file} (not relevant for TSCM)")
continue
file_path = os.path.join(extract_dir, file)
try:
logging.info(f"Reading {file}...")
with open(file_path, 'r', encoding='latin1', errors='replace') as f:
first_line = f.readline().strip()
delimiter = '|' # FCC standard
if '|' not in first_line:
logging.warning(f"Expected '|' delimiter in {file}, found none. First line: {first_line[:100]}")
# Determine expected columns
if isinstance(expected_columns[file], dict):
expected_cols = expected_columns[file]['a_tower' if 'a_tower' in extract_dir and file in ['EN.dat', 'CO.dat'] else 'default']
else:
expected_cols = expected_columns[file]
# Preprocess the file
temp_file_path = preprocess_file(file_path, delimiter, expected_cols)
# Print sample raw rows
with open(temp_file_path, 'r', encoding='latin1', errors='replace') as f:
raw_rows = [f.readline().strip() for _ in range(5) if f.readline()]
logging.info(f"Sample raw rows from {file} (after preprocessing):")
for i, row in enumerate(raw_rows):
logging.info(f"Row {i+1}: {row[:100]}...")
# Read the entire file
df = pd.read_csv(temp_file_path,
sep=delimiter,
header=None,
low_memory=False,
encoding='latin1',
on_bad_lines='warn')
logging.info(f"Processing {file} with {len(df)} rows...")
# Log raw data
logging.info(f"Raw data from {file} (first 5 rows, first 20 columns):")
logging.info(df.iloc[:5, :20].to_string())
if file == 'EN.dat':
pos = field_positions[file]['a_tower' if 'a_tower' in extract_dir else 'default']
required_columns = max([p for p in pos.values() if p is not None]) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
licensee_parts = df[[pos['first_name'], pos['mi'], pos['last_name'], pos['suffix']]].astype(str)
licensee_name = licensee_parts.agg(lambda x: ' '.join(x[x != 'nan']), axis=1)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']],
'entity_name': df[pos['entity_name']],
'licensee_name': licensee_name,
'address': df[pos['address']],
'po_box': df[pos['po_box']],
'city': df[pos['city']],
'state': df[pos['state']],
'zip_code': df[pos['zip_code']],
'frn': df[pos['frn']],
'phone': df[pos['phone']],
'fax': df[pos['fax']],
'email': df[pos['email']],
'suffix': df[pos['suffix']],
'applicant_type_code': df[pos['applicant_type_code']],
'applicant_type_other': df[pos['applicant_type_other']],
'status_code': df[pos['status_code']],
'status_date': pd.to_datetime(df[pos['status_date']], errors='coerce')
})
df_clean['file_name'] = file
df_clean['state'] = df_clean['state'].str[:2]
for col in df_clean.select_dtypes(include='object').columns:
df_clean[col] = df_clean[col].str.strip()
invalid_states = df_clean[df_clean['state'].str.len() > 2]
if not invalid_states.empty:
logging.warning(f"Found invalid state values in {file}: {invalid_states['state'].head().tolist()}")
logging.info(f"Sample uls_file_number from EN.dat: {df_clean['uls_file_number'].head().tolist()}")
logging.info(f"Sample city from EN.dat: {df_clean['city'].head().tolist()}")
df_clean.to_sql('fc_entities', engine, if_exists='append', index=False)
elif file == 'FR.dat':
pos = field_positions[file]
required_columns = max(pos.values()) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': None,
'frequency': df[pos['frequency']],
'bandwidth': df[pos['bandwidth']],
'emission_designator': df[pos['emission_designator']],
'manufacturer': df.get(pos['manufacturer'], pd.Series([None] * len(df))),
'model': df.get(pos['model'], pd.Series([None] * len(df)))
})
df_clean['file_name'] = file
df_clean['frequency'] = pd.to_numeric(df_clean['frequency'], errors='coerce')
if df_clean['frequency'].max() > 1000000: # Hz
df_clean['frequency'] = df_clean['frequency'] / 1000000
elif df_clean['frequency'].max() > 1000: # kHz
df_clean['frequency'] = df_clean['frequency'] / 1000
invalid_frequencies = df_clean[df_clean['frequency'] > 100000]
if not invalid_frequencies.empty:
logging.warning(f"Found {len(invalid_frequencies)} invalid frequencies in FR.dat: {invalid_frequencies['frequency'].head().tolist()}")
df_clean['emission_designator'] = df_clean['emission_designator'].astype(str)
logging.info(f"Sample uls_file_number from FR.dat: {df_clean['uls_file_number'].head().tolist()}")
df_clean.to_sql('fc_frequencies', engine, if_exists='append', index=False)
elif file == 'LO.dat':
pos = field_positions[file]
required_columns = max(pos.values()) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']],
'latitude': df.apply(lambda row: dms_to_decimal(
row[pos['lat_degrees']], row[pos['lat_minutes']], row[pos['lat_seconds']], row[pos['lat_direction']]), axis=1),
'longitude': df.apply(lambda row: dms_to_decimal(
row[pos['lon_degrees']], row[pos['lon_minutes']], row[pos['lon_seconds']], row[pos['lon_direction']]), axis=1),
'elevation': df[pos['elevation']],
'city': df[pos['city']],
'county': df[pos['county']],
'state': df[pos['state']],
'location_type': df[pos['location_type']]
})
df_clean['file_name'] = file
for col in ['latitude', 'longitude', 'elevation']:
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
df_clean['state'] = df_clean['state'].str[:2]
invalid_coords = df_clean[(df_clean['latitude'].abs() > 90) | (df_clean['longitude'].abs() > 180)]
if not invalid_coords.empty:
logging.warning(f"Found {len(invalid_coords)} invalid coordinates in LO.dat: {invalid_coords[['latitude', 'longitude']].head().to_dict()}")
logging.info(f"Sample uls_file_number from LO.dat: {df_clean['uls_file_number'].head().tolist()}")
logging.info(f"Sample city from LO.dat: {df_clean['city'].head().tolist()}")
df_clean.to_sql('fc_locations', engine, if_exists='append', index=False)
elif file == 'MA.dat':
pos = field_positions[file]
required_columns = max(pos.values()) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']],
'market_code': df[pos['market_code']],
'state': df.get(pos.get('state'), None),
'market_name': df[pos['market_name']]
})
df_clean['file_name'] = file
for col in df_clean.select_dtypes(include='object').columns:
df_clean[col] = df_clean[col].str.strip()
invalid_states = df_clean[df_clean['state'].str.len() > 2]
if not invalid_states.empty:
logging.warning(f"Found invalid state codes in MA.dat: {invalid_states['state'].head().tolist()}")
logging.info(f"Sample uls_file_number from MA.dat: {df_clean['uls_file_number'].head().tolist()}")
df_clean.to_sql('fc_markets', engine, if_exists='append', index=False)
elif file == 'CO.dat':
pos = field_positions[file]['a_tower' if 'a_tower' in extract_dir else 'default']
required_columns = max([p for p in pos.values() if p is not None]) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
if 'a_tower' in extract_dir:
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']] if pos['call_sign'] is not None else None,
'latitude': df.apply(lambda row: dms_to_decimal(
row[pos['lat_degrees']], row[pos['lat_minutes']], row[pos['lat_seconds']], row[pos['lat_direction']]), axis=1),
'longitude': df.apply(lambda row: dms_to_decimal(
row[pos['lon_degrees']], row[pos['lon_minutes']], row[pos['lon_seconds']], row[pos['lon_direction']]), axis=1),
'elevation': df[pos['elevation']],
'city': None,
'county': None,
'state': None,
'location_type': df[pos['location_type']]
})
df_clean['file_name'] = file
for col in ['latitude', 'longitude', 'elevation']:
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
invalid_coords = df_clean[(df_clean['latitude'].abs() > 90) | (df_clean['longitude'].abs() > 180)]
if not invalid_coords.empty:
logging.warning(f"Found {len(invalid_coords)} invalid coordinates in CO.dat: {invalid_coords[['latitude', 'longitude']].head().to_dict()}")
df_clean.to_sql('fc_locations', engine, if_exists='append', index=False)
else:
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df.get(pos.get('call_sign'), None),
'entity_name': df[pos['owner_name']],
'licensee_name': df[pos['owner_name']]
})
df_clean['file_name'] = file
for col in df_clean.select_dtypes(include='object').columns:
df_clean[col] = df_clean[col].str.strip()
df_clean.to_sql('fc_entities', engine, if_exists='append', index=False)
logging.info(f"Sample uls_file_number from CO.dat: {df_clean['uls_file_number'].head().tolist()}")
elif file == 'AM.dat':
pos = field_positions[file]
required_columns = max(pos.values()) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']],
'operator_class': df[pos['operator_class']],
'group_code': df[pos['group_code']],
'region_code': df[pos['region_code']],
'trustee_call_sign': df[pos['trustee_call_sign']],
'trustee_indicator': df[pos['trustee_indicator']],
'physician_certification': df[pos['physician_certification']],
've_signature': df[pos['ve_signature']]
})
df_clean['file_name'] = file
for col in df_clean.select_dtypes(include='object').columns:
df_clean[col] = df_clean[col].str.strip()
logging.info(f"Sample uls_file_number from AM.dat: {df_clean['uls_file_number'].head().tolist()}")
df_clean.to_sql('fc_amateur', engine, if_exists='append', index=False)
elif file == 'HD.dat':
pos = field_positions[file]
required_columns = max(pos.values()) + 1
if df.shape[1] < required_columns:
logging.warning(f"Dataframe of {file} has {df.shape[1]} columns, expected at least {required_columns}. Padding with NaN.")
df = df.reindex(columns=range(required_columns), fill_value=pd.NA)
df_clean = pd.DataFrame({
'uls_file_number': df[pos['uls_file_number']],
'call_sign': df[pos['call_sign']],
'license_status': df[pos['license_status']],
'radio_service_code': df[pos['radio_service_code']],
'grant_date': pd.to_datetime(df[pos['grant_date']], errors='coerce'),
'expired_date': pd.to_datetime(df[pos['expired_date']], errors='coerce'),
'cancellation_date': pd.to_datetime(df[pos['cancellation_date']], errors='coerce'),
'renewal_date': pd.to_datetime(df[pos['renewal_date']], errors='coerce'),
'operator_class': df[pos['operator_class']]
})
df_clean['file_name'] = file
for col in df_clean.select_dtypes(include='object').columns:
df_clean[col] = df_clean[col].str.strip()
logging.info(f"Sample uls_file_number from HD.dat: {df_clean['uls_file_number'].head().tolist()}")
df_clean.to_sql('fc_licenses', engine, if_exists='append', index=False)
logging.info(f"Successfully loaded {file}")
# Retry deleting the temporary file
for attempt in range(3):
try:
os.remove(temp_file_path)
break
except PermissionError as e:
logging.warning(f"Attempt {attempt + 1}: Failed to delete {temp_file_path}: {e}. Retrying after delay...")
time.sleep(1)
else:
logging.warning(f"Could not delete {temp_file_path} after multiple attempts. It may need to be deleted manually.")
except Exception as e:
logging.error(f"Error loading {file_path}: {str(e)}")
logging.error(f"First line of file: {first_line[:100]}...")
try:
conn = psycopg2.connect(**db_config)
conn.rollback()
conn.close()
engine.dispose()
new_engine = create_engine(
f'postgresql://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["dbname"]}',
pool_pre_ping=True
)
globals()['engine'] = new_engine
logging.info(f"Transaction rolled back and engine reinitialized, continuing with next file.")
except Exception as rollback_e:
logging.error(f"Failed to roll back transaction: {rollback_e}")
raise
if os.path.exists(temp_file_path):
for attempt in range(3):
try:
os.remove(temp_file_path)
break
except PermissionError as e:
logging.warning(f"Attempt {attempt + 1}: Failed to delete {temp_file_path}: {e}. Retrying after delay...")
time.sleep(1)
else:
logging.warning(f"Could not delete {temp_file_path} after multiple attempts. It may need to be deleted manually.")
continue
def update_call_signs(db_config):
"""Update call_sign in fc_frequencies, fc_locations, fc_markets, and fc_amateur using uls_file_number from fc_entities."""
try:
conn = psycopg2.connect(**db_config)
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*)
FROM fc_frequencies f
JOIN fc_entities e ON f.uls_file_number = e.uls_file_number
WHERE f.call_sign IS NULL;
""")
matches_before = cur.fetchone()[0]
logging.info(f"Found {matches_before} fc_frequencies records to update with call_sign")
cur.execute("""
UPDATE fc_frequencies f
SET call_sign = e.call_sign
FROM fc_entities e
WHERE f.uls_file_number = e.uls_file_number
AND f.call_sign IS NULL;
""")
cur.execute("""
UPDATE fc_locations l
SET call_sign = e.call_sign
FROM fc_entities e
WHERE l.uls_file_number = e.uls_file_number
AND l.call_sign IS NULL;
""")
cur.execute("""
UPDATE fc_markets m
SET call_sign = e.call_sign
FROM fc_entities e
WHERE m.uls_file_number = e.uls_file_number
AND m.call_sign IS NULL;
""")
cur.execute("""
UPDATE fc_amateur a
SET call_sign = e.call_sign
FROM fc_entities e
WHERE a.uls_file_number = e.uls_file_number
AND a.call_sign IS NULL;
""")
cur.execute("""
SELECT COUNT(*)
FROM fc_frequencies
WHERE call_sign IS NOT NULL;
""")
matches_after = cur.fetchone()[0]
logging.info(f"After update, {matches_after} fc_frequencies records have call_sign")
logging.info("Successfully updated call_signs in fc_frequencies, fc_locations, fc_markets, and fc_amateur.")
except Exception as e:
logging.error(f"Error updating call_signs: {str(e)}")
raise
finally:
cur.close()
conn.close()
def main():
"""Main function to import FCC data into PostgreSQL (full mode)."""
logging.info("Starting FCC data import to PostgreSQL (FULL MODE: Processing all rows)...")
db_config = get_db_config()
if not test_db_connection(db_config):
return
create_directories()
ensure_fcc_files()
logging.info("Creating database schema...")
create_database_schema(db_config)
logging.info("Loading data into database (full mode)...")
load_fcc_data_to_db(db_config)
logging.info("Updating call_signs (full mode)...")
update_call_signs(db_config)
logging.info("FCC data import completed (full mode)!")
if __name__ == "__main__":
main()
match_signals.py
##############################################
import os
import pandas as pd
import psycopg2
import json
from datetime import datetime
import argparse
import geopandas as gpd
from shapely.geometry import Point
# Configuration
OUTPUT_DIR = "output"
DEFAULT_SIGNALS_CSV = "signals.csv"
DEFAULT_RADIUS_KM = 5.0
DEFAULT_TOLERANCE_MHZ = 0.05
OUTPUT_JSON = os.path.join(OUTPUT_DIR, f"signal_matches_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
def get_db_config():
"""Get database configuration with password prompt."""
password = input("Enter PostgreSQL password: ")
return {
'dbname': 'fcc_data',
'user': 'postgres',
'password': password,
'host': 'localhost',
'port': '5432'
}
def parse_emission_designator(emission):
"""Parse emission designator to infer device type."""
if not emission or not isinstance(emission, str) or len(emission) < 4:
print(f"Invalid emission designator: {emission}")
return "Unknown", "Unknown"
try:
# Parse bandwidth (first 4 characters, e.g., '20K0')
bandwidth = emission[:4].strip()
modulation = emission[4:5].strip() if len(emission) > 4 else ''
signal_type = emission[5:6].strip() if len(emission) > 5 else ''
# Parse bandwidth
bw_value = "Unknown"
if 'K' in bandwidth:
bw_value = float(bandwidth.replace('K', ''))
elif 'M' in bandwidth:
bw_value = float(bandwidth.replace('M', '')) * 1000
elif bandwidth.replace('.', '').isdigit():
bw_value = float(bandwidth)
# Parse modulation
modulation_types = {
'F': 'FM',
'G': 'Phase Modulation',
'D': 'Digital',
'A': 'AM',
'W': 'Digital (wideband)',
'N': 'None'
}
mod_type = modulation_types.get(modulation, 'Unknown')
# Parse signal type
signal_types = {
'3': 'Voice',
'7': 'Data',
'9': 'Composite',
'0': 'None',
'E': 'Telephony'
}
sig_type = signal_types.get(signal_type, 'Unknown')
# Infer device type
device = "Unknown device"
if mod_type == 'FM' and sig_type == 'Voice':
device = "Two-way radio (e.g., walkie-talkie)"
elif mod_type == 'Digital' and sig_type == 'Data':
device = "Digital transmitter (e.g., IoT, SCADA)"
elif mod_type == 'AM':
device = "AM transmitter (e.g., aviation radio)"
elif mod_type == 'Digital' and 'W' in emission:
device = "Wideband digital device (e.g., spread spectrum)"
return f"{bw_value} kHz, {mod_type}, {sig_type}", device
except Exception as e:
print(f"Error parsing emission designator {emission}: {str(e)}")
return "Unknown", "Unknown"
def get_county_state(lat, lon):
"""Map lat/lon to county and state."""
try:
point = Point(lon, lat)
counties_gdf = gpd.read_file("counties.geojson")
county = counties_gdf[counties_gdf.geometry.contains(point)]
if not county.empty:
return county.iloc[0].get('COUNTYNAME', None), county.iloc[0].get('STATE', None)
return None, None
except Exception as e:
print(f"Error mapping lat/lon: {e}")
return None, None
def query_matches(db_config, signals_df, radius_km, tolerance_mhz):
"""Query PostgreSQL for frequency range matches with tolerance."""
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
results = []
sig_lat = signals_df['latitude'].iloc[0]
sig_lon = signals_df['longitude'].iloc[0]
county, state = get_county_state(sig_lat, sig_lon)
print(f"Capture location: ({sig_lat}, {sig_lon}), County: {county}, State: {state}")
print(f"Using frequency tolerance: ±{tolerance_mhz} MHz")
for idx, signal in signals_df.iterrows():
start_freq = signal['start_freq_mhz']
stop_freq = signal['stop_freq_mhz']
print(f"\nProcessing signal range: {start_freq} - {stop_freq} MHz")
# Apply tolerance to the frequency range
search_start = start_freq - tolerance_mhz
search_stop = stop_freq + tolerance_mhz
print(f"Searching with tolerance: {search_start} - {search_stop} MHz")
signal_result = {
"signal": {
"start_freq_mhz": start_freq,
"stop_freq_mhz": stop_freq,
"latitude": sig_lat,
"longitude": sig_lon
},
"matches": {
"transmitter": [],
"county": [],
"state": [],
"market": [],
"unlicensed": []
}
}
# Check unlicensed bands with tolerance
cur.execute("""
SELECT start_freq_mhz, stop_freq_mhz, description, typical_devices
FROM fc_unlicensed_bands
WHERE (start_freq_mhz - %s <= %s AND stop_freq_mhz + %s >= %s)
OR (start_freq_mhz - %s <= %s AND stop_freq_mhz + %s >= %s)
""", (tolerance_mhz, stop_freq, tolerance_mhz, start_freq, tolerance_mhz, start_freq, tolerance_mhz, stop_freq))
unlicensed_matches = cur.fetchall()
for match in unlicensed_matches:
ul_start, ul_stop, desc, devices = match
signal_result['matches']['unlicensed'].append({
"frequency_range_mhz": f"{ul_start}-{ul_stop}",
"description": desc,
"typical_devices": devices,
"potential_threat": "Possible unlicensed device (e.g., Wi-Fi, Bluetooth)"
})
# Query licensed frequencies with tolerance, join with fc_entities on uls_file_number
cur.execute("""
SELECT f.call_sign as abc_call_sign, f.frequency, f.bandwidth as abc_bandwidth,
f.emission_designator as abc_emission_designator, f.manufacturer, f.model,
f.file_name as abc_file_name, f.uls_file_number,
e.entity_name, e.licensee_name
FROM fc_frequencies f
LEFT JOIN fc_entities e ON f.uls_file_number = e.uls_file_number
WHERE f.frequency BETWEEN %s AND %s
""", (search_start, search_stop))
freq_matches = cur.fetchall()
print(f"Found {len(freq_matches)} licensed frequency matches (with tolerance)")
for match in freq_matches:
call_sign, freq, bandwidth, emission, manufacturer, model, file_name, uls_file_number, entity_name, licensee_name = match
emission_info, device_type = parse_emission_designator(emission)
# Check if the match is outside the exact range but within tolerance
within_exact_range = start_freq <= freq <= stop_freq
match_note = "Exact match" if within_exact_range else f"Match within tolerance (±{tolerance_mhz} MHz)"
# Flag potential TSCM threats
threat_flag = "Low risk"
bandwidth_float = float(bandwidth) if bandwidth and bandwidth != 'null' and bandwidth.replace('.', '').replace('-', '').isdigit() else None
if emission_info and "Voice" in emission_info and bandwidth_float and bandwidth_float < 25:
threat_flag = "High risk: Possible audio bug (narrowband FM voice)"
elif bandwidth_float and bandwidth_float < 25:
threat_flag = "Medium risk: Narrowband signal (possible bug)"
elif file_name and "LMcomm" in file_name:
threat_flag = "Medium risk: Business/security comms (potential surveillance)"
elif freq >= 450 and freq <= 470: # UHF business band, common for surveillance
threat_flag = "Medium risk: UHF business band (common for surveillance)"
match_record = {
"call_sign": call_sign,
"licensee": licensee_name or entity_name or "Unknown",
"frequency_mhz": freq,
"bandwidth": bandwidth,
"emission_info": emission_info,
"inferred_device": device_type,
"manufacturer": manufacturer,
"model": model,
"source_file": file_name,
"uls_file_number": uls_file_number,
"potential_threat": threat_flag,
"match_note": match_note
}
# Check transmitter proximity
if call_sign:
cur.execute("""
SELECT latitude, longitude, elevation, city, state, location_type
FROM fc_locations
WHERE call_sign = %s
AND latitude IS NOT NULL
AND longitude IS NOT NULL
AND earth_distance(
ll_to_earth(%s, %s),
ll_to_earth(latitude, longitude)
) / 1000 <= %s
""", (call_sign, sig_lat, sig_lon, radius_km))
loc_matches = cur.fetchall()
if loc_matches:
for loc in loc_matches:
lat, lon, elev, city, state_loc, loc_type = loc
distance = ((sig_lat - lat)**2 + (sig_lon - lon)**2)**0.5 * 111 # Approx km
tx_record = match_record.copy()
tx_record.update({
"tx_lat": lat,
"tx_lon": lon,
"distance_km": distance,
"city": city,
"state": state_loc,
"location_type": loc_type
})
signal_result['matches']['transmitter'].append(tx_record)
# Check county/market
if call_sign:
cur.execute("""
SELECT market_code, state, market_name
FROM fc_markets
WHERE call_sign = %s
""", (call_sign,))
market_matches = cur.fetchall()
for market in market_matches:
market_code, state_code, market_name = market
if county and county.lower() in (market_name or '').lower():
county_record = match_record.copy()
county_record['county'] = county
signal_result['matches']['county'].append(county_record)
if state_code == state:
state_record = match_record.copy()
state_record['state'] = state_code
signal_result['matches']['state'].append(state_record)
if market_code:
market_record = match_record.copy()
market_record['market_code'] = market_code
signal_result['matches']['market'].append(market_record)
# Include as market match if no specific geo data
if not any(match_record in matches for matches in signal_result['matches'].values()):
signal_result['matches']['market'].append(match_record)
results.append(signal_result)
cur.close()
conn.close()
return results
def main():
"""Query FCC data for signal matches with tolerance."""
parser = argparse.ArgumentParser(description="TSCM Signal Classifier")
parser.add_argument('--radius', type=float, default=DEFAULT_RADIUS_KM, help='Match radius in kilometers')
parser.add_argument('--input', type=str, default=DEFAULT_SIGNALS_CSV, help='Path to input CSV')
parser.add_argument('--tolerance', type=float, default=DEFAULT_TOLERANCE_MHZ, help='Frequency tolerance in MHz (e.g., 0.05)')
args = parser.parse_args()
print(f"Using input CSV: {args.input}")
print(f"Match radius: {args.radius} km")
print(f"Frequency tolerance: ±{args.tolerance} MHz")
if not os.path.exists(args.input):
print(f"Error: {args.input} not found.")
return
signals_df = pd.read_csv(args.input)
required_columns = ['start_freq_mhz', 'stop_freq_mhz', 'latitude', 'longitude']
if not all(col in signals_df.columns for col in required_columns):
print(f"Error: {args.input} must contain columns: {required_columns}")
return
if not signals_df[['latitude', 'longitude']].nunique().eq(1).all():
print("Warning: Multiple lat/lon values found. Using first row's location.")
db_config = get_db_config()
matches = query_matches(db_config, signals_df, args.radius, args.tolerance)
if matches:
os.makedirs(OUTPUT_DIR, exist_ok=True)
with open(OUTPUT_JSON, 'w') as f:
json.dump(matches, f, indent=2)
print(f"Matches saved to {OUTPUT_JSON}")
print(f"Found {sum(len(m['matches']['transmitter']) + len(m['matches']['county']) + len(m['matches']['state']) + len(m['matches']['market']) + len(m['matches']['unlicensed']) for m in matches)} total matches.")
else:
print("No matches found.")
if __name__ == "__main__":
main()
signals.csv
########################
start_freq_mhz,stop_freq_mhz,latitude,longitude
462.500,462.600,40.7128,-74.0060
850.000,850.200,40.7128,-74.0060