zurück
News

Multithread-Zugriffe mit .NET auf das Dateisystem - ein Erfahrungsbericht

ZEITSPRUNG-BLOG
|
Andreas Kürti
|
17.9.2019

Im vergangenen Monat stand ich vor der Herausforderung, eine Vielzahl von Dateien in einem Verzeichnis effizient zu verarbeiten. Während der Verarbeitung kommen ununterbrochen neue Dateien hinzu, sodass durchgängig eine Wartschlange an Dateien zur Verarbeitung bereit steht.

Andreas Kürti zu seinen Erfahrungen mit dem Multithread-Zugriff in .NET

Tech
Interview
.NET
Entwicklung
Quellcode

Einleitung: Die Problemlösung im Singlethread

Wie die Verarbeitung im Einzelnen aussieht, ist für das eigentliche Problem nicht relevant – wichtig ist an dieser Stelle nur, dass die Verarbeitung je nach Datei zwischen 5 und 120 Sekunden dauern kann. Somit steht das Programm im Zweifel für bis zu 2 Minuten still, während wir auf die Verarbeitung einer einzelnen Datei warten. Soweit, so einfach. Die simple Lösung zur Verarbeitung von Dateien in einem Ordner könnte beispielhaft folgendermaßen aussehen:

class FileProcessor
{
     public void ProcessFolder(string folderPath)
   {
       var files =Directory.GetFiles(folderPath);
       foreach (string file in files)
       {
           this.ProcessSingleFile(file);
       }
   }

   private void ProcessSingleFile(string filePath)
   {
       Console.WriteLine("Verarbeite " + filePath);
       // do some operations here which can last long.
       // we simulate this with thread.sleep
       var waitTime = this.getRandomTimespan();
       Thread.Sleep(waitTime);
       Console.WriteLine("Die Verarbeitung dauerte " + waitTime.ToString() + "ms.");
       // finished, remove file
      File.Delete(filePath);
   }
   /// <summary>
   /// </summary>
   /// <returns>A random timespan in ms</returns>
   private int getRandomTimespan()
   {
       Random rnd = new Random();
       return rnd.Next(1, 120) * 1000;
   }
}

Code-Snippet 1: Einfache Verarbeitung von Dateien in einem Verzeichnis mit einer zufälligen Wartezeit pro Datei.

Für eine dauerhafte Verarbeitung des Ordners bei neu ein treffenden Dateien würde sich die Implementierung der Filesystem Watcher-Komponente empfehlen, welche ich an dieser Stelle der Einfachheit halber weglasse. Wir gehen einfach davon aus, dass die Anwendung von sich aus bei neuen Dateien in unserem Verzeichnis den Process Folder-Aufruf durchführt.

Wichtig: Hier lauert schon auch bei Singlethread-Anwendungen der erste Stolperstein: Wird Process Folder mehrmals parallel mit den selben Pfadangaben aufgerufen, werden wir an dieser Stelle auch Dateien mehrmals verarbeiten. Ursache für einen mehrfachen Aufruf kann bspw. der Event Listener von Filesystem Watcher sein, welcher auch während unserer Verarbeitung weiter bei Dateisystemänderungen den Event triggern könnte.

Dieses Problem muss bereits an dieser Stelle gelöst werden, auch wenn wir Dateien nur in einem einzelnen Hauptthread verarbeiten wollen. Zur Lösung bietet uns .NET bereits eine schlanke Anweisung namens lock. Mit lock übergeben wir ein Status-Objekt, welches von .NET zur Prüfung und Markierung eines Sperrstatus verwendet werden kann. Das Objekt muss dabei instanziert sein und darf beim Aufruf von lock nicht null sein, andernfalls wirft uns die Anwendung zur Laufzeit eine Exception.

private object lockFolder = new object();
  public voidProcessFolder(stringfolderPath)
  {
     lock (this.lockFolder)
     {
        var files = Directory.GetFiles(folderPath);
        foreach (string file in files)
        {
           this.ProcessSingleFile(file);
        }
     }
  }
Code-Snippet2: Lock-Anweisung für die Verarbeitung unseres Verzeichnisses

Der Umbau auf Multithreading

