Sunday, November 22, 2015

Presenting the File Tracker

This project goal is to track changes in files and manage those changes as a byte array in asynchronous way using Akka actors and Java NIO library.
This is done by registering directory for the WatchService and filtering the files using PathMatcher . For each change in the file the requester will receive the byte array reflecting that change.
Currently this project supports only addition to file, i.e deletion of characters in file is not supported.
The Complete Source Code can be found here


Let's dive in.

The Ingredients :

FileSyncIo .

This part is copied from the FileAsyncIo project with some adjustments, and it is very handy for reading files asynchronously.
In order to read the file we use Java NIO AsynchronousFileChannel . Since we are only reading the file, we open the channel with the Read option. The AsynchronousFileChannel.read method accepts buffer, the start position and a handler :
  val p = Promise[Array[Byte]]()
  val buffer = ByteBuffer.allocate(channel.size().toInt)
  channel.read(buffer, position, buffer, onComplete(channel, p))
I really like this implementation of the handler that uses a promise to complete the handler and consume the byte array with the promised change
private def onComplete(channel: AsynchronousFileChannel, p: Promise[Array[Byte]]) = {
    new CompletionHandler[Integer, ByteBuffer]() {
      def completed(res: Integer, buffer: ByteBuffer) {
        p.complete(Try {
          buffer.array().take(res)
        })
        closeSafely(channel)
      }

WatchServiceActor

The watch service actor uses the WatcService - to register directory and getting create,modify and delete events.
  path.register(watchService, Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY))
The WatchServiceActor reads the events and notifies the registered actor. In order to avoid sacrificing a thread the watchservice actor sends a message to itself periodically
def monitor: Receive = {
    case Poll =>
      val key = watchService.take()
      key.pollEvents() foreach { event =>
        val relativePath = event.context.asInstanceOf[Path]
        val path = contextAbsolutePath(key, relativePath)
        event.kind match {
          case kind: WatchEvent.Kind[_] if monitoredTypes.contains(kind) =>
            notifyActor ! EventOccured(kind, path)
          case _ => // do nothing
        }
      }
      key.reset()
      context.system.scheduler.scheduleOnce(500 millis, self, Poll)
    case Interrupt =>
      context.stop(self)
  }

FileMonitorActor

This actor's purpose is to process the byte stream received from the file and send to the requester. In it's constructor, it accepts the directory to monitor and a pattern to use to identify which files we want to track. Although the watchservice can handle an arbitrary number of directories, I chose to keep one FileMonitorActor per directory (and pattern to filter the monitored files) and for each one to keep a single WatchServiceActor. I found it is easier to manage. In order to filter desired files the FileMonitorActor uses the PathMatcher and accepts a text pattern as glob (see this link). The actor registers to watch service in order to be notified about the events. In order to monitor the files and their changes, it uses two mutable collections
val filePos = new mutable.HashMap[Path, Long]
val fileQueue = new mutable.HashMap[Path, mutable.Queue[Array[Byte]]] 
  • the first one is used to preserve the last position that we read from that file.
  • the second keeps a map file and queue of bytes from each event. Once requested it dequeue the bulk of bytes that reflects the change (FIFO) and sends it to the requester.
This file monitor create a WatchServiceActor and subscribes for getting event changes in it's constructor
watchActor = Some(context.actorOf(WatchServiceActor(self)))
and sends watch request to the WatchServiceActor
watchActor foreach (_ ! Watch(dir))
Since the WatchService monitors all changes in the directory the FileMonitorActor accepts events change message from the watch service regarding all the files and filters only relevant files using the PathMatcher
case EventOccured(event, path) if matcher.matches(path.getFileName) =>
      event match {
        case ENTRY_CREATE => addFileToQueue(path)
        case ENTRY_MODIFY => process(path)
        case ENTRY_DELETE => removeFileFromQueue(path)
      }

FileMonitoringAggregatorActor .

This guy manages the FileMonitorActor actors and aggregates the changes from all monitors . It keeps a buffer that contains all changes from the monitors. Once requested it sends the requester all accumulated changes and clears the buffer (might cause overflow issues if not consumed) .
This actor accepts List of Tuples of directory and pattern
case class PathPattern(path:String,pattern:String)
It spawns FileMonitorActors per directory and keeps a map of actor per path(this will be useful for adding and removing paths and prevents creating duplicates ).
  val monitorActors = paths.collect {
    case p:PathPattern if Files.exists(p.path) =>
    p.path -> context.actorOf(FileMonitorActor(p.path, p.pattern), p.path)
  }(collection.breakOut):mutable.HashMap[String, ActorRef]
In order to start monitoring it accepts an Init message with a boolean flag to determine if we want to track existing files in the directory or just new ones. it also uses periodically message to request the changes and it accumulates the answers in a buffer. Once
case object GetNextBulks
is called it returns all accumulated changes since the last request to the requester
    case GetNextBulks =>
      sender ! bulksBuffer.toList
      bulksBuffer.clear()
The FileMonitoringAggregatorActor manages the FileMonitorActors , when getting AddPath command it simply adds another FileMonitorActor for that Path. when getting remove path it simply sends Stop message to that Actor .
  def monitoring: Receive = {

    case GetNextBulks =>
      log.info("Sending back "+bulksBuffer)
      sender ! bulksBuffer.toList
      bulksBuffer.clear()
    case RequestNextBulk =>
      monitorActors.values foreach (_ ! GetBulk)
      context.system.scheduler.scheduleOnce(500 millis, self, RequestNextBulk)
    case bs: ByteBulks =>
      bulksBuffer += bs
    case Stop =>
      monitorActors.values foreach (_ ! Stop)
      context become ready
    case AddPath(p,i)=>
      if (monitorActors.contains(p.path))
        log.warning(s"Request Add ${p.path} is redundant because it is already monitored ")
      else{
        if (Files.exists(p.path)) {
          val m = context.actorOf(FileMonitorActor(p.path, p.pattern), removeSlashes(p.path))
          monitorActors += p.path -> m
          m ! Init(i)
        }else
          log.error(s"Cannot add path. Reason: Directory ${p.path} does not exists")
      }
    case RemovePath(p) =>
      monitorActors.find(_._1 == p) match {
        case Some(a) =>
          log.info(s"Remove ${a._1} from monitor")
          a._2 ! Stop
          monitorActors -= a._1
        case None => log.warning(s"Cannot remove $p. Reason: Not Found ")
      }
  }

Summary

We can use the watchservice to register a directory and be notified on and file change in that directory. A PathMatcher is useful to filter only the Path's/Files. In this project we use Akka Actors for maintaining non blocking operations and keeping the state of position and the changes as Byte array. The FileMonitoringAggregatorActor will return all accumulated changes since the last request (i.e GetNextBulk message).
Usage:
object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")

  val pattern = "*.txt"
  val pathPattern1 = PathPattern("/tmp",pattern)
  val pathPattern2 = PathPattern("/home/avi/Downloads",pattern)
implicit val timeout = Timeout(10 seconds)

  val monitorActor = system.actorOf(FileMonitoringAggregatorActor(List(pathPattern1,pathPattern2)))
  monitorActor ! Init(true)
}
and start making changes , you can see they are reflected in the log file you can also send a request message to see them
val changes = ask(monitorActor , GetNextBulks).mapTo[List[Bulks]]
hope you enjoyed it .
The Complete Source Code can be found here ,Feedback and remarks are always welcome

