Mahout and Scalding for poker collusion detection
Authors of the book where using well-known K-means clusterization algorithm for finding similar players on stackoverflow.com, where the criterion of similarity was the set of the authors of questions/answers the users were up-/downvoting.
In a very simple words, K-means algorithm iteratively finds clusters of points/vectors, located close to each other, in a multidimensional space. Being applied to the problem of finding similars players in StackOverflow, we assume that every axis in the multi-dimensional space is a user, where the distance from zero is a sum of points, awarded to the questions/answers given by other players (those dimensions are also often called “features”, where the the distance is a “feature weight”).
Obviously, the same approach can be applied to one of the most sophisticated problems in a massively-multiplayer online poker – collusion detection. We’re making a simple assumption that if two or more players have played too much games with each other (taking into account that any of the players could simply have been an active player, who played a lot of games with anyone), they might be in a collusion.
We break a massive set of players into a small, tight clusters (preferably, with 2-8 players in each), using K-means clustering algorithm. In a basic implementation that we will go through further, every user is represented as a vector, where axises are other players that she has played with (and the weight of the feature is the number of games, played together).
Stage 1. Building a dictionary
As the first step, we need to build a dictionary/enumeration of all the players, involved in the subset of hand history that we analyze:
// extract user ID from hand history record val userId = (playerHistory: PlayerHandHistory) => new Text(playerHistory.getUserId.toString) // Builds basic dixtionary (enumeration, in fact) of all the players, participated in the selected subset of hand // history records class Builder(args: Args) extends Job(args) { // input tap is an HTable with hand history entries: hand history id -> hand history record, serialized with ProtoBuf val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) // output tap - plain text file with player IDs val output = TextLine(args("output")) input .read .flatMap('blob -> 'player) { // every hand history record contains the list of players, participated in the hand blob: Array[Byte] => // at the first stage, we simply extract the list of IDs, and add it to the flat list HandHistory.parseFrom(blob).getPlayerList.map(userId) } .unique('player) // remove duplicate user IDs .project('player) // leave only 'player column from the tuple .write(output) }
1003 1004 1005 1006 1007 ...
Stage 2. Adding indices to the dictionary
Secondly, we map user IDs to position/index of a player in the vector.
class Indexer(args: Args) extends Job(args) { val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable], 'userId -> 'idx) TextLine(args("input")).read .map(('offset -> 'line) -> ('userId -> 'idx)) { // dictionary lines are read with indices from TextLine source // out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5))) } .project(('userId -> 'idx)) // only userId -> index tuple is passed to the output .write(output) }
1003 0 1004 1 1005 2 1006 3 1007 4 ...
Stage 3. Building vectors
We build vectors that will be passed as an input to K-means clustering algorithm. As we noted above, every position in the vector corresponds to another player the player has played with:
/** * K-means clustering algorithm requires the input to be represented as vectors. * In out case, the vector, itself, represents the player, where other users, the player has played with, are * vector axises/features (the weigh of the feature is a number of games, played together) * User: remeniuk */ class VectorBuilder(args: Args) extends Job(args) { import Dictionary._ // initializes dictionary pipe val dictionary = TextLine(args("dictionary")) .read .map(('offset -> 'line) -> ('userId -> 'dictionaryIdx)) { tuple: (Int, String) => (tuple._2 -> tuple._1 / 5) } .project(('userId -> 'dictionaryIdx)) val input = new HBaseSource("hand", args("hbasehost"), 'handId, Array("d"), Array('blob)) val output = WritableSequenceFile(args("output"), classOf[Text], classOf[VectorWritable], 'player1Id -> 'vector) input .read .flatMap('blob -> ('player1Id -> 'player2Id)) { //builds a flat list of pairs of users that player together blob: Array[Byte] => val playerList = HandsHistoryCoreInternalDomain.HandHistory.parseFrom(blob).getPlayerList.map(userId) playerList.flatMap { playerId => playerList.filterNot(_ == playerId).map(otherPlayerId => (playerId -> otherPlayerId.toString)) } } .joinWithSmaller('player2Id -> 'userId, dictionary) // joins the list of pairs of //user that played together with // the dictionary, so that the second member of the tuple (ID of the second //player) is replaced with th index //in the dictionary .groupBy('player1Id -> 'dictionaryIdx) { group => group.size // groups pairs of players, played together, counting the number of hands } .map(('player1Id, 'dictionaryIdx, 'size) ->('playerId, 'partialVector)) { tuple: (String, Int, Int) => val partialVector = new NamedVector( new SequentialAccessSparseVector(args("dictionarySize").toInt), tuple._1) // turns a tuple of two users // into a vector with one feature partialVector.set(tuple._2, tuple._3) (new Text(tuple._1), new VectorWritable(partialVector)) } .groupBy('player1Id) { // combines partial vectors into one vector that represents the number of hands, //played with other players group => group.reduce('partialVector -> 'vector) { (left: VectorWritable, right: VectorWritable) => new VectorWritable(left.get.plus(right.get)) } } .write(output) }
1003 {3:5.0,5:4.0,6:4.0,9:4.0} 1004 {2:4.0,4:4.0,8:4.0,37:4.0} 1005 {1:4.0,4:5.0,8:4.0,37:4.0} 1006 {0:5.0,5:4.0,6:4.0,9:4.0} 1007 {1:4.0,2:5.0,8:4.0,37:4.0} ...
The entire workflow, required to vectorize the input:
val conf = new Configuration conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization") // the path, where the vectors will be stored to val vectorsPath = new Path("job/vectors") // enumeration of all users involved in a selected subset of hand history records val dictionaryPath = new Path("job/dictionary") // text file with the dictionary size val dictionarySizePath = new Path("job/dictionary-size") // indexed dictionary (every user ID in the dictionary is mapped to an index, from 0) val indexedDictionaryPath = new Path("job/indexed-dictionary") println("Building dictionary...") // extracts IDs of all the users, participating in selected subset of hand history records Tool.main(Array(classOf[Dictionary.Builder].getName, "--hdfs", "--hbasehost", "localhost", "--output", dictionaryPath.toString)) // adds index to the dictionary Tool.main(Array(classOf[Dictionary.Indexer].getName, "--hdfs", "--input", dictionaryPath.toString, "--output", indexedDictionaryPath.toString)) // calculates dictionary size, and stores it to the FS Tool.main(Array(classOf[Dictionary.Size].getName, "--hdfs", "--input", dictionaryPath.toString, "--output", dictionarySizePath.toString)) // reads dictionary size val fs = FileSystem.get(dictionaryPath.toUri, conf) val dictionarySize = new BufferedReader( new InputStreamReader( fs.open(new Path(dictionarySizePath, "part-00000")) )).readLine().toInt println("Vectorizing...") // builds vectors (player -> other players in the game) // IDs of other players (in the vectors) are replaces with indices, taken from dictionary Tool.main(Array(classOf[VectorBuilder].getName, "--hdfs", "--dictionary", dictionaryPath.toString, "--hbasehost", "localhost", "--output", vectorsPath.toString, "--dictionarySize", dictionarySize.toString))
Stage 4. Generating n-random clusters
Random clusters/centroids is an entry point for K-means algorithm:
//randomly selected cluster the will be passed as an input to K-means val inputClustersPath = new Path('jobinput-clusters') val distanceMeasure = new EuclideanDistanceMeasure println('Making random seeds...') //build 30 initial random clusterscentroids RandomSeedGenerator.buildRandom(conf, vectorsPath, inputClustersPath, 30, distanceMeasure)
Stage 5. Running K-means algorithms
Every next iteration, K-means will find better centroids and clusters. As a result, we have 30 clusters of players that played with each other the most often:
// clusterization results val outputClustersPath = new Path("job/output-clusters") // textual dump of clusterization results val dumpPath = "job/dump" println("Running K-means...") // runs K-means algorithm with up to 20 iterations, to find clusters of colluding players (assumption of collusion is // made on the basis of number hand player together with other player[s]) KMeansDriver.run(conf, vectorsPath, inputClustersPath, outputClustersPath, new CosineDistanceMeasure(), 0.01, 20, true, 0, false) println("Printing results...") // dumps clusters to a text file val clusterizationResult = finalClusterPath(conf, outputClustersPath, 20) val clusteredPoints = new Path(outputClustersPath, "clusteredPoints") val clusterDumper = new ClusterDumper(clusterizationResult, clusteredPoints) clusterDumper.setNumTopFeatures(10) clusterDumper.setOutputFile(dumpPath) clusterDumper.setTermDictionary(new Path(indexedDictionaryPath, "part-00000").toString, "sequencefile") clusterDumper.printClusters(null)
Results
Let’s go to “job/dump”, now – this file contains textual dumps of all clusters, generated by K-means. Here’s a small fragment of the file:
VL-0{n=5 c=[1003:3.400, 1006:3.400, 1008:3.200, 1009:3.200, 1012:3.200] r=[1003:1.744, 1006:1.744, 1008:1.600, 1009:1.600, 1012:1.600]} Top Terms: 1006 => 3.4 1003 => 3.4 1012 => 3.2 1009 => 3.2 1008 => 3.2 VL-15{n=1 c=[1016:4.000, 1019:3.000, 1020:3.000, 1021:3.000, 1022:3.000, 1023:3.000, 1024:3.000, 1025:3.000] r=[]} Top Terms: 1016 => 4.0 1025 => 3.0 1024 => 3.0 1023 => 3.0 1022 => 3.0 1021 => 3.0 1020 => 3.0 1019 => 3.0
As we can see, 2 clusters of players have been detected: one with 8 players, that has played a lot of games with each other, and the second with 4 players.
Reference: Poker collusion detection with Mahout and Scalding from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.
Stack Overflow is a question and answer site for professional and enthusiast programmers. It’s 100% free, no registration required.
It doesn´t seem that stackoverflow is a poker gaming site…
Op