Wenn sich nun herausstellt, dass dieser eine Ordner schneller mit neuen Dateien befüllt wird, als wir die vorhandene Dateiliste verarbeiten können, ist der nächste Schritt das Einbauen einer parallelen Verarbeitung mehrerer Dateien.

Wie in .NET häufig, gibt es mehrere Lösungsansätze. Für die Entwickler, die sich mit dem Thema nicht befasst haben, wird eine Recherche wohl die Thread-Klasse zum Vorschein bringen, mit der wir einen neuen Thread starten und die durchzuführende Aufgabe in einem neuen Thread einfach durch Übergabe des Funktionsnamens (und -Parameters) festlegen können.

Zunächst initiieren wir erstmal nur einen Thread, der sequentiell eine Datei abarbeitet. Damit legen wir das Grundgerüst für den weiteren Umbau. Unsere ProcessingFile-Methode, welche vom Thread ausgeführt wird, muss hierbei nur minimal umgebaut werden, der Übergabeparameter der Funktion ProcessFile muss nun jedoch vom Typ object sein.

public void ProcessFolder(string folderPath)
   {
       lock (this.lockFolder)
       {
           var files =Directory.GetFiles(folderPath);
           foreach (string file in files)
           {
               // start 1 thread
               ThreadmyThread = new Thread(
                   new ParameterizedThreadStart(this.ProcessSingleFile)
               );
              myThread.Start(file);
               while (myThread.IsAlive)
               {
                   // wait for finish before repeating the loop
               }
           }
       }
   }

   private void ProcessSingleFile(object filePathObject)
   {
       var filePath =filePathObject.ToString();
       // ...  
   }
Code-Snippet 3: Wir lagern ProcessSingleFile in eineseparaten Thread aus.

Die Erweiterung auf mehrere Threads ist nunmehr ein Leichtes. Was für einen Thread funktioniert, kann für viele Threads genauso funktionieren. Hierfür nehmen wir eine Liste von Threads. Wir suchen in der Schleife immer nach dem nächsten freien „Slot“, erstellen dann den Thread mit der nächsten Datei und starten diesen. Anhand der Größe der Liste können wir die Anzahl der parallelen Threads steuern. Wir warten pro Datei auf den nächsten verfügbaren Thread und arbeiten uns so durch die Liste der Dateien durch.

public void ProcessFolder(string folderPath)
{
   lock (this.lockFolder)
   {
       var files =Directory.GetFiles(folderPath);
       // thread-limit is list-capacity, can be changed
       var myThreads = new List<Thread>(new Thread[3]);
       foreach (string file in files)
       {
           var fileAssignedToThread = false;
           do
           {
               // check all threads in our list
               for (int i = 0; i < myThreads.Count; i++)
               {
                   // check if current thread-element is idle
                   if (myThreads[i] == null || myThreads[i].IsAlive == false)
                   {
                       // assign task to the next free thread
                      myThreads[i] = new Thread(
                          new ParameterizedThreadStart(this.ProcessSingleFile)
                       );
                       // start thread
                      myThreads[i].Start(file);
                       // mark file as assigned
                      fileAssignedToThread = true;
                       // leavefor-loop
                       break;
                   }
               }
               // only continue with next file in for each, after we
               // have assigned the current file to a free thread!
           } while (fileAssignedToThread == false);
       }
   }
}
Code-Snippet4: Implementierung eines Multithread-Aufrufs mit bis zu 3 laufenden Threads.

Hiermit haben wir eine schlanke beliebig erweiterbare Multithread-Lösung geschaffen. Wir können nur mittels Änderung eines Wertes die Anzahl der parallelen Threads erhöhen.

CPU-Last: zu hoch!

Das Resultat unserer Beispiellösung bis dato kann sich sehen lassen. Es funktioniert zuverlässig und ist leicht wartbar. Erst einige Zeit später stellt sich heraus, dass die Lösung zu viel CPU-Ressourcen verbraucht. In unserem Beispiel mit 3 Threads lag meine CPU-Last auf meiner potenten Entwickler-Maschine bei immerhin 13% Dauerlast. Mithilfe der CPU-Profilerstellung in .NET lässt sich relativ schnell feststellen, wo die meiste CPU-Zeit bei der Multithread-Verarbeitung benötigt wird, nämlich bei der Abfrage des nächsten freien Threads: Knapp 80% der CPU-Zeit wird nur für die Prüfung auf einen freien Thread verwendet.

