Tag Archives: sqlite

ClickBankClickbank is an amazing service that allows anyone to easily to either as a publisher create and sell information products or as an advertiser sell other peoples products for a commission. Clickbank handles the credit card transactions, and refunds while affiliates can earn as much as 90% of the price of the products as commission. It’s a pretty easy to use system and I have used it both as a publisher and as an affiliate to make significant amounts of money online.

The script I have today is a Python program that uses Clickbank’s REST API to download the latest transactions for your affiliate IDs and stuffs the data into a database.

The reason for doing this is that it keeps the data in your control and allows you to more easily see all of the transactions for all your accounts in one place without having to go to clickbank.com and log in to your accounts constantly. I’m going to be including this data in my Business Intelligence Dashboard Application

One of the new things I did while writing this script was made use of SQLAlchemy to abstract the database. This means that it should be trivial to convert it over to use MySQL – just change the connection string.

Also you should note that to use this script you’ll need to get the “Clerk API Key” and the “Developer API Key” from your Clickbank account. To generate those keys go to the Account Settings tab from the account dashboard. If you have more than one affiliate ID then you’ll need one Clerk API Key per affiliate ID.

This is the biggest script I have shared on this site yet. I hope someone finds it useful.

Here’s the code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (C) 2009 HalOtis Marketing
# written by Matt Warren
# http://halotis.com/
 
import csv
import httplib
import logging
 
from sqlalchemy import Table, Column, Integer, String, MetaData, Date, DateTime, Float
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
 
LOG_FILENAME = 'ClickbankLoader.log'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG,filemode='w')
 
#generate these keys in the Account Settings area of ClickBank when you log in.
ACCOUNTS = [{'account':'YOUR_AFFILIATE_ID',  'API_key': 'YOUR_API_KEY' },]
DEV_API_KEY = 'YOUR_DEV_KEY'
 
CONNSTRING='sqlite:///clickbank_stats.sqlite'
 
Base = declarative_base()
class ClickBankList(Base):
    __tablename__ = 'clickbanklist'
    __table_args__ = (UniqueConstraint('date','receipt','item'),{})
 
    id                 = Column(Integer, primary_key=True)
    account            = Column(String)
    processedPayments  = Column(Integer)
    status             = Column(String)
    futurePayments     = Column(Integer)
    firstName          = Column(String)
    state              = Column(String)
    promo              = Column(String)
    country            = Column(String)
    receipt            = Column(String)
    pmtType            = Column(String)
    site               = Column(String)
    currency           = Column(String)
    item               = Column(String)
    amount             = Column(Float)
    txnType            = Column(String)
    affi               = Column(String)
    lastName           = Column(String)
    date               = Column(DateTime)
    rebillAmount       = Column(Float)
    nextPaymentDate    = Column(DateTime)
    email              = Column(String)
 
    format = '%Y-%m-%dT%H:%M:%S'
 
    def __init__(self, account, processedPayments, status, futurePayments, firstName, state, promo, country, receipt, pmtType, site, currency, item, amount , txnType, affi, lastName, date, rebillAmount, nextPaymentDate, email):
        self.account            = account
        if processedPayments != '':
        	self.processedPayments  = processedPayments
        self.status             = status
        if futurePayments != '':
            self.futurePayments     = futurePayments
        self.firstName          = firstName
        self.state              = state
        self.promo              = promo
        self.country            = country
        self.receipt            = receipt
        self.pmtType            = pmtType
        self.site               = site
        self.currency           = currency
        self.item               = item
        if amount != '':
        	self.amount             = amount 
        self.txnType            = txnType
        self.affi               = affi
        self.lastName           = lastName
        self.date               = datetime.strptime(date[:19], self.format)
        if rebillAmount != '':
        	self.rebillAmount       = rebillAmount
        if nextPaymentDate != '':
        	self.nextPaymentDate    = datetime.strptime(nextPaymentDate[:19], self.format)
        self.email              = email
 
    def __repr__(self):
        return "<clickbank ('%s - %s - %s - %s')>" % (self.account, self.date, self.receipt, self.item)
 