Acknowledgments : This project was inspired by:

Monday, October 19, 2015

Harness Scala Type Classes and Implicits

In my previous blog, I presented an Expression ADT. In this article I will extend its functionality and serialize it using other ADT while maintaining decoupling using Scala's magic a.k.a implicits and type classes . Full code is in this repository.
Let's start by building our JSON serializer ADT
sealed trait JSON
case class JSeq(elms:List[JSON]) extends JSON
case class JObj(bindings:Map[String,JSON]) extends JSON
case class JNum(num:Double) extends JSON
case class JStr(str:String) extends JSON
case class JBool(b:Boolean) extends JSON
case object JNull extends JSON
and now we can create our JSONWriter to convert JSON objects to nice JSON String
object JSONWriter{
  def write(j:JSON):String = {
    j match {
      case JSeq(e) => e.map(write).mkString("[",",","]")
      case JObj(obj) => 
           obj.map(o=> "\""+o._1+"\":"+write(o._2)).mkString("{",",","}")
      case JNum(n) => n.toString
      case JStr(s) => "\""+s+"\""
      case JBool(b) => b.toString
      case JNull => "null"

    }
  }
In order to make this JSONwriter work we need to add functionality to our Expression ADT that will handle the conversion to JSON object and then send it to the JSONWriter.write. Something like this
sealed trait AST{
def asJSON:JSON
}
and then implement asJSON in every subclass e.g
sealed trait BooleanExpression extends AST
case class BooleanOperation(op: String, lhs: BooleanExpression, rhs: BooleanExpression) extends BooleanExpression{

override def asJSON:JSON =  JObj(
        Map(
        "Operation"-> JStr(op),
        "lhs" -> toJSON(lhs),
        "rhs" -> toJSON(rhs)
        ))
}

// now we can call JSONWriter.write(BooleanOperation(..,..,..).asJSON)
That will do it right ? Wrong ! we want to maintain decoupling as much as possible. Our AST shouldn't care about the JSONWriter. we do not want it polluting the whole namespace.

Let's Harness Type Classes.

Type class defines the behavior in form of operation that must be supported by type T and allows ad-hoc polymorphism.
trait JSONSerializer[T] {
  def toJSON(value:T):JSON
}
And now we can add some functionality to use this trait to our JSONWriter object
...
 def write[A](value:A, j:JSONSerializer[A]):String = write(j.toJSON(value))
and use it like this
val astExpressionToJSON = new JSONSerializer[AST] {

      override def toJSON(value: AST):JSON = value match{

        case BooleanOperation(op, lhs, rhs) => JObj(
        Map(
        "Operation"-> JStr(op),
        "lhs" -> toJSON(lhs),
        "rhs" -> toJSON(rhs)
        ))
        case LRComparison ....

JSONWriter.write(BooleanOperation(..,..,..),astExpressionToJSON)
Well... that will do it, but you might ask yourself so where is the Scala magic you talked about ? We want to maintain our code as clean and in style as much we can. Implicits can be very helpful in this case. We just need to change the write method in the JSONWriter object

Implicits - the hidden gem

Just as a quick brush up, and I'm stepping aside her we can define an implicit parameter and tell the compiler to search for it somewhere else in scope for example.
implicit val i = 1 
//somewhere else in the code we can define a method that will use this Int 

def increment(x:Int)(implicit y:Int) = x+y
As long that the implicit definition is in the scope we can use it without the second parameter
scala> implicit val i = 1
i: Int = 1

scala> def increment(x:Int)(implicit y:Int) = x + y
increment: (x: Int)(implicit y: Int)Int

scala> increment(9)
res0: Int = 10
Warning! Do not abuse Implicits . Implicits, if not used wisely can seriously damage your code readability. Sometimes, you can simply add a default value.
It always depends how and what do you want to express .
Saying that, let's get back on track.
...
def write[A](value:A)(implicit j:JSONSerializer[A]):String = write(j.toJSON(value))
...
and that's it (well almost, more cool stuff ahead) . with this defined we need to create our implicit definition
   implicit val astExpressionToJSON = new JSONSerializer[AST] {

   override def toJSON(value: AST):JSON = value match{
   case BooleanOperation(op, lhs, rhs) => JObj(
       Map(
       "Operation"-> JStr(op),
       "lhs" -> toJSON(lhs),
       "rhs" -> toJSON(rhs)
       ))
   case  LRComparison (lhs: Variable, op, rhs: Constant) =>JObj( ....
and we are good to go, nice and clean.
val parser = new ConditionParser {}
val p:AST = parser.parse("foo = '2015-03-25' || bar = 8").get //I know that the "get" will succeed here
val s = JSONWriter.write(p)
println(s)
//will print {"Operation":"||","lhs":{"Operation":"=","lhs":"foo","rhs":"1422136980000"},"rhs":{"Operation":"=","lhs":"bar","rhs":8.0}}

Context Bound

Another way to achieve the same with a stylish way to use implicitly pulls the implicit from the context is using context bound by stating that A is a member of the type class hence it must have the functionality that we want.
def toJSONString[A:JSONSerializer](value:A):String = {
    val j = implicitly[JSONSerializer[A]]//pull the implicit def of JSONSerializer
    write(j.toJSON(value))
    //we can replace the upper two lines with:  write(implicitly[JSONSerializer[A]].toJSON(value))
  }
and use it like this
val parser = new ConditionParser {}
val p:AST = parser.parse("foo = '2015-03-25' || bar = 8").get //I know that the get will succeed here
val s1 = JSONWriter.toJSONString(p)
println(s1)
//will print {"Operation":"||","lhs":{"Operation":"=","lhs":"foo","rhs":"1422136980000"},"rhs":{"Operation":"=","lhs":"bar","rhs":8.0}}
we can use it everywhere (YEY!!!) lets say that we have a 3rd party class e.g Person class we can add this functionality without changing the 3rd party class.
case class Person (name:String,age:Int)

object PersonImplicits{
  implicit val personToJSON = {
    new JSONSerializer[Person] {
       override def toJSON(value: Person): JSON =JObj( Map("name" -> JStr(value.name),
       "age" -> JNum(value.age)))
    }
  }
}

// and just bring that into scope 

    import PersonImplicits._
    val person = Person("Drakula",578)
    println(JSONWriter.write(person))
//will print {"name":"Drakula","age":578.0}

Implicit classes

another cool use of implicits is to add functionality to external class using implicit class
object JSONConverterImplicits {
  implicit class ASTConverter(ast:AST){
    def asJSON: JSON = ast match{

      case BooleanOperation(op, lhs, rhs) => JObj(
        Map(
          "Operation"-> JStr(op),
          "lhs" -> lhs.asJSON,
          "rhs" -> rhs.asJSON
        ))
      case  LRComparison (lhs: Variable, op, rhs: Constant) =>JObj( ...
and now we can use it as is "asJSON" is part of the AST
    import JSONConverterImplicits._
    val p2 = parser.parse("foo = '2015-03-25' || bar = 8 ").get
    val s2 = JSONWriter.write(p2.asJSON)
how cool is that ?!

Summary 

Type classes and implicits are very powerful tools, especially when writing libraries that needs to be extended, and maintaining decoupled. We can add default implementations in a nice and natural way for the developers that uses our library. Those implementations can easily overwrite and extended.

hope you enjoyed it.
  • Comments and feedbacks are always welcome