Das ist bei näherer Betrachtung unseres Codes einleuchtend: Nachdem wir zu Beginn alle Threads zugewiesen haben, was wohl innerhalb weniger Millisekunden der Fall ist, verbringen wir die restliche Zeit bis zum Freiwerden eines Threads ununterbrochen damit, dass wir unsere Liste auf das Freiwerden des nächstens Threads abprüfen. Wenn wir davon ausgehen, dass im Produktionsumfeld mit bis zu 24-48 Threads über mehrere Verzeichnisse gearbeitet wird, ist die Tragweite dieses Performance-Problems schnell klar und bedarf einer Lösung vor dem produktiven Einsatz.

Diverse Lösungsansätze und einhergehenden Probleme

Es gibt mehrere Lösungsansätze, welche ich zum Teil auch testweise implementiert hatte:

Thread-Sleep in der Schleife ausführen, wenn keinfreier Thread gefunden wurde

Funktioniert zwar bedingt, löst aber unser eigentliches Problem nicht. Außerdem wird je nach Zeitspanne des Thread-Sleeps nicht die maximale Performance aus der Verarbeitung der einzelnen Dateien herausgeholt. Die Option fällt somit raus.

Nutzung von Events / Callbacks, die vom Thread bei Erledigung ausgeführt werden

Klingt in der Theorie nicht schlecht. Allerdings kennen der EventListener bzw. die Callback-Funktion erstmal nur den Stand des eigenen erledigten Threads. Diese Funktionen wissen nicht, welche anderen Threads aktuell aktiv sind und – viel wichtiger – welche Dateien bereits in der Verarbeitung sind oder just einem Thread zugeordnet werden. Die Implementierung mittels Callback oder EventListener bei Multithreading gestaltet sich somit als aufwendig. Eine zusätzliche Implementierung eines File-Locks sowie die Aufhebung des Ordner-Locks zum erneuten Einlesen der verfügbaren Dateien ist erforderlich. Diese Implementierungen müssen wiederum thread-safe sein. Das kostet vor allen Dingen Zeit und macht den Code schwierig zu warten.

Die Lösung ist die Nutzung der ThreadPool-Klasse

Wieso überhaupt das Rad neu erfinden, wenn wir mit .NET bereits mächtige wie auch erprobte Werkzeuge und Komponenten zur Verfügung haben?

Mit der Klasse ThreadPooler halten wir im .NET Framework bereits ein auf unser Bedürfnis abgestimmtes Werkzeug inkl. den erforderlichen Methoden. Auch hier können wir die Anzahl der parallel laufenden Threads selbst bestimmen (der Threadpool ermittelt auch selbst die theoretisch maximal mögliche Anzahl an Threads) sowie die Erledigung eines oder aller Threads zu kontrollieren.

Für die Implementierung müssen wir jedoch ein wenig umbauen. Um die Beendigung der Threads überwachen zu können, erwartet der Threadpool, dass wir jedem Thread ein Status-Objekt vom Typ ManualResetEvent als Parameter mitgeben und das Ende der Verarbeitung signalisieren, wenn unsere Aufgabe erledigt ist.

Dazu müssen wir unser ManuelResetEvent-Objekt zusätzlich zum Dateinamen an den Thread übergeben. Hierzu definieren wir uns eine kleine Struktur mit den notwendigen Eigenschaften:

private struct ThreadStartParameter
{
 public string filePath;
 public ManualResetEvent threadFinished;
}
private List<ThreadStartParameter>myThreadStates = new List<ThreadStartParameter>(new ThreadStartParameter[3]);
Code-Snippet5: In der Struktur haben wir alle erforderlichen Eigenschaften je Thread, diewir in einer Liste für alle Threads speichern.

Wir setzen diese Werte direkt vor dem Start des neuen Threads und hinterlegen diese dann in unserer Liste der ThreadStartParameter myThreadStates. Den einzelnen Thread rufen wir nur noch mit dem Thread-Index auf. Anhand des Indexes können wir auf den einzelnen Eintrag in unserer Liste myThreadStates zugreifen. Schauen wir uns zunächst die Funktion ProcessFile an:

