Skip to main content

Providers

Providers are classes which receive data from a particlar location (network, file, in-process lib) and format that data into a map which matches the shape of the table that the provider is populating. They have a very simple interface:

Included below is an example of the metrics provider.


class MetricsTableProvider (table: DataTable, tableContainer: TableContainer)(implicit clock: Clock, lifecycleContainer: LifecycleContainer,
metrics: MetricsProvider ) extends Provider with StrictLogging {

private val runner = new LifeCycleRunner("metricsTableProvider", () => runOnce, minCycleTime = 1_000)

lifecycleContainer(this).dependsOn(runner)

override def subscribe(key: String): Unit = {}

override def doStart(): Unit = {}

override def doStop(): Unit = {}

override def doInitialize(): Unit = {}

override def doDestroy(): Unit = {}

override val lifecycleId: String = "metricsTableProvider"

def runOnce(): Unit ={

try {

val tables = tableContainer.getTables()

tables.foreach(tableDef => {

val counter = metrics.counter(tableDef.table + ".processUpdates.Counter");
val size = tableContainer.getTable(tableDef.table).size()

val meter = metrics.meter(tableDef.table + ".processUpdates.Meter")

val upMap = Map("table" -> (tableDef.module + "-" + tableDef.table), "updateCount" -> counter.getCount, "size" -> size, "updatesPerSecond" -> meter.getOneMinuteRate);

table.processUpdate(tableDef.table, RowWithData(tableDef.table, upMap), clock.now())

})

} catch {
case e: Exception =>
logger.error("Error occured in metrics", e)
}
}
}

As you can see from the code the important lines are:

  private val runner = new LifeCycleRunner("metricsTableProvider", () => runOnce, minCycleTime = 1_000)

lifecycleContainer(this).dependsOn(runner)

This sets up a thread (in this case a lifecycle aware thread, so that it shuts down happily when the process is killed)

And the runOnce() method:

  def runOnce(): Unit ={

try {

val tables = tableContainer.getTables()

tables.foreach(tableDef => {

//source the metric's information from the metrics api for a specific table
val counter = metrics.counter(tableDef.table + ".processUpdates.Counter");
val size = tableContainer.getTable(tableDef.table).size()
val meter = metrics.meter(tableDef.table + ".processUpdates.Meter")

//format the data into a map
val dataMap = Map("table" -> (tableDef.module + "-" + tableDef.table), "updateCount" -> counter.getCount, "size" -> size, "updatesPerSecond" -> meter.getOneMinuteRate);

//pass the data into the table as a RowWithData object, the map embedded within
table.processUpdate(tableDef.table, RowWithData(tableDef.table, dataMap), clock.now())

})

} catch {
case e: Exception =>
logger.error("Error occured in metrics", e)
}
}

As the code comments show the runOnce() method populates the table with data.