Migration.py - Migration mit Scripten - Dokumente im Index ändern

Manchmal müssen bei einer Migration auch die Dokumente angepasst werden, das Ändern des Mapping reicht manchmal nicht. Wie das funktioniert, erkläre ich in meinem Blog.

Einleitung

Manchmal hat man in der Vergangenheit eine Entscheidung getroffen, die sich für den Fortschritt eines Projektes nicht hilfreich sind. Algorithmen passen nicht, Frameworks müssen geändert werden. Implementationen erweisen sich nicht als praxistauglich. Das Beste ist, wenn man das erkennt, sollte man frühzeitig Abhilfe schaffen und diese “Technical Debts” beseitigen. Ansonsten türmen sie sich zu einem riesigen Berg auf und alles, was ein Softwareprojekt ausmacht, scheitert irgendwann daran, dass es nicht mehr zu warten ist.

Es gibt verschiedene Stufen der Komplexität von Veränderungen. Wenn es nur den Sourcecode lokal betrifft, den man nicht ausgerollt hat, ist es simpel, die Probleme zu beseitigen, die man in der Phase erkannt hat. Ist die Software schon ausgerollt, muss man einen vernünftigen Update-Prozess haben, der die Dateien austauscht.

Wenn man aber Software mit Daten-Persistenzen implementiert und man selber keine Hoheit über die Daten hat, bewegt man sich auf recht hohem Niveau, das meistern zu müssen.

Bisher haben wir in OpenSearch nur das Mapping manipuliert. Wir haben uns entschieden, dass wir immer neue Indexe mit neuen Versionen anlegen, damit es möglichst einfach wird. Zudem haben wir immer ein Backup der Daten in dem Index. Faktisch ist es aber so, dass wir mit einem Mapping nie die Daten im “_source” Bereich des Dokumentes mit der Migration geändert hatten. Das kann aber durchaus die Anforderung sein.

Betrachten wir OpenSearch als Such-Index für Quelldaten, die in ihrer Ursprungspersistenz nicht in erwartbarer Geschwindigkeit durchsucht werden können, ist der Such-Index mit seinen Dokumenten eine synchron gehaltene Kopie der Attribute, die für die Suche relevant sind. Damit öffnen sich unmittelbar mehrere Szenarien, wenn wir von Migration sprechen:

  1. Das Mapping der Such-Attribute wird geändert, bzw. optimiert
  2. Die Dokument-Such-Attribute müssen anders gespeichert werden
  3. Es kommen neue Such-Attribute hinzu oder es werden welche entfernt

Punkt 1 können wir bereits. Punkt 2 ist eine neue Anforderung, die ich implementieren möchte. Der dritte Punkt ist eigentlich Punkt 1 plus einer Neuübertragung der Dokumente, auf die diese Änderung zutrifft.

Zu Punkt 2 gibt es zwei Lösungen:

    1. Man fügt ein weiteres Mapping für das gleiche Such-Attribut hinzu und kopiert den Wert transformiert in ein neues Dokument-Attribut
    2. Man ändert das Mapping des bestehenden Attributs und manipuliert bestehende Werte und speichert sie unter demselben Attribut-Namen

Technisch sind beide Methoden valide und können 1:1 implementiert werden. Aber 2.1 hat den Nachteil, dass man sich ggf. bei der Implementation der Suchabfragen vertun kann und je mehr Attribute indiziert werden, desto mehr Speicherplatz und Arbeitsspeicher kostet es.

Grundsätzlich, egal wie man sich entscheidet, könnte man das Ändern von Dokument-Attributen auch mit dem kompletten Neuaufbau eines Index bewerkstelligen. Also man legt eine neue Version an und überträgt alle Dokumente neu und bei der Übertragung werden die geänderten Werte so übertragen, wie es die neue Anforderung will.

Es gibt zwei Sachen, die das ggf. verhindern könnten:

  1. Die Ursprungsdaten liegen nicht mehr vor
  2. Das Übertragen der Ursprungsdaten ist unverhältnismäßig aufwendig.

