不学网

 找回密码
 立即注册

只需一步,快速开始

手机号码,快捷登录

查看: 80|回复: 0

[java] [Spark中移动平均法的实现]

[复制链接]
Ricardo 发表于 2018-6-30 17:26:55 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
一、 基本概念       移动平均法是一种简单平滑预测技术,它的基本思想是:根据时间序列资料、逐项推移,依次计算包含一定项数的序时平均值,以反映短期趋势的方法。因此,当时间序列的数值由于受周期变动和随机波动的影响,起伏较大,不易显示出事件的发展趋势时,使用移动平均法可以消除这些因素的影响,显示出事件的发展方向与趋势(即趋势线),然后依趋势线分析预测序列的中短期趋势。
       移动平均法的应用比较广泛,尤其在股票,金融,期货等方向应用较多,通过计算移动平均值去预测短期内未来的走势等。同时,在企业中,企业通过实际数据值来预测未来一期或几期内公司产品的需求量、公司产能等的一种常用方法。移动平均法适用于即期预测
二、 移动平均法的表示方式       移动平均法分为简单移动平均法和加权移动平均法,文本中主要介绍简单移动发在股票中的计算。
       首先要了解股票中的时间序列数据。时间序列数据表示一个变量在一段时间内的值,如1秒、1分钟、1小时、1天、1月、1季度或1年。我们可以以不严格地时间序列数据形式化表示为三元组序列:
            (k,t,v)
       这里的k是键(如股票代码),t是时间(天,小时,分钟或秒),v是相关联的值(如某一只骨片在时间点t的值)。一般地,只要在一段时间内记录相同的度量值,就会得到时间序列数据。例如,一个公司的股票的收盘价就是基于分钟、小时或天的时间序列数据。多个连续周期的时间序列数据平均值(按相同时间间隔得到的观察值,如每小时一次或每天一次)称为移动平均。
       简单移动平均算法的公式如下:
20180624212144566.png
三、移动平均法的Spark实现3.1测试数据     本文中的数据仅用于测试,数据不具有真实性,仅仅是为了实现移动平均法的计算使用。假设有股票的时间序列数据如下
  1. 股票代码,时间,收盘价

  2. AA,2017-1-7,10.8

  3. AA,2017-1-8,10.9

  4. AA,2017-1-9,11

  5. ...,...,...

  6. AA,2017-1-30,10.5

  7. BB,2017-1-31,10.7

  8. BB,2017-2-1,10.9

  9. BB,2017-2-2,11.1

  10. ...,...,...

  11. BB,2017-2-19,14.9
复制代码

3.2内存中排序实现移动平均
  1. /**
  2.   * 在内存中进行排序计算移动平均值
  3.   **/
  4. object MovingAverageInMemory {
  5.     def main(args: Array[String]): Unit = {
  6.         if (args.length < 3) {
  7.             println("Usage: MovingAverageInMemory <period> <input-path> <output-path>")
  8.             sys.exit(1)
  9.         }
  10.         //移动宽度
  11.         val period: Int = args(0).toInt
  12.         //文件输入路径
  13.         val inputPath: String = args(1)
  14.         //输出路径
  15.         val outputPath: String = args(2)

  16.         val sparkConf: SparkConf = new SparkConf()
  17.             .setMaster("local[1]")
  18.             .setAppName("MovingAverageInMemory")
  19.         //构建Spark上下文
  20.         val sc: SparkContext = SparkContext.getOrCreate(sparkConf)
  21.         //广播变量
  22.         val brodcastPeriod: Broadcast[Int] = sc.broadcast(period)
  23.         //读取文件原始数据
  24.         val rawData: RDD[String] = sc.textFile(inputPath)
  25.         val keyValue: RDD[(String, (String, Double))] = rawData.map(line => {
  26.             val tokens = line.split(",")
  27.             (tokens(0), (tokens(1), tokens(2).toDouble))
  28.         })
  29.         val groupValue: RDD[(String, List[(String, Double)])] = keyValue.combineByKey(
  30.             (v: (String, Double)) => List(v),
  31.             (c: List[(String, Double)], v: (String, Double)) => c :+ v,
  32.             (c1: List[(String, Double)], c2: List[(String, Double)]) => c1 ::: c2
  33.         )
  34.         val movingAverage: RDD[(String, Seq[(String, Double)])] = groupValue.mapValues(values => {
  35.             val dateFormat: SimpleDateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
  36.             // 在内存中排序,对于大型数据集谨慎使用这样的排序
  37.             val sortedValues: Seq[(Long, Double)] = values.map(s => (dateFormat.parse(s._1).getTime, s._2)).toSeq.sortBy(_._1)
  38.             val queue: mutable.Queue[Double] = new scala.collection.mutable.Queue[Double]()
  39.             for (tup <- sortedValues) yield {
  40.                 queue.enqueue(tup._2)
  41.                 if (queue.size > brodcastPeriod.value) {
  42.                     queue.dequeue
  43.                 }
  44.                 (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))
  45.             }
  46.         })

  47.         val formattedResult: RDD[String] = movingAverage.sortByKey().flatMap(kv => {
  48.             kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
  49.         })

  50.         //保存结果
  51.         //formattedResult.saveAsTextFile(outputPath)
  52.         formattedResult.foreach(println)
  53.         sc.stop()
  54.     }
  55. }
