Given a query and some optional parameters, es_search
gets results
from HTTP requests to Elasticsearch and returns a data.table
representation of those results.
Usage
es_search(
es_host,
es_index,
size = 10000,
query_body = "{}",
scroll = "5m",
max_hits = Inf,
n_cores = ceiling(parallel::detectCores()/2),
break_on_duplicates = TRUE,
ignore_scroll_restriction = FALSE,
intermediates_dir = getwd()
)
Arguments
- es_host
A string identifying an Elasticsearch host. This should be of the form
[transfer_protocol][hostname]:[port]
. For example,'http://myindex.thing.com:9200'
.- es_index
The name of an Elasticsearch index to be queried. Note that passing
NULL
is not supported. Technically, not passing an index to Elasticsearch is legal and results in searching over all indexes. To be sure that this very expensive query is not executed by accident, uptasticsearch forbids this. If you want to execute a query over all indexes in the cluster, set this argument to"_all"
.- size
Number of records per page of results. See Elasticsearch docs for more. Note that this will be reset to 0 if you submit a
query_body
with an "aggs" request in it. Also seemax_hits
.- query_body
String with a valid Elasticsearch query. Default is an empty query.
- scroll
How long should the scroll context be held open? This should be a duration string like "1m" (for one minute) or "15s" (for 15 seconds). The scroll context will be refreshed every time you ask Elasticsearch for another record, so this parameter should just be the amount of time you expect to pass between requests. See the Elasticsearch scroll/pagination docs for more information.
- max_hits
Integer. If specified,
es_search
will stop pulling data as soon as it has pulled this many hits. Default isInf
, meaning that all possible hits will be pulled.- n_cores
Number of cores to distribute fetching and processing over.
- break_on_duplicates
Boolean, defaults to TRUE.
es_search
uses the size of the final object it returns to check whether or not some data were lost during the processing. If you have duplicates in the source data, you will have to set this flag to FALSE and just trust that no data have been lost. Sorry :( .- ignore_scroll_restriction
There is a cost associated with keeping an Elasticsearch scroll context open. By default, this function does not allow arguments to
scroll
which exceed one hour. This is done to prevent costly mistakes made by novice Elasticsearch users. If you understand the cost of keeping the context open for a long time and would like to pass ascroll
value longer than an hour, setignore_scroll_restriction
toTRUE
.- intermediates_dir
When scrolling over search results, this function writes intermediate results to disk. By default, `es_search` will create a temporary directory in whatever working directory the function is called from. If you want to change this behavior, provide a path here. `es_search` will create and write to a temporary directory under whatever path you provide.
Examples
if (FALSE) { # \dontrun{
###=== Example 1: Get low-scoring food survey results ===###
query_body <- '{"query":{"filtered":{"filter":{"bool":{"must":[
{"exists":{"field":"customer_comments"}},
{"terms":{"overall_satisfaction":["very low","low"]}}]}}},
"query":{"match_phrase":{"customer_comments":"food"}}}}'
# Execute the query, parse into a data.table
commentDT <- es_search(es_host = 'http://mydb.mycompany.com:9200'
, es_index = "survey_results"
, query_body = query_body
, scroll = "1m"
, n_cores = 4)
###=== Example 2: Time series agg features ===###
# Create query that will give you daily summary stats for revenue
query_body <- '{"query":{"filtered":{"filter":{"bool":{"must":[
{"exists":{"field":"pmt_amount"}}]}}}},
"aggs":{"timestamp":{"date_histogram":{"field":"timestamp","interval":"day"},
"aggs":{"revenue":{"extended_stats":{"field":"pmt_amount"}}}}},"size":0}'
# Execute the query and get the result
resultDT <- es_search(es_host = "http://es.custdb.mycompany.com:9200"
, es_index = 'ticket_sales'
, query_body = query_body)
} # }