[DajSięPoznać#7] Elasticsearch Aggregations

Wstęp

Po zdobyciu danych i wrzuceniu ich do Elasticsearcha można wykonać na nich sporo ciekawych operacji statystycznych przy użyciu mechanizmu agregacji Elasticsearcha. Która dzielnica ma najwięcej ogłoszeń ? Gdzie ceny za metr są najwyższe ? Ile ogłoszeń ma cenę powyżej pół miliona zł ?

F# i serializacja do JSON

F# przy aplikacjach webowych może sprawić pewne problemy, ponieważ niektóre specyficzne dla niego konstrukcje językowe są serializowane do JSON-a w sposób nieintuicyjny. Przykładowo Option<bool> po serializacji zwraca obiekt z zagnieżdżoną tablicą wartości, a przecież oczekujemy czegoś innego.

W obliczu takich problemów warto doinstalować nugetem Newtonsoft.Json.FSharp, który rozwiązuje popularne problemy serializacji obiektów F#.

ns

Przykładowo rozwiązanie problemu typów opcjonalnych jest następujące:

JsonConvert.SerializeObject(advert, [| OptionConverter() :> JsonConverter |])

 

Agregacje w Elasticsearch

Elasticsearch przyjmuje dane w postaci JSONów, budując w oparciu o nie indeksy. W związu z tym, oprócz zaawansowanych możliwości filtrowania (z przeszukiwaniem pełnotekstowym na czele), mamy też możliwość agregowania takich danych, analogicznie do operacji GROUP BY z SQL.

W uproszczeniu pipeline agregacji wygląda następująco:

Untitled Diagram

gdzie zapytanie filtruje wyniki, bucketing ma za zasadnie pogrupowanie danych (np. według klucza, według zakresów itd.), a operacje metryczne zwracają statystyki (avg, min, max itd) dla każdego bucketu. Co ciekawe zapytania agregujące można wykonywać równolegle z zapytaniami filtrującymi (mniejsza ilość round-tripów do serwera ES), dlatego w przykładach wszędzie ustawiamy size na 0 –  dotyczy wyników wyszukiwania.

Bucketing z metryką

Przykładem zapytania grupującego jest zebranie statystyk cen za metr kwadratowy dla każdej dzielnicy.

{
   "size":0,
   "aggs":{
      "group_by_district":{
         "terms":{
            "field":"District"
         },
         "aggs":{
            "price_stats":{
               "stats":{
                  "field":"PricePerMeter"
               }
            }
         }
      }
   }
}

Zapytanie takie najpierw pogrupuje dane względem dzielnic, a następnie wykona podzapytanie agregujące dla każdej dzielnicy. Operacja stats wykona podstawowe operacje statystyczne. Wynik operacji będzie następujący:

tests

Jeżeli nie wykonamy podzapytania, to bucketing poprzestanie na zliczeniu dokumentów w każdej grupie.

Bucketing na podstawie zakresów

{
   "size":0,
    "aggs" : {
        "price_ranges" : {
            "range" : {
                "field" : "PricePerMeter",
                "ranges" : [
                    { "to" : 5000 },
                    { "from" : 5000, "to" : 7000 },
                    { "from" : 7000, "to" : 10000 },
                    { "from" : 10000}
                ]
            }
        }
    }
}

 

Kontroler WebAPI do agregacji

Metoda będzie przyjmowała komplet parametrów potrzebnych do zbudowania agregacji i na ich podstawie podjęta zostanie decyzja co do typu agregacji.

member x.Get(field: string, [<FromUri>]query:QueryModel) : IHttpActionResult =   
             
    let client = GetElasticClient()
    let searchRequest = new SearchRequest()
    let dictionary = new Dictionary<string, IAggregationContainer>()

    let container = match (query.firstThreshold, query.lastThreshold, query.step) with
                                | (0,0,0) -> x.TermsAggregation(field, query.size, query.statsField) 
                                | (f,l,s) -> x.RangeAggregation(field, x.Ranges(f,l,s))

    if not(query.statsField = null) then            
        container.Aggregations <- x.CreateStatsAggregation(query.statsField)

    dictionary.Add("outer_aggregation", container)

    searchRequest.Aggregations <- new AggregationDictionary(dictionary)

    let aggs = client.Search<AdvertBase>(searchRequest).Aggs

    match aggs.Terms("outer_aggregation").Buckets.ToArray() with
            | [||] -> x.Ok(aggs.Range("outer_aggregation").Buckets.ToArray()) :> _
            | terms -> x.Ok(terms) :> _

Same agregacje w sposób obiektowy buduje się podobnie. Najciekawsza z nich (range) wygląda następująco:

member x.RangeAggregation(fieldName, ranges) = 
    let aggregator = new Nest.RangeAggregation(fieldName)
    let field = new Field()
    field.Name <- fieldName

    aggregator.Field <- field 
    aggregator.Ranges <- ranges

    let container = new AggregationContainer()
    container.Range <- aggregator
           
    container

member x.CreateRange(low:Option<int>, high:Option<int>) =
    let range = new Nest.Range()
    if low.IsSome then
        range.From <- new Nullable<float>(float(low.Value))
    if high.IsSome then
        range.To <- new Nullable<float>(float(high.Value))
    range

member x.Ranges(first, last, step) =
    seq{
        yield x.CreateRange(None, Some(first)) :> IRange
        for thr in first .. step .. (last-step) 
            do yield x.CreateRange(Some(thr), Some(thr+step)) :> IRange
        yield x.CreateRange(Some(last), None) :> IRange
    }
Advertisements

One thought on “[DajSięPoznać#7] Elasticsearch Aggregations

  1. Pingback: [DajSięPoznać] Podsumowanie | When the smoke is going down

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s