My contribution to Luigi

Data pipeline and workflow framework made in Python by Spotify

Posted by Yoone 1 year, 8 months ago in Open source

During my last internship at Aquto, I spent a few months rebuilding their data pipeline. I had the privilege of being given a lot of flexibility on this project, especially in the choice of technologies to be used, so I decided to make the most out of it.
I chose Luigi as the framework I would end up working with to build that pipeline. I found myself stuck during the development of that project, but I worked on a solution and decided to contribute it back to the framework.

Before getting into the nuts and bolts, let’s make sure we are on the same page: what is a data pipeline? A data pipeline is a workflow containing multiple tasks that are independent. Some tasks have a higher priority than others and they can have with requirements. For instance, if task A depends on task B, then task B needs to be executed before task A. But if task A also depends on task C, then we can execute B and C simultaneously to act optimally and efficiently. Such processing of data in an organized way results in a data pipeline.

The first step was to choose from roughly three tools to build the data pipeline:

  • AWS Data Pipeline was rolled out pretty quickly because of all the negative comments on AWS’ forums. I still checked it out, but it was mostly configuration through a web app, so not really what I was looking for.
  • A quick look at LinkedIn’s Azkaban documentation discouraged me from using it. First of all it was -at the time- outdated and contained a few dead links. Then the way it worked was not intuitive at all, and I had to fill a lot of settings files before even thinking about testing anything.
  • Spotify’s Luigi seemed really straightforward when I opened their documentation. I just installed it and started playing around.

I later discovered other workflow managers such as Airflow by AirBnB, but their complexity or their lack of features allowed me to stay grounded and confident in my decision.

How does Luigi work?


Credits: github.com/spotify/luigi

There are two fundamental entities in Luigi: tasks and targets. A task contains the code to be executed and an “output” which in turn contains target(s). Targets are meant to verify a state, for example if a local file exists -they basically have a standard method that returns a boolean. Tasks are only executed if the output is not already fulfilled, which is the case when at least one target doesn’t return a boolean. The output is checked by Luigi’s scheduler before the execution of a task and at the end of its execution to verify that everything was executed without errors (throwing an exception is considered as an instantaneous failure for the current running task).

Tasks also have dependencies, which are references to other tasks. If a task needs to be executed, its dependencies will be executed first (if their output is not in a “fulfilled” state) and that can be done in parallel if multithreading is enabled.

Here is some self-explanatory code showcasing a simple workflow:

import luigi
from luigi.contrib.simulate import RunAnywayTarget


class TaskA(luigi.Task):
    num = luigi.IntParameter(default=1)  # Some param

    def output(self):
        """
        Forcing that task to run every time
        """
        return RunAnywayTarget(self)

    def run(self):
        # Doing stuff
        print('Num is', self.num)
        # ...

        self.output().done()  # Marking target as "done"


class TaskB(luigi.Task):
    def output(self):
        """
        This task's output only checks if the file
        /tmp/taskB.txt exists. It won't be executed if the
        file is already there.
        """
        return luigi.LocalTarget('/tmp/taskB.txt')

    def run(self):
        """
        This will create the file. This method is where the
        logic happens.
        """
        self.output().open('w').close()


class RootTask(luigi.WrapperTask):
    def requires(self):
        """
        This task is inheriting from luigi.WrapperTask. It's
        a standard task made only to be used as a root task
        since the dependency graph needs an entry point.
        """
        return [
            TaskA(),  # Executed with num=1 (default value)
            TaskA(num=2),
            TaskB(),
        ]


if __name__ == '__main__':
    """
    The following means that when that script is executed
    luigi is supposed to run RootTask.

    Alternative: luigi.run()
    Execution: python script.py TaskA  # to run TaskA
    """
    luigi.run(main_task_cls=RootTask)

As I mentioned in the introduction, I used Luigi during a project for the last company I did an internship with. I had to revamp their datawarehouse structure and the data pipeline that was processing the data and running ETLs (Extract, Transform, Load). I’m adding this because the nature of the project made that I needed some tasks to run whenever they were called. The most common reason for this is that some values were calculated base on the last hour, day, week or month. The whole pipeline was running every hour using cron jobs and those values were updated with the last data in “almost real time”.

The problem I faced

As most of this project’s tasks needed to be executed no matter what whenever the whole pipeline was executed, I immediately created a simple target storing a state boolean to know if whether the task had been executed or not. The idea is very simple: We initialize the target to False, and at the end of the execution -the run() method- the boolean is simply set to True. This behavior is really basic but it has a major flaw in its design that I didn’t foresee before I started parallelizing tasks execution.

When using multiple workers to run a pipeline targets can -and will most of the time- be instantiated every time they are checked. This means that even if I set my state target to True at the end of my task’s execution, it doesn’t mean that it’s going to be that very same instance that is going to be checked the next time the output() method is called. On that account, I started thinking of a way to fix this issue. I should also mention that at this point Luigi did not provide any built-in solution to my problem, as the philosophy is that a target needs to verify an actual change somewhere to vet the good execution of a task.

The solution I found

I started working on a “RunAnywayTarget” located in a new module: “luigi.contrib.simulate”.

In the attempt of solving this problem I first tried to store all the information in memory, but this time using Python’s multiprocessing library for shared variables. I could successfully share a single variable acting as state across all RunAnywayTarget’s but it was not practical since multiple tasks were going to use that same target. I hence started exploring the possibilities offered by shared lists and dictionaries.
Python’s multiprocessing module offers ready-to-use shared lists and dictionaries. I was pretty hyped when I discovered that but I quickly realized that it was a dead-end in my case. Shared lists and dictionaries need to be passed to every subprocess upon creation. And that is a really big issue right here because I don’t have any control over Luigi’s workers, and I could not just modify the workers’ code to make my target work as I wanted. I had to find another solution.

The second idea I had was to simply use the wonderful filesystem I had at my disposal. The reason is that I just wanted something immutable to processes and subprocesses. Each Luigi task has an identifier generated using the name of the task and the parameters it is executed with. The scheduler avoids having duplicates by matching that very identifier. I figured that creating a file with a name based on a task identifier should then be reliable enough to determine if that task is running.
There was still one little problem though. How would the target know when to clean up to avoid preventing Luigi from executing a task after its first execution? A.k.a avoid conflicts with previously executed pipelines. The solution was to combine my new found idea with my previous one: a global multiprocessing value shared across all RunAnywayTarget instances. I used it to know if the startup clean up had been performed or still needed to be. It is executed by the first encountered target only, thanks to the way multiprocessing values work.

The final implemented workflow that passed all the tests was the following:

  • When instantiated, the target checks if the shared state variable is 0 -the default value. If it is, it gets the lock on the variable to avoid collision with another target and sets the value to the current target’s PID.
  • The shared value -the first target’s PID- is then added to the temp directory path to ensure having a true unique path and avoid collisions with other pipelines. If that path already exists and its modification time is older than 24h (configurable parameter), it gets recursively deleted.
  • When the target is marked as “done” -by calling the done() method- an empty file is created in the temporary directory with the task’s unique identifier hashed in MD5 for its name. The task’s unique identifier is generated by Luigi and contains its name and arguments.
  • Whenever the exists() method is called, the file’s presence is checked.

The documented code is available on GitHub.