2023-05-18 Implementation Plan: Rapid iteration of ingestion server index configuration¶
Author: @aetherunbound
Reviewers¶
[x] @sarayourfriend
[x] @stacimc
[x] @krysal
Project links¶
Overview¶
This document describes a DAG (and associated dependencies) which can be used to generate a new index in Elasticsearch without requiring a data refresh, ingestion server deployment, or any interaction with the ingestion server.
Expected Outcomes¶
Once the implementation is complete, a maintainer should be able to run this DAG to create a new index on either elasticsearch environment with an altered configuration. The maintainer should only need to specify the pieces of the configuration that are different from the default configuration, although they can supply a full configuration if they wish. The DAG will report when it is complete and what the name of the new index is.
Dependencies¶
The following are prerequisites for the DAG:
Installation of the
elasticsearchAirflow provider packageAirflow Connections for each Elasticsearch cluster
Elasticsearch provider¶
The
elasticsearch provider
will need to be added to the list of dependencies for our catalog Docker image.
This will be done first and deployed on its own, as the connections will be
needed for the rest of the work.
In order to add this provider, the
production requirements
will need to be changed from apache-airflow[amazon,postgres,http] to
apache-airflow[amazon,postgres,http,elasticsearch].
Airflow connections¶
Once the provider is installed,
Airflow Connections
will need to be created for each Elasticsearch cluster. The connections will be
named elasticsearch_production and elasticsearch_staging and reference each
respective environment. These will be created by hand in the Airflow UI. For
local development, defaults will be added to the
env.template file.
Infrastructure¶
No infrastructure changes should be necessary for this implementation, beyond
the addition of the elasticsearch provider package and the Airflow connections
described above.
Other projects or work¶
This work does not depend on any other projects and can be implemented at any time. In order to be utilized however, a query parameter on the API will need to be used for determining which index to query. This has been implemented in #2073.
Outlined Steps¶
Once the prerequisites above have been filled, the DAGs can be created. The DAGs
will be named create_new_es_index_{environment}, and will have the parameters
and steps described below. A dynamic DAG generation function will be used to
create one DAG per environment (staging and production). It will have a
schedule of None so that it is only run when triggered. It should also have
max_active_runs set to 1 so that only one instance of the DAG can be running
at a time.
Parameters¶
media_type: The media type for which the index is being created. Presently this would only beimageoraudio.index_config: A JSON object containing the configuration for the new index. The values in this object will be merged with the existing configuration, where the value specified at a leaf key in the object will override the existing value (see Merging policy below). This can also be the entire index configuration, in which case the existing configuration will be replaced entirely (seeoverride_configparameter below).index_suffix: (Optional) The name suffix of the new index to create. This will be a string, and will be used to name the index in Elasticsearch of the form{media_type}-{index_suffix}. If not provided, the suffix will be a timestamp of the formYYYYMMDDHHMMSS.source_index: (Optional) The existing index on Elasticsearch to use as the basis for the new index. If not provided, the index aliased tomedia_typewill be used (e.g.imagefor theimagemedia type). In production, the data refresh process creates the index and aliases it to the media type. In staging, the process for creating the indices which can be iterated on here will be defined in a follow-up IP, #1987.override_config: (Optional) A boolean value which can be toggled to replace the existing index configuration entirely with the new configuration. IfTrue, theindex_configparameter will be used as the entire configuration. IfFalse, theindex_configparameter will be merged with the existing configuration. Defaults toFalse.query: (Optional) An Elasticsearch query to use to filter the documents to be copied to the new index. If not provided, all documents will be copied. See the reindex API endpoint for the request body andcreate_and_populate_filtered_indexon the ingestion server for an example of how this is used.
DAG¶
Once the parameters are provided, the DAG will execute the following steps:
(For
productiononly) Check that there are no other reindexing jobs currently underway. This can be done by checking whether the{media_type}_data_refreshandcreate_filtered_{media_type}_indexDAGs are currently running. If either of them are, this DAG should fail immediately with an appropriate error message. Note: We will also need to add thecreate_new_es_index_productionDAG to the checks prior to running the data refresh (wherecreate_filtered_{media_type}_indexis currently checked) in theimage_data_refreshandaudio_data_refreshDAGs. This will prevent the data refresh from running concurrently while the new index is being created.Create the index configuration. If
override_configis supplied, theindex_configparameter will be used as the entire configuration. If not, theindex_configparameter will be merged with the existing index settings. See Merging policy for how this merge will be performed. Below are the steps for gathering the current index settings:Get the current index information. This will be done using the
ElasticsearchPythonHook, specifically theindices.getfunction.Extract the relevant settings from the response. They come back in a form that differs from what is required by the
indices.createfunction of the Elasticsearch Python API. Theindices.getfunction returns data of the form:{ "<index-name>": { "settings": { "index": { "number_of_shards": "18", "number_of_replicas": 0, "refresh_interval": "-1", ... "analysis": { ... } } }, "mappings": {...} } }
However, the
indices.createfunction expects data of the form:{ "settings": { "index": { "number_of_shards": 18, "number_of_replicas": 0, "refresh_interval": "-1" }, "analysis": {...}, }, "mapping": {...} }
Specifically note that the returned
settings.index.analysisconfiguration section needs to be moved tosettings.analysisinstead, and that most of the other index settings (besides those denoted explicitly here, e.g.number_of_shards,number_of_replicas, andrefresh_interval) can be discarded. The returned information will need to be extracted and converted into the form expected byindices.createbefore it is merged with the new configuration.
Create the new index with the configuration generated in step 1 using the
ElasticsearchPythonHook’sindices.createfunction. The index name will either be a combination of themedia_typeandindex_suffixparameters ({media_type}-{index_suffix}, e.g.image-my-special-suffix), or themedia_typeand a timestamp ({media_type}-{timestamp}, e.g.image-20200102030405). (example from the existing ingestion server code).Initiate a reindex using the
source_indexas the source and thereindexfunction of the Elasticsearch Python API (example from the ingestion server). Rather than settingwait_for_completion=Trueon the ES Python API reindex call and having this step halt until the reindex is complete, this step will setwait_for_completion=Falseand continue to the next step. The task ID returned from Elasticsearch for the reindex operation will be stored in an XCom variable. This step will haverefresh=Trueon the function call to ensure the reindex will immediately refresh after completion to make the data available to searches. It will also needslices='auto'so the indexing is parallelized.Use a sensor and the previously emitted task ID to wait for the reindex to complete. This will check the task using the Task API. This could be done either using a subsequent
wait_for_completioncall within a simplePythonSensorusing the Elasticsearch hook with a low timeout which is polled repeatedly, or by using thestatusvalue of the response to determine if polling should continue. Elasticsearch documentation notes thatstatusmay not always be available, so we’d need to determine if the reindex task returns astatusfield first.Once the reindex & refresh are complete, report that the new index is ready via Slack. This message should include both the index name and the Elasticsearch environment.
Merging policy¶
The configuration should be merged such that the leaf key overwrites the entire value present in the default configuration at that key. For example, if the default configuration is:
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}
}
An override value of {"settings": {"index": {"number_of_shards": 2}}} will
overwrite the number_of_shards value, but leave the number_of_replicas value
unchanged. The resulting configuration will be:
{
"settings": {
"index": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
}
This will be a naive merge, and list values will not be appended automatically. For instance:
{
"analysis": {
"filter": {
"stem_overrides": {
"type": "stemmer_override",
"rules": [
"animals => animal",
"animal => animal",
"anime => anime",
"animate => animate",
"animated => animate",
"universe => universe"
]
}
}
}
}
with an override of
{
"analysis": {
"filter": {
"stem_overrides": {
"rules": ["crim => cribble"]
}
}
}
}
will result in
{
"analysis": {
"filter": {
"stem_overrides": {
"type": "stemmer_override",
"rules": ["crim => cribble"]
}
}
}
}
The jsonmerge Python library provides a
good example of the naive approach, although we will want to avoid adding an
extra dependency for this step is possible.
Alternatives¶
Using the ingestion server¶
The first idea for this approach was to use the ingestion server directly, and
alter the es_mappings.py file either dynamically or in a mechanism similar to
the dag-sync script on the catalog.
This might have been made easier by a move of this service from EC2 to ECS. This
approach was abandoned when @sarayourfriend suggested
interacting with Elasticsearch directly.
Design¶
No new design work is required for this plan.
Parallelizable streams¶
The addition of the Elasticsearch provider is a blocker for the rest of the work described here; all work must be done serially.
Blockers¶
There are no external blockers to this project.
API version changes¶
No API version changes should be necessary.
Accessibility¶
No accessibility concerns are expected.
Rollback¶
Rollback would likely involve deleting the DAG. The provider dependency and Elasticsearch connections are likely to be useful in the future, so it may behoove us to keep them even if the DAG is deleted.
Risks¶
This approach does allow for maintainers to affect production resources, so the same care should be taken when triggering this DAG as with triggering a deployment. The described DAG will only ever create new indices, and so there is no risk losing or affecting the production indices. The DAG will also fail if any other DAGs which perform a reindex are running, which will prevent saturating Elasticsearch resources in production.