Apache Storm vs Spark Streaming – 明智的对比

1. Objective

Apache Storm 是一个实时数据处理的流数据处理引擎,  Apache Spark 是一个通用的计算引擎,它提供 Spark Streaming 去处理流数据。这里是比较他们的优缺点。

2. Apache Storm vs Spark Streaming

2.1. Processing Model

  • Storm: 通过核心的storm用来支持流处理
  • Spark Streaming:  是Spark batch processing的变形

2.2. Primitives

  • Storm: It provides a very rich set of primitives to perform tuple level process at intervals of a stream (filters, functions). Aggregations over messages in a stream are possible through group by semantics. It supports left join, right join, inner join (default) across the stream.
  • Spark Streaming: 提供2种变种的operators,第一是 Stream transformation operators 一个 DStream 到另一个 DStream。第二是 output operators 是写信息到外部系统。

2.3. State Management

  • Storm: Core Storm by default doesn’t offer any framework level support to store any intermediate bolt output (the result of user operation) as a state. Hence, any application has to create/update its own state as and once required.
  • Spark Streaming: The underlying Spark by default treats the output of every RDD operation(Transformations and Actions) as an intermediate state. It stores it as RDD. Spark Streaming permits maintaining and changing state via updateStateByKey API. A pluggable method couldn’t be found to implement state within the external system.

2.4. Message Delivery Guarantees (Handling message level failures)

  • Storm: It supports 3 message processing guarantees: at least onceat-most-once and exactly once. Storm’s reliability mechanisms are distributed, scalable, and fault-tolerant.
  • Spark Streaming: Apache Spark Streaming defines its fault tolerance semantics, the guarantees provided by the recipient and output operators. As per the Apache Spark architecture, the incoming data is read and replicated in different Spark executor’s nodes. This generates failure scenarios data received but may not be reflected. It handles fault tolerance differently in the case of worker failure and driver failure.

2.5. Fault Tolerance (Handling process/node level failures)

  • Storm: Storm is intended with fault-tolerance at its core. Storm daemons (Nimbus and Supervisor) are made to be fail-fast (that means that method self-destructs whenever any sudden scenario is encountered) and stateless (all state is unbroken in Zookeeper or on disk).
  • Spark Streaming: The Driver Node (an equivalent of JT) is SPOF. If driver node fails, then all executors will be lost with their received and replicated in-memory information. Hence, Spark Streaming uses data checkpointing to get over from driver failure.

2.6. Debuggability and Monitoring

  • Storm: Apache Storm UI support image of every topology; with the entire break-up of internal spouts and bolts. UI additionally contributes information having any errors coming in tasks and fine-grained stats on the throughput and latency of every part of the running topology. It helps in debugging problems at a high level. Metric based monitoring: Storm’s inbuilt metrics feature supports framework level for applications to emit any metrics, which can then be simply integrated with external metrics/monitoring systems.
  • Spark Streaming: Spark web UI displays an extra Streaming tab that shows statistics of running receivers (whether receivers are active, the variety of records received, receiver error, and so on.) and completed batches (batch process times, queuing delays, and so on). It is useful to observe the execution of the application. The following 2 info in Spark web UI are significantly necessary for standardization of batch size:
  1. Processing Time – The time to process every batch of data.
  2. Scheduling Delay – The time a batch stays in a queue for the process previous batches to complete.

2.7. Auto Scaling

  • Storm: It provides configuring initial parallelism at various levels per topology – variety of worker processes, executors, tasks. Additionally, it supports dynamic rebalancing, that permits to increase or reduces the number of worker processes and executors w/o being needed to restart the cluster or the topology. But, many initial tasks designed stay constant throughout the life of topology.
    Once all supervisor nodes are fully saturated with worker processes, and there’s a need to scale out, one merely has to begin a replacement supervisor node and inform it to cluster wide Zookeeper.
    It is possible to transform the logic of monitor the present resource consumption on every node in a very Storm cluster, and dynamically add a lot of resources. STORM-594 describes such auto-scaling mechanism employing a feedback system.
  • Spark Streaming: The community is currently developing on dynamic scaling to streaming applications. At the instant, elastic scaling of Spark streaming applications isn’t supported.
    Essentially, dynamic allocation isn’t meant to be used in Spark streaming at the instant (1.4 or earlier). the reason is that presently the receiving topology is static. the number of receivers is fixed. One receiver is allotted with every DStream instantiated and it’ll use one core within the cluster. Once the StreamingContext is started, this topology cannot be modified. Killing receivers leads to stopping the topology.

