Wikipedia Data in Spark and Scala

Who hasn't spent hours browsing Wikipedia, following link after link, going from the History of Cheese to the Roman Empire to Pig Wresting (yes that's an actual Wikipedia article). Now that you're back from your Wikipedia binge, I have a question for you: have you ever wondered what data is available from wikipedia from which you can pull fun insights? If you have, and are a Spark user, then this is the post for you.

Wikipedia actually provides a few rich sources of data on which to play and experiment; however it's far from obvious how to load this data cleanly into Spark. This post focuses on that and all the code here is in the wiki-parser github repo for others to use. There are going to be future posts going into building data pipelines, machine learning training data, and then machine learning models on this data.

The data sources we'll be loading are:

  • Wikipedia Raw XML Dump: This is all the articles in the English wikipedia as an xml dump from their database. The resulting data is not processed and is using the raw markup tags that you'd get if you were editing an article on Wikipedia. You can find more information here and here.
  • Wikipedia Page Views: The count of page views on Wikipedia articles broken down by project and some additional areas such as mobile. You can find more information here.
  • Wikipedia Click Stream: A sample of clicks going to Wikipedia articles with information on the referrer of the click. This includes both internal (other articles) and external (google, twitter, etc.) sources however the exact external referrer is hidden. You can find more information here and here.

Wikipedia Raw XML Dump

The parsing and loading are done 100% in Spark utilizing two pieces of code:

  • The parsing code from this blog post by tuxdna to break up the raw data into individual pages
  • The Java Wikipedia API library to actually parse the individual XML pages and then further to extract data from those pages

The biggest limitations are tat the first step is single threaded (and takes hours due to the data size) and that templates are not expanded. The later can be done in Spark however the effort needed was simply not seen as worth it at this time.

The Spark code for reading the raw data was in the end:

def readWikiDump(sc: SparkContext, file: String) : RDD[(Long, String)] = {
  val rdd = sc.hadoopFile[LongWritable, Text, WikiInputFormat](file)
  rdd.map{case (k,v) => (k.get(), v.toString)}.repartition(100)
}

This creates an rdd where each record is a single <page>...</page> entry from the original XML file. This Java Wikipedia API library could have been used here however we wanted a simple and fast initial parser. This allows us to, for example, easily understand how often the <> library runs into parsing exceptions and fails to properly parse Wikipedia pages without having to keep a lot of debugging code around. 

The key aspect of this parsing is the custom WikiInputFormat Hadoop InputFormat which wraps around tuxdna's code and returns a single record per page. The InputFormat is very bare bones since it runs on a single massive non-splittable file so a lot of the intelligence common in InputFormat classes was stripped out. The key is the position in the compressed file where the page was found.

The output of this is then fed into the next Parsing step which actually extract structured information from the raw pages. This uses the Java Wikipedia API library:

case class Page(title: String, text: String, isCategory: Boolean , isFile: Boolean, isTemplate: Boolean)

def parsePages(rdd: RDD[(Long, String)]): RDD[(Long, Page)] = {
  rdd.mapValues{
    text => {
      val wrappedPage = new WrappedPage
      //The parser occasionally exceptions out, we ignore these
      try {
        val parser = new WikiXMLParser(new ByteArrayInputStream(text.getBytes), new SetterArticleFilter(wrappedPage))
        parser.parse()
      } catch {
        case e: Exception =>
      }
      val page = wrappedPage.page
      if (page.getText != null && page.getTitle != null
        && page.getId != null && page.getRevisionId != null
        && page.getTimeStamp != null) {
        Some(Page(page.getTitle, page.getText, page.isCategory, page.isFile, page.isTemplate))
      } else {
        None
      }
    }
  }.filter(_._2.isDefined).mapValues(_.get)
}

The WrappedWikiArticle object provides a serializable and writtable class which also allows for a settable WikiArticle object. This allows for both easy serialization of the and reading of the object from the WikiXMLParser parser. The WikiXMLParser doesn't return a parsed page but rather set's the page in the wrappedPage object using the SetterArticleFilter.

Wikipedia Page Views

The Page View files are hourly files containing all the page views which happened in a white space delimited format.  Parsing them is fairly straightforward although the page title has to be unescaped to be in line with how we're processing other data sources:

case class PageCounts(project: String, pageTitle: String, views: Long)

def readPageCounts(sc: SparkContext, path: String): RDD[PageCounts] = {
  val rdd = sc.textFile(path + "/*")
  rdd.map(_.split(" ")).map(l => PageCounts(
    l(0),
    StringEscapeUtils.unescapeHtml4(l(1)),
    l(2).toLong
  )
  )
}

Wikipedia Click Stream

The Click Stream file provides various information on the clicks which happened on Wikipedia in a tab delimited format.  Parsing the file is fairly straightforward although article titles had to be cleaned up a bit to be in line with the rest of the page:

case class Clicks(prevId: String, currId: String, n: Long, prevTitle: String, currTitle: String, clickType: String)

def readClickSteam(sc: SparkContext, file: String) : RDD[Clicks] = {
  val rdd = sc.textFile(file)
  rdd.zipWithIndex().filter(_._2 != 0).map(_._1)
    .repartition(10)
    .map(_.split('\t'))
    .map(l => Clicks(
      l(0),
      l(1),
      l(2).toLong,
      l(3).replace("_"," "), //Click stream uses _ for spaces while the dump parsing uses actual spaces
      l(4).replace("_"," "), //Click stream uses _ for spaces while the dump parsing uses actual spaces
      l(5))
    )
}

Redirects

Wikipedia often redirects one article to another which is very valuable information to know so we want to be able to parse this out of articles. This information is stored by Wikipedia as an article so we have to parse the text or xml to pull out that this article is actually a redirect and what it redirects to. Thankfully the Java Wikipedia API library provides an easy way to parse out this information.

case class Redirect(pageTitle: String, redirectTitle: String)

def parseRedirects(rdd: RDD[Page]): RDD[Option[Redirect]] = {
  rdd.map {
    page =>
      val redirect =
        if (page.text != null && !page.isCategory && !page.isFile && !page.isTemplate) {
          Option(Redirect(page.title, WikipediaParser.parseRedirect(page.text, new WikiModel("", ""))))
        } else {
          None
        }
      redirect
  }
}

Links

Another very useful piece of information in the data is the links between Wikipedia articles and we want to extract these into a cleaner format. Parsing links is a bit more complicated especially since we also optimally want to know the approximate position on the page where the link shows up. The approach taken was to first use the Java Wikipedia API library to convert the raw Wikipedia syntax to valid HTML and then to use the HtlmCleaner library to extract links. Links which go to special pages or external pages are ignored. 

case class Link(pageTitle: String, linkTitle: String, row: Int, col: Int)

def parseInternalLinks(rdd: RDD[Page]): RDD[Link] = {
  rdd.flatMap {
    page =>
      if (page.text != null) {
        try {
          val html = WikiModel.toHtml(page.text)
          val cleaner = new HtmlCleaner
          val rootNode = cleaner.clean(html)
          val elements = rootNode.getElementsByName("a", true)
          val out = for (
            elem <- elements;
            classType = elem.getAttributeByName("class");
            title = elem.getAttributeByName("title")
            if (
              title != null
                && !title.startsWith("User:") && !title.startsWith("User talk:")
                && (classType == null || !classType.contains("external"))
              )
          ) yield {
            Link(page.title, StringEscapeUtils.unescapeHtml4(title), elem.getRow, elem.getCol)
          }
          out.toList
        }  catch {
          case e: Exception => Nil
        }
      } else {
        Nil
      }
  }
}