def get_clickbank_list(API_key, DEV_key):
    conn = httplib.HTTPSConnection('api.clickbank.com')
    conn.putrequest('GET', '/rest/1.0/orders/list')
    conn.putheader("Accept", 'text/csv')
    conn.putheader("Authorization", DEV_key+':'+API_key)
    conn.endheaders()
    response = conn.getresponse()
 
    if response.status != 200:
        logging.error('HTTP error %s' % response)
        raise Exception(response)
 
    csv_data = response.read()
 
    return csv_data
 
def load_clickbanklist(csv_data, account, dbconnection=CONNSTRING, echo=False):
    engine = create_engine(dbconnection, echo=echo)
 
    metadata = Base.metadata
    metadata.create_all(engine) 
 
    Session = sessionmaker(bind=engine)
    session = Session()
 
    data = csv.DictReader(iter(csv_data.split('\n')))
 
    for d in data:
        item = ClickBankList(account, **d)
        #check for duplicates before inserting
        checkitem = session.query(ClickBankList).filter_by(date=item.date, receipt=item.receipt, item=item.item).all()
 
        if not checkitem:
            logging.info('inserting new transaction %s' % item)
            session.add(item)
 
    session.commit()
 
if  __name__=='__main__':
    try:
        for account in ACCOUNTS:
            csv_data = get_clickbank_list(account['API_key'], DEV_API_KEY)
            load_clickbanklist(csv_data, account['account'])
    except:
        logging.exception('Crashed')
</clickbank>

3038922333_79273fbb30_oThere are a number of services out there such as Google Cash Detective that will go run some searches on Google and then save the advertisements so you can track who is advertising for what keywords over time. It’s actually a very accurate technique for finding out what ads are profitable.

After tracking a keyword for several weeks it’s possible to see what ads have been running consistently over time. The nature of Pay Per Click is that only profitable advertisements will continue to run long term. So if you can identify what ads, for what keywords are profitable then it should be possible to duplicate them and get some of that profitable traffic for yourself.

The following script is a Python program that perhaps breaks the Google terms of service. So consider it as a guide for how this kind of HTML parsing could be done. It spoofs the User-agent to appear as though it is a real browser, and then does a search through all the keywords stored in an sqlite database and stores the ads displayed for that keyword in the database.

The script makes use of the awesome Beautiful Soup library. Beautiful Soup makes parsing HTML content really easy. But because of the nature of scraping the web it is very fragile since it makes several assumptions about the structure of the Google results page and if they change their site then the script could break.

#!/usr/bin/env python
 
import sys
import urllib2
import re
import sqlite3
import datetime
 
from BeautifulSoup import BeautifulSoup  # available at: http://www.crummy.com/software/BeautifulSoup/
 
conn = sqlite3.connect("espionage.sqlite")
conn.row_factory = sqlite3.Row
 
def get_google_search_results(keywordPhrase):
	"""make the GET request to Google.com for the keyword phrase and return the HTML text
	"""
	url='http://www.google.com/search?hl=en&q=' + '+'.join(keywordPhrase.split())
	req = urllib2.Request(url)
	req.add_header('User-agent', 'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.13 (KHTML, like Gecko) Chrome/0.2.149.29 Safari/525.13')
	page = urllib2.urlopen(req)
	HTML = page.read()
	return HTML
 
