2023-05-18 Implementation Plan: Rapid iteration of ingestion server index configuration#

Author: @aetherunbound

Reviewers#

Overview#

This document describes a DAG (and associated dependencies) which can be used to generate a new index in Elasticsearch without requiring a data refresh, ingestion server deployment, or any interaction with the ingestion server.

Expected Outcomes#

Once the implementation is complete, a maintainer should be able to run this DAG to create a new index on either elasticsearch environment with an altered configuration. The maintainer should only need to specify the pieces of the configuration that are different from the default configuration, although they can supply a full configuration if they wish. The DAG will report when it is complete and what the name of the new index is.

Dependencies#

The following are prerequisites for the DAG:

Elasticsearch provider#

The elasticsearch provider will need to be added to the list of dependencies for our catalog Docker image. This will be done first and deployed on its own, as the connections will be needed for the rest of the work.

In order to add this provider, the production requirements will need to be changed from apache-airflow[amazon,postgres,http] to apache-airflow[amazon,postgres,http,elasticsearch].

Airflow connections#

Once the provider is installed, Airflow Connections will need to be created for each Elasticsearch cluster. The connections will be named elasticsearch_production and elasticsearch_staging and reference each respective environment. These will be created by hand in the Airflow UI. For local development, defaults will be added to the env.template file.

Infrastructure#

No infrastructure changes should be necessary for this implementation, beyond the addition of the elasticsearch provider package and the Airflow connections described above.

Other projects or work#

This work does not depend on any other projects and can be implemented at any time. In order to be utilized however, a query parameter on the API will need to be used for determining which index to query. This has been implemented in #2073.

Outlined Steps#

Once the prerequisites above have been filled, the DAGs can be created. The DAGs will be named create_new_es_index_{environment}, and will have the parameters and steps described below. A dynamic DAG generation function will be used to create one DAG per environment (staging and production). It will have a schedule of None so that it is only run when triggered. It should also have max_active_runs set to 1 so that only one instance of the DAG can be running at a time.

Parameters#

  1. media_type: The media type for which the index is being created. Presently this would only be image or audio.

  2. index_config: A JSON object containing the configuration for the new index. The values in this object will be merged with the existing configuration, where the value specified at a leaf key in the object will override the existing value (see Merging policy below). This can also be the entire index configuration, in which case the existing configuration will be replaced entirely (see override_config parameter below).

  3. index_suffix: (Optional) The name suffix of the new index to create. This will be a string, and will be used to name the index in Elasticsearch of the form {media_type}-{index_suffix}. If not provided, the suffix will be a timestamp of the form YYYYMMDDHHMMSS.

  4. source_index: (Optional) The existing index on Elasticsearch to use as the basis for the new index. If not provided, the index aliased to media_type will be used (e.g. image for the image media type). In production, the data refresh process creates the index and aliases it to the media type. In staging, the process for creating the indices which can be iterated on here will be defined in a follow-up IP, #1987.

  5. override_config: (Optional) A boolean value which can be toggled to replace the existing index configuration entirely with the new configuration. If True, the index_config parameter will be used as the entire configuration. If False, the index_config parameter will be merged with the existing configuration. Defaults to False.

  6. query: (Optional) An Elasticsearch query to use to filter the documents to be copied to the new index. If not provided, all documents will be copied. See the reindex API endpoint for the request body and create_and_populate_filtered_index on the ingestion server for an example of how this is used.

DAG#

