Dokumentation für **/ce/run/pipeline/*.py
Modul: open_ticket_ai\src\ce\run\pipeline\context.py
class PipelineContext
Kontextobjekt, das zwischen den Pipeline-Stufen übergeben wird. Diese generische class
dient als Container zum Teilen von Zustand und Daten über verschiedene Stufen einer Verarbeitungspipeline hinweg. Sie nutzt Pydantic zur Datenvalidierung und -serialisierung.
Der generische Typparameter DataT
muss eine Unterklasse von BaseModel
sein, um die Typsicherheit für die Hauptdatennutzlast zu gewährleisten.
Parameter:
data
(DataT
) - Die Hauptdatennutzlast, die durch die Pipeline verarbeitet wird. Muss eine Pydantic-model
-Instanz sein, die dem generischen Typ entspricht.meta_info
(MetaInfo
) - Metadaten über die Pipeline-Ausführung, einschließlich Statusinformationen und operativen Details.
#### Methode def stop_pipeline(self)
Signalisiert der Pipeline, die Verarbeitung anzuhalten. Diese Methode bietet eine kontrollierte Möglichkeit für Pipeline-Stufen, anzuzeigen, dass die Verarbeitung gestoppt werden soll. Sie aktualisiert die Status-Metadaten des Kontexts auf STOPPED
, was nachfolgende Stufen prüfen können, um frühzeitig zu beenden.
Hinweis: Diese Methode ändert den Zustand des Kontexts, gibt aber keinen Wert zurück.
Modul: open_ticket_ai\src\ce\run\pipeline\meta_info.py
class MetaInfo
Speichert Metadaten über den Ausführungszustand der Pipeline. Dieses model
erfasst den aktuellen Status einer Pipeline zusammen mit Fehlerinformationen, falls Fehler auftreten.
Parameter:
status
() (Standard:RUNNING
) - Aktueller Ausführungsstatus der Pipeline. StandardmäßigRUNNING
.error_message
() - Detaillierte Fehlermeldung, falls die Pipeline fehlgeschlagen ist.None
, wenn erfolgreich.failed_pipe
() - Bezeichner der spezifischenPipe
, die den Fehler verursacht hat.None
, wenn erfolgreich.
Modul: open_ticket_ai\src\ce\run\pipeline\pipe.py
class Pipe
Schnittstelle für alle Pipeline-Komponenten. Diese abstrakte Basisklasse definiert die gemeinsame Schnittstelle, die alle Pipeline-Komponenten implementieren müssen. Sie erbt von Providable
, um die automatische Registrierung in einer Komponenten-Registry zu ermöglichen, und von ABC
, um die Implementierung abstrakter Methoden zu erzwingen.
Unterklassen müssen die process
-Methode implementieren, um ihre spezifische Datenverarbeitungslogik innerhalb der Pipeline zu definieren.
Attribute: Erbt Attribute von Providable
für die Registry-Verwaltung. InputDataType (type[InputDataT]): Der Typ des Eingabedaten-model
, das von dieser Pipe
-Komponente erwartet wird. OutputDataType (type[OutputDataT]): Der Typ des Ausgabedaten-model
, das von dieser Pipe
-Komponente erzeugt wird.
#### Methode def process(self, context: PipelineContext[InputDataT]) -> PipelineContext[OutputDataT]
Verarbeitet ein Pipeline-Kontextobjekt und gibt den modifizierten Kontext zurück. Diese Methode definiert die Kernverarbeitungslogik für eine Pipeline-Komponente. Sie nimmt ein PipelineContext
-Objekt entgegen, das den gemeinsamen Pipeline-Zustand enthält, führt Transformationen oder Operationen auf diesem Kontext aus und gibt den aktualisierten Kontext für die nächste Komponente in der Pipeline zurück.
Args: context: Der aktuelle Pipeline-Kontext, der gemeinsame Zustandsdaten enthält.
Returns: Das aktualisierte PipelineContext
-Objekt nach der Verarbeitung.
Raises: Implementierungsspezifische Ausnahmen können von Unterklassen ausgelöst werden, um Verarbeitungsfehler oder ungültige Zustände anzuzeigen.
Modul: open_ticket_ai\src\ce\run\pipeline\pipeline.py
Definiert die Pipeline
-Klasse zur Ausführung einer Sequenz von Pipes
. Die Pipeline
ist eine spezialisierte Pipe
, die mehrere Pipes
nacheinander ausführt. Sie verwaltet den Kontext und den Status während der gesamten Ausführung und behandelt Fehler sowie Stopp-Anfragen entsprechend.
class Pipeline
Eine Pipeline, die eine Sequenz von Pipes
sequenziell ausführt. Diese class
verwaltet den Ausführungsfluss mehrerer Pipes
und behandelt dabei Statusübergänge, Fehlerweitergabe und Stopp-Anfragen während der Verarbeitung.
Parameter:
pipes
() - Liste vonPipe
-Objekten, die nacheinander ausgeführt werden sollen.
#### Methode def __init__(self, config: PipelineConfig, pipes: List[Pipe])
Initialisiert die Pipeline
mit Konfiguration und Pipe
-Sequenz.
Parameter:
config
() - Konfigurationseinstellungen für die Pipeline.pipes
() - Geordnete Liste vonPipe
-Instanzen, die ausgeführt werden sollen.
#### Methode def execute(self, context: PipelineContext) -> PipelineContext
Führt alle Pipes
sequenziell mit Fehlerbehandlung und Statusweitergabe aus. Verarbeitet jede Pipe
nacheinander und dabei:
- Validiert Eingabedaten mithilfe des Eingabe-
model
jederPipe
- Behandelt
STOPPED
-Statusanfragen vonPipes
- Fängt und protokolliert Ausnahmen während der
Pipe
-Ausführung - Aktualisiert den Kontextstatus entsprechend (RUNNING, SUCCESS, FAILED, STOPPED)
Parameter:
context
() - Der Pipeline-Kontext, der den Ausführungszustand und die Daten enthält.
Returns: () - Aktualisierter PipelineContext
, der den endgültigen Ausführungszustand nach der Verarbeitung widerspiegelt.
#### Methode def process(self, context: PipelineContext) -> PipelineContext
Verarbeitet den Kontext durch die gesamte Pipeline-Sequenz. Implementiert die abstrakte Methode aus der Pipe
-Basisklasse. Delegiert die eigentliche Pipeline-Verarbeitung an die execute()
-Methode.
Parameter:
context
() - Der Pipeline-Kontext, der den Ausführungszustand und die Daten enthält.
Returns: () - Aktualisierter PipelineContext
nach der Verarbeitung durch alle Pipes
.
Modul: open_ticket_ai\src\ce\run\pipeline\status.py
class PipelineStatus
Repräsentiert die möglichen Zustände einer Pipeline-Ausführung. Dieses Enum definiert die verschiedenen Status, die eine Pipeline während ihres Lebenszyklus haben kann.
Parameter:
RUNNING
() - Zeigt an, dass die Pipeline gerade ausgeführt wird.SUCCESS
() - Zeigt an, dass die Pipeline erfolgreich und ohne Fehler abgeschlossen wurde.STOPPED
() - Zeigt an, dass die Pipeline absichtlich angehalten wurde (kontrollierter Stopp).FAILED
() - Zeigt an, dass die Pipeline aufgrund eines unerwarteten Fehlers beendet wurde.