Skip to content

Dokumentation für **/ce/run/pipeline/*.py

Modul: open_ticket_ai\src\ce\run\pipeline\context.py

class PipelineContext

Kontextobjekt, das zwischen den Pipeline-Stufen übergeben wird. Diese generische class dient als Container zum Teilen von Zustand und Daten über verschiedene Stufen einer Verarbeitungspipeline hinweg. Sie nutzt Pydantic zur Datenvalidierung und -serialisierung.

Der generische Typparameter DataT muss eine Unterklasse von BaseModel sein, um die Typsicherheit für die Hauptdatennutzlast zu gewährleisten.

Parameter:

  • data (DataT) - Die Hauptdatennutzlast, die durch die Pipeline verarbeitet wird. Muss eine Pydantic-model-Instanz sein, die dem generischen Typ entspricht.
  • meta_info (MetaInfo) - Metadaten über die Pipeline-Ausführung, einschließlich Statusinformationen und operativen Details.
#### Methode def stop_pipeline(self)

Signalisiert der Pipeline, die Verarbeitung anzuhalten. Diese Methode bietet eine kontrollierte Möglichkeit für Pipeline-Stufen, anzuzeigen, dass die Verarbeitung gestoppt werden soll. Sie aktualisiert die Status-Metadaten des Kontexts auf STOPPED, was nachfolgende Stufen prüfen können, um frühzeitig zu beenden.

Hinweis: Diese Methode ändert den Zustand des Kontexts, gibt aber keinen Wert zurück.


Modul: open_ticket_ai\src\ce\run\pipeline\meta_info.py

class MetaInfo

Speichert Metadaten über den Ausführungszustand der Pipeline. Dieses model erfasst den aktuellen Status einer Pipeline zusammen mit Fehlerinformationen, falls Fehler auftreten.

Parameter:

  • status () (Standard: RUNNING) - Aktueller Ausführungsstatus der Pipeline. Standardmäßig RUNNING.
  • error_message () - Detaillierte Fehlermeldung, falls die Pipeline fehlgeschlagen ist. None, wenn erfolgreich.
  • failed_pipe () - Bezeichner der spezifischen Pipe, die den Fehler verursacht hat. None, wenn erfolgreich.

Modul: open_ticket_ai\src\ce\run\pipeline\pipe.py

class Pipe

Schnittstelle für alle Pipeline-Komponenten. Diese abstrakte Basisklasse definiert die gemeinsame Schnittstelle, die alle Pipeline-Komponenten implementieren müssen. Sie erbt von Providable, um die automatische Registrierung in einer Komponenten-Registry zu ermöglichen, und von ABC, um die Implementierung abstrakter Methoden zu erzwingen.

Unterklassen müssen die process-Methode implementieren, um ihre spezifische Datenverarbeitungslogik innerhalb der Pipeline zu definieren.

Attribute: Erbt Attribute von Providable für die Registry-Verwaltung. InputDataType (type[InputDataT]): Der Typ des Eingabedaten-model, das von dieser Pipe-Komponente erwartet wird. OutputDataType (type[OutputDataT]): Der Typ des Ausgabedaten-model, das von dieser Pipe-Komponente erzeugt wird.

#### Methode def process(self, context: PipelineContext[InputDataT]) -> PipelineContext[OutputDataT]

Verarbeitet ein Pipeline-Kontextobjekt und gibt den modifizierten Kontext zurück. Diese Methode definiert die Kernverarbeitungslogik für eine Pipeline-Komponente. Sie nimmt ein PipelineContext-Objekt entgegen, das den gemeinsamen Pipeline-Zustand enthält, führt Transformationen oder Operationen auf diesem Kontext aus und gibt den aktualisierten Kontext für die nächste Komponente in der Pipeline zurück.

Args: context: Der aktuelle Pipeline-Kontext, der gemeinsame Zustandsdaten enthält.

Returns: Das aktualisierte PipelineContext-Objekt nach der Verarbeitung.

Raises: Implementierungsspezifische Ausnahmen können von Unterklassen ausgelöst werden, um Verarbeitungsfehler oder ungültige Zustände anzuzeigen.


Modul: open_ticket_ai\src\ce\run\pipeline\pipeline.py

Definiert die Pipeline-Klasse zur Ausführung einer Sequenz von Pipes. Die Pipeline ist eine spezialisierte Pipe, die mehrere Pipes nacheinander ausführt. Sie verwaltet den Kontext und den Status während der gesamten Ausführung und behandelt Fehler sowie Stopp-Anfragen entsprechend.

class Pipeline

Eine Pipeline, die eine Sequenz von Pipes sequenziell ausführt. Diese class verwaltet den Ausführungsfluss mehrerer Pipes und behandelt dabei Statusübergänge, Fehlerweitergabe und Stopp-Anfragen während der Verarbeitung.

Parameter:

  • pipes () - Liste von Pipe-Objekten, die nacheinander ausgeführt werden sollen.
#### Methode def __init__(self, config: PipelineConfig, pipes: List[Pipe])

Initialisiert die Pipeline mit Konfiguration und Pipe-Sequenz.

Parameter:

  • config () - Konfigurationseinstellungen für die Pipeline.
  • pipes () - Geordnete Liste von Pipe-Instanzen, die ausgeführt werden sollen.
#### Methode def execute(self, context: PipelineContext) -> PipelineContext

Führt alle Pipes sequenziell mit Fehlerbehandlung und Statusweitergabe aus. Verarbeitet jede Pipe nacheinander und dabei:

  • Validiert Eingabedaten mithilfe des Eingabe-model jeder Pipe
  • Behandelt STOPPED-Statusanfragen von Pipes
  • Fängt und protokolliert Ausnahmen während der Pipe-Ausführung
  • Aktualisiert den Kontextstatus entsprechend (RUNNING, SUCCESS, FAILED, STOPPED)

Parameter:

  • context () - Der Pipeline-Kontext, der den Ausführungszustand und die Daten enthält.

Returns: () - Aktualisierter PipelineContext, der den endgültigen Ausführungszustand nach der Verarbeitung widerspiegelt.

#### Methode def process(self, context: PipelineContext) -> PipelineContext

Verarbeitet den Kontext durch die gesamte Pipeline-Sequenz. Implementiert die abstrakte Methode aus der Pipe-Basisklasse. Delegiert die eigentliche Pipeline-Verarbeitung an die execute()-Methode.

Parameter:

  • context () - Der Pipeline-Kontext, der den Ausführungszustand und die Daten enthält.

Returns: () - Aktualisierter PipelineContext nach der Verarbeitung durch alle Pipes.


Modul: open_ticket_ai\src\ce\run\pipeline\status.py

class PipelineStatus

Repräsentiert die möglichen Zustände einer Pipeline-Ausführung. Dieses Enum definiert die verschiedenen Status, die eine Pipeline während ihres Lebenszyklus haben kann.

Parameter:

  • RUNNING () - Zeigt an, dass die Pipeline gerade ausgeführt wird.
  • SUCCESS () - Zeigt an, dass die Pipeline erfolgreich und ohne Fehler abgeschlossen wurde.
  • STOPPED () - Zeigt an, dass die Pipeline absichtlich angehalten wurde (kontrollierter Stopp).
  • FAILED () - Zeigt an, dass die Pipeline aufgrund eines unerwarteten Fehlers beendet wurde.