Friday, December 7, 2012

My Cassandra Templates

So I thought I would share some of my abstract adapter code for, eh, interacting with Hector client, that in turn interacts with Cassandra.

The idea is it adapts any model/entity POJO class into a key and columns that fit a column family that corresponds to the model. This happens through the marshal and unmarshal methods that are specific to each implementor of this trait. Curious names--marshal and unmarshal, but I just don't want to call them map and unmap or pack and unpack because in Scala they have different concepts. Marshal transforms the model into, really, a Scala tuple of key and list of tuples of column name and value. The persist methods in this adapter know how to work with them. Unmarshal transforms query results (in the form of ColumnFamilyResult) into the model. And this one is used by query methods.

As you can see the adapter also functions as a high-level DAO in that the user of the DAO shouldn't know about how to work with Hector/Cassandra.

Fine, I know it gets roundabout, the interaction between this trait and its subclass, but it's just how it is for now. I don't want to spend too much time refining for now.

Furthermore there is this automatic Id field finder method (getKeyFieldOpt). It does some reflection work against the model to find which field is the Id (row key). In my case Id is a case class (like var id = Id(9999)). And I think I use some implicit defs so retrieving id transforms it automatically to its value whenever relevant. But you get the gist.

This is a work in progress. At the moment all generic queries work to fetch the entire row of a column family and unmarshal its data to a model, but I'll be working on generic slices and ranges soon.

So here's the base trait:

import java.lang.reflect.Field
import scala.collection.JavaConversions._
import id.pronimo.watchlet.exception.NoExistingRowException
import id.pronimo.watchlet.exception.WrongModelException
import javax.validation.ConstraintViolation
import javax.validation.ConstraintViolationException
import javax.validation.Validation
import javax.validation.ValidationException
import me.prettyprint.cassandra.serializers.AsciiSerializer
import me.prettyprint.cassandra.serializers.StringSerializer
import me.prettyprint.cassandra.serializers.UUIDSerializer
import me.prettyprint.cassandra.serializers.CompositeSerializer
import me.prettyprint.cassandra.serializers.DateSerializer
import me.prettyprint.cassandra.service.template.ColumnFamilyResult
import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate
import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater
import me.prettyprint.hector.api.exceptions.HectorException
import me.prettyprint.hector.api.factory.HFactory
import me.prettyprint.hector.api.mutation.Mutator
import me.prettyprint.hector.api.Keyspace
import id.pronimo.watchlet.model.metadata.Id
import me.prettyprint.cassandra.serializers.SerializerTypeInferer
import me.prettyprint.hector.api.Serializer

trait CommonStandardCFAdapter[R <: Serializable, K, N] {
 @transient val (us, ss, as, ds, cs) = (UUIDSerializer.get, StringSerializer.get, AsciiSerializer.get, DateSerializer.get, CompositeSerializer.get)
 @transient val EMPTY_BYTE_ARRAY = Array.empty[Byte]
 @transient val validator = Validation.buildDefaultValidatorFactory.getValidator
 
 def fs[F](x:F) = { SerializerTypeInferer.getSerializer[F](x) }
 val hcolumn = (n:N, v:Any) => {
  HFactory.createColumn(n, v, getKeyspace.createClock, getNameSerializer, fs(v))
 }

 /* CONCRETE METHODS [BEGIN] */
 /**
  * Add a new record, or override existing data.
  *
  *  @tparam   R the record type defined per implementation
  */
 @throws(classOf[HectorException])
 @throws(classOf[ValidationException])
 def write(record: R, deleteFirst: Boolean) {
  validate(record)
  if (deleteFirst) remove(record)
  val thriftTemplate = this.getThriftTemplate
  val (k, c) = marshal(record)
  val updater = thriftTemplate.createUpdater(k)
  c.iterator.foreach {
   case (n, v) =>
    updater.setColumn(hcolumn(n, v))
  }
  thriftTemplate.update(updater)
 }

