2023-05-18 Implementation Plan: Rapid iteration of ingestion server index configuration#
Author: @aetherunbound
Reviewers#
[x] @sarayourfriend
[x] @stacimc
[x] @krysal
Project links#
Overview#
This document describes a DAG (and associated dependencies) which can be used to generate a new index in Elasticsearch without requiring a data refresh, ingestion server deployment, or any interaction with the ingestion server.
Expected Outcomes#
Once the implementation is complete, a maintainer should be able to run this DAG to create a new index on either elasticsearch environment with an altered configuration. The maintainer should only need to specify the pieces of the configuration that are different from the default configuration, although they can supply a full configuration if they wish. The DAG will report when it is complete and what the name of the new index is.
Dependencies#
The following are prerequisites for the DAG:
Installation of the
elasticsearch
Airflow provider packageAirflow Connections for each Elasticsearch cluster
Elasticsearch provider#
The
elasticsearch
provider
will need to be added to the list of dependencies for our catalog Docker image.
This will be done first and deployed on its own, as the connections will be
needed for the rest of the work.
In order to add this provider, the
production requirements
will need to be changed from apache-airflow[amazon,postgres,http]
to
apache-airflow[amazon,postgres,http,elasticsearch]
.
Airflow connections#
Once the provider is installed,
Airflow Connections
will need to be created for each Elasticsearch cluster. The connections will be
named elasticsearch_production
and elasticsearch_staging
and reference each
respective environment. These will be created by hand in the Airflow UI. For
local development, defaults will be added to the
env.template
file.
Infrastructure#
No infrastructure changes should be necessary for this implementation, beyond
the addition of the elasticsearch
provider package and the Airflow connections
described above.
Other projects or work#
This work does not depend on any other projects and can be implemented at any time. In order to be utilized however, a query parameter on the API will need to be used for determining which index to query. This has been implemented in #2073.
Outlined Steps#
Once the prerequisites above have been filled, the DAGs can be created. The DAGs
will be named create_new_es_index_{environment}
, and will have the parameters
and steps described below. A dynamic DAG generation function will be used to
create one DAG per environment (staging
and production
). It will have a
schedule of None
so that it is only run when triggered. It should also have
max_active_runs
set to 1
so that only one instance of the DAG can be running
at a time.
Parameters#
media_type
: The media type for which the index is being created. Presently this would only beimage
oraudio
.index_config
: A JSON object containing the configuration for the new index. The values in this object will be merged with the existing configuration, where the value specified at a leaf key in the object will override the existing value (see Merging policy below). This can also be the entire index configuration, in which case the existing configuration will be replaced entirely (seeoverride_config
parameter below).index_suffix
: (Optional) The name suffix of the new index to create. This will be a string, and will be used to name the index in Elasticsearch of the form{media_type}-{index_suffix}
. If not provided, the suffix will be a timestamp of the formYYYYMMDDHHMMSS
.source_index
: (Optional) The existing index on Elasticsearch to use as the basis for the new index. If not provided, the index aliased tomedia_type
will be used (e.g.image
for theimage
media type). In production, the data refresh process creates the index and aliases it to the media type. In staging, the process for creating the indices which can be iterated on here will be defined in a follow-up IP, #1987.override_config
: (Optional) A boolean value which can be toggled to replace the existing index configuration entirely with the new configuration. IfTrue
, theindex_config
parameter will be used as the entire configuration. IfFalse
, theindex_config
parameter will be merged with the existing configuration. Defaults toFalse
.query
: (Optional) An Elasticsearch query to use to filter the documents to be copied to the new index. If not provided, all documents will be copied. See the reindex API endpoint for the request body andcreate_and_populate_filtered_index
on the ingestion server for an example of how this is used.
DAG#
Once the parameters are provided, the DAG will execute the following steps:
(For
production
only) Check that there are no other reindexing jobs currently underway. This can be done by checking whether the{media_type}_data_refresh
andcreate_filtered_{media_type}_index
DAGs are currently running. If either of them are, this DAG should fail immediately with an appropriate error message. Note: We will also need to add thecreate_new_es_index_production
DAG to the checks prior to running the data refresh (wherecreate_filtered_{media_type}_index
is currently checked) in theimage_data_refresh
andaudio_data_refresh
DAGs. This will prevent the data refresh from running concurrently while the new index is being created.Create the index configuration. If
override_config
is supplied, theindex_config
parameter will be used as the entire configuration. If not, theindex_config
parameter will be merged with the existing index settings. See Merging policy for how this merge will be performed. Below are the steps for gathering the current index settings:Get the current index information. This will be done using the
ElasticsearchPythonHook
, specifically theindices.get
function.Extract the relevant settings from the response. They come back in a form that differs from what is required by the
indices.create
function of the Elasticsearch Python API. Theindices.get
function returns data of the form:{ "<index-name>": { "settings": { "index": { "number_of_shards": "18", "number_of_replicas": 0, "refresh_interval": "-1", ... "analysis": { ... } } }, "mappings": {...} } }
However, the
indices.create
function expects data of the form:{ "settings": { "index": { "number_of_shards": 18, "number_of_replicas": 0, "refresh_interval": "-1" }, "analysis": {...}, }, "mapping": {...} }
Specifically note that the returned
settings.index.analysis
configuration section needs to be moved tosettings.analysis
instead, and that most of the other index settings (besides those denoted explicitly here, e.g.number_of_shards
,number_of_replicas
, andrefresh_interval
) can be discarded. The returned information will need to be extracted and converted into the form expected byindices.create
before it is merged with the new configuration.
Create the new index with the configuration generated in step 1 using the
ElasticsearchPythonHook
’sindices.create
function. The index name will either be a combination of themedia_type
andindex_suffix
parameters ({media_type}-{index_suffix}
, e.g.image-my-special-suffix
), or themedia_type
and a timestamp ({media_type}-{timestamp}
, e.g.image-20200102030405
). (example from the existing ingestion server code).Initiate a reindex using the
source_index
as the source and thereindex
function of the Elasticsearch Python API (example from the ingestion server). Rather than settingwait_for_completion=True
on the ES Python API reindex call and having this step halt until the reindex is complete, this step will setwait_for_completion=False
and continue to the next step. The task ID returned from Elasticsearch for the reindex operation will be stored in an XCom variable. This step will haverefresh=True
on the function call to ensure the reindex will immediately refresh after completion to make the data available to searches. It will also needslices='auto'
so the indexing is parallelized.Use a sensor and the previously emitted task ID to wait for the reindex to complete. This will check the task using the Task API. This could be done either using a subsequent
wait_for_completion
call within a simplePythonSensor
using the Elasticsearch hook with a low timeout which is polled repeatedly, or by using thestatus
value of the response to determine if polling should continue. Elasticsearch documentation notes thatstatus
may not always be available, so we’d need to determine if the reindex task returns astatus
field first.Once the reindex & refresh are complete, report that the new index is ready via Slack. This message should include both the index name and the Elasticsearch environment.
Merging policy#
The configuration should be merged such that the leaf key overwrites the entire value present in the default configuration at that key. For example, if the default configuration is:
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}
}
An override value of {"settings": {"index": {"number_of_shards": 2}}}
will
overwrite the number_of_shards
value, but leave the number_of_replicas
value
unchanged. The resulting configuration will be:
{
"settings": {
"index": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
}
This will be a naive merge, and list values will not be appended automatically. For instance:
{
"analysis": {
"filter": {
"stem_overrides": {
"type": "stemmer_override",
"rules": [
"animals => animal",
"animal => animal",
"anime => anime",
"animate => animate",
"animated => animate",
"universe => universe"
]
}
}
}
}
with an override of
{
"analysis": {
"filter": {
"stem_overrides": {
"rules": ["crim => cribble"]
}
}
}
}
will result in
{
"analysis": {
"filter": {
"stem_overrides": {
"type": "stemmer_override",
"rules": ["crim => cribble"]
}
}
}
}
The jsonmerge
Python library provides a
good example of the naive approach, although we will want to avoid adding an
extra dependency for this step is possible.
Alternatives#
Using the ingestion server#
The first idea for this approach was to use the ingestion server directly, and
alter the es_mappings.py
file either dynamically or in a mechanism similar to
the dag-sync script on the catalog.
This might have been made easier by a move of this service from EC2 to ECS. This
approach was abandoned when @sarayourfriend suggested
interacting with Elasticsearch directly.
Design#
No new design work is required for this plan.
Parallelizable streams#
The addition of the Elasticsearch provider is a blocker for the rest of the work described here; all work must be done serially.
Blockers#
There are no external blockers to this project.
API version changes#
No API version changes should be necessary.
Accessibility#
No accessibility concerns are expected.
Rollback#
Rollback would likely involve deleting the DAG. The provider dependency and Elasticsearch connections are likely to be useful in the future, so it may behoove us to keep them even if the DAG is deleted.
Risks#
This approach does allow for maintainers to affect production resources, so the same care should be taken when triggering this DAG as with triggering a deployment. The described DAG will only ever create new indices, and so there is no risk losing or affecting the production indices. The DAG will also fail if any other DAGs which perform a reindex are running, which will prevent saturating Elasticsearch resources in production.