Blog

SORT BY

Data Pipelines with Luigi, Part 2

Data Pipelines with Luigi, Part 2

In our previous post, we showed how Luigi helps you to build a simple task dependency graph by providing two simple abstractions: a Task class and an output class. We gave the example of having a regularly updated count of all the companies in the UK. Having a count of all the companies in the UK is pretty cool, but what if we wanted to get way more awesome and calculate how the overall number of companies in the UK has changed over the past year? We would need a csv for each month of the year.

Calculating how the overall number of companies in the UK has changed over the past year
Calculating how the overall number of companies in the UK has changed over the past year

We can do that pretty easily with our current code, we just need to add a third task.

class AnnualCompanyCountDelta(luigi.Task):
   year = luigi.Parameter()
   def requires(self):
       tasks = []
       for month in range(1, 13):
         tasks.append(CompanyCount(dt.datetime.strptime(
"{}-{}-01".format(self.year, month), "%Y-%m-%d")))
       return tasks
	# not shown: output(), run()

A couple of interesting things are happening here. First, we are passing in a year as a parameter. Luigi intelligently accepts defined parameters as command line arguments, so no boilerplate code is needed. Second, the task uses the year to dynamically generate its requirements: for each month in this year, run CompanyCount for that month. This triggers a download for that date’s data if we don’t have it already. (Note: in order for this to work, we’ll have to add a date parameter to our previous tasks.)

Let’s try running this using the central-scheduler. To start a scheduler, we just run “luigid” from the command line in the background. Next we run our task, leaving off the –local-scheduler option this time, telling Luigi to use the central scheduler. The central scheduler is useful in production because it ensures that two instances of the same task never run simultaneously, but it’s also great in development because it comes with a dynamic visualizer that allows you to easily see dependencies between tasks. To see the visualizer, we can visit localhost:8082.

This graph is dynamically created by the Luigi central scheduler during task execution.
This graph is dynamically created by the Luigi central scheduler during task execution.

Our simple command spawned 24 subtasks, a download and company count task for each month of 2014. The colours represent task status, so you can see all of our previous tasks have run and the delta task is still in progress. If a task fails, it’s marked as red.

Now that you’ve seen the central-scheduler let’s talk a bit about how Luigi nicely integrates with other tools like MySQL. In a real life scenario, we’re probably going to want to store the company data in a database. To do this, we define a new task, CompaniesToMysql which is responsible for copying our data into a table. In this way, we can leverage the CompanyDownload task that we created previously and run this task completely separately from our analytics tasks.

class CompaniesToMySQL(luigi.sqla.CopyToTable):
   date = luigi.DateParameter()
   columns = [(["name", String(100)], {}), ...]
   connection_string = "mysql://localhost/test"  # or something
   table = "companies"  # name of the table to store data
   def requires(self):
       return CompanyDownload(self.date)
   def rows(self):
       for row in self.get_unique_rows():  # uses self.input()
           yield row

You’ll notice that this looks very different from tasks you’ve seen before. This is because our task isn’t inheriting directly from the vanilla Luigi task; instead we’re using the contrib.sqla module. The SQLA CopyToTable task provides powerful abstractions on top of a base Luigi task when working with SQLAlchemy. It assumes the output of the task will be an SQLAlchemy table, which you can specify using the connection string (a URL to the database), table, and columns.

Also, instead of a run method, we override the rows method, which returns a generator of row tuples. This simplifies things, because you can do all the processing you want in the rows method and pass off the rows you want inserted to the task, which chunks over them as it inserts data into the table.

Now you’re probably feeling ready to try out Luigi on some of your own data, but before you go, one last thing to think about: what happens when something falls over? As you might have guessed, Luigi handles errors intelligently: because tasks are individual units of work, if something breaks, you don’t have to re-run everything. Instead, you can simply restart the task and the dependencies that finished will be skipped. Luigi can also be configured to send a stack trace.

We hope this post gave you a sense of what Luigi can do! Here are some links for you to peruse as you start using the library.

– The Luigi Docs

– The Luigi Mailing List

– The Source Code