Spark任务中空间数据的序列化

一、引言

Spark是目前主流的分布式计算框架,通过利用内存存储中间计算结果的方式,优化了MapReduce框架并不擅长的迭代式计算。同时,Spark使用有向无环图(Directed Acyclic Graph,DAG)统筹和优化整个计算流程。另外,Spark基于弹性分布式数据集RDD(Resilient Distributed Datasets)提供了丰富的数据分析算子,大大简化了分布式计算应用的开发难度。
序列化和反序列是Spark的一项基本操作。Spark在执行计算任务的过程中,需要在不同的机器之间交换数据,这一过程称为洗牌(Shuffle)。机器之间交换内存对象需要通过序列化和反序列化来实现,Spark使用Kryo序列化框架将内存中的Java对象序列化成字节数组,然后通过网络传输到另一台机器并反序列化成内存中的对象。序列化的效率直接影响着Spark任务的计算性能,应该尽量确保序列化过程快速高效且序列化后的字节数组尽可能小,以减少网络开销,加速计算效率。
空间对象是空间分析的主体,在基于Spark的空间分析中,空间对象的序列化是一个重要的优化环节,本文将介绍如何通过向Spark注册空间对象序列化器的方式优化基于Spark的空间分析性能。
二、优化方案
Kryo序列化机制支持用户注册自定义的序列化器,以供用户在开发过程中通过自定义序列化器的方法优化计算性能。如果直接使用Kryo序列化一个空间对象,Kryo则会在记录该对象字段值的基础上记录字段类型等一系列元信息,所以会降低序列化性能,并增加生成的字节数组。
JTS(Java Topology Suite)空间拓扑工具包提供了空间对象与WKB(Well-known Binary)格式之间的转换,WKB是OGC标准定义的空间对象的二进制表示格式,该格式用最少的字节数表示一个空间对象的完整信息。可以将空间对象与WKB之间的转换过程对应到空间对象的序列化和反序列化过程,并将通过Kryo序列化器的形式注册到Spark的序列化过程中,以优化基于Spark的空间分析性能。
三、代码实现
WKB的转换利用JTS提供的WKBReader和WKBWriter两个类来实现,这两个类是非线程安全的,为了在Spark的并发场景中重复利用,一般使用ThreadLocal对其进行缓存,Scala代码实现如下。
objectWKBUtils {
  private val readerPool = newThreadLocal[WKBReader]{
    override def initialValue:WKBReader =new WKBReader
  }
  private val writerPool = newThreadLocal[WKBWriter]{
    override def initialValue:WKBWriter =new WKBWriter
  }
  def read(bytes: Array[Byte]): Geometry= readerPool.get.read(bytes)
  def write(geom: Geometry): Array[Byte]= writerPool.get.write(geom)
}
为空间对象创建Kryo序列化器。序列化的实现通过调用WKBUtils的read和write方法来完成,Scala代码实现如下。
classGeometrySerializer extends Serializer[Geometry] {
	override def write(kryo: Kryo, output:Output, geom: Geometry): Unit= {
		val spatialBytes: Array[Byte] =WKBUtils.write(geom)
		output.writeInt(spatialBytes.length)
		output.writeBytes(spatialBytes)
	}
	override def read(kryo: Kryo, input:Input, clazz: Class[Geometry]):Geometry= {
		val length = input.readInt()
		WKBUtils.read(input.readBytes(length))
	}
}
向Spark注册创建好的Kryo序列化器。该过程通过继承Spark的KryoRegistrator类实现,Scala代码实现如下。因为Point、LineString、Polygon、MultiPoint、MultiLineString、MultiPolygon这六类空间数据类型都是Geometry的子类,都可以通过WKB的格式表示,因此共用同一个GeometrySerializer对象。
classGeometryKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo:Kryo): Unit = {
    val geometrySerializer = new GeometrySerializer

    kryo.register(classOf[Point],geometrySerializer)
    kryo.register(classOf[LineString],geometrySerializer)
    kryo.register(classOf[Polygon],geometrySerializer)
    kryo.register(classOf[MultiPoint],geometrySerializer)
    kryo.register(classOf[MultiLineString],geometrySerializer)
    kryo.register(classOf[MultiPolygon],geometrySerializer)
  }
}
将创建好的序列化注册器作为SparkConf的参数,在创建SparkContext的时候传入,便可在Spark的计算中对空间对象使用WKB的格式进行序列化和反序列化。
val conf= new SparkConf().set("spark.serializer",classOf[KryoSerializer].getName)
 .set("spark.kryo.registrator",classOf[GeometryKryoRegistrator].getName)
val spark = new SparkContext(conf)

四、实验与结论
为了验证向Spark注册空间对象序列化器的有效性,本文以10320条表示轨迹的LineString作为测试数据集,在PC机上用多线程并行计算的模式做了对比实验。对比原始的Kryo序列化器和自定义的Kryo序列化器,以序列化时长和序列化字节数为指标观察其性能差异,实验结果如下表所示:
由实验结果可知,原始的Kryo序列化机制耗时是自定义序列化器的3倍左右,序列化所得的字节数是自定义序列化器的1.7倍左右。总之,在使用Spark进行空间数据分析时,通过Kryo给Spark指定空间对象的WKB序列化机制可以提高序列化效率,并减少机器之间的数据传输量。
参考文献:
[1]     https://github.com/EsotericSoftware/kryo
[2]     https://github.com/locationtech/jts
[3]     http://spark.apache.org


0 条评论

    发表评论

    电子邮件地址不会被公开。 必填项已用 * 标注