2024-03-28 Implementation Plan: Ingestion Server Removal¶
Author: @stacimc
Reviewers¶
[x] @sarayourfriend
[x] @AetherUnbound
Project links¶
Overview¶
The critical data refresh process is orchestrated by Airflow via the
<media_type>_data_refresh DAGs, but the bulk of the work is merely triggered
by Airflow on a remote ingestion server. Due to the reasons laid out in the
Project Proposal, we will be moving all operational pieces of the data refresh
into Airflow itself, and removing the ingestion server entirely.
The work which must be moved into Airflow can be split into these abstract steps, which will be discussed separately in this IP:
Copy Data: An FDW extension is used to connect the API database to the upstream (catalog) database. The entire contents of the upstream media table are copied into a new temp table in the API database. This temp table will later replace the main media table in the API.
Clean Data: Data clean up which includes removing denylisted tags and cleaning URLs. This step will become unnecessary when the Catalog Data Cleaning project is completed, which moves all data cleaning steps upstream into the initial provider ingestion process.
Create Index: Create a new Elasticsearch index, matching the configuration of the existing media index.
Distributed Reindex: Convert each record from the new temp table to the format required by an Elasticsearch document, and then reindex them into the newly created index. Because this step is by far the most time consuming, the reindex is distributed across multiple indexer workers. In the current implementation, the indexer workers are themselves running on EC2 instances (6 in production, 2 in staging). Each indexer worker serves a small API which can be used to trigger a reindexing task for an equal portion of the work. The workers are started by the ingestion-server when needed, and automatically spun down when complete.
Create and Populate Filtered Index: Create a new Elasticsearch index matching the configuration of the existing filtered index, and then reindex documents into it from the new media index, applying appropriate filtering for sensitive terms.
Reapply Constraints: Recreate indices and constraints from the original API table on the new temp table.
Promote Table: Drop the old media table in the API and rename the temp table and its indices, which has the effect of promoting them/replacing the old table.
Promote Index: Promote the new Elasticsearch index by unlinking the given alias from the existing index and moving it to the new one. (Used for both the main and filtered indices.)
Delete Index: Delete the old Elasticsearch index. (Used for both the main and filtered indices.)
In addition to the data refresh DAGs, some combination of these steps on the ingestion-server are also used by:
recreate_full_staging_indexDAGcreate_filtered_<media>_indexDAGsOur
load_sample_datascripts, which use the ingestion server to load sample data from the catalog into the API and Elasticsearch
This IP includes details for moving all steps into Airflow. The bulk of the plan
deals with the Distributed Reindex step, which is the only step that is not
straightforward to move.
Expected Outcomes¶
When this work is completed we will be able to:
Manage data refreshes entirely in Airflow, without the use of the ingestion server
Completely remove the staging and production ingestion servers, as well as all related code in the monorepo and infrastructure repos
Remove the
recreate_full_staging_indexDAG
Approach to the Distributed Reindex¶
The majority of the data refresh process can be implemented in Airflow with only slight refactoring, using our existing Postgres hooks/operators and reusable Elasticsearch tasks. The exception is the distributed reindex.
The simplest approach would be to remove the indexer workers entirely in favor of doing the reindexing in parallelized dynamic tasks in Airflow. However, currently the indexer workers and catalog EC2 instances are the same size (m5.xlarge), and each of the six production workers were observed to use up to ~25% CPU and ~52% memory utilization during reindexing. The expense of permanently increasing resources on the catalog instance to support reindexing (which may happen concurrently with popularity refreshes, ingestion, and all other processes that occur on the catalog) would significantly exceed the cost of maintaining remote workers that can be run only when needed.
Two other options were evaluated for this IP:
Keep the EC2 instances as they are now, but connect to them directly from Airflow. Rather than managing the 8 total EC2 instances directly, we will instead set up an EC2 Auto Scaling group for each environment, with an initial desired capacity of 0. The data refresh DAGs will spin up instances by increasing this desired capacity. (More details follow later in this document.)
Remove the EC2 instances entirely in favor of a new ECS task definition using the
indexer-workerimage, which would remove all API code and contain only the reindexing script. Airflow would spin up the appropriate number of ECS tasks when needed.
Comparison of EC2 and ECS¶
The following areas of comparison were considered when choosing between the two approaches:
Cost¶
Maintaining the EC2 worker instances, provided we continue to automatically stop the instances when not in use, would be cheaper than using ECS, given equivalent resources.
Since our existing indexer workers do not use all of the resources available, we can also consider reducing resources for the workers. For EC2, we are already using the cheapest possible offering that will comfortably accommodate our memory consumption. Since ECS is more flexible and allows scaling vCPU and RAM independently, it is possible that ECS could be cheaper than EC2 with careful adjustment of resources[1].
Notably both solutions are signifcantly cheaper than the current implementation, because the bulk of the cost comes from constantly running the two m5.xlarge EC2 instances for the ingestion servers.
Ease of infrastructure development¶
EC2 is also likelier the easier solution to implement, because it requires so few changes to the indexer-workers as they are currently configured. For ECS, less total infrastructure scaffolding is required, but it is a greater departure from the current setup.
Code maintainability¶
The ECS approach requires slightly less code, because it would allow us to completely remove all of the server logic needed in the EC2 instances. The Docker image need only contain the script for reindexing.
For the EC2 instances we would continue to serve a minimal API with the following endpoints:
healthcheckreindexing_task- Trigger the reindexing task for the given parameters (e.g., run the script)task/{task_id}- Get the status of the running task
That being said, the API needed for the indexer worker is simple and small. The vast majority of the complexity in the data refresh is in the ingestion server’s API, all of which would still be removed.
Supporting local testing & development¶
This is the most significant area in which EC2 is preferable to the ECS
approach. With the EC2 approach, assigning work to the workers requires starting
the worker instances and then triggering the reindexing task on each worker by
POSTing to its reindexing_task endpoint. Locally, we would simply POST to our
local Docker container rather than an EC2 instance. Minimal work is needed to
adapt the process to work in a local environment, and the DAG when run locally
is as close as possible to its production counterpart.
For the ECS approach, in production we must spin up ECS instances using
Airflow’s
EcsRunTaskOperator.
Simulating this in a local development environment without actually connecting
to AWS is less straightforward, and requires supporting a separate workflow for
local environments. For example we could have a branch operator that checks
the environment and proceeds to the EcsRunTaskOperator in production to spin up
ECS tasks, or a
DockerOperator
in local development to spin up a worker as a local Docker container.
This is still relatively complex, however, because the worker Docker containers must be launched from Airflow, which is itself dockerized. This requires either using a Docker-in-Docker approach, which should typically be avoided, or modifying the configuration to allow Airflow to launch sibling containers[2]. Requirements differ across operating systems, yet this must be tested and maintained across platforms because the data refresh is used as a critical step in setting up our local development environment.
Support for progress tracking¶
With the EC2 approach it is trivial to calculate the percentage_complete for
the reindexing task, and report to Airflow via the task status endpoint.
Airflow’s ECS Operators do not have out-of-the-box support for streaming the
Cloudwatch logs from the task while it is running, so we would have less insight
into the status of the task from Airflow.
Impact on Deployments¶
This is a great strength of both approaches, each of which would eliminate the need for deployments when changes are made to the data refresh code (including changes to the indexer workers).
Each time an ECS task is spun up on Fargate, it will pull the Docker image
configured in the task definition. By using the latest tag in this
configuration we can ensure that the latest indexer-worker image is pulled each
time, which means that any changes to the reindexing code will become live in
production as soon as the new docker image is published after merging on main.
The EC2 approach uses an ASG to achieve a similar result. Because the ASG will
actually terminate (rather than stop) the instances when their work is complete
and start new instances for each data refresh, we can pull the Docker image
with the latest tag in the user_data script so that the latest Docker image
is pulled each time a new data refresh starts. By using AWS Systems Manager
parameters instead of AMI IDs in the launch template, we can even use the ASG to
automatically use new AMI IDs without needing to deploy a new launch template
each time a system dependency is updated.
Conclusion¶
Both approaches allow us to eliminate the need for deployments in almost all cases, except for when changes are made to the task definition and launch template respectively.
The main advantage of the ECS approach is that it allows us to remove 100% of the server management code. However, it is more difficult to implement, likely more expensive, and requires a special secondary implementation for running locally that may be prone to cross-platform issues. The EC2 approach on the other hand is cheaper, quicker to implement, and gets us the vast majority of what we want in terms of removing complexity. Consequently I argue here that the EC2 approach is the one we should pursue.
Step-by-step plan¶
Because the data refresh process is critical, we will not remove the ingestion server or the existing data refresh DAGs until the new process has been performed successfully in staging and production. The new data refresh will be developed as separate DAGs alongside the current ones.
Create a new data refresh factory to generate new data refresh DAGs for each media type for staging and production, with the
<environment>_<media>_data_refreshDAG id. For simplicity of code review, these initial DAGs should only perform theCopy Data, andCreate Indexsteps, which we will perform entirely in Airflow.Create a new
catalog-indexer-workerin the Catalog, and build the new indexer worker image locally.Add the distributed reindexing step to the DAG (excludes infrastructure work).
Set up the necessary resources for the ASGs for staging and production in the catalog Terraform configuration.
Add all remaining steps to the data refresh DAGs:
Create and Populated Filtered Index,Reapply Constraints,Promote Table,Promote Index,Delete Index.Deploy the catalog and the new indexer-workers configuration.
Run the staging audio data refresh, followed by the staging image data refresh.
Once successful, run the production audio data refresh and image data refresh.
Update the
create_filtered_<media>_indexDAGs to remove use of the ingestion server.Drop the
recreate_full_staging_indexDAG, which can be removed entirely in favor of the staging data refresh.Update the
load_sample_data.shscript to run the DAG instead of using the ingestion server.Fully remove the ingestion server and related infrastructure.
There are few opportunities for multiple streams of work. Updating the
create_filtered_<media>_index DAGs can happen at any time.
Step details¶
While this may look daunting, it should be noted that with very few exceptions the work described below is refactoring of existing logic. Links to the source files are included for convenience.
Create the new data refresh DAG factory and move initial steps into Airflow¶
In this step, we’ll create a new data refresh DAG factory to generate data refresh DAGs for each existing media_type and environment. Currently these four will be generated:
staging_audio_data_refresh
staging_image_data_refresh
production_audio_data_refresh
production_image_data_refresh
Because the environment is added as a prefix, there will be no collision with
the existing DAG ids. In this initial step, we we will add only a small portion
of the logic in order to make the PR easier to review. The first steps are
already implemented in the current data refresh and can simply be copied:
Get the current record count from the target API table; this must be modified to take the
environmentas an argumentPerform concurrency checks on the other data refreshes and conflicting DAGs; this must be modified to include the now larger list of data refresh DAG ids
Get the name of the Elasticsearch index currently mapped to the
target_aliasGenerate the new index suffix
We will include new tasks to perform the initial few steps of the ingestion server’s work:
Copy Data: this should be a TaskGroup that will have multiple tasks for creating the FDW from the upstream DB to the downstream DB, running the copy_data query, and so on. It should fully replace the implementation of
refresh_api_tablein the ingestion server. All steps in this section are SQL queries that can be implemented using the existing PostgresHook and PGExecuteQueryOperator.Create Index: we can use our existing Elasticsearch tasks to create the new elasticsearch index with the index suffix generated in the previous task.
Implement new catalog indexer worker¶
In this step we will create the new catalog-indexer-worker Docker image. This step does not include adding the orchestration steps to the DAG, or the infrastructure work to actually create the ASGs.
First we will create a new indexer-worker directory under
catalog/dags/data_refresh, which will contain the contents of the new indexer
worker. This implementation already exists in the ingestion server. The
relevant pieces can be pulled out and refactored slightly to fit the new, much
smaller image. Broadly, this is the mapping of existing files to new files
needed:
api.pywill defined the API for the worker, and is refactored from the existingindexer_worker.py. It must be refactored to add task state and atask_statusendpoint, which takes atask_idand returns the status and progress of the given task.indexer.pywill contain the logic for the actual indexing task. It will be refactored from the existingindexer.py; specifically all we need is thereplicatefunction.elasticsearch_models.py, pulled from the file of the same name in the ingestion server. Defines a mapping from a database record to an Elasticsearch document.Utility files for helper functions for connecting to Elasticsearch and Postgres (e.g.
es_helpers.py)
The
Dockerfile
can be copied from the existing ingestion server. It should be updated to
reference the new file structure, and to expose only a single port, which should
be distinguished from the ports currently in use by the ingestion server (8001
and 8002). Other necessary files, including env.docker, .dockerignore,
Pipfile, and gunicorn.conf.py can all be copied in from the existing
ingestion server as well.
Finally we will update the monorepo’s root
docker-compose.yml
to add a new catalog-indexer-worker service. Its build context should point to
the nested data_refresh/indexer_worker directory, and it should map the
exposed port to enable the API to be reached by the catalog.
When this work is complete, it should be possible to run just catalog/shell
and curl the new indexer worker. The existing ingestion-server and
indexer-worker services are unaffected (it is still possible to run legacy data
refreshes locally and in production).
Implement distributed reindexing locally¶
In this step we will add tasks to the data refresh DAGs to orchestrate the
distributed reindex. At the end of this step, it will be possible to run a
distributed reindex locally, but because the infrastructure work to create the
ASGs is not complete, it can not be run on production yet. The following code
can all be refactored from
distributed_reindex_scheduler.py.
Use
describe_auto_scaling_groupsand filter by tags to select the appropriate ASG for the desired environment. (Skips in local env.)Use
set_desired_capacityto increase the desired capacity of the ASG to the desired number of workers, depending on the environment. This will cause the ASG to begin spinning up instances. (Skips in local env.)Use
describe_auto_scaling_groupsto poll the ASG until all instances have been started, and get the EC2 instance IDs. (Skips in local env.)Use dynamic task mapping to distribute reindexing across the indexer workers by first calculating
startandendindices that will split the records in the media table into even portions, depending on the number of workers available in the given environment. Then:POST to each worker’s
reindexing_taskendpoint thestart_indexandend_indexit should handleUse a Sensor to ping the worker’s
task/{task_id}endpoint until the task is complete, logging the progress as it goes
Use
terminate_instance_in_auto_scaling_groupto terminate the instance. Make sure to setShouldDecrementDesiredCapacitytoTrueto ensure that the ASG does not try to replace the instance. This task should use theNONE_SKIPPEDTriggerRule to ensure that the instances are terminated, even if there are upstream failures. (Skips in local env.)Finally, after all tasks have finished (regardless of success/failure), we should have a cleanup task that calls
set_desired_capacityto 0. Generally this should be a no-op, but if an instance crashes during reindexing (rather than simply failing during reindexing) the ASG will spin up a replacement and Airflow will not automatically clean it up. This task ensures that any dangling instances are terminated.
Note
It is not possible to retry a single indexer worker with this set up, because once a worker fails the instance is actually terminated (rather than simply stopped). If a task that triggers a reindex is cleared after an instance has been terminated, it will simply fail. The entire reindex must be restarted from the first step in this task group.
However, there is a valuable tradoff to this approach: it ensures that all of the indexer workers in a data refresh are identical, while still allowing us to avoid manual deployments every time the indexer logic changes. For example, imagine some changes to the reindexing logic are merged to main while a data refresh is actively underway, and a new Docker image is published. If one indexer worker failed, and it were possible to retry just that indexer worker, it would use the new Docker image – leading to inconsistency in the behavior of different workers within a single data refresh.
In future iterations this may also be solved by using AMIs with the Docker image baked in, and then preventing launch template version bumps while a data refresh is running.
Create the Terraform and Ansible resources needed to deploy the new indexer workers¶
In this step we will add the resources needed to actually configure the ASGs.
Some important notes:
Currently, the staging and production workers are split into separate environments (i.e., a staging deploy is used to deploy the 2 staging workers separately from the 6 production workers). It is more accurate to view all 8 workers as production instances (i.e., part of the production catalog deployment), which merely operate on different environments. As such all 8 should be part of the production deployment, but two separate ASGs which are given a “staging-indexer-worker-pool” and “production-indexer-worker-pool” tag, respectively, to indicate their intended environment.
The playbooks must be updated to check if any of the four new data refresh DAGs are running before deploying, as well.
The
user_datascript should be updated to pull the Docker image with thelatesttag.
Add remaining steps to the Data Refresh DAGs¶
The final steps can now be added to the DAGs:
Create and Populated Filtered Index: this should be implemented as a reusable TaskGroup. This work can already be implemented using our existing Elasticsearch tasks and replaces this function.Reapply ConstraintsandPromote Table: these SQL queries can be performed with the PostgresHook, and replaces this function.Promote IndexandDelete Index: these can be implemented using our existing Elasticsearch tasks, and replaces these functions.
Deploy the catalog and indexer workers¶
In this step we’ll actually deploy the new workers using the Ansible playbooks. The existing ingestion server and indexer workers remain untouched, so the legacy data refresh can continue to run.
Run the data refreshes¶
The next step is to run the staging_<x> data refreshes, which act on the
staging API DB and Elasticsearch cluster and are therefore lower risk. This will
be the first test of the new process. Once successful, we can run the
production_<x> refreshes.
Update create_filtered_<media>_index DAGs¶
These DAGs currently use the ingestion server to perform the
create_filtered_index steps in isolation. We can update these to use the
reusable TaskGroup implemented in an earlier step.
Remove the recreate_full_staging_index DAG¶
This DAG performs the second half of a data refresh (the distributed reindex and
promotion) in staging. Once the data refresh has been moved to Airflow, we can
add a
DAG param
to the data refresh that allows skipping the initial Copy Data steps. This DAG
will no longer be necessary and can then be deleted.
Update the load_sample_data scripts¶
Our load_sample_data.sh scripts currently use just commands to
run parts of the data refresh
on the local ingestion server, as part of setting up the local development
environment. We should update the scripts to instead use the Airflow CLI to
unpause the two data refresh DAGs and await their completion.
Remove the ingestion server¶
Once the new data refresh has been run successfully in production, we can finally remove all ingestion-server code from the monorepo and the infrastructure repo, and completely remove the associated EC2 instances.
Infrastructure¶
Infrastructure is a large part of this project. We will be adding 8 new EC2 instances but deprovisioning ten, as described above. There will be significant cost benefits to removing the two EC2 instances which are constantly running.
Tools & packages¶
No new tools or packages are required.
Other projects or work¶
Deploying Airflow with Ansible, part of the Move Airflow to openverse.org Project.
Catalog Data Cleaning project, which removes the Clean Data steps from the data refresh
Alternatives¶
The alternative options of using an ECS approach or performing the reindex entirely in Airflow are discussed at length in the Approach to the Distributed Reindex section.
It is also possible to use EC2 instances but manage them directly in Airflow using EC2 operators to start and stop the instances as needed. However, more infrastructure work is required in this approach, and we would require deployments whenever there are code changes in the indexer workers.
Blockers¶
Development can start immediately, but the infrastructure components of this project are blocked by completion of the effort to move Airflow to openverse.org, specifically this issue to run a (legacy) data refresh on the new instance.
Rollback¶
We will not remove the ingestion server or the existing data refresh DAGs until the new DAGs have been run successfully against both staging and production. We can rollback at any time by simply removing the new instances and DAGs.
Risks¶
There is minimal risk, as we will be able to test against production data in staging before running against production. However the data refresh does completely replace the production media tables and Elasticsearch indices, so there is always inherent risk that production data could be lost or malformed if something goes wrong. We will ensure production backups are available and monitor the first production data refreshes closely.