Asynchronous RPC with Finagle


Dev-Time, Kharkiv


By Mairbek Khadikov / @mairbek

How many of you are working on distributed systems?


  • Do you use databases?
  • Or caching servers?
  • Or integration with REST Service?

Most servers are also clients


  • Http server
  • Mysql client
  • Memcached client
  • Http client

Same principle - different implementations


  • Jdbc driver
  • Memcached
  • Http
  • Thrift
  • Protobuf
  • Etc.

Message passing


  • Send message
  • Wait for a while
  • Process response

Service


Is just a function


trait Service[Req, Res] = Req => Rep
					

Server

Something that implements Service

Client

Something that uses Service

Most of the services these days are I/O bound instead of CPU bound


  • If you care about performance
  • If you design for low latency and high throughput

Blocking I/O

Most of the time application waits for I/O operation to be completed

Non-blocking I/O

Use resources to do the job instead of waiting for I/O

Synchronous vs Asynchronous


Synchronous



interface UserService {
	User getUser(String email);
}
					

Synchronous vs Asynchronous


Hollywood principle


Don't call us, we'll call you!

Synchronous vs Asynchronous


Asynchronous with callback




interface UserCallback {

	void onComplete(User user);

	void onFailure(Throwable error);

}

interface UserService {
	void getUser(String email, UserCallback callback);
}

					

Callbacks don't compose well

Callbacks don't compose well

Try to implement sequence of asynchronous calls

Callbacks don't compose well

Try to combine result of concurrent asynchronous calls

Callback is a wrong abstraction

Languages are simply designed for serial execution

Asynchronous Service



trait Service[Req, Res] = Req => Future[Rep]
					

Future

Represents a computation that has not yet completed, which can either succeed or fail

Future

Usually computed concurrently

Future (Blocking)



trait Future[A]

// Get value. Blocks until completed
val f : Future[A]
println(f())

// Get value with timeout
val f : Future[A]
println(f(1.second))

// Can throw exception
f.get() match {
  case Return(res) => ...
  case Throw(exc) => ...
}

					

Future (Async)



// Good old callback

val f: Future[String]

f onSuccess { s =>
  log.info(s)
} onFailure { e =>
  log.error(e)
}
					

Futures compose well

Future is a Monad

Monad for Humans

Fancy control-flow

Future as a Functor




def map[B](f: A => B): Future[B]	

					




Future as a Functor




def map[B](f: A => B): Future[B]	

val f : Future[List[User]]
val fcount = f.map(_.size)
println(fcount())
					




Functor is cool, but



val fUser : Future[UserData] = getUserData(token)
val fFriends = fUser.map(user => getFriends(user))
println(fFriends()) // Future[Future[List[User]]]
					

flatMap to rescue



def flatMap[B](f: A => Future[B]): Future[B]
					

flatMap



val f = userIdGenerator.generateId flatMap {id => 
	facebookClient.getUserData(token) flatMap {userData=>
		userRepository.addUser(id, userData) flatMap {_=>
			notificationService.sendEmail(userData.email).map(_=>id)
		}
	}
}

println(f.get)
					




For comprehension



val f = for {
	id <- userIdGenerator.generateId
	userData <- facebookClient.getUserData(token)
	_ <- userRepository.addUser(id, userData)
	_ <- notificationService.sendEmail(userData.email)
} yield (id)

println(f.get)
					




Other Monads

  • Option
  • List
  • State
  • etc.

Concurrent compositions



def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]

def join(fs: Seq[Future[_]]): Future[Unit]

def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]

					




Promise


Writable future




Promise




val p = new Promise[String]

p.setValue("Hello")

p.setException(new RuntimeException)

					




Filters

Filter common use cases

  • Retries
  • Exception handling
  • Authentication
  • Tracing

Filter is just a function


(ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]
					

Composing filters and services


val authRequred: Filter[Req, Rep, Req, Rep]
val service: Service[Req, Rep]

val secureService = authRequred andThen service

Codec

Encodes and decodes protocols

Supported protocols

  • Http
  • Thrift
  • memcache
  • Http streaming
  • MySQL
  • PostgreSQL

Based on Netty

Codec

is a ChannelPipeline

Building Server


object SimpleHttp extends Service[HttpRequest, HttpResponse] {
  def apply(request: HttpRequest) = {
    val response = new DefaultHttpResponse(HTTP_1_1, OK)
    response.setContent(copiedBuffer("Hello #devtime", UTF_8))
    Future.value(response)
  }
}

Building Server


val server = ServerBuilder()
	.codec(Http)
	.bindTo(new InetSocketAddress(8080))
	.build(SimpleHttp)

Building Client


val client = ClientBuilder()
  .codec(Http())
  .hosts(new InetSocketAddress(8080))
  .hostConnectionLimit(1)
  .build()

val request = new DefaultHttpRequest(HTTP_1_1, Get, "/")
val response = client(request)

Service Discovery

Automatic registration and discovery of services

Cluster

  • Static cluster
  • Dynamic cluster
  • ZooKeeper cluster

Registering Service

							
val serviceAddress = new InetSocketAddress(...)
val server = ServerBuilder()
  .bindTo(serviceAddress)
  .build()

cluster.join(serviceAddress)

							
						

Using cluster from client

							
val client = ClientBuilder()
  .cluster(cluster)
  .hostConnectionLimit(1)
  .codec(new StringCodec)
  .build()
							
						

Other abstractions

  • Spool
  • Broker/Offer

Spool

Asynchronous stream

						
trait Spool[+A] {
	def head: A

	def tail: Future[Spool[A]]

	def foreach[B](f: A => B)
}

						
					

Broker/Offer

Communication mechanism

Broker

Channel of communication

						
trait Broker[T] {
  def send(msg: T): Offer[Unit]
  val recv: Offer[T]
}
						
					

Offer

Synchronization mechanism between sender and receiver

						
trait Offer[T] {
  def sync(): Future[T]
}
						
					

Offer

Synchronization mechanism between sender and receiver

						
val b: Broker[String]
val sendOf = b.send("Hello")
val recvOf = b.recv

// Sender process
sendOf.sync()

// Receiver process
recvOf.sync()
						
					

Broker/Offer

Composes well

						
val q0 = new Broker[Int]
val q1 = new Broker[Int]
val q2 = new Broker[Int]

val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)
						
					

Finatra

Sintara like web framework running on top of finagle

Finatra

  • Routing DSL
  • Templating
  • App Generator

Finatra

Define controller

							
class ExampleApp extends Controller {
  get("/") { request =>
    render.plain("hello world").toFuture
  }

  get("/user/:username") { request =>
    val username = request.routeParams.getOrElse("username", "default_user")
    render.plain("hello " + username).toFuture
  }
}
							
						

Finatra

Start the server

							
val app = new ExampleApp
def main(args: Array[String]) = {
  FinatraServer.register(app)
  FinatraServer.start()
}
							
						

Further reading


Scala school

Effective Scala

Finagle @ github

Finatra @ github

Thank You!


Mairbek Khadikov


mairbek.github.com

@mairbek

mkhadikov@gmail.com