def scrape_ads(text, phraseID):
	"""Scrape the text as HTML, find and parse out all the ads and store them in a database
	"""
	soup = BeautifulSoup(text)
	#get the ads on the right hand side of the page
	ads = soup.find(id='rhsline').findAll('li')
	position = 0
	for ad in ads:
		position += 1
 
		#display url
		parts = ad.find('cite').findAll(text=True)
		site = ''.join([word.strip() for word in parts]).strip()
		ad.find('cite').replaceWith("")
 
		#the header line
		parts = ad.find('a').findAll(text=True)
		title = ' '.join([word.strip() for word in parts]).strip()
 
		#the destination URL
		href = ad.find('a')['href']
		start = href.find('&q=')
		if start != -1 :
			dest = href[start+3:]
		else :
			dest = None
			print 'error', href
 
		ad.find('a').replaceWith("")
 
		#body of ad
		brs = ad.findAll('br')
		for br in brs:
			br.replaceWith("%BR%")
		parts = ad.findAll(text=True)
		body = ' '.join([word.strip() for word in parts]).strip()
		line1 = body.split('%BR%')[0].strip()
		line2 = body.split('%BR%')[1].strip()
 
		#see if the ad is in the database
		c = conn.cursor()
		c.execute('SELECT adID FROM AdTable WHERE destination=? and title=? and line1=? and line2=? and site=? and phraseID=?', (dest, title, line1, line2, site, phraseID))
		result = c.fetchall() 
		if len(result) == 0:
			#NEW AD - insert into the table
			c.execute('INSERT INTO AdTable (`destination`, `title`, `line1`, `line2`, `site`, `phraseID`) VALUES (?,?,?,?,?,?)', (dest, title, line1, line2, site, phraseID))
			conn.commit()
			c.execute('SELECT adID FROM AdTable WHERE destination=? and title=? and line1=? and line2=? and site=? and phraseID=?', (dest, title, line1, line2, site, phraseID))
			result = c.fetchall()
		elif len(result) > 1:
			continue
 
		adID = result[0]['adID']
 
		c.execute('INSERT INTO ShowTime (`adID`,`date`,`time`, `position`) VALUES (?,?,?,?)', (adID, datetime.datetime.now(), datetime.datetime.now(), position))
 
 
def do_all_keywords():
	c = conn.cursor()
	c.execute('SELECT * FROM KeywordList')
	result = c.fetchall()
	for row in result:
		html = get_google_search_results(row['keywordPhrase'])
		scrape_ads(html, row['phraseID'])
 
if __name__ == '__main__' :
	do_all_keywords()

Another in my series of Python scripting blog posts. This time I’m sharing a script that can rip through RSS feeds and devour their content and stuff it into a database in a way that scales up to 1000s of feeds. To accomplish this the script is multi-threaded.

The big problem with scaling up a web script like this is that there is a huge amount of latency when requesting something over the internet. Due to the bandwidth as well as remote processing time it can take as long as a couple of seconds to get anything back. Requesting one feed after the other in series will waste a lot of time, and that makes this type of script a prime candidate for some threading.

I borrowed parts of this script from this post: Threaded data collection with Python, including examples

What could you do with all this content? Just off the top of my head I can think of many interesting things to do:

  • Create histograms of the publish times of posts to find out the most/least popular days and times are for publishing
  • Plot trends of certain words or phrases over time
  • create your own aggregation website
  • get the trending topics by doing counting the occurrence of words by day
  • Try writing some natural language processing algorithms

This script is coded at 20 threads, but that really needs to be fine tuned for the best performance. Depending on your bandwidth and the sites you want to grab you may want to tweak the THREAD_LIMIT value.

import sqlite3
import threading
import time
import Queue
from time import strftime
 
import feedparser     # available at http://feedparser.org
 
 
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)
 
DATABASE = "rss.sqlite"
 
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
c = conn.cursor()
 
#insert initial values into feed database
c.execute('CREATE TABLE IF NOT EXISTS RSSFeeds (id INTEGER PRIMARY KEY AUTOINCREMENT, url VARCHAR(1000));')
c.execute('CREATE TABLE IF NOT EXISTS RSSEntries (entry_id INTEGER PRIMARY KEY AUTOINCREMENT, id, url, title, content, date);')
c.execute("INSERT INTO RSSFeeds(url) VALUES('http://www.halotis.com/feed/');")
 
