As noted in part 1 of this series, I wanted to transform the bulk JSON of all NASA publications into three databases - one for the published documents, one for the authors, and one for the organizations.

Along the way, I also wanted to transform some of the information into pieces I felt would be most useful for my own uses. Below is the abbreviated python block with the logic.

First, I hard-code the columns I eventually want, some of which are directly entries in the JSON.

AUTHOR_COLUMNS = ['first','last','middle']
ORG_COLUMNS = ['code','name']
DOC_COLUMNS = ['stiType','disseminated','distribution','created','distributionDate', 'downloadLink', 'authorIdsOrdered',
               'modified','onlyAbstract','stiTypeDetails','status','title','orgId','abstract']

Next book the pandas DataFrames:

df_authors = pd.DataFrame(columns = AUTHOR_COLUMNS)
df_orgs = pd.DataFrame(columns = ORG_COLUMNS)
df_docs = pd.DataFrame(columns = DOC_COLUMNS)

Now loop over one chunk of the bulk JSON, being careful to use a generator here so the whole file isn’t read into memory.

for entry_id, entry in read_bulk_file_chunk(start,stop):
    # First track authors athat are currently untracked
    d_authors = authors_from_entry(entry) # helper to extract all author info for this JSON entry which returns a dict
    for author in d_authors:
        if author['id'] not in df_authors.index: # if not tracking already, add it
            # Be careful to parse names with "first, last" vs something else
            if len(author['name'].split(',')) < 2:
                d_author = {
                    'first': author['name'],
                    'middle': None,
                    'last': None
                }
            else:
                d_author = {
                    'first': author['name'].split(',')[1].strip().split(' ')[0],
                    'last': author['name'].split(',')[0].strip(),
                    'middle': ' '.join(author['name'].split(',')[1].strip().split(' ')[1:])
                }
            # Sanity check you are only using columns you intend to use
            if set(d_author.keys()) != set(AUTHOR_COLUMNS):
                raise KeyError(f'The keys of d_author do not match the AUTHOR_COLUMNS.{chr(10)}{set(d_author.keys())} vs{chr(10)}{set(AUTHOR_COLUMNS)}')
            # Add the author
            df_authors = df_authors.append(pd.Series(d_author, name=author['id']))

    # Now do the same with organizations
    d_org = copy.deepcopy(entry['center'])
    if d_org['id'] not in df_orgs.index:
        d_org_id = d_org.pop('id')
        if set(d_org.keys()) != set(ORG_COLUMNS):
            raise KeyError(f'The keys of d_org do not match the ORG_COLUMNS.{chr(10)}{set(d_org.keys())} vs{chr(10)}{set(ORG_COLUMNS)}')
        df_orgs = df_orgs.append(pd.Series(d_org, name=d_org_id))

    # Now track document
    # columns in first layer of entry -> ['id','stiType','dissemination','distributions','created','distributionDate', 'abstract', 'modified','onlyAbstract','stiTypeDetails','status','title',
    # columns that need to be made -> ['downloadLink', 'authorIdsOrdered','orgId']
    d_doc = {}
    for k in DOC_COLUMNS:
        if k in entry:
            d_doc[k] = entry[k]
        else:
            d_doc[k] = None

    d_doc['authorIdsOrdered'] = [t['id'] for t in d_authors]
    d_doc['orgId'] = entry['center']['id']
    if entry['disseminated'] == 'DOCUMENT_AND_METADATA':
        d_doc['downloadLink'] = f'https://ntrs.nasa.gov/api/citations/{entry_id}/downloads/{entry_id}.txt'
    else:
        d_doc['downloadLink'] = None

    if set(d_doc.keys()) != set(DOC_COLUMNS): # Should never be raised because of last for loop
        raise KeyError(f'The keys of d_doc do not match the DOC_COLUMNS. {set(d_doc.keys()) ^ set(DOC_COLUMNS)}')
    
    df_docs = df_docs.append(pd.Series(d_doc, name=entry['id']))

Now clean up by setting the index to the ids and saving out to CSV.

df_docs.index.name = 'id'
df_authors.index.name = 'id'
df_orgs.index.name = 'id'

df_docs.to_csv(f'data/temp/documentDB.csv')
df_authors.to_csv(f'data/temp/authorDB.csv')
df_orgs.to_csv(f'data/temp/organizationDB.csv')

One other thing I like to do is keep a “meta database” that stores information about when I last created and updated CSV files. Here’s the function I use that gets run at the very end,

def update_meta_db(dbname):
    _columns = ['creation','update']
    # If meta DB doesn't exist, make it
    if not os.path.exists('data/meta.csv'):
        df = pd.DataFrame(columns=_columns)
        df.index.name = 'name'
    else:
        df = pd.read_csv('data/meta.csv', index_col='name')
        
    now = datetime.datetime.now().strftime("%m/%d/%Y")
    # If DB already being tracked, update the line
    if dbname in df.index:
        df.iloc[dbname, 'update'] = now
    else: # If the DB to track isn't in the meta DB, track it.
        df = df.append(pd.Series({'creation':now, 'update':now}, name=dbname))

    df.to_csv('data/meta.csv')

I should note that the JSON has to be run over multiple threads to run in a reasonable amount of time (~45 min on 10 threads). This requires breaking the JSON into chunks, saving out separate CSV files per chunk, and combining them back together, accounting for duplicates. For fun, I also implemented some fancy per-worker terminal progress bars with tqdm to track progress. I’ll probably do a separate blog post on that as well as some of these other tricks.

However, I just learned what “Dask” is last week and realized I could’ve just used that! Oh the tech I was missing in physics-land…