Extract-Transform-Load flows

The diagrams below show how data flows through Openverse, and the various transformation steps that might happen at each stage.

Guide:

  • Blue boxes represent places where data sits at rest.

  • Purple boxes represent transformation steps. Each box links to a longer description below.

Current flow

        flowchart TD

%% DBs
A[(API DB)]
ES[(Elasticsearch)]

%% Graph definitions
    subgraph AF [Airflow]
    P{{Provider}} --> M(MediaStore):::ETL
    M --> T{{TSV}}
    T --> IC1(Index constraints):::ETL
    IC1 --> C[(Catalog DB)]
    C --> B(Batched update):::ETL --> C
    end

    subgraph IS [Ingestion Server]
    %% ETLs
    ING(Deleted media removal):::ETL
    IC2(Index constraints):::ETL
    ESF(Index creation & deleted media filtering):::ETL

    C --> ING
    ING --> TT[(Temp Table)]
    TT --> CF(Cleaning & filtering):::ETL --> TT
    end

    TT --> A
    ESF --> ES
    A --> IC2 --> A
    A --> ESF
    subgraph DJA [Django API]
    %% ETLs
    DL(Dead link filtering):::ETL
    AS(API seralizers):::ETL
    R[(Redis)] --> DL --> R
    end

    %% Connections for the API
    ES --> DL
    DL --> A
    A --> AS

    subgraph F [Frontend]
    ATG(Attribution generation):::ETL
    end

    %% External connections
    AS --> ATG
    AS --> CL[External Client]
    ATG --> CL

%% Style definitions
    style AF fill:#00c7d4
    style IS fill:#ffc83d
    style DJA fill:orange
    style F fill:lightgreen
    classDef ETL fill:#fa7def


%% Reference definitions
    click M "#mediastore-transformations"
    click IC1 "#index-constraints-catalog"
    click B "#batched-update"
    click ING "#deleted-media-removal"
    click CF "#cleaning-filtering"
    click IC2 "#index-constraints-api"
    click ESF "#es-index-creation-filtering"
    click DL "#dead-link-filtering"
    click AS "#api-serializers"
    click ATG "#attribution-generation"
    

Note

The Temp Table referenced above is actually a table within the API database that gets swapped for the live production table once the processing steps on it are complete. This prevents us from having to operate on live data.

MediaStore transformations

These transformations occur as part of the provider ingestion scripts as records are being saved from the provider’s API response into a TSV. The exhaustive list of shared transformations can be found in the MediaStore definition, as well as individual transformations for images and audio.

A non-exhaustive list includes:

  • Filtering out tags based on licenses or tags which add no useful search information

  • Removing integer values that would exceed the Postgres integer maximum

  • Validating filetype and collapsing common types

  • Formatting tags appropriately (see the media properties definition)

  • Enriching meta_data the license URLs

  • Validating URLs

  • Determining source is not explicitly provided

Index constraints (catalog)

As the data from a saved provider TSV gets inserted into the database, it also must be compliant with the existing indices. The indices for each primary media table can be found here: image, audio. In general, these include:

  • Uniqueness on provider + foreign_identifier combination

  • Uniqueness on identifier

  • Uniqueness on url

Batched update

Updates against the catalog database may also be initiated using the batched update DAG. These updates may be automated (such as the popularity refresh process) or manual (such as data correction).

Deleted media removal

Presently, we filter out deleted media from each of the primary media tables while the SELECT {columns} FROM {upstream_table} query is being run.

Cleaning & filtering

We also iterate through the copied data in batches and update the values in the new table based on several cleaning/filtering steps (all of which can be seen in the ingestion server’s cleanup definitions). At present, these operations include:

  • Ensuring URLs have a scheme

  • Filtering tags out (similar to the MediaStore transformations)

  • Filtering machine-generated tags below a certain confidence value

Index constraints (API)

Once the ingestion server data is copied and cleaned/filtered, the API indices are applied to the new table. There are several more indices on the API than on the catalog database, many of which are for query optimization. These include (all are btree unless specified):

  • Category

  • Foreign identifier

  • Last synced with source

  • Provider

  • Source

ES index creation & filtering

An additional filtering step happens when the records are indexed from the API database into Elasticsearch. In addition to the filtering of sensitive terms using two separate indices and deleted media, there are also transformations that occur when constructing the ES documents. The exhaustive list can be seen in the elasticsearch_models module, which includes:

Common

  • Computing the authority boost

  • Computing the popularity

  • Parsing a description from the media, if available

  • Parsing tags

  • Determining the mature field

Images

  • Computing the aspect ratio

  • Computing the size

Audio

  • Computing the duration

API serializers

An additional transformation occurs after a record is retrieved from the database (and once it has been validated by the dead link filtering). This step includes any transformations that required in hydrating the database record and converting it into a JSON object which is sent to the requester.

Serializations can be found in the codebase under api/api/serializers.

Attribution generation

The frontend also modifies results before displaying them to a user. This includes generating attributions in multiple formats. The full attribution generation code can be found in attribution-html.ts.

Proposed

The projects #430, #431, and #3925 all intend to modify the above process. Below is the diagram of what this process might look like after all steps are taken. Most blocks reference the same sections above, with the exception being Deleted media & tag filtering.

        flowchart TD

%% DBs
A[(API DB)]
ES[(Elasticsearch)]

%% Graph definitions
    subgraph AF [Airflow]
    %% ETLs
    M(MediaStore):::ETL
    IC1(Index constraints):::ETL
    B(Batched update):::ETL
    ING(Deleted media & tag filtering):::ETL
    IC2(Index constraints):::ETL
    ESF(Index creation & deleted media filtering):::ETL

    P{{Provider}} --> M
    M --> T{{TSV}}
    T --> IC1
    IC1 --> C[(Catalog DB)]
    C --> B --> C
    C --> ING
    end

    %% Connections for the data refresh
    ESF --> ES
    ING --> A
    A --> IC2 --> A
    A --> ESF

    subgraph DJA [Django API]
    %% ETLs
    DL(Dead link filtering):::ETL
    AS(API seralizers):::ETL
    R[(Redis)] --> DL --> R
    end

    %% Connections for the API
    ES --> DL
    DL --> A
    A --> AS

    subgraph F [Frontend]
    ATG(Attribution generation):::ETL
    end

    %% External connections
    AS --> ATG
    AS --> CL[External Client]
    ATG --> CL

%% Style definitions
    style AF fill:#00c7d4,margin-left:12em
    style DJA fill:orange
    style F fill:lightgreen
    classDef ETL fill:#fa7def

%% Reference definitions
    click M "#mediastore-transformations"
    click IC1 "#index-constraints-catalog"
    click B "#batched-update"
    click ING "#deleted-media-tag-filtering"
    click IC2 "#index-constraints-api"
    click ESF "#es-index-creation-filtering"
    click DL "#dead-link-filtering"
    click AS "#api-serializers"
    click ATG "#attribution-generation"
    

Deleted media & tag filtering

The data normalization project (#430) intends to remove the cleanup steps from the ingestion server. Subsequent ingestion server removal project (#3925) will remove the ingestion server entirely. This will be fleshed out more as part of #4456, but the filtering steps (e.g. demographic tags, tags below a confidence level, etc.) will be present within the pipeline prior to the API indexing values into Elasticsearch.