Parallelisation using Pyramid and Celery

Today let’s see how you can use parallelisation to improve your processing time.

But first, let’s talk languages and versions: For this article, I’m using Python on version 2.7, the web framework Pyramid on version 1.5 and Celery on version 3.

Parallelisation is really powerful to reduce a lot your loading time/processing time. Basically, without it, your code will be executed by one server. With parallelisation you split a task in many smaller ones to be processed by many virtual servers. There are many benefits but the main one is definitely to avoid overloading locks on Database or CPU. This is really efficient with Amazon Web Services (AWS). Indeed, once a node is almost overloading, AWS should create a new one and spread the tasks on all the nodes. I won’t go too much in the details as it’s not my expertise but for those interested, you can have complementary info here.

##Example case So let’s dive into the code. Let’s say you have a warehouse with many products. You want to process each products by chunks instead of processing the whole warehouse and crashing your server.

First of all, let’s import the Celery functions you will use:

from celery import chain, group

You will split big task, let’s say processWholeWarehouse(), into smaller tasks processSomeProducts(start, end) (processing only 500 products) and you will append all these small tasks to a list:

task_list = []
for x in range(0, warehouse_products.length, 500):
	start = x
	end = x+500

	#Note the .s() is a Celery shortcut for .subtask()
	task_list.append(processSomeProducts.s(start, end))

Once you have done that you want to create a group of tasks using this task_list:

if len(task_list)>0:
	product_group = group(task_list)

And then if you want to execute a callback function (processFinished) at the end of all this tasks, you need to use chain from Celery.

chain(product_group, processFinished.s()).apply_async()

The callback function is really usefull if you need to update your database or trigger an event to tell your app that tasks have been processed.

###Bonus point (the famous one ;))

Don’t forget to add the header @celery.task to your method and you can give some params like acks_late=True and max_retry=5 to requeue your task automatically if your worker crash. You can also use .retry() if you catch any exception and want to requeue your task

@celery.task(acks_late=True, max_retries=5)
def processSomeProducts(group_result, start, end):
	try:
		#Your actions
		return True
	except Exception as exc:
		#Error management/logging
		#Will retry in 60 seconds
		raise processSomeProducts.retry(exc=exc, countdown=60)

It’s recommended to do the same for processFinished()

You can find more documentation on these Celery functions here and some doc about acks_late and retry