feeds = c.execute('SELECT id, url FROM RSSFeeds').fetchall()
 
def store_feed_items(id, items):
    """ Takes a feed_id and a list of items and stored them in the DB """
    for entry in items:
        c.execute('SELECT entry_id from RSSEntries WHERE url=?', (entry.link,))
        if len(c.fetchall()) == 0:
            c.execute('INSERT INTO RSSEntries (id, url, title, content, date) VALUES (?,?,?,?,?)', (id, entry.link, entry.title, entry.summary, strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)))
 
def thread():
    while True:
        try:
            id, feed_url = jobs.get(False) # False = Don't wait
        except Queue.Empty:
            return
 
        entries = feedparser.parse(feed_url).entries
        rss_to_process.put((id, entries), True) # This will block if full
 
for info in feeds: # Queue them up
    jobs.put([info['id'], info['url']])
 
for n in xrange(THREAD_LIMIT):
    t = threading.Thread(target=thread)
    t.start()
 
while threading.activeCount() > 1 or not rss_to_process.empty():
    # That condition means we want to do this loop if there are threads
    # running OR there's stuff to process
    try:
        id, entries = rss_to_process.get(False, 1) # Wait for up to a second
    except Queue.Empty:
        continue
 
    store_feed_items(id, entries)
 
conn.commit()

I was a little bored today and decided to write up a simple script that pushes RSS feed information out to Twitter and manages to keep track of the history so that tweets are not sent out more than once.

It was actually a very trivial little script to write but it could actually be useful for something that I’m working on in the future.

The script makes use of an Sqlite database to store history and bit.ly for shortening URLs. I’ve made heavy use of some really nice open source libraries to make for a very short and sweet little script.

Grab the necessary python libraries:
python-twitter
python-bitly
feedparser

You’ll need to sign up for free accounts at Twitter and Bit.ly to use this script.

Hopefully someone out there can take this code example to do something really cool with Twitter and Python.

Update: I’ve added some bit.ly link tracking output to this script. After it twitters the RSS feed it will print out the click count information for every bit.ly link.

from time import strftime
import sqlite3
 
import twitter     #http://code.google.com/p/python-twitter/
import bitly       #http://code.google.com/p/python-bitly/
import feedparser  #available at feedparser.org
 
 
DATABASE = "tweets.sqlite"
 
BITLY_LOGIN = "bitlyUsername"
BITLY_API_KEY = "api key"
 
TWITTER_USER = "username"
TWITTER_PASSWORD = "password"
 
def print_stats():
	conn = sqlite3.connect(DATABASE)
	conn.row_factory = sqlite3.Row
	c = conn.cursor()
 
	b = bitly.Api(login=BITLY_LOGIN,apikey=BITLY_API_KEY)
 
	c.execute('SELECT title, url, short_url from RSSContent')
	all_links = c.fetchall()
 
	for row in all_links:
 
		short_url = row['short_url']
 
		if short_url is None:
			short_url = b.shorten(row['url'])
			c.execute('UPDATE RSSContent SET `short_url`=? WHERE `url`=?',(short_url,row['url']))
 
 
		stats = b.stats(short_url)
		print "%s - User clicks %s, total clicks: %s" % (row['title'], stats.user_clicks,stats.total_clicks)
 
	conn.commit()
 