2.8. Yarn Integration

  • Storm: 通过Apache Slider集成 YARN,slider 是一个部署在YARN cluster上的分布应用
  • Spark Streaming: Spark framework provides native integration along with YARN. Spark streaming as a layer above Spark merely leverages the integration. Every Spark streaming application gets reproduced as an individual Yarn application. The ApplicationMaster container runs the Spark driver and initializes the SparkContext. Every executor and receiver run in containers managed by ApplicationMaster. The ApplicationMaster then periodically submits one job per micro-batch on the YARN containers.

2.9. Isolation

  • Storm: Each employee process runs executors for a particular topology. That’s mixing of various topology tasks isn’t allowed at worker process level which supports topology level runtime isolation. Further, every executor thread runs one or more tasks of an identical element (spout or bolt), that’s no admixture of tasks across elements.
  • Spark Streaming: Spark application is a different application run on YARN cluster, wherever every executor runs in a different YARN container. Thus, JVM level isolation is provided by Yarn since 2 totally different topologies can’t execute in same JVM. Besides, YARN provides resource level isolation so that container level resource constraints (CPU, memory limits) can be organized.

2.11. Open Source Apache Community

  • Storm: Apache Storm powered-by page healthy list of corporations that are running Storm in production for many use-cases. Many of them are large-scale web deployments that are pushing the boundaries for performance and scale. For instance, Yahoo reading consists of two, 300 nodes running Storm for near-real-time event process, with the largest topology spanning across four hundred nodes.
  • Spark Streaming: Apache Spark streaming remains rising and has restricted expertise in production clusters. But, the general umbrella Apache Spark community is well one in all the biggest and thus the most active open supply communities out there nowadays. The general charter is space evolving given the massive developer base. this could cause maturity of Spark Streaming within the close to future.

2.12. Ease of development

  • Storm: It provides extremely easy, rich and intuitive APIs that simply describe the DAG nature of process flow (topology). The Storm tuples, which give the abstraction of data flowing between nodes within the DAG, are dynamically written. The motivation there’s to change the APIs for simple use. Any new custom tuple can be plugged in once registering its Kryo serializer. Developers will begin with writing topologies and run them in native cluster mode. In local mode, threads are used to simulate worker nodes, permitting the developer to set breakpoints, halt the execution, examine variables, and profile before deploying it to a distributed cluster wherever all this is often way tougher.
  • Spark Streaming: It offers Scala and Java APIs that have a lot of a practical programming (transformation of data). As a result, the topology code is way a lot of elliptic. There’s an upscale set of API documentation and illustrative samples on the market for the developer.

2.13. 易操作性

  • Storm: It is little tricky to deploy/install Storm through many tools (puppets, and then on ) and deploys the cluster. Apache Storm contains a dependency on Zookeeper cluster. So that it can meet coordination over clusters, store state and statistics. It implements CLI support to install actions like submit, activate, deactivate, list, kill topology. a powerful fault tolerance suggests that any daemon period of time doesn’t impact executing topology.
    In standalone mode, Storm daemons are compel to run in supervised mode. In YARN cluster mode, Storm daemons emerged as containers and driven by Application Master (Slider).
  • Spark Streaming: It uses Spark as the fundamental execution framework. It should be easy to feed up Spark cluster on YARN. There are many deployment requirements. Usually we enable checkpointing for fault tolerance of application driver. This could bring a dependency on fault-tolerant storage (HDFS).

2.14. Language Options

  • Storm: We can create Storm applications in Java, Clojure, and Scala.
  • Spark Streaming: We can create Spark applications in Java, Scala, Python, and R.

从MySQL采集binlog到 Hive [使用Tungsten]

Source from: http://www.malinga.me/continuous-data-loading-from-mysql-to-hive-using-tungsten/

入门 [可以跳过]

什么是 Hive

Continuous data loading from MySQL to Hive“The Apache Hive™ data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.”

~ Hive official site

Why Hive

Hive 应该被很多大公司使用,并表现出优秀的实时大规模数据处理能力和近实时的查询能力。Hive现在支持复杂的schemas和高级操作。如果正面临传统的数据仓库的瓶颈,那么Hive将是很好的选择。下面是一些公司正在采用的业务场景:

  • Bizo:  用于 reporting 和 ad hoc queries.
  • Chitika:  用于 data mining 和 analysis
  • CNET: 用于data mining, log analysis 和 ad hoc queries
  • Digg: 用于 data mining, log analysis, R&D, reporting/analytics
  • Grooveshark: 用于客户分析, 数据清洗, machine learning R&D.
  • Hi5: 用于 analytics, machine learning, social graph analysis.
  • HubSpot: 用于近实时的web analytics.
  • Last.fm: 用于 各种 ad hoc queries.
  • Trending Topics: 用于 log data 建模, 和 构建 抽样数据集用于趋势分析和研发
  • VideoEgg: 用于分析usage数据

接下来让我们开始了解整个流程实现

