本文介绍了在不使用pyspark中的数据透视的情况下进行分组的有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个查询,我需要使用pyspark计算内存利用率.我已经通过使用数据透视的python pandas实现了这一点,但是现在我需要在pyspark中做到这一点,而数据透视将是一项昂贵的功能,因此我想知道pyspark中是否有此解决方案的替代方案

I have a query where I need to calculate memory utilization using pyspark. I had achieved this with python pandas using pivot but now I need to do it in pyspark and pivoting would be an expensive function so I would like to know if there is any alternative in pyspark for this solution

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

需要转换为

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

我使用的Python代码

Python code i used

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

寻找一个在pyspark中复制它,并在python中将其作为枢轴的更有效方法是一项昂贵的操作,我需要每5分钟在一个非常大的数据集上执行一次

Looking for a to replicate this in pyspark and a more efficient way in python as pivot is an expensive operation and I need to perform this every 5 mins on a really large dataset

推荐答案

当转换为列的值列表未知时,数据透视很昂贵. Spark有一个重载的pivot方法,该方法将其作为参数.

Pivoting is expensive when the list of values that are translated to columns is unknown. Spark has an overloaded pivot method that takes them as an argument.

def pivot(pivotColumn: String, values: Seq[Any])

如果未知,Spark必须对数据集中的不同值进行排序和收集.否则,逻辑将非常简单明了并描述此处.

In case they aren't known Spark must sort and collect the distinct values from your dataset. Otherwise, the logic is pretty straightforward and described here.

例如,df.groupBy("A","B").pivot("C",Seq(小",大")).sum("D")将被翻译成df.groupBy("A","B").agg(expr("sum(if(C ='small',D,null))"),expr("sum(if (C ='大',D,null))))).您本可以完成此操作,但是它会变得很长,并且可能很快就会出错.

For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr("sum(if(C = ‘small’, D, null))"), expr("sum(if(C = ‘large’, D, null))")). You could have done this yourself but it would get long and possibly error prone quickly.

如果不进行枢纽操作,我会做类似的事情:

Without pivoting I would do something like that:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

现在,我们可以使用主机名按时间窗口对数据集进行分组,并将KPI指标收集到地图中.
answer 有一个很好的描述,

Now we can group our dataset by the time window with hostname and collect KPI metrics into a map.
There is an excellent answer describing just that.

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

输出

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

现在,我们定义一些别名和一个简单的select语句:

Now we define some aliases and a simple select statement:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

然后我们开始:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+

这篇关于在不使用pyspark中的数据透视的情况下进行分组的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 21:11