def tweet_rss(url):
 
	conn = sqlite3.connect(DATABASE)
	conn.row_factory = sqlite3.Row
	c = conn.cursor()
 
	#create the table if it doesn't exist
	c.execute('CREATE TABLE IF NOT EXISTS RSSContent (`url`, `title`, `dateAdded`, `content`, `short_url`)')
 
	api = twitter.Api(username=TWITTER_USER, password=TWITTER_PASSWORD)
	b = bitly.Api(login=BITLY_LOGIN,apikey=BITLY_API_KEY)
 
	d = feedparser.parse(url)
 
	for entry in d.entries:
 
		#check for duplicates
		c.execute('select * from RSSContent where url=?', (entry.link,))
		if not c.fetchall():
 
			tweet_text = "%s - %s" % (entry.title, entry.summary)
 
			shortened_link = b.shorten(entry.link)
 
			t = (entry.link, entry.title, strftime("%Y-%m-%d %H:%M:%S", entry.updated_parsed), entry.summary, shortened_link)
			c.execute('insert into RSSContent (`url`, `title`,`dateAdded`, `content`, `short_url`) values (?,?,?,?,?)', t)
			print "%s.. %s" % (tweet_text[:115], shortened_link)
 
			api.PostUpdate("%s.. %s" % (tweet_text[:115], shortened_link))
 
	conn.commit()
 
if __name__ == '__main__':
  tweet_rss('http://www.halotis.com/feed/')
  print_stats()

I discovered this very handy trick for getting relevant YouTube videos in an RSS feed and I have used it to build up some very powerful blog posting scripts to grab relevant content for some blogs that I have.  It could also be helpful to pull these into an RSS Reader to quickly skim the newest videos relevant to a specific search.  I thought I would share some of this with you and hopefully you’ll be able to use these scripts to get an idea of your own.

To start with you need to create the URL of the RSS feed for the search.  To do that you can do the search on YouTube and click the RSS icon in the address bar.  The structure of the URL should be something like this:

http://gdata.youtube.com/feeds/base/videos?q=halotis%20marketing&client=ytapi-youtube-search&alt=rss&v=2

The problem with the RSS feed is that they don’t include the HTML required to embed the video. You have to parse the RSS content and find the URL for the video which can be used to create the embed code so you can post the videos somewhere else.

In my example code I have categorized each url to a target keyword phrase.  The code below is not a comprehensive program, but just an idea of how to go about repurposing YouTube RSS content.

import feedparser  # available at: feedparser.org
from time import strftime
import sqlite3
 
DATABASE = "YouTubeMatches.sqlite"
 
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
c = conn.cursor()
 
def LoadIntoDatabase(phrase, url):
 
	d = feedparser.parse(url)
	for entry in d.entries:
 
		#check for duplicates with the url
		Constants.c.execute('select * from YoutubeResources where url=?', (entry.link,))
		if len(Constants.c.fetchall()) == 0:
			#only adding the resources that are not already in the table.
			t = (phrase,entry.link, 0, entry.title, strftime("%Y-%m-%d %H:%M:%S", entry.updated_parsed), entry.summary)
			Constants.c.execute('insert into YoutubeResources (`phrase`, `url`, `used`, `title`,`dateAdded`, `content`) values (?,?,?,?,?,?)', t)
 
	Constants.conn.commit()
 
def getYouTubeEmbedCode(phrase):
 
	c.execute("select * from YoutubeResources where phrase=? and used=0", (phrase,))
	result = Constants.c.fetchall()
	random.shuffle(result)
 
	content = result[0]
 
	contentString = content[3]
	url=content[1].replace('?', '').replace('=' , '/')
	embedCode='&lt;div class="youtube-video"&gt;&lt;object width="425" height="344"&gt;&lt;param name="movie" value="%s&amp;hl=en&amp;fs=1"&gt; &lt;/param&gt;&lt;param name="allowFullScreen" value="true"&gt; &lt;/param&gt;&lt;param name="allowscriptaccess" value="always"&gt; &lt;/param&gt;&lt;embed src="%s&amp;hl=en&amp;fs=1" type="application/x-shockwave-flash" allowscriptaccess="always" allowfullscreen="true" width="425" height="344"&gt; &lt;/embed&gt;&lt;/object&gt;&lt;/div&gt;' % (url, url)
 
	t=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
	c.execute("UPDATE YoutubeResources SET used = '1', dateUsed = ? WHERE url = ? ", (t, url) )
 
	return embedCode