On today’s Internet 2.0 there are all sorts of data feeds available for consumption. From APIs to RSS feeds, it seems like nearly every site has a machine-readable output. There are many reasons why you’d want to collect this information, which I won’t go in to, so in this post I’m going to walk you through an application which consumes RSS feeds. I’ll be using the Python scripting language, and I’ll show you an evolution of the ways to go about the task:
Our application is going to work like this:
Database manipulation and RSS feed parsing are outside the scope of this tutorial, so we’ll start off by defining some empty functions that handle all this:
"def get_feed_list():
""" Returns a list of tuples: (id, feed_url) """
pass
def get_feed_contents(feed_url):
""" Gets feed over HTTP, returns RSS XML """
pass
def parse_feed(feed_rss):
""" Parses the feed and returns a list of items """
pass
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stored them in the DB """
pass
We’re going to have all these in a module called “functions”, which can just be a file called functions.py in the same directory ( < python3.0)
This is the way most people would do it at first. So simple, I’ll just post the sample code:
import functions
for id, feed_url in get_feed_list():
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
Pretty simple huh? But there are fundamental problems. Feeds are usually slow, meaning that your program will spend a lot of time waiting for feeds to come in before you can parse them. You program will also be spending time parsing feeds when it could be getting them from the internet as well. Consequently this program will be as slow as molasses. It’s like eating a bowl of peas one at a time – you’d rather just shovel them in wouldn’t you? Enter: threading.
So we reckon: “If we use threads, this will make things faster?” Answer: Yes. However, there are quite a few ways of doing this. We’ll start off with this:
import threading, functions, time
def thread(id, feed_url):
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
for id, feed_url in get_feed_list():
t = threading.Thread(target=thread, kwargs={"id":id, "feed_url":feed_url})
t.start()
while threading.activeCount() > 1: time.sleep(1)
Problem: This is just going to create as many threads as there are feed items immediately and then wait for them to finish. This has the following issues:
So what do we do? Well, let’s set a limit on the number of concurrent threads:
import threading, functions, time
THREAD_LIMIT = 20
def thread(id, feed_url):
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
for id, feed_url in get_feed_list():
while threading.activeCount() > THREAD_LIMIT:
time.sleep(1)
t = threading.Thread(target=thread, kwargs={"id":id, "feed_url":feed_url})
t.start()
while threading.activeCount() > 1: time.sleep(1)
Spot the difference? We have another while
loop right in the for
loop. This is going to make our main thread code hang there while there are other threads still running.
There’s another problem though, and that’s with the model. In this mode, we’re continually creating new threads that live for a short time, then exit. This isn’t efficient. It would be much better to create a pool of threads which we can then re-use. Let’s kick this up a notch.
So in this version we’re going to do a few new things:
Queue
object and populate it with the list of urls.I’ll start off with the sample code then walk you through it:
import threading, functions, time, Queue
THREAD_LIMIT = 50
jobs = Queue.Queue(0) # arg1 means "No item limit"
def thread():
while True: # forever
try:
id, feed_url = jobs.get(False) # arg1 means "Don't wait for items to appear"
except Queue.Empty:
# Nothing left to do, time to die
return
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
for info in get_feed_list():
Queue.put(info)
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() > 1: time.sleep(1) # Wait to finish
Lines to note:
False
parameter means that once the queue is empty, we’re not interested any more. This raises the Queue.Empty
exception, and we terminate the thread.Our threads run in a loop, performing work in lines 12-14, until there is no more work, then they exit. This model will work just fine for the majority of people, however, there are (still) problems. They are:
Problem #2 isn’t so serious, but it would be better to have more control over the heavy lifting. However problem #1 definitely needs addressing. The solution is to shift all processing inline to the master thread, which takes care of all the processing.
Here’s the code:
import threading, functions, time, Queue
THREAD_LIMIT = 50
jobs = Queue.Queue(0) # arg1 means "No item limit"
rss_to_process = Queue.Queue(THREAD_LIMIT) # We set a limit on this, I'll
# explain later
def thread():
while True: # forever
try:
id, feed_url = jobs.get(False) # arg1 means "Don't wait for items
# to appear"
except Queue.Empty:
# Nothing left to do, time to die
return
rss = functions.get_feed_contents(feed_url)
rss_to_process.put((id, rss), True) # This will block if our processing
# queue is too large
for info in get_feed_list(): # Load them up
jobs.put(info)
for n in xrange(THREAD_LIMIT): # Unleash the hounds
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() > 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
try:
id, rss = rss_to_process.get(False, 1) # Wait for up to a second for a
# result
except Queue.Empty:
continue
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
Notes:
…and there you have it, a fully fledged multithreaded data collector. Not bad for a few hours’ work. It’s not finished though, as there are plenty of things you’d want to add to it. For example:
signal
module, then wrapping our final loop in a big try/except block, catching KeyboardInterrupt. This would need to empty the job and processing queues, which will cause your threads to exit gracefully, and then your program will exit too.os.fork()
calls in, forcing your program to multiprocess, which could then take full advantage of multiple cores. You can do this by either dividing up your work queue at the start, or by moving the heavy lifting out of the main thread and into seperate processes. Your main thread could then communicate with these “worker” processes via shared memory or sockets (my preference) then pass back the results. Make sure your functions.parse_feed
can produce picklable objects.jobs.qsize()
Comments and suggestions welcome, and you can feel free to use the contact form.
The code contained in this tutorial is not guaranteed to work, or even compile. It has never been executed and is not tested for syntax errors or other bugs. However it is semantically accurate and is provided “as-is”. I am not responsible for any loss of business, crashes, errors, marital crisis or nuclear wars caused as a result of using this code. Copy and paste at your own risk.
Like what you’ve read, then why not tell others about it... they might enjoy it too
If you think Bronco has the skills to take your business forward then what are you waiting for?
Get in Touch Today!
Great post Dave, I am just wondering who wrote it ? 😀
hey Jason ,, we are all one big happy DaveN family here you know that 🙂
DaveN
” There are many reasons why you’d want to collect this information, which I won’t go in to”
Synomise, make unique throw them up on domains. Get indexed and use it as a 3rd party link source + add some adcents for good measure?
Makes my single-threaded PHP script look quite feeble 🙁
My app is just harvesting search/tech feeds at the moment, will need to take a few bugs (including dupe posts despite MD5 hash comparison) out of it before putting it to better uses:
http://www.evilgreenmonkey.com/egmreader
🙂
Rob
@evilgreenmonkey:
I would advise you use the similar_text() function to identify dupe posts. It’ll do you much better than your current method. You can also use the GUID value, perhaps, I can’t remember how accurate it is in practise.
As for your threading issue, obviously PHP doesn’t support threads, however I can offer some advice.
Solution 1:
If you use PHP5 you can use the curl_multi_* functions to thread data collection. It’s not truly threaded however (it actually uses a technique called asynchronous socket IO) so you’ll either need to collect all your data first or perform your processing inline with curl_multi_exec() magic. This really needs a thorough understanding of AS S IO to be utilised properly though.
Solution 2:
You can obviously do multiprocessing with php in CLI mode using the fork() command. Forked programming is really easy (look up the double-fork paradigm first though), however you’ll have difficulty with IPC (interprocess communication). This takes the place of my Queue objects above. I usually implement my own socket communication protocol, however you can cheat a bit by using PEAR::HTTP_Server and PEAR::HTTP_Client (or curl), or even the PEAR XML-RPC client and server. It’s a bit heavyweight but not loads. You’d basically code the same program above, but instead of your parent process waiting on the Queue objects, it would set up a server that either waits for communications from its spawned children that either ask for work or return results. If you’re on a multicore machine you might as well get the children to do the data processing – or even better, spawn proper worker children.
I may well write another post on this subject in a few days if it takes my fancy.
Thanks Rob, I’ll take those suggestions on board. Might be simpler for me to re-code it in Perl, although will have to dust off the old Camel book. 😀
Because this app will be primarily network IO bound, the Twisted framework http://twistedmatrix.com/trac/ is ideal for this app over threading.
Perl is my language of choice. I love it, though I must say Python is a killer language for its balance between extreme functionality and simple syntax, meaning coding is so damn quick!
Wow awesome article Dave. Dugg.
But does this threaded code take advantage of smp? The CPython implementation of threads runs all threads in the same parent process. It kinda sucks if you are writing anything more complex than a calculator. jkg, but (all) new cpu’s have at lest 2 cores, by the end of the year, most will have 4. So for serious stuff you are limited to 1 core.
Shouldn’t the “Queue.put(info)” on line 17 in Implementation 3 and line 19 in Implementation 4 be “jobs.put(info)”?
Otherwise, nice example!
Is line 17 supposed to say:
jobs.put(info)
rather than:
Queue.put(info)
Just wondering 🙂
A couple of corrections for you. In implementation 3, line 17, and implementation 4, line 19, you have:
Queue.put(info)
These should be:
jobs.put(info)
It’s worthwhile to actually run the code before posting. These little typos are easy to miss.
You guys are so stuipd and gay yous hould use Ruby becuase it is the best! Rails is the most awesomest langauge strucutre to use because it makes things so simple, you only need to intall Rails then put a page together with some ezy code!
Perl! Boo!
Agreed. Perl! Boo!
Of course, I don’t use python either. But I don’t feel as embarrassed for not knowing python as I do about perl.
Cool.
For more free Python scripts have a look at http://www.TheScriptLibrary.com
Thanks for the tutorial and sharing the script. I will have to code it in PHP due to my intertia in learning a new language Python.
Great post. I am not much a python guy. So can you please explain the diff between impl 2 and 3 a bit more detailed?
impl 3 suppose to have a threading pool class that created THREAD_LIMIT threads initilially and reuse those without new().
but what I saw seems impl 3 has only a queue for rss address list, then process the list for every THREAD_LIMIT items..
impl 2 and 3 are using the same one
t = threading.Thread(target=thread)…
Can you please clarify it a bit more? Sorry if it is too dumb question.
Perl yah!
Perl 6 BIG yahhhh
If you wanna code in Python then feel free – You can do it
If you wanna code in PHP then feel free – You can do it.
But none of you can call yourself a coder till you can code in brainfuck which (of course) can run under Perl 6 🙂
Got to love it
Aye I’m well versed in Brainfuck, never actually written any code in it though 😛 I dunno what it is with Perl, I’ve just never liked it. Probably because I percieve it to be a memory hog and the OO features are a bit “hacked on”, a bit like PHP. That and the syntax blows chunks (sometimes) 😛
Just a quick question..
Which plugin do you use to display code, i have tried many but none seem to work with wordpress 2
http://wp-plugins.net/plugin/codesnippet/
Actually it’s http://blog.igeek.info/wp-plugins/igsyntax-hiliter/
XRumer Platinum Edition is the best program for promotion!
It’s have CAPTCHA recognizer, email verificator, and a lot of other functions…
But. I forgot link to it 🙁
Can you give me link to the XRumer description? screenshots, etc.
Thanks
Good example. But in examples I have no right left spaces…
To get high rankings in Yahoo and MSN is all about links? I can get ranked easier in Google with links,
but the other two I have no clue.