Deswegen möchte ich gerne für die Migration diese Methode der Datenmanipulation von Dokumenten direkt im Index implementieren. OpenSearch bietet dafür ein Scripting an, um diese Art der Änderung von Daten ohne Roundtrip zum Client durchführen zu müssen.

Migration mit semantischen Versionen

Eine meiner Fehlentscheidungen war, dass ich die Versionen der Scripte als float speicherte. D.h. im migration_history Index ist die Version als Zahl, diese bedauerlicherweise auch untypisch notiert.

Eine Version 01.001 bedeutet eigentlich 1.1 und nicht 1.001. Jeder Abschnitt einer semantischen Version ist ein Integer und führende Nullen sind zu ignorieren. Das triviale float(version) erlaubt also sowieso nur zwei Versions-Ebenen und zum anderen wird die zweite Stufe auch noch falsch gespeichert.

Dieses Erbstück meiner Faulheit, will ich entfernen und das bedeutet, dass der migration_history Index selbst migriert werden muss und ich auch keine Quelldaten habe, um die History neu aufzubauen (ja doch, die der Version 0.0 - aber nur in OpenSearch gespeichert).

Ich muss also zum einen das Mapping von migration_ìndex ändern, zum anderen muss die Implementation angepasst werden. Abschließend müssen bestehende History-Datensätze geändert werden.

Da das alles ziemlich tiefgreifende Änderungen sind, habe ich mir weitere Hilfsfunktionen gebaut. Zum Beispiel in OpenSearchBackup.py das Backup eines bestehenden Indexes in eine Sicherungskopie. Das Restore dazu. Außerdem in os-reset-migration.py eine Funktion, die eine komplette Migration vollständig löscht (um von Vorne anfangen zu können).

Das Backup/Restore und Reset bespreche ich nicht gesondert. Das sollte man sich im aktuellen Commit anschauen. Die Funktionen sind nicht wirklich kompliziert, aber nehmen einem die manuelle Arbeit deutlich ab.

Das geänderte Mapping für migration_history ist einfach:

...
            "mappings": {
                "properties": {
                    "installed_rank": {"type": "integer"},
                    "version": { "type": "keyword" },
...

Die version wird nun als keyword behandelt. Aber das ändert ja die Dokumente nicht. Bisherige Dokumente mit ihren float-Versions werden bei einem ReIndex 1:1 wieder mit float kopiert. Das keyword als Mapping ändert daran gar nichts.

Man könnte das Kopieren so gestalten, dass man nun alle History-Datensätze liest, den Wert von float auf den transformierten String wandeln (also 1.001 -> “1.1”, 1.002 -> “1.2”, usw) und das Dokument im Zielindex (neue Version) speichern.

Aber das ist eine sehr teure Angelegenheit. Erstens müsste man das statisch implementieren. Dann müssen alle Dokumente geladen und wieder gespeichert werden (eine Menge Roundtrips zwischen Cluster und Client) und es skaliert nicht, wenn wir viele Nodes und Replikations-Knoten haben.

Die ReIndex-API unterstützt deswegen das Scripting während des Kopierens von Index-Dokumenten in einen anderen Index, ganz ohne den Client damit zu involvieren. Mehrere Script-Sprachen stehen da zur Verfügung und Painless wird meistens dafür empfohlen.

Wir erweitern in OpenSearchHelper.py also das Kopieren von Dokumenten um einen weieren Parameter:

def os_copy_documents(client: OpenSearch, index_from: str, index_to: str, script = None):
    reindex = {
        "source": {
            "index": index_from
        },
        "dest": {
            "index": index_to,
            "op_type": "create"
        }
    }

    # Add the script object to the reindex body
    if script:
        if 'script' in script:
            reindex['script'] = script['script']
        else:
            reindex['script'] = script

    response = client.reindex(body=reindex, requests_per_second=10_000, refresh=True)

    print(f"Copied {response['created']} documents from {index_from} to {index_to}")

    if response['failures']:
        print(f"Failures on copy: {response['failures']}")
        raise RuntimeError(f"Failure on copying document from  {index_from} to {index_to}")
    return response

Wird ein script mit übergeben, fügen wir es dem Request-Body hinzu und lassen OpenSearch den Rest machen. Für jedes zu kopierende Dokument ruft OpenSearch in jedem Node des Clusters in allen verfügbaren Lucene-Prozessen (Shards) eine compilierte Version des Scripts auf. Das Script bekommt das Dokument in einem Kontext Dictionary geliefert (ctx).

Painless

Aber wie schreibt man Scripte in Painless? Mit einer Menge Pain. Leider.

An sich ist Painless ein Groovy Derivat mit einigen Änderungen. OpenSearch erlaubt nicht den Zugriff auf alle Klassen der Java-Runtime der Nodes. Bei den Klassen, die verfügbar sind, sind auch nicht alle Methoden aufrufbar. OpenSearch benutzt dafür eine Whitelist. Die zwei wichtigsten Gründe sind: Performance-Probleme bei einigen Methoden (wie die String-Methoden, die reguläre Ausdrücke verwenden, wie split) und Sicherheitsprobleme (zum Beispiel ist ein Zugriff auf das Filesystem natürlich unterbunden).

Aber in der Syntax gibt es weitere Unterschiede. Painless erwartet “;” als Zeilenabschluss, was in Groovy nicht notwendig ist. Reguläre Ausdrücke über Pattern.compile sind in Painless statisch vorkompiliert. Das bedeutet, dass es nicht möglich ist, dynamisch reguläre Ausdrücke zu erzeugen. Es müssen Konstanten sein. Dafür hat Painless sogar eine eigene Syntax, die in Groovy nicht existiert.

Dann hat OpenSearch keinen Debugger und die Fehlermeldungen sind manchmal krude und Testen ist auch nicht einfach.

Wie geht man also voran? Man kann trotzdem Groovy verwenden, um ein Script vorab zu testen. Man darf ja auch in Groovy trotzdem ; ans Ende von Zeilen schreiben. Mit etwas Ausprobieren findet man raus, welche Klassen und Methoden erlaubt sind (und es gibt bei ElasticSearch eine API Referenz).

Dann gibt es aber noch ein anderes Problem. Das Script in das JSON Objekt unserer Migrations-Datei einfügen ist leider kaum lesbar. Denn JSON erlaubt keine Zeilenumbrüche in Zeichenketten (nur als Escapes). Ich habe also die Migration so umgebaut, dass man “Dateizeiger” in das Script aufnehmen kann. Das sieht so aus:

{
    "run": [
        {
            "call": "create index",
            "index_name": "migration_history",
            "body": "migration_history",
            "reindex_body": {
                "script": {
                    "lang": "painless",
                    "inline": "script://S01.000__Reindex_Version.groovy"
                }
            }
        }
    ],
...

Es gibt also den neuen Parameter reindex_body (den wir später an die Kopierfunktion übergeben) und diese besondere Notation script://. Immer wenn Migration.py so etwas sieht, versucht es dazu eine Datei zu laden und das genau an die Stelle als Zeichenkette einzufügen.

Im Prinzip schon so, wie wir das mit dem body Parameter gemacht haben. Nur laden wir den Inhalt aus einer weiteren Datei. Diese Datei ist für unser Script nicht wieder ein JSON, sondern eben Groovy.

Damit haben wir nun zwei Dinge erledigt: Wir können ganz normale Zeilenumbrüche verwenden und wir könenn das Script einfach mit einer IDE bearbeiten und durch einen Compiler jagen oder sogar ausführen.

Jetzt die 1 Millionen Dollar Frage: Wie sieht unser Script aus?

Die lauffähige Version in Groovy geschrieben, sieht so aus:

def normalizeVer (String ver) {
    def pZeros = ~'^0+(?!$)'
    def pDotSplit = ~'\\.'
    if (ver == null || ver.length() == 0) {
        return "0"
    }
    if (ver.startsWith(".")) {
        ver = "0" + ver
    }
    String norm = ""
    for (String v : pDotSplit.split(ver)) {
        if (norm.length() > 0) {
            norm = norm + "."
        }

        if (v=='') {
            norm = norm + "0"
        } else {
            norm = norm + pZeros.matcher(v).replaceFirst('')
        }
    }
    return norm;
}
ctx['_source']['version'] = normalizeVer(ctx['_source']['version'].toString())

Man könnte es fast in Groovy ausführen, wenn uns nicht ctx fehlen würde, dass nur in der Laufzeitumgebung von OpenSearch magisch zur Verfügung steht.

Also Mocken wir das, als würden wir einen Test schreiben:

def ctx = ["_source": ["version": 1.001]]

Hm, aber das dürfen wir OpenSearch nicht so mitgeben (Die Syntax ist sowieso noch zu Groovy, es fehlen ; am Ende der Zeilen), weil wir das ctx Dictionary nicht so in OpenSearch defnieren können (dort existiert es ja).

Also erfinde ich zwei spezielle Marker, um den Test-Code vom OpenSearch-Script zu trennen:

def ctx = ["_source": ["version": 1.001]]

//!script
def normalizeVer (String ver) {
    def pZeros = ~'^0+(?!$)'
    def pDotSplit = ~'\\.'
    if (ver == null || ver.length() == 0) {
        return "0"
    }
    if (ver.startsWith(".")) {
        ver = "0" + ver
    }
    String norm = ""
    for (String v : pDotSplit.split(ver)) {
        if (norm.length() > 0) {
            norm = norm + "."
        }

        if (v=='') {
            norm = norm + "0"
        } else {
            norm = norm + pZeros.matcher(v).replaceFirst('')
        }
    }
    return norm;
}
ctx['_source']['version'] = normalizeVer(ctx['_source']['version'].toString());
//!endscript

assert "1.1" == ctx['_source']['version']

Alles was zwischen //!script und //!endscript liegt, wird als Script eingefügt, der Rest verworfen.

Ok, das würde immer noch nicht funktionieren. Denn ; fehlt und das Definieren der regulären Ausdrücke funktioniert nur so in Groovy aber nicht in Painless.

Also habe ich einen ganz einfachen Code-Wandler geschrieben, der die Zeilen von Groovy nach Painless konvertiert.

Die Painless Variante von Regulären Ausdrücken ist übrigens:

    def pZeros = /^0+(?!$)/
    def pDotSplit = /\\./

Der Konverter wird nur ausgeführt, wenn die Endung der Datei .groovy ist. Wenn die Endung .painless ist, kopiert Migration.py das Script 1:1 in die Kopier-Funktion.

Das waren jetzt wirklich eine Menge Details und insgesamt habe ich schon ein paar Tage dafür gebraucht, bis es so funktioniert.

Dazu habe ich Migration.py so umgebaut, dass es nicht mehr mit float, sondern nur noch mit packaging.Version arbeitet, damit wir semantische Versionen unterstützen (das auch mit mehr Stufen als nur 1.2).

Prolog

Das war eine wirklich große Umstellung, aber das Migration.py hat dabei eine Menge gewonnen. Nicht nur Komplexität, sondern auch die Flexibilität Dokumente umzuwandeln und mit Scripten zu arbeiten. Zugleich haben wir ein kleines Werkzeug, wo wir die Painless-Scripte in Groovy-IDEs testen zu können.

Die neuen Module zum Erstellen von Index-Kopien (als eine Art Backup) mit Restore-Funktion und das komplette Löschen schon Indexen, die über eine Migration erstellt wurden, runden das Toolset noch ab. So ist es viel einfacher eigene Migrationen für eigene Softwareprojekte in Python zusammen mit OpenSearch zu entwickeln.