 /**
  * Update a record by specifying intended column to add/update.
  *
  * Will throw an exception when record with specified key does not exist.
  *
  *  @note  Use with caution, this method allows adding a column not defined in the record type.
  *  @param    key of the record
  *  @param    colName the column name
  *  @param    value the column value
  *  @tparam   K the key type defined per implementation
  *  @tparam   V the value type
  */
 @throws(classOf[HectorException])
 @throws(classOf[ValidationException])
 def update[V](key: K, colName: N, value: V) {
  val thriftTemplate = this.getThriftTemplate

  if (false == thriftTemplate.isColumnsExist(key)) {
   throw new NoExistingRowException(key, "update")
  }
  val updater = thriftTemplate.createUpdater(key)

  val column = HFactory.createColumn(colName, value, getKeyspace.createClock)
  updater.setColumn(column)
  thriftTemplate.update(updater)
 }

 /**
  * Update a record by specifying multiple intended columns to add/update.
  *
  * Will throw an exception when record with specified key does not exist.
  *
  *  @note  Use with caution, this method allows adding columns not defined in the record type/column family metadata.
  *  @param    key of the record
  *  @param    map of column name/column value
  *  @tparam   K the key type defined per implementation
  */
 @throws(classOf[HectorException])
 @throws(classOf[ValidationException])
 def update(key: K, map: Map[N, Any]) {
  val thriftTemplate = this.getThriftTemplate

  if (false == thriftTemplate.isColumnsExist(key)) {
   throw new NoExistingRowException(key, "update")
  }

  val updater = {
   thriftTemplate.createUpdater(key)
  }
  map.iterator.foreach {
   case (n, v) =>
    val column = HFactory.createColumn(n, v, getKeyspace.createClock)
    updater.setColumn(column)
  }
  thriftTemplate.update(updater)
 }

 /**
  * Add multiple records, or override when such with the same key exists.
  *
  * Will throw an exception when record with specified key does not exist, automatically cancelling whole operation.
  *
  *  @note  Use with caution, this method allows adding columns not defined in the record type/column family metadata.
  *  @param    key of the record
  *  @param    map of column name/column value
  *  @tparam   K the key type defined per implementation
  */
 @throws(classOf[HectorException])
 @throws(classOf[ValidationException])
 def writeBatch(records: List[R], deleteFirst: Boolean) {
  val cf = this.getThriftTemplate.getColumnFamily
  val thriftTemplate = this.getThriftTemplate

  records.iterator.foreach { it => validate(it) }

  val mutator = thriftTemplate.createMutator

  records.iterator.foreach { it =>
   if (deleteFirst) remove(it)
   marshal(it) match {
    case (k, c) => c.iterator.foreach { tuple =>
     mutator.addInsertion(k, cf, HFactory.createColumn(tuple._1, tuple._2, getKeyspace.createClock))
    }
   }
  }
  thriftTemplate.executeBatch(mutator)
 }

 /**
  * Read record by key, returning Option
  *
  *  @param    key of the record
  *  @tparam   K the key type defined per implementation
  *  @tparam   R the record type defined per implementation
  */
 @throws(classOf[HectorException])
 def read(key: K): Option[R] = {
  val thriftTemplate = this.getThriftTemplate
  val result = thriftTemplate.queryColumns(key)
  unmarshal(result)
 }

 /**
  * Remove entire row by key
  *
  *  @param    key of the record
  *  @tparam   K the key type defined per implementation
  */
 @throws(classOf[HectorException])
 def remove(key: K) {
  this.getThriftTemplate().deleteRow(key)
 }

 /**
  * Remove entire row using the record's id
  *
  *  @tparam   R the record type defined per impl
  *  ementation
  */
 @throws(classOf[HectorException])
 def remove(record: R) {
  remove(getKey(record))
 }

