#!/usr/bin/env python # ----------------------------------------------------------------------- # Copyright (C) 2006 Will Uther # http://www.cse.unsw.edu.au/~willu/ # and Indulis Bernsteins (channeltz patch) # indulis 2 1 C mail c o m # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MER- # CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # The full text of the GPL license is available here: # http://www.gnu.org/licenses/gpl.html # """Grab TV guide information from either the Australian Community-based TV Guide, or the IceTV commercial guide. Version $Date: 2008-02-26 22:24:14 +1100 (Tue, 26 Feb 2008) $. See http://www.cse.unsw.edu.au/~willu/xmltv/tv_grab_au_reg.html for more information on this script. See http://tvguide.org.au/ and/or http://www.icetv.com.au/ for more infomation on the guide data itself. You need to register to get data from either source. The IceTV data is higher quality, because you pay to have someone update it for you. The Community data covers more channels and is cheaper. This grabber is distributed under the GNU Public License. The guide data is distributed under the license you agree to when you register with the data provider (last I checked, either a Creative Commons license or the IceTV license). Usage: tv_grab_au_reg [options] Options: -h / --help Print this message and exit. --version Print the date this script was last edited and exit. -v / --verbose Increase verbosity of output (to stderr) while grabbing --debug Save incomming data to a file, error.xml, if we get a parse error. Inefficient and ugly code. -o / --output Output to FILE rather than standard output -d / --days Grab N days. Defaults to grabbing 7 days of guide data. --offset Start grabbing N days in the future. Defaults to 0; starting grabbing with today's data. --configure Write a default configuration file. This file will need to be edited with appropriate information. -c / --config-file Read(/write) configuration information from FILE instead of from ~/.xmltv/tv_grab_au_reg.xml . Some Windows users have had problems with .xmltv as a directory. If you have a similar problem, this option is for you :). --force-explicit-timezone Force any implicitly UTC timestamps in the XMLTV input to have an explicit timezone of +0000 in the output. --quiet Suppress all progress information. --capabilities List XMLTV capabilities. --preferredmethod Tell the calling program that we prefer to return all the data at once. --description Tell everyone that we're an Australian grabber. Configuration File: The config file is in XML. The config file includes your login information for each data source, and the list of channels to grab for each data source. See the default config file for a basic example. """ version = "tv_grab_au_reg/1.0 $Date: 2008-02-26 22:24:14 +1100 (Tue, 26 Feb 2008) $" import sys import os import getopt import time import datetime import urllib2 from xml.dom import pulldom import re import shutil import gzip import random verbose = 0 debug = 0 output = sys.stdout try: time.tzset() except: print >> sys.stderr, "Unable to set timezone - normal on Windows, sigh." epoch = datetime.date(1970,1,1) today = (datetime.date.today() - epoch).days start_day_offset = 0 days = 7 use_tvguide = False use_icetv = False filter_tvguide = False filter_icetv = False config_file = os.path.join('.xmltv','tv_grab_au_reg.xml') tvguide_root_url = 'http://minnie.tuhs.org/xmltv/channels.xml.gz' tvguide_server = 'minnie.tuhs.org' tvguide_realm = 'Enter your WikiName: (First name and last name, no space, no dots, capitalized, e.g. JohnSmith). Cancel to register if you do not have one.' icetv_base_url = 'http://iceguide.icetv.com.au/cgi-bin/epg/iceguide.cgi' icetv_server = 'iceguide.icetv.com.au' icetv_realm = 'cgi-bin-epg' opener = None authinfo = urllib2.HTTPBasicAuthHandler() tvguide_channels = [] tvguide_conversion = {} icetv_channels = [] icetv_conversion = {} force_explicit_timezone = False timeRE = re.compile("(\d+)(\s+\S+)?") useRandom=False XML_header = '' XMLTV_Tag = """ """ defaultConfigFile = """ """ capability_string = """baseline manualconfig preferredmethod""" preferredmethod_string = """allatonce""" description_string = """Australia (tvguide.org.au and/or icetv.com.au)""" def getHomeDir(): ''' Try to find user's home directory, otherwise return current directory.''' try: path1=os.path.expanduser("~") except: path1="" try: path2=os.environ["HOME"] except: path2="" try: path3=os.environ["USERPROFILE"] except: path3="" if not os.path.exists(path1): if not os.path.exists(path2): if not os.path.exists(path3): return os.getcwd() else: return path3 else: return path2 else: return path1 def outputNewChannel(provider, channel): if verbose > 1: print >> sys.stderr, "Generating channel tag for channel " + channel[provider] + \ " from provider " + provider + "." if channel.has_key('display'): print >> sys.stderr, "Display name: " + channel['display'] if channel.has_key('local'): print >> sys.stderr, "Changing XMLTV name to: " + channel['local'] if channel.has_key('local'): channelXMLstring = '\n\t' else: channelXMLstring = '\n\t' if channel.has_key('display'): channelXMLstring = channelXMLstring + '' + \ channel['display'] + '' channelXMLstring = channelXMLstring + '\n' output.write(channelXMLstring) if verbose > 2: print >> sys.stderr, "channel string: ", channelXMLstring def outputOldChannel(node, this_channel, channel): global verbose, output if verbose > 1: print >> sys.stderr, "Processing channel tag for channel " + this_channel if channel != None and channel.has_key('display'): print >> sys.stderr, "Display name (will NOT be changed to): " + channel['display'] if channel != None and channel.has_key('local'): print >> sys.stderr, "Changing XMLTV ID to: " + channel['local'] if channel != None and channel.has_key('local'): node.setAttribute('id', channel['local']) out = node.toxml("utf-8") output.write(out) output.write("\n") if verbose > 2: print out print "\n" def outputProgramme(node, channel): global verbose, output, force_explicit_timezone if channel != None: if channel.has_key('channeltz'): channelTimezone = channel['channeltz'] subStr = r'\1 ' + channelTimezone startTime = node.getAttribute('start') node.setAttribute('start', timeRE.sub(subStr, startTime)) endTime = node.getAttribute('stop') node.setAttribute('stop', timeRE.sub(subStr, endTime)) elif force_explicit_timezone: startTime = node.getAttribute('start') match = timeRE.search(startTime) if match.group(2) == None: node.setAttribute('start', startTime + " +0000") endTime = node.getAttribute('stop') match = timeRE.search(endTime) if match.group(2) == None: node.setAttribute('stop', endTime + " +0000") if channel.has_key('local'): node.setAttribute('channel', channel['local']) out = node.toxml("utf-8") output.write(out) output.write("\n") if verbose > 4: print out print "\n" def outputData(conversionMap, filt, f): global verbose, output, force_explicit_timezone events = pulldom.parse(f) for (event,node) in events: if event == pulldom.START_ELEMENT: if node.nodeName == "channel": events.expandNode(node) this_channel = node.getAttribute('id') if conversionMap.has_key(this_channel): channel = conversionMap[this_channel] outputOldChannel(node, this_channel, channel) elif not filt: outputOldChannel(node, this_channel, None) elif verbose > 2: print >> sys.stderr, "Ignoring channel tag for", this_channel elif node.nodeName == "programme": events.expandNode(node) this_channel = node.getAttribute('channel') if conversionMap.has_key(this_channel): channel = conversionMap[this_channel] outputProgramme(node, channel) elif not filt: outputProgramme(node, None) elif verbose > 3: print >> sys.stderr, "Ignoring programme tag for program on channel", this_channel elif node.nodeName == "message": events.expandNode(node) print >> sys.stderr, "Message from server: ", node.toxml() def grabDataFromReq(conversionMap, filt, req): global verbose, debug req.add_header('User-agent', version) req.add_header('Accept-encoding', 'gzip') if verbose > 2: print >> sys.stderr, "requesting url: ", req.get_full_url() if debug > 0: tf = os.tmpfile() f = tf encoding = 'Default' try: f = urllib2.urlopen(req) encoding = f.info().getheader('Content-Encoding') if encoding == 'gzip': gtf = os.tmpfile() shutil.copyfileobj(f, gtf) gtf.seek(0) gzipper = gzip.GzipFile(fileobj=gtf,mode='rb') f = gzipper if debug > 0: shutil.copyfileobj(f, tf) tf.seek(0) f = tf outputData(conversionMap, filt, f) except: print >> sys.stderr print >> sys.stderr print >> sys.stderr, "Error requesting url: ", req.get_full_url() print >> sys.stderr, sys.exc_info()[0] print >> sys.stderr print >> sys.stderr, "HTTP reply content encoding: ", encoding print >> sys.stderr print >> sys.stderr, "Common errors include bad proxy configuration, and" print >> sys.stderr, "not setting the username/password correctly in the config file." print >> sys.stderr print >> sys.stderr if debug > 0: print >> sys.stderr, "Creating file error.xml with the offending xml" f.seek(0) errFile = open('error.xml','wb') shutil.copyfileobj(f, errFile) raise def IceTVDate(days_from_today): td = datetime.timedelta(days_from_today) day = datetime.date.today() + td return day.strftime("%y%m%d") def TvGuideDate(days_from_today): td = datetime.timedelta(days_from_today) day = datetime.date.today() + td return day.strftime("%Y-%m-%d") def annotate_tv_guide(conversionMap, req): global verbose, debug req.add_header('User-agent', version) req.add_header('Accept-encoding', 'gzip') if verbose > 2: print >> sys.stderr, "requesting url: ", req.get_full_url() if debug > 0: tf = os.tmpfile() f = tf encoding = 'Default' try: f = urllib2.urlopen(req) encoding = f.info().getheader('Content-Encoding') if encoding == 'gzip': gtf = os.tmpfile() shutil.copyfileobj(f, gtf) gtf.seek(0) gzipper = gzip.GzipFile(fileobj=gtf,mode='rb') f = gzipper if debug > 0: shutil.copyfileobj(f, tf) tf.seek(0) f = tf events = pulldom.parse(f) for (event,node) in events: if event == pulldom.START_ELEMENT: if node.nodeName == "channel": events.expandNode(node) count = 0 this_channel = node.getAttribute('id') if conversionMap.has_key(this_channel): channel = conversionMap[this_channel] for child in node.childNodes: if child.nodeName == "base-url": count = count + 1 if (useRandom and random.random() < 1.0/count) or ((not useRandom) and count == 1): child.normalize() for tChild in child.childNodes: if tChild.nodeType == tChild.TEXT_NODE: channel['base-url'] = tChild.data elif verbose > 2: print >> sys.stderr, "Ignoring tv_guide channel tag for", this_channel elif node.nodeName == "message": events.expandNode(node) print >> sys.stderr, "Message from server: ", node.toxml() except: print >> sys.stderr print >> sys.stderr print >> sys.stderr, "Error requesting url: ", req.get_full_url() print >> sys.stderr, sys.exc_info()[0] print >> sys.stderr print >> sys.stderr, "HTTP reply content encoding: ", encoding print >> sys.stderr print >> sys.stderr, "Common errors include bad proxy configuration, and" print >> sys.stderr, "not setting the username/password correctly in the config file." print >> sys.stderr print >> sys.stderr if debug > 0: print >> sys.stderr, "Creating file error.xml with the offending xml" f.seek(0) errFile = open('error.xml','wb') shutil.copyfileobj(f, errFile) raise def grab(): global output, tvguide_channels, icetv_channels, start_day_offset, days, verbose, opener, authinfo global tvguide_conversion, icetv_conversion opener = urllib2.build_opener(authinfo) urllib2.install_opener(opener) output.write(XML_header) output.write(XMLTV_Tag) # output channel tags: if use_tvguide: req = urllib2.Request(tvguide_root_url) annotate_tv_guide(tvguide_conversion, req) for channel in tvguide_channels: outputNewChannel('tvguide', channel) if use_icetv: req = urllib2.Request(icetv_base_url + "?op=xmlguide" + \ '&start_date=' + IceTVDate(start_day_offset) + \ '&end_date=' + IceTVDate(start_day_offset+days)) grabDataFromReq(icetv_conversion, filter_icetv, req) # output IceTV channels then programs output.write("\n\n") # output tvguide data if use_tvguide: for channel in tvguide_channels: url_base = channel['base-url'] + channel['tvguide'] + "_"; for day in range(0, days+1): date = TvGuideDate(day) url = url_base + date + ".xml.gz" # print >> sys.stderr, "URL:", url req = urllib2.Request(url) try: grabDataFromReq(tvguide_conversion, filter_tvguide, req) except urllib2.HTTPError, e: if e.code == 401: print >> sys.stderr, 'URL request not authorized' raise elif e.code == 404: print >> sys.stderr, 'URL not found' elif e.code == 503: print >> sys.stderr, 'service unavailable' raise else: print 'unknown error: ', e.code raise output.write("\n\n\n") def load_config(cf): global channels, authinfo, filter_tvguide, filter_icetv, use_tvguide, use_icetv events = pulldom.parse(cf) for (event,node) in events: if event == pulldom.START_ELEMENT: if node.nodeName == "login": events.expandNode(node) provider = node.getAttribute('provider') user = node.getAttribute('user') passwd = node.getAttribute('password') filterAttr = node.getAttribute('filter') if provider == 'tvguide': authinfo.add_password(tvguide_realm, tvguide_server, user, passwd) filter_tvguide = True # ((filterAttr == None) or (filterAttr.lower() == 'true')) use_tvguide = True elif provider == 'icetv': authinfo.add_password(icetv_realm, icetv_server, user, passwd) filter_icetv = ((filterAttr != None) and (filterAttr.lower() == 'true')) use_icetv = True else: print >> sys.stderr, "Unrecognised data provider in login tag:", provider elif node.nodeName == 'channel': events.expandNode(node) providerCount = 0 convMap = {} if node.attributes.has_key('display'): convMap['display'] = node.getAttribute('display') if node.attributes.has_key('local'): convMap['local'] = node.getAttribute('local') if node.attributes.has_key('channeltz'): convMap['channeltz'] = node.getAttribute('channeltz') if node.attributes.has_key('tvguide'): convMap['tvguide'] = node.getAttribute('tvguide') tvguide_channels.append(convMap) tvguide_conversion[convMap['tvguide']] = convMap providerCount = providerCount + 1 if node.attributes.has_key('icetv'): convMap['icetv'] = node.getAttribute('icetv') icetv_channels.append(convMap) icetv_conversion[convMap['icetv']] = convMap providerCount = providerCount + 1 if providerCount == 0: print >> sys.stderr, "No data provider in config channel tag:", node.toxml("utf-8") sys.exit(1) elif providerCount > 1: print >> sys.stderr, "Multiple data providers in config channel tag:", node.toxml("utf-8") sys.exit(1) if filter_tvguide and (len(tvguide_channels) == 0): if use_tvguide: print >> sys.stderr, "Warning: all tvguide channels being filtered out!" use_tvguide = False if filter_icetv and (len(icetv_channels) == 0): if use_icetv: print >> sys.stderr, "Warning: all IceTV channels being filtered out!" use_icetv = False def usage(): print >> sys.stderr, __doc__ def main(): global verbose, debug, output, today, start_day_offset, days, config_file, force_explicit_timezone try: opts, args = getopt.getopt(sys.argv[1:], \ "hvo:d:c:", ["help", "version", "verbose", "quiet", "debug", "output=", "days=", "offset=", "configure", "config-file=", "force-explicit-timezone", "capabilities", "preferredmethod", "description"]) except getopt.GetoptError: # print help information and exit: print "Unrecognised option: " usage() sys.exit(2) config_file = os.path.join(getHomeDir(), config_file) configure = False for o, a in opts: if o in ("-h", "--help"): usage() sys.exit() elif o == "--version": print >> sys.stderr, version sys.exit(0) elif o == "--capabilities": print capability_string sys.exit(0) elif o == "--preferredmethod": print preferredmethod_string sys.exit(0) elif o == "--description": print description_string sys.exit(0) elif o in ("-v", "--verbose"): verbose = verbose + 1 print >> sys.stderr, "Increasing verbosity" elif o == "--quiet": verbose = -1000 elif o == "--debug": debug = 1 print >> sys.stderr, "Debug mode" elif o in ("-o", "--output"): if verbose > 0: print >> sys.stderr, "Outputting to file: ", a output = open(a, 'w') elif o in ("-d", "--days"): days = int(a) if days < 1: print >> sys.stderr, "Cannot grab less than one day of data!" sys.exit(1) if verbose > 0: print >> sys.stderr, "Grabbing", days, "days of data." elif o == "--offset": start_day_offset = int(a) if verbose > 0: print >> sys.stderr, "Starting grab offset", start_day_offset, "days." elif o == "--configure": configure = True elif o in ("-c", "--config-file"): config_file = a if verbose > 0: print >> sys.stderr, "Using config file: ", config_file elif o == "--force-explicit-timezone": force_explicit_timezone = True if configure: print >> sys.stderr, "Resetting config file to default: ", config_file f = open(config_file, "w") try: f.write(defaultConfigFile) finally: f.close() sys.exit(0) if os.path.isfile(config_file): if verbose > 1: print >> sys.stderr, "Reading config file: ", config_file try: load_config(config_file) except: print >> sys.stderr print >> sys.stderr print >> sys.stderr, "Error reading config file: ", config_file print >> sys.stderr print >> sys.stderr raise else: print >> sys.stderr, "Aborting: Config file does not exist: ", config_file print >> sys.stderr, "Do you need to run with --config-file or --configure?" sys.exit(2) try: grab() finally: if output != sys.stdout: output.close() output = sys.stdout if __name__ == '__main__': try: main() except KeyboardInterrupt: print >> sys.stderr, "User required program abort using Keyboard!"