private void ProcessSingleFile(object threadStartParameterIndex)
{
   var filePath = this.myThreadStates[(int)threadStartParameterIndex].filePath;
   Console.WriteLine("Verarbeite [Thread " +threadStartParameterIndex.ToString() +  "]: " + filePath);
   // do some operations here which can last long.
   // we simulate this with thread.sleep
   var waitTime = this.getRandomTimespan();
   Thread.Sleep(waitTime);
   Console.WriteLine("Die Verarbeitung [Thread " + threadStartParameterIndex.ToString() + "] dauerte " +waitTime.ToString() + "ms.");
   // finished, remove file
   File.Delete(filePath);
   // mark thread as finished
   this.myThreadStates[(int)threadStartParameterIndex].threadFinished.Set();
}
Code-Snippet 6: Umbau der ProcessFile-Methode. Anhand desübergeben Thread-Indexes können wir auf die Eigenschaften in unserer Listezugreifen und die Werte lesen/setzen.

Zunächst holen wir uns die zu verarbeitende Datei aus myThreadStates. Am Ende der Verarbeitung senden wir das Signal an den WaitHandle, dass dieser Thread fertig ist. Der ProcessFolder wurde folgendermaßen umgebaut:

public void ProcessFolder(string folderPath)
{
   lock (this.lockFolder)
   {
       var files = Directory.GetFiles(folderPath);
       // thread-limit is list-capacity, can be changed        
       foreach (string file in files)
       {
           var fileAssignedToThread = false;
           do
           {
               // check all threads in our list
               for (int i = 0; i < this.myThreadStates.Count; i++)
               {
                   // check if current thread-element is idle
                   if (this.myThreadStates[i].threadFinished == null || this.myThreadStates[i].threadFinished.WaitOne(0) == true)
                   {
                       // assign task to the next free thread,
                       // store thread information in thread-states
                       this.myThreadStates[i] = new ThreadStartParameter() { filePath =file, threadFinished = newManualResetEvent(false) };
                       // start thread, give thread index to
                      ThreadPool.QueueUserWorkItem(new WaitCallback(this.ProcessSingleFile), i);
                       // mark file as assigned
                      fileAssignedToThread = true;
                       // leave for-loop
                       break;
                   }
               }
               if (fileAssignedToThread == false)
               {
                   // we could not assign a free thread, let's wait for the next                    //free.
                   // get all AutoResetEvents from List/Structure
                   var allAutoResetEvents = new List<ManualResetEvent>();
                   foreach (ThreadStartParameter tsp in this.myThreadStates)
                   {
                      allAutoResetEvents.Add(tsp.threadFinished);
                   }
                   WaitHandle.WaitAny(allAutoResetEvents.ToArray());
               }
               // only continue with next file in for each, after we
               // have assigned the current file to a free thread!
           } while (fileAssignedToThread == false);
       }
   }
}
Code-Snippet7: Nutzung von ThreadPool und WaitHandle in ProcessFolder

Mittels der Übergabe unserer Liste der ManualResetEvents an die WaitHandle.WaitAny-Methode können wir im Anschlussan die Verteilung der Aufgaben auf die Erledigung des ersten Threads warten.

Ein erneuter Testlauf mit dem Umbau auf den ThreadPooler gibt eine deutliche Reduktion der CPU-Last auf 0-1%. Eine bemerkenswerte Reduktion bei gleichbleibender Performance und Leistung.

Fazit

Das .NET-Framework bietet mit dem ThreadPool nicht nur ein leicht zu implementierendes Werkzeug für Aufgaben im Bereich Multithreading, sondern kann auch ressourcenarm eingesetzt werden. Mithilfe der WaitHandler haben wir volle Kontrolle über die laufenden Threads und können die Zuordnung neuer Threads zurückstellen, bis ein freier Thread zur Verfügung steht.

Insbesondere für Dateioperationen, in denen ein exklusiver Dateizugriff gewährleistet sein muss, bietet die oben gezeigte Vorgehensweise eine threadsichere Verarbeitung an, in der man als Entwickler auch bei vielen parallelen Zugriffen den Überblick behält.

Lesen Sie auch