Blog

Ein Anwendungsfall für AWS Step Function: Serverless ETL

Mark Panahi
September 28, 2020

Mit Batch- und Looping-Techniken nutzte FloQast AWS Step Functions als serverloses ETL-Werkzeug unter Berücksichtigung der Grenzen der Datengröße. Floqast verwendet maschinelles Lernen, um automatische Abstimmungen ohne Konfiguration über unser AutoRec-Tool durchzuführen. Obwohl der Großteil der Abstimmungen automatisch durchgeführt wird, können Benutzer dennoch einige Anpassungen und zusätzliche Übereinstimmungen vornehmen. Wir wollen diese Anpassungen analysieren, um unseren Algorithmus zu verbessern, sodass der Benutzer beim nächsten Mal weniger Anpassungen vornehmen muss. Insbesondere möchten wir einen täglichen Bericht mit wichtigen Metrikdaten (z. B. falsch positive/negative Ergebnisse) für alle Benutzer erstellen, die an einem bestimmten Tag Abstimmungen durchgeführt haben. Wir möchten, dass der Prozess serverlos abläuft, aber aufgrund der 15-minütigen Laufzeitobergrenze können wir nicht eine einzige Lambda-Funktion verwenden, um alles zu erledigen. Daher haben wir uns entschieden, den Prozess in mehrere Lambdas aufzuteilen und sie miteinander zu verknüpfen Funktionen von AWS Step.

Der von uns erstellte Step Function-Workflow orchestriert Lambdas, die wiederum mit MongoDB-Sammlungen, DocumentDB-Sammlungen und S3-Dateien interagieren. Aber wie wir sehen werden, stießen wir auf Probleme mit der Datenmenge, die zu einem bestimmten Zeitpunkt durch den Workflow fließen kann. Wir haben dem Rechnung getragen, indem wir jeweils einen Datenstapel durchgesehen haben. Während Looping mit AWS Step Functions ist ein bekanntes Muster, dieser Blogbeitrag zeigt, wie man Looping mit einer Paging-Technik für MongoDB kombiniert.

Unser erster Versuch

Im einfachsten Sinne muss der Workflow eine (potenziell riesige) Liste von Datensätzen abrufen, einige Berechnungen für jeden dieser Datensätze mit Daten durchführen, auf die aus anderen Quellen verwiesen wird, und die Endergebnisse an einem anderen Ort speichern. Wir beginnen den Workflow mit dem Zeitraum, in dem wir Daten sammeln möchten, z. B.:

{
    "date_begin": "2020-08-18T00:00:00",
    "date_end": "2020-08-20T23:59:59"
}

Die Step-Funktion übergibt diese Parameter an das Query Jobs-Lambda, das eine Liste aller Benutzerabgleichsjobs, die in den Datumsbereich fallen, aus unserer MongoDB-Sammlung „Jobs“ abruft. Die Ausgabe wird dann in eine Schrittfunktion Iterator eingespeist, die gleichzeitig das Calc Metrics-Lambda für jeden Job aufruft. Diese Lambda-Funktion vergleicht Benutzertransaktionsübereinstimmungen aus einer DocumentDB-Sammlung mit Transaktionsübereinstimmungen, die von unserem in S3 gespeicherten Algorithmus generiert wurden, und schreibt die Ergebnisse in unsere MongoDB-Sammlung „Metrics“. Dies ist das von AWS generierte Workflow-Diagramm:

Dies ist der Code für das Query Jobs Lambda:

def lambda_handler(event, context):
    query = {}

    query['runDate'] = dict()
    query['runDate']['$gte'] = event['date_begin']
    query['runDate']['$lte'] = event['date_end']

    client = MongoClient(connection_string)
    db = client['Analytics']

    return list(db.jobs.find(query))

Das Problem bei diesem Ansatz besteht darin, dass die vom Query Jobs-Lambda zurückgegebenen Ergebnisse potenziell Tausende von Dokumenten sein können. AWS derzeit begrenzt die Ausgabegrößeeiner Step Function-Aufgabe auf 256 KB. Ohne besondere Behandlung wird dieses Limit leicht überschritten und führt dazu, dass die Ausführung der Step Function fehlschlägt:

Ein skalierbarer Ansatz

Die Lösung, die wir gefunden haben, war:

  1. Begrenzen Sie die Ergebnisse, die aus dem Query Jobs-Lambda kommen,
  2. Verarbeiten und speichern Sie die Vergleiche nur auf diesem begrenzten Set,
  3. Ermitteln Sie, ob es mehr Ergebnisse gibt,
  4. Kehren Sie zum Query Jobs-Lambda zurück, um den Vorgang zu wiederholen, bis keine Daten mehr vorhanden sind.

