问题背景

周四的时候突然写了点bug,就那么几行代码,怎么都看不出来问题。周六尝试做了几次实验,大概定位了问题,然后在stackoverflow上问了一下。一位热心的伙伴给回答了,然后再验证了一遍。stackoverflow的地址在:Spark rdd handle java date bug

问题简单的说,就是:我在spark的map里面通过simpleDateFormat将unix时间戳转为特定格式的字符串,存在一些转错了的情况。一开始我以为是java Date的问题,我又改成了Calendar,发现还是一样出错。然后,我一狠心,直接在一个map里面转换两次,看结果是否一样。果不其然,结果不一样!下面就是代码和现象:

val rdd1 = some_rdd1.map { x =>
  (x._1, x._2)
}

val rdd2 = some_rdd2.map { x =>
  (x._1, x._2)
}

val formatter2 = new SimpleDateFormat("yyyyMMddmmss")

val res_rdd = rdd1.join(rdd2).map { case(id, (tm, value)) =>
  val format_time = {
    //tm is a unix timestamp, long type
    val cur_time = new Date(tm*1000L)
    val time_format = formatter2.format(cur_time)
      time_format
  }
  val format_time2 = {
    //tm is a unix timestamp, long type
    val cur_time = new Date(tm*1000L)
    val time_format = formatter2.format(cur_time)
      time_format
  }

  if (format_time!= format_time2) {
        println(s"""format_time: ${format_time}, format_time2: ${format_time2}, click_time: ${tm}""")
      }

  (tm, format_time, format_time2, value)
}
//...
//some actions

我在executor上得到这样的结果:

format_time: 201810251433, format_time2: 201810251241, click_time: 1540442502
format_time: 201810250757, format_time2: 201810250924, click_time: 1540430650
format_time: 201810250738, format_time2: 201810250701, click_time: 1540422067
format_time: 201810251415, format_time2: 201810250738, click_time: 1540448140
format_time: 201810250503, format_time2: 201810251400, click_time: 1540447255
format_time: 201810250831, format_time2: 201810250836, click_time: 1540427797
format_time: 201810251203, format_time2: 201810250901, click_time: 1540429284
format_time: 201810250729, format_time2: 201810250910, click_time: 1540429821

经stackoverflow上的提醒,我猜知道问题是因为simpleDateFormat不是线程安全的。

root cause

前面提到线程不安全的问题,最基本的解释是这样:

两个线程在同时运行下面代码的时候:

class MyCounter {
    private static int counter = 0;

    public static int getCount() {
        return counter++;
    }
}

由于counter是static的,多个线程会共享这一变量。如果线程2在对counter进行++操作的时候,线程1要读counter变量。counter++不是一个原子操作,所以可能线程1读到的是没有自增的counter,从逻辑上来看这一结果是错误的。

回到我们的map的问题上来,为什么rdd的map会出现这个问题呢?如果Spark 每个executor只用一个core(executor-cores=1),那么不会不会出现什么问题。而如果大于1个core的话,代码就在一个executor上多线程执行。所以formatter2会被同一个executor上的多个线程共享,因而出现了读写顺序不一致的问题。

什么意思呢?val time_format = formatter2.format(cur_time)这句代码同时进行了读写操作,将formatter2进行重新赋值,然后将值按特定格式取出来,这就跟上面的getCount有些类似。而SimpleDateFormat不是线程安全的,所以取出来的数据可能是错误的。

解决方案

问题原因还是因为我对线程安全没有一个概念。知道了SimpleDateFormat线程不安全后,可以有若干种解决方案:

formatter2用作局部变量

之前formatter2是共享的,现在将它放到map里面,这样无论如何都不会出现问题了。也就是大概这样子:

val res_rdd = rdd1.join(rdd2).mapPartitions { join_res =>
    val formatter2 = new SimpleDateFormat("yyyyMMddHHmm")
    val format_time = {
        //tm is a unix timestamp, long type
        val cur_time = new Date(tm*1000L)
        val time_format = formatter2.format(cur_time)
        time_format
    }
//...
}

mapPartitions

如果感觉每次又要创建一个SimpleDateFormat太浪费,可以使用mapPartitions,这样每一个partition共享一个SimpleDateFormat。spark给每个partition创建一个task,这里的每个formatter2由每个partition共享,而每个partition是单线程,不存在race condition的情况。

val res_rdd = rdd1.join(rdd2).mapPartitions { iter_list =>
    val formatter2 = new SimpleDateFormat("yyyyMMddHHmm")
    iter_list.map { join_res => { case(id, (tm, value)) =>
    val format_time = {
    //tm is a unix timestamp, long type
    val cur_time = new Date(tm*1000L)
    val time_format = formatter2.format(cur_time)
      time_format
  }
  //...
}

ThreadLocal

object SafeFormat extends ThreadLocal[SimpleDateFormat] {
  override def initialValue = { 
    new SimpleDateFormat("yyyyMMddHHmm")
  }
}
// in map
val format_time = {
    val cur_time = new Date(tm*1000L)
    val time_format = SafeFormat.get.format(cur_time)
      time_format
  }

这个是解决多线程冲突的常见方法。使用ThreadLocal创建的变量只能被当前线程访问,其他线程则无法访问和修改。这种思路跟这个问题类似,源于这个解释

关于线程安全

前面提到counter++的线程安全的经典例子,还有一些经典的解决思路(refer),比如加入synchronized的关键字,解决race condition的问题;另外,前面的整数自增不是源自操作,可以使用语言自带的数据结构,如AtomicInteger实现线程安全。后续也许需要将Java多线程这一块的知识补充一下。

关于java的日期库

这个问题的本质是SimpleDateFormat线程不安全导致的(跟数据可变有很大关系),java的Date/Calendar也都是可变的,因而也往往存在相似的坑。我感觉这些与函数式编程语言格格不入,但java已经设计成这样了,scala貌似也没有重新实现一些标准的日期库,所以有些无奈。不过有些第三方库可能比java官方库好一些,Joda-Time是一个开源的java日期库。里面的DateTimeFormat 是线程安全的且是不可变的。以后也许可以将java标准库替换成joda。

import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.Date;

public class DateFormatTest {

  private final DateTimeFormatter fmt =
       DateTimeFormat.forPattern("yyyyMMdd");

  public Date convert(String source){
    DateTime d = fmt.parseDateTime(source);
    return d.toDate();
  }
}