复制代码

运行结果:
  1. AA,2017-01-01,10.2
  2. AA,2017-01-02,10.25
  3. AA,2017-01-03,10.299999999999999
  4. AA,2017-01-04,10.35
  5. AA,2017-01-05,10.4
  6. AA,2017-01-06,10.450000000000001
  7. AA,2017-01-07,10.5
  8. AA,2017-01-08,10.55
  9. AA,2017-01-09,10.600000000000001
  10. AA,2017-01-10,10.65
  11. AA,2017-01-11,10.75
  12. AA,2017-01-12,10.85
  13. AA,2017-01-29,12.28
  14. AA,2017-01-30,12.120000000000001
  15. BB,2017-01-31,10.7
  16. BB,2017-02-01,10.8
  17. BB,2017-02-02,10.9
  18. BB,2017-02-03,11.0
  19. BB,2017-02-04,11.1
  20. BB,2017-02-05,11.200000000000001
  21. BB,2017-02-06,11.3
  22. BB,2017-02-07,11.4
  23. BB,2017-02-08,11.422222222222222
  24. BB,2017-02-09,11.47
  25. BB,2017-02-10,11.620000000000001
  26. BB,2017-02-11,11.78
  27. BB,2017-02-12,11.95
  28. BB,2017-02-13,12.129999999999999
  29. BB,2017-02-14,12.32
  30. BB,2017-02-15,12.52
  31. BB,2017-02-16,12.73
  32. BB,2017-02-17,12.95
  33. BB,2017-02-18,13.25
  34. BB,2017-02-19,13.55
复制代码

四、 移动平均的特点及存在问题4.1 特点
    1、移动平均对原序列有修匀或平滑的作用,使得原序列的上下波动被削弱了,而且平均的时距项数N越大,对数列的修匀作用越强。
    2、移动平均时距项数N为奇数时,只需一次移动平均,其移动平均值作为移动平均项数的中间一期的趋势代表值;而当移动平均项数N为偶数时,移动平均值代表的是这偶数项的中间位置的水平,无法对正某一时期,则需要在进行一次相临两项平均值的移动平均,这才能使平均值对正某一时期,这称为移正平均,也成为中心化的移动平均数。
    3、当序列包含季节变动时,移动平均时距项数N应与季节变动长度一致,才能消除其季节变动;若序列包含周期变动时,平均时距项数N应和周期长度基本一致,才能较好的消除周期波动 [1][url=] [/url] 。
    4、移动平均的项数不宜过大。
4.2存在问题
    1、移动平均值并不能总是很好地反映出趋势。由于是平均值,预测值总是停留在过去的水平上而无法预计会导致将来更高或更低的波动;
    2、移动平均法要由大量的过去数据的记录;
    3、它通过引进愈来愈期的新数据,不断修改平均值,以之作为预测值。
    移动平均法的基本原理,是通过移动平均消除时间序列中的不规则变动和其他变动,从而揭示出时间序列的长期趋势。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|手机版|小黑屋|不学网

GMT+8, 2018-7-23 23:49

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表