Zu diesem Zweck begrenzen wir die Anzahl der Dokumente, die vom Query Jobs-Lambda für einen bestimmten Aufruf zurückgegeben werden, indem wir ein Paginierungsschema einführen. Anstatt alle Dokumente auszuwählen, setzen wir ein Abfragelimit und spezielle Batch-ID um die aktuelle Seite anzugeben, die abgerufen werden soll. Wenn es mehr Seiten gibt, gibt das Lambda die Batch-ID Identifizieren der nächsten Seite mit Dokumenten, die verwendet werden sollen, wenn das Lambda erneut aufgerufen wird. Das Batch-ID ähnelt dem lastEvaluatedKey, der verwendet wird für Paginierung in DynamoDB. Wir verwenden die interne MongoDB _id Feld aus der Sammlung „Jobs“ in sortierter Reihenfolge als Batch-ID.Wir starten den Workflow nun zusätzlich zum Datumsbereich mit einer Chargengröße:

{
    "date_begin": "2020-08-18T00:00:00",
    "date_end": "2020-08-20T23:59:59",
    "batch_size": 20
}

In diesem Beispiel begrenzen wir die Batchgröße auf 20. Der neue Code unten zeigt das Query Jobs-Lambda mit zusätzlichen Paging-Funktionen. Wir stellen auch sicher, dass runDate richtig ist indiziert in unserer „Jobs“ -Kollektion.

def lambda_handler(event, context):
    query = {}

    batch_id = event['batch_id']

    query['runDate'] = dict()
    query['runDate']['$gte'] = event['date_begin']
    query['runDate']['$lte'] = event['date_end']

    if batch_id:
        query['_id'] = dict()
        query['_id']['$gt'] = ObjectId(batch_id)

    client = MongoClient(connection_string)
    db = client['Analytics']

    jobs = list(db.jobs.find(query).sort('_id', 1).limit(event['batch_size']))

    event['batch_id'] = None
    # if there are more results, set the batch_id for the next iteration in the sfn
    if len(jobs) >= event['batch_size']:
        event['batch_id'] = jobs[-1]['_id']

    return dict(
        jobs = jobs, 
        queryParams = event
    )

Zusätzlich zu den angeforderten Dokumenten gibt das Lambda auch die Abfrageparameter zurück, die in der nächsten Iteration verwendet werden sollen. Dazu gehören die Batch-ID für den nächsten Datenstapel. Im Auswahlstatus „Nach weiteren Daten suchen“ wird angezeigt, dass ein Batch-ID existiert und beginnt eine weitere Iteration. Sehen Sie sich den Workflow-Schnipsel unten an. In der nächsten Iteration wird Batch-ID wird mit dem letzten ausgefüllt _id aus den vorherigen Ergebnissen gesehen und schnappen Sie sich die nächste Charge von 20. Die Abfragekriterien im Lambda Query Jobs verwenden eine strikte Größer-als-Klausel ($gt) für _id wenn Batch-ID existiert. Auf diese Weise wird die Abfrage garantiert dort fortgesetzt, wo sie beim letzten Mal aufgehört hat.Sobald die Anzahl der Dokumente, die die Abfrage zurückgibt, geringer ist als die Stapelgröße, nein Batch-ID ist gesetzt. Der Auswahlstatus „Nach weiteren Daten suchen“ erkennt dies und beendet den Workflow.

"Check For More Data": {
    "Type": "Choice",
    "Choices": [
        {
            "Variable": "$.data.Payload.queryParams.batch_id",
            "IsNull": false,
            "Next": "Next Batch"
        },
        {
            "Variable": "$.data.Payload.queryParams.batch_id",
            "IsNull": true,
            "Next": "Success"
        }
    ],
    "Default": "Success"
},
"Next Batch": {
    "Type": "Pass",
    "Parameters": {
        "queryParams.$": "$.data.Payload.queryParams"
    },
    "Next": "Query Jobs"
}

Da hast du es! Jetzt können wir eine Menge abgefragter Daten durch einen Workflow weitergeben, ohne dass das Ding explodiert!

Was wir gelernt haben

Wie immer hängt die Wahl des ETL-Tools (oder eines beliebigen AWS-Tools) vom Lösung ab. In unserem Fall haben wir gelernt, dass wir einen bestehenden Step Function-Workflow anpassen können, um ein moderates Volumen zu bewältigen, ohne den Aufwand einer Migration zu einem vollwertigen serverlosen ETL-Tool wie AWS-Kleber. In unserem Fall wussten wir, dass wir es immer mit Daten in der Größenordnung von Tausenden von Dokumenten zu tun haben würden. Ein Anwendungsfall mit Millionen von Dokumenten würde stattdessen einen anderen Ansatz erfordern. Eine weitere Einschränkung, die wir hier nicht behandelt haben, ist eine Limit von 25.000 zur Größe der Ausführungshistorie von Step Functions. Neustart des Workflows als neue Hinrichtung löst diese Einschränkung.