Process

Continuous data loading from MySQL to Hive

  1. 使用 Tungsten 获取 MySQL bin logs 的 CSV 文件
  2. 使用DDLScan 工具 去创建 Hive Staging 和base tables 表结构
  3. 使用Map-Reduce job 从staging 层来初始化装载
  4. [可选] 使用 bc tool 去比较核对 MySQL 表 和 hive 表的数据正确性
  5. setup 持续的装载处理过程

Scope

这篇文章假定你已经提前setup tungsten,并在Hadoop 中已经有csv文件。这里重点谈剩余的其他处理细节。 上面的处理过程脚本,你可以在 https://github.com/continuent/continuent-tools-Hadoop。

预备工作

详细步骤

Step 1 – 在Hive 中创建 Staging 和 Base tables

使用 continuent/tungsten/tungsten-replicator/bin/DDLScan 工具创建 Staging 和 Base table 表结构。 下面的两个命令将创建 Staging 和 Base table 表结构。

创建 staging tables

ddlscan -user hive -url jdbc:mysql:thin://localhost:3306/DB_NAME -pass password -template ddl-mysql-hive-0.10-staging.vm -db DB_NAME -opt hdfsStagingDir /path/to/hadoop/csv | hive

  对于ddl-mysql-hive-0.10-staging.vm 请参见 https://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop-preparation.html。

你也可以使用load-reduce-check (https://github.com/continuent/continuent-tools-hadoop)做所有的事情。但是,下面的脚本执行和上面两个命令同样的目的,只是做(初始化工作initial load)。

load-reduce-check -U jdbc:mysql:thin://localhost:3306/DB_NAME -D /path/to/hadoop/csv -r /path/to/continuent/dir -s DB_NAME -u tungsten -p password -v –no-compare –no-map-reduce –no-materialize –no-meta –no-sqoop

【注意】 load-reduce-check 是一个从表创建到数据装载的完整脚本。

Step 2 – Loading data to base tables

这里运行 load-reduce-check 去装载数据到base tables,其中跳过一些已经完成的步骤。按照每个staging table去运行 Map-Reduce 来完成 base tables。

loadreducecheck U jdbc:mysql:thin://localhost:3306/DISCO -D /user/tungsten/staging/alpha -r /ebs/continuent -s DISCO -u tungsten -p password -v –no-base-ddl –no-compaer –no-staging-ddl –no-base-ddl

Step 3 – 配置完成连续的加载过程

完成step 2将可以在base table中看到数据,可以多次运行step 2但它只是再次在完整的数据集上运行map reduce。所以, 我们只需要在增量的数据集上运行map-reduce job,把新增部分增加到Hive table。下面列出用于连续加载数据的过程:

  1. Stop tungsten replication
  2. Run Map-reduce
  3. 移除全量的CSV files
  4. Start tungsten replication

Conclusion

总之,这里讨论了从Mysql连续加载数据到Hive的过程,可以在https://github.com/continuent/continuent-tools-hadoop.git 找到脚本去创建 表和加载数据。

1)  Intial load:  Tungsten initial load data –>HDFS (CSV files) — >运行步骤1: load data to staging tables –> 运行步骤2:loading data to base tables

       2) 连续loading:  停掉tungsten –> 运行Map redue –> 清空HDFS下的CVS文件 –> 启动tungsten

 

[HELP] load-reduce-check

  •    -D, –staging-dir String         Directory within Hadoop for staging data (default=/user/tungsten/staging)
  •    -l, –log String                 Log file for detailed output
  •    -m, –metadata String           Table metadata JSON file (/tmp/meta.json)
  •    -P, –schema-prefix String       Prefix for schema names (defaults to replication service
  •    -p, –password String           MySQL password
  •    -q, –sqoop-dir String           Directory within Hadoop for Sqooped table data (default=/user/tungsten/sqoop)
  •    -r, –replicator String         Replicator home (/opt/continuent)
  •    -S, –service String             Replicator service that generated data
  •    -s, –schema String             DBMS schema
  •    -t, –table String               Table within schema (default=all)
  •    -U, –url String                 MySQL DBMS JDBC url
  •    -u, –user String               MySQL user
  •    -v, –verbose                   Print verbose output
  •        –hive-ext-libs String       Location of Hive JDBC jar files
  •        –[no-]base-ddl             Load base table ddl
  •        –[no-]compare               Compare to source data
  •        –[no-]map-reduce           Materialize view for tables (deprecated)
  •        –[no-]materialize           Materialize view for tables
  •        –[no-]meta                 Generate metadata for tables
  •        –[no-]sqoop                 Generate Sqoop commands to provision data
  •        –[no-]staging-ddl           Load staging table ddl
  •    -h, –help                       Displays help