 /**
  * Get key field option (thru reflection)
  */
 protected def getKeyFieldOpt[E <: R: Manifest] = implicitly[Manifest[E]].erasure.getDeclaredFields.find(_.getType.equals(classOf[Id[K]]))
 
 /**
  * Get N serializer (thru reflection)
  */
 protected def getNSerializer[E <: N: Manifest]():Serializer[N] = SerializerTypeInferer.getSerializer[N](implicitly[Manifest[E]].erasure)

 /**
  * Get key for record (thru reflection)
  */
 def getKey[E <: R: Manifest](record: R): K = {
  try { getKeyField.get(record).asInstanceOf[Id[K]]}
  catch {
   case e =>
    throw new WrongModelException(implicitly[Manifest[E]].erasure, "Key type specified in template does not match actual key type of record.", e)
  }
 }

 /**
  * Validate record using JSR-303
  *
  *  @param    record
  *  @tparam   R the record type defined per implementation
  */
 @throws(classOf[ValidationException])
 def validate(record: R) {
  val violations = validator.validate(record)

  if (false == violations.isEmpty()) {
   val exceptionMessage = violations.iterator
    .map(it => it.getPropertyPath.toString + " " + it.getMessage)
    .mkString(", ")

   throw new ConstraintViolationException(exceptionMessage, violations.asInstanceOf[java.util.Set[ConstraintViolation[_]]])
  }
 }
 /* CONCRETE METHODS [END] */

 /* ABSTRACT METHODS [BEGIN] */
 /**
  * Transform record into tuples with format (key, List[(columnName, columnValue)])
  */
 def marshal(record: R): (K, List[(N, Any)])

 /**
  * Transform ColumnFamilyResult into record
  */
 def unmarshal(columns: ColumnFamilyResult[K, N]): Option[R]

 /**
  * Get underlying Thrift template used by this spec template
  */
 def getThriftTemplate(): ColumnFamilyTemplate[K, N]
 
 /**
  * Get column name serializer used by this spec template
  * This became mandatory for creating composite columns using the Hector client.
  */
 def getNameSerializer():Serializer[N]

 def getKeyField: Field

 def getKeyspace: Keyspace
 /* ABSTRACT METHODS [END] */
}

And a sample implementation:
PS: Don't mind the CDI annotations. And the transient annotations are there just so it plays along well with CDI.

@Named( "ItemRepository" )
@ApplicationScoped
class ItemCFAdapter extends CommonStandardCFAdapter[Item, UUID, String] with Serializable {
 @transient val keyField = {
  getKeyFieldOpt[Item] match {
   case Some( field ) =>
    field.setAccessible( true )
    field
   case None =>
    throw new WrongModelException( classOf[Item], "Cannot find field with Id metadata." )
  }
 }

 @transient val nameSerializer = getNSerializer[String]

 @Inject
 @transient
 var keyspace: Keyspace = _

 var thriftTemplate: ThriftColumnFamilyTemplate[UUID, String] = _

 @PostConstruct
 def initialise() {
  thriftTemplate = new ThriftColumnFamilyTemplate( keyspace, CF.CF_NAME, us, as )
 }

 override def marshal( record: Item ): ( UUID, List[( String, Any )] ) = {
  ( getKey( record ), List(
   ( "label", record.label ),
   ( "url", record.url ) ) )
 }

 override def unmarshal( columns: ColumnFamilyResult[UUID, String] ): Option[Item] = {
  val l = columns.getString( "label" )
  val u = columns.getString( "url" )
  ( Option( l ), Option( u ) ) match {
   case ( Some( _ ), Some( _ ) ) =>
    Option( new Item {
     id = columns.getKey
     label = l
     url = u    } )
   case _ =>
    None
  }
 }

 override def getThriftTemplate() = thriftTemplate

 override def getKeyField() = keyField

 override def getNameSerializer() = nameSerializer

 override def getKeyspace() = keyspace

}

No comments:

Post a Comment