Once the parameters are provided, the DAG will execute the following steps:

  1. (For production only) Check that there are no other reindexing jobs currently underway. This can be done by checking whether the {media_type}_data_refresh and create_filtered_{media_type}_index DAGs are currently running. If either of them are, this DAG should fail immediately with an appropriate error message. Note: We will also need to add the create_new_es_index_production DAG to the checks prior to running the data refresh (where create_filtered_{media_type}_index is currently checked) in the image_data_refresh and audio_data_refresh DAGs. This will prevent the data refresh from running concurrently while the new index is being created.

  2. Create the index configuration. If override_config is supplied, the index_config parameter will be used as the entire configuration. If not, the index_config parameter will be merged with the existing index settings. See Merging policy for how this merge will be performed. Below are the steps for gathering the current index settings:

    1. Get the current index information. This will be done using the ElasticsearchPythonHook, specifically the indices.get function.

    2. Extract the relevant settings from the response. They come back in a form that differs from what is required by the indices.create function of the Elasticsearch Python API. The indices.get function returns data of the form:

      {
        "<index-name>": {
          "settings": {
            "index": {
              "number_of_shards": "18",
              "number_of_replicas": 0,
              "refresh_interval": "-1",
              ...
              "analysis": {
                ...
              }
            }
          },
          "mappings": {...}
        }
      }
      

      However, the indices.create function expects data of the form:

      {
        "settings": {
          "index": {
            "number_of_shards": 18,
            "number_of_replicas": 0,
            "refresh_interval": "-1"
          },
          "analysis": {...},
        },
        "mapping": {...}
      }
      

      Specifically note that the returned settings.index.analysis configuration section needs to be moved to settings.analysis instead, and that most of the other index settings (besides those denoted explicitly here, e.g. number_of_shards, number_of_replicas, and refresh_interval) can be discarded. The returned information will need to be extracted and converted into the form expected by indices.create before it is merged with the new configuration.

  3. Create the new index with the configuration generated in step 1 using the ElasticsearchPythonHook’s indices.create function. The index name will either be a combination of the media_type and index_suffix parameters ({media_type}-{index_suffix}, e.g. image-my-special-suffix), or the media_type and a timestamp ({media_type}-{timestamp}, e.g. image-20200102030405). (example from the existing ingestion server code).

  4. Initiate a reindex using the source_index as the source and the reindex function of the Elasticsearch Python API (example from the ingestion server). Rather than setting wait_for_completion=True on the ES Python API reindex call and having this step halt until the reindex is complete, this step will set wait_for_completion=False and continue to the next step. The task ID returned from Elasticsearch for the reindex operation will be stored in an XCom variable. This step will have refresh=True on the function call to ensure the reindex will immediately refresh after completion to make the data available to searches. It will also need slices='auto' so the indexing is parallelized.

  5. Use a sensor and the previously emitted task ID to wait for the reindex to complete. This will check the task using the Task API. This could be done either using a subsequent wait_for_completion call within a simple PythonSensor using the Elasticsearch hook with a low timeout which is polled repeatedly, or by using the status value of the response to determine if polling should continue. Elasticsearch documentation notes that status may not always be available, so we’d need to determine if the reindex task returns a status field first.

  6. Once the reindex & refresh are complete, report that the new index is ready via Slack. This message should include both the index name and the Elasticsearch environment.

Merging policy#

The configuration should be merged such that the leaf key overwrites the entire value present in the default configuration at that key. For example, if the default configuration is:

{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1
    }
  }
}

An override value of {"settings": {"index": {"number_of_shards": 2}}} will overwrite the number_of_shards value, but leave the number_of_replicas value unchanged. The resulting configuration will be:

{
  "settings": {
    "index": {
      "number_of_shards": 2,
      "number_of_replicas": 1
    }
  }
}

This will be a naive merge, and list values will not be appended automatically. For instance:

{
  "analysis": {
    "filter": {
      "stem_overrides": {
        "type": "stemmer_override",
        "rules": [
          "animals => animal",
          "animal => animal",
          "anime => anime",
          "animate => animate",
          "animated => animate",
          "universe => universe"
        ]
      }
    }
  }
}

with an override of

{
  "analysis": {
    "filter": {
      "stem_overrides": {
        "rules": ["crim => cribble"]
      }
    }
  }
}

will result in

{
  "analysis": {
    "filter": {
      "stem_overrides": {
        "type": "stemmer_override",
        "rules": ["crim => cribble"]
      }
    }
  }
}

The jsonmerge Python library provides a good example of the naive approach, although we will want to avoid adding an extra dependency for this step is possible.

Alternatives#

Using the ingestion server#

The first idea for this approach was to use the ingestion server directly, and alter the es_mappings.py file either dynamically or in a mechanism similar to the dag-sync script on the catalog. This might have been made easier by a move of this service from EC2 to ECS. This approach was abandoned when @sarayourfriend suggested interacting with Elasticsearch directly.

Design#

No new design work is required for this plan.

Parallelizable streams#

The addition of the Elasticsearch provider is a blocker for the rest of the work described here; all work must be done serially.

Blockers#

There are no external blockers to this project.

API version changes#

No API version changes should be necessary.

Accessibility#

No accessibility concerns are expected.

Rollback#

Rollback would likely involve deleting the DAG. The provider dependency and Elasticsearch connections are likely to be useful in the future, so it may behoove us to keep them even if the DAG is deleted.

Risks#

This approach does allow for maintainers to affect production resources, so the same care should be taken when triggering this DAG as with triggering a deployment. The described DAG will only ever create new indices, and so there is no risk losing or affecting the production indices. The DAG will also fail if any other DAGs which perform a reindex are running, which will prevent saturating Elasticsearch resources in production.

Prior art#