ZooKeeper系列 (二) zookeeper shell操作

喵了个喵,我又遇到瓶颈了

[TOC]

zk自带命令行工具,bin目录下zkCli 脚本,linux下可通过执行 zkCli.sh 连接

命令: ./zkCli.sh [-server ip:port] 远程地址可选,不填情况下连接本地服务器

zk-shell基本操作

1
2
3
4
创建   create
更新 set
查询 get
删除 delete

创建

zk的四种节点类型

  1. 持久节点
  2. 持久顺序节点
  3. 临时节点
  4. 临时顺序节点

1.持久节点

数据节点创建后,一直存在,直到有删除操作主动清除

创建方式:create /zk-node data

2.持久顺序节点

节点一直存在,zk自动追加数字后缀做节点名,后缀上限 MAX(int)

创建方式:create -s /zk-node data

3.临时节点

生命周期和会话相同,客户端会话失效,则临时节点被清除

创建方式:create -e /zk-node-temp data

4.临时顺序节点

临时节点+顺序节点后缀

创建方式:create -s -e /zk-node-temp data

读取

  • ls

列出zk执行节点的所有子节点,只能看到第一级子节点

ls ${path}

  • get

获取zk指定节点数据内容和属性

get ${path}

更新

set ${path} ${data} [version]

version可选

删除

delete ${path} [version]

ZooKeeper系列 (安装篇) zookeeper的分布式安装

喵了个喵,我又遇到瓶颈了

[TOC]

下载地址

apache索引目录 Index of /dist

分布式部署

  1. 选择3台机器,这里选择了3台机器

    1
    2
    3
    172.23.7.9
    172.23.7.10
    172.23.7.12
  2. 创建一个data目录,存放zk数据 例如
    /home/hadoop/apache/zookeeper/data

  3. 在每台机器的data目录下,创建文件 myid,按机器编号写入1,2,3
    第一台机器 myid文件写入 1
    第二台机器 myid文件写入 2
    第三台机器 myid文件写入 3

  4. 在解压的zk文件中,复制配置文件,配置相关的集群情况

    1
    2
    3
    cd conf
    cp zoo_sample.cfg zoo.cfg
    vim zoo.cfg

配置data目录地址
dataDir=/home/hadoop/apache/zookeeper/data
文件末尾配置集群情况

1
2
3
server.1=172.23.7.9:2888:3888
server.2=172.23.7.10:2888:3888
server.3=172.23.7.12:2888:3888
  1. 针对每个机器,启动服务
    zkServer.sh

测试

查看状态

./zkServer.sh status

创建节点并查看

  1. 使用客户端
    ./zkCli.sh
  2. 创建节点和数据并查看
    1
    2
    3
    创建   create /testNode 'testValue'
    查看 ls /
    获取 get /testNode

EBCC5164-F288-4E14-881F-01C4A9782516

C3E795F7-75CB-49E4-94DA-E4D19182AD03

6511263F-C36F-4726-B16C-23229E1C7CBC

Zookeeper系列 (一) zookeeper的概念

喵了个喵,我又遇到瓶颈了

[TOC]

zookeeper是什么

定义:zookeeper是一个开源的分布式协调服务,一个典型的分布式数据一致性解决方案。

前世今生:Yahoo创建,最初为 hadoop的子项目,是google Chubby的开源实现,现为Apache的顶级项目。

分布式应用程序可以基于zookeeper实现数据发布订阅、负载均衡、命名服务、分布式锁、集群管理等一系列功能。

zookeeper提供什么能力

zookeeper搭建的集群可以保证以下分布式协议

  1. 顺序一致性

    从同一个客户端发起的多个事务请求,将会严格按照发起顺序应用到各个节点

  2. 原子性

    所有事务请求的处理结果在集群中所有机器的应用情况一致。所有机器要么全部应用,要么全部不应用。

  3. 单一视图

    无论连接的是哪个节点,效果都一样

  4. 可靠性

    一旦一个事务被应用,那么该事物状态会被保持

  5. 实时性

    zookeeper保证一定时间段后,客户端从服务端读取到最新状态

zookeeper的特点

###数据模型简单

zk的数据模型是一个共享的、树树型结构的名字空间。由一系列ZNode组成,ZNode被称为数据节点,具有层级关系。zk将全量数据存储在内存中,以此来实现提高服务器吞吐,减少延迟的目的

可构成集群

zk集群由一组机器构成,3~5台即可,组成zk集群的每台机器在内存中维护当前的服务器状态,并且每台机器之间都互相保持着通信。集群中只要超过一般的机器能正常工作,那整个集群就能正常对外服务

顺序访问

对于来自客户端的每个更新请求,zk都会分配一个全局唯一的递增编号,事务操作将按照这个编号按照先后顺序执行,且该特性可被实用于其他用途。

高性能

全量数据都存储在内存中,并直接服务于客户端的所有非事务请求,尤其适合读操作为主的场景。

zookeeper的基本概念

zk的概念此处只做基本介绍和简单讲解,后续做详细讲解

集群角色

  • Leader

一个,提供读写能力

  • Follower

多个,提供读能力,选举Leader能力

  • Observer

多个,提供读能力,不参与选举

会话

zk客户端和服务器之间是TCP长连接。sessionTimeout设置一个会话超时时间,连接异常断开时,只要在超时时间内连接上任意一台zk节点,之前的会话依然有效。

节点

1.机器节点

构成集群的机器

2.数据节点 ZNode

ZNode数据节点,数据模型中的数据单元。数据模型是一棵树,由斜杠(/)分割路径,保存数据内容和属性。可分为持久节点和临时节点两类

版本

每个ZNode上面都有一个stat的数据结构,记录3个数据版本:

  1. version:当前ZNode版本
  2. cversion:当前ZNode子节点的版本
  3. aversion:当前ZNode的ACL版本

ACL

(access control lists) 权限控制清单

  • CREATE: 创建子节点权限
  • READ: 获取节点数据和子节点列表的权限
  • WRITE: 更新节点数据的权限
  • DELETE: 删除子节点的权限
  • ADMIN: 设置节点ACL的权限

create和delete都是针对子节点的权限控制

Watcher

事件监听。zk的重要特性,zk允许用户注册事件到指定节点,当特定时间触发,事件通知会被发送到具体的客户端。

Hadoop系列(九)Hadoop三大核心之Yarn-资源调度详解

喵了个喵,我又遇到瓶颈了

[TOC]

Yarn的调度流程详解

image-20191104160107191

  1. Client端提交作业到ResourceManager中的ApplicationManager,申请JobID(唯一ID)

  2. RM返回一个作业ID,并且将一个临时hdfs路径返回给 Client,要求Client将要上传的文件发送到这个临时目录中。(任务结束后,该临时目录将被删除)

  3. Client将作业运行所需要的资源(jar包、配置文件和分片信息等)向指定的HDFS路径上传

  4. 上传成功后,向RM中的AM发送请求,执行作业

  5. AM将请求转发给调度器,调度器开始处理请求

  6. 调度器将任务放置队列中,当执行到请求的时候,则告知ApplicationManager 可以分配容器,告知NodeManager的信息用于开辟Container

  7. ApplicationManager命令NodeManager创建一个Container并运行作业的ApplicationMaster。NodeManager创建一个Container并启动作业的ApplicationMaster。ApplicationMaster将自己注册到ApplicationManager,使得ApplicationManager可以监控到Job的执行状态,Client也可以通过ApplicationManager对作业进行控制。

  8. ApplicationMaster查询临时hdfs路径,获取jar包信息,配置文件等,创建map和reduce任务

  9. ApplicationMaster请求调度器分配资源,开辟map,reduce任务资源

  10. 调度器返回执行信息,内含在哪些NodeManager可开辟资源信息

  11. ApplicationMaster通知NodeManager开辟资源池启动map和reduce任务

  12. NodeManager 启动自身资源池中的任务

  13. map、reduce任务查询临时hdfs路径的数据。开始执行。ApplicationMaster实时监控自己管理的map、reduce任务执行情况,如果失败,请求调度器在新节点中开辟资源池,执行失败的程序。ApplicationManager实时监控自己管理的ApplicationMaster执行情况,如果ApplicationMaster 宕机,创建一个新的ApplicationMaster,继续监控原有的map和reduce任务,在此期间,map、reduce任务不受影响

  14. 程序成功,释放资源

Yarn的调度策略

  • FIFO调度
  • 容量调度
  • 公平调度

1.FIFO先进先出调度

这种调度最简单,将应用放置在一个队列中,然后按照提交的顺序将所有的Applications放到队列中,先按照作业的优先级高低、再按照到达时间的先后,为每个app分配资源。如果第一个app需要的资源被满足了,如果还剩下了资源并且满足第二个app需要的资源,那么就为第二个app分配资源。

优点:简单,不需要配置。

缺点:不适合共享集群。如果有大的app需要很多资源,那么其他app可能会一直等待。

image-20191107004016198

2.Capacity 容量调度机制

容量调度机制适用于一个集群(集群被多个组织共享)中运行多个Application的情况,目标是最大化吞吐量和集群利用率。

容量调度允许将整个集群的资源分成多个部分,每个组织使用其中的一部分,即每个组织有一个专门的队列,每个组织的队列还可以进一步划分成层次结构(Hierarchical Queues),从而允许组织内部的不同用户组的使用。

每个队列内部,按照FIFO的方式调度Applications。当某个队列的资源空闲时,可以将它的剩余资源共享给其他队列。

image-20191107004427447

实例:

有一个专门的队列允许小的apps提交之后能够尽快执行,job1先提交到queueA,并没有占用系统的全部资源(假如job1需要100G内存,但是整个集群只有100G内存,那么只分配给job1 80G),job2可提交到queueB 中快速执行。

3.Fair 公平调度机制

FairScheduler允许应用在一个集群中公平地共享资源。默认情况下FairScheduler的公平调度只基于内存,也可以配置成基于memory & CPU。当集群中只有一个app时,它独占集群资源。当有新的app提交时,空闲的资源被新的app使用,这样最终每个app就会得到大约相同的资源。可以为不同的app设置优先级,决定每个app占用的资源百分比。FairScheduler可以让短的作业在合理的时间内完成,而不必一直等待长作业的完成。

Fair Sharing: Scheduler将apps组织成queues,将资源在这些queues之间公平分配。默认情况下,所有的apps都加入到名字为“default“的队列中。app也可以指定要加入哪个队列中。队列内部的默认调度策略是基于内存的共享策略,也可以配置成FIFO和multi-resource with Dominant Resource Fairness

Minimum Sharing:FairScheduller提供公平共享,还允许指定minimum shares to queues,从而保证所有的用户以及Apps都能得到足够的资源。如果有的app用不了指定的minimum的资源,那么可以将超出的资源分给别的app使用。

FairScheduler默认让所有的apps都运行,但是也可以通过配置文件配置每个queue中的分配权重,若权重1:1,当两个queue同时执行任务时,各分到一半资源。

image-20191107005019943

实例:

两个用户A和B。A提交job1时集群内没有正在运行的app,因此job1独占集群中的资源。用户B的job2提交时,job2在job1释放一半的containers之后,开始执行。job2还没执行完的时候,用户B提交了job3,job2释放它占用的一半containers之后,job3获得资源开始执行。

Hadoop系列(八)Hadoop三大核心之Yarn 资源调度初探

喵了个喵,我又遇到瓶颈了

[TOC]

0. Yarn的来源

​ hadoop 1.x的时代,并没有Yarn,hadoop核心组件只有HDFS和MapReduce。到了hadoop2.x才有了Yarn的诞生,组件包含HDFS,MapReduce和Yarn。

​ 诞生原因:hadoop 1.x存在的最大问题就是资源管理问题。技术的发展不再满足于hadoop集群中只使用MapReduce一个计算框架,人们更希望有一套合理的管理机制,来控制集群的资源管理问题。就此Yarn诞生。

1. YARN概述

全称 Yet Another Resource Negotiator。是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce,spark 等运算程序可以运行在YARN上,相当于应用程序运行于操作系统之上

YARN 是 Hadoop2.x 版本中的一个新特性。它的出现是为了解决第一代 MapReduce 编程框架的不足,提高集群环境下的资源利用率,这些资源包括内存,磁盘,网络,IO等。Hadoop2.X 版本中重新设计的这个 YARN 集群,具有更好的扩展性,可用性,可靠性,向后兼容性,以及能支持除 MapReduce 以外的更多分布式计算程序

YARN的特点:

  1. YARN不清楚提交的程序的运行机制
  2. 只提供运算资源的调度,分配。用户申请就分配。
  3. 与运行的用户程序完全解耦。 YARN 上可以运行各种类型的分布式运算程序。比如 MapReduce、Storm 程序,Spark 程序等
  4. yarn 是一个通用的资源调度平台,企业中存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享

Yarn最大的特点是执行调度与Hadoop上运行的任务类型无关

2. YARN的重要组成部分

有两类长期运行的守护进程提供核心服务

  • ResourceManager(主节点) :全局资源管理器
  • NodeManager(从节点):节点资源管理器

主从结构如图

image-20191103184844628

2.1 ResourceManager

RM组件是负责资源管理的,整个系统有且只有一个 RM ,来负责资源的调度。ResourceManager 会为每一个 Application 启动一个 ApplicationMaster, 并且 ApplicationMaster 分散在各个 NodeManager 节点

RM里面还有两个重要组成部分:

  1. 应用程序管理器 Application Manager
  2. 资源调度器 Resource Scheduler

ResourceManager名字就是这两个词合并而来

Application Manager 应用程序管理器

应用程序管理器就是负责管理 Client 用户提交的应用的管理器

主要功能:

  1. 负责接收client端传输的job请求,为应用(MapReduce 程序)分配一个Container(资源池)来运行一个Application Master

  2. 负责监控Appication Master

  3. 并且在遇到失败的时候重启Application Master

Scheduler 资源调度器

Resource Scheduler即资源调度器,是让每一个节点都充分利用起来,合理分配和调度资源的一种管理器。

值得注意的是:调度器真的只是一个调度器,不参与任何具体的和应用程序相关的工作。

2.2 NodeManager

NodeManager 是 YARN 集群当中真正资源的提供者,是真正执行应用程序的容器的提供者, 监控应用程序的资源使用情况(CPU,内存,硬盘,网络),并通过心跳向集群资源调度器 ResourceManager 进行汇报以更新自己的健康状态。同时其也会监督 Container 的生命周期管理,监控每个 Container 的资源使用(内存、CPU 等)情况,追踪节点健康状况,管理日 志和不同应用程序用到的附属服务(auxiliary service)。

2.3 逻辑上的组件Application Master

ApplicationMaster 就是一个java程序,进程名:MRAppMaster

作用:负责监控Map、Reduce任务。用户提交的每一个程序都会产生一个ApplicationMaster,这个AM就是负责整个任务的一个管理者,由这个 AM去向ResourceManager 申请容器资源,获得资源后会将要运行的程序发送到容器上启动,然后进行分布式计算。

主要功能:

  1. 与调度器(Scheduler)协商,获取执行资源
  2. 与NodeManager通信,启动任务和停止任务
  3. 监控所有旗下Job的执行状态,重启失败任务

3. Container 资源池

Yarn中的资源抽象,封装了多维度资源: 内存,cpu,磁盘等。Container就是Scheduler进行资源分配的一个单位,也是运行各个任务的容器。

  • 容器由 NodeManager 启动和管理,并被它所监控。
  • 容器被 ResourceManager 进行调度。

image-20191104135136097

4. 小结

Yarn是Hadoop2.x之后引入的新组件。Yarn的架构采用了主从结构,一主多从(ResourceManager & NodeManager)。Yarn在Hadoop中的功能作用有两个,第一是负责Hadoop集群中的资源管理(resource management),第二是负责对任务进行调度和监控(scheduling/monitoring)。

Hadoop系列(七)Hadoop三大核心之MapReduce-程序编写

喵了个喵,我又遇到瓶颈了

[TOC]

接下来以一个简单的WordCount为例子,介绍Java版本的MapReduce的程序编写。

mapreduce程序主要分三部分:1.map部分,2.reduce部分,3.提交部分。

1. 准备部分

hadoop中,针对数据类型自成一体,与java的数据类型对应。封装在hadoop.io包中,主要分为基本类型和其它类型。

  • 基本数据类型
数据类型 hadoop数据类型 Java数据类型
布尔 BooleanWritable boolean
整型 IntWritable int
长整型 LongWritable long
浮点型 FloatWritable float
双精浮点 DoubleWritable double
字节 ByteWritable byte
  • 其它类型
数据类型 hadoop数据类型 Java数据类型
字符串 Text String
数组 ArrayWritable Array
Map MapWritable Map

2. jar包依赖

创建一个maven工程,pom.xml文件中,添加以下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>

3. Map部分

映射部分,将数据逐条处理

首先,需要继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类

四个泛型参数分别代表:输入key 输入value 输出key 输出value

然后重写mapper的map方法

key, VALUEIN value, Context context) throws IOException, InterruptedException```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

主体代码:

```java
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {

/**
* 流程,输入key和value,map的结果写入到context中
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取一行数据
String line = value.toString();
//因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中
String str[] = line.split(" ");
//循环迭代字符串,将一个单词变成<key,value>形式,及<"hello",1>
for (String s : str) {
context.write(new Text(s), new IntWritable(1));
}
}

}

4.Reduce部分

归并部分,将map处理和shuffle之后的数据进行归并。shuffle过程由hadoop控制

Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

四个泛型参数分别代表:输入key 输入value 输出key 输出value

然后重写reducer的reduce方法

key, Iterable values, Context context) throws IOException, InterruptedException```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

主体代码:

```java
public class WordCountReduce extends Reducer<Text,IntWritable,Text, IntWritable> {
/**
* 流程,输入key和value,map的结果写入到context中
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int count = 0;

for(IntWritable value: values) {
count++;
}
context.write(key,new IntWritable(count));
}
}

5.提交部分

mapreduce的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 构造一个job对象来封装本mapreduce业务到所有信息
Job mrJob = Job.getInstance(conf);
// 指定本job工作用到的jar包位置
mrJob.setJarByClass(WordCount.class);
// 指定本job用到的mapper类
mrJob.setMapperClass(WordCountMap.class);
// 指定本job用到的reducer类
mrJob.setReducerClass(WordCountReduce.class);

// 指定mapper输出的kv类型
mrJob.setMapOutputKeyClass(Text.class);
mrJob.setMapOutputValueClass(IntWritable.class);


// 指定reducer输出到kv数据类型(setOutputKeyClass 会对mapper和reducer都起作用,如果上面mapper不设置的话)
mrJob.setOutputKeyClass(Text.class);
mrJob.setOutputValueClass(IntWritable.class);


FileInputFormat.addInputPath(mrJob,new Path(args[0]));

//设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
FileOutputFormat.setOutputPath(mrJob,new Path(args[1]));

// 最后提交任务
boolean waitForCompletion = mrJob.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}

6.打包提交

将程序打成jar包,提交到hadoop集群中,然后使用命令行进行任务提交

对于输入输出路径,均可接受本地路径和hdfs路径。

本地路径前缀:file://

hdfs路径前缀:hdfs://

1
/hadoop-3.1.2/bin/hadoop jar my_mapreduce-1.0-SNAPSHOT.jar com.breakthrough.wordcount.WordCount file:///home/hadoop/english.txt file:///home/hadoop/output

Hadoop系列(六)Hadoop三大核心之MapReduce 基础

喵了个喵,我又遇到瓶颈了

[TOC]

MapReduce背景

  在程序由单机版扩成分布式版时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架。

MapReduce是什么

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架。

核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

MapReduce将整个并行计算过程抽象到两个函数:

  Map(映射):对一些独立元素组成的列表的每一个元素进行制定的操作,可以高度并行。

  Reduce(归约):归约过程,把若干组映射结果进行汇总并输出。

一个简单的MapReduce程序只需要指定Map()、reduce()、input和output,剩下的事情由框架完成。

基于MapReduce写出来的应用程序能够运行在大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。一个Map/Reduce 作业通常会把输入的数据集切分为若干独立的数据块,由 *map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

MapReduce的架构简单介绍

image-20191012085953019

Input:输入文件的存储位置。可以是hdfs文件位置,也可以是本地文件位置

Map阶段:自己编写映射逻辑

Shuffle阶段:是我们不需要编写的模块,但却是十分关键的模块。Shuffle 阶段需要从所有 map主机上把相同的 key 的 key value对组合在一起,传给 reduce主机, 作为输入进入 reduce函数里。

Reduce阶段:自己编写合并逻辑

Final result: 最终结果存储在hdfs

MapReduce 更深层次分析后续讲解

分布式系列 (二) 一致性协议 2PC和3PC

喵了个喵,我又遇到瓶颈了

[TOC]

为解决分布式问题,涌现了一大批经典的一致性协议和算法。最著名的就是2PC,3PC,和Paxos算法。

2PC和3PC

当一个事务操作需要跨越多个分布式节点的时候,为保持事务处理的ACID特性,需要引入一个协调者来统一调度所有分布式节点的执行逻辑,这些被调度的分布式节点被称为参与者。协调者负责调度参与者的行为,并最终决定这些参与者是否要把事务真正提交。由此衍生出2pc和3pc

2PC 二阶段提交协议

二阶段提交协议是把事务的提交过程分成两个阶段处理。

  • 阶段一:提交事务请求
  • 阶段二:执行事务提交

二阶段提交的核心思想就是对每一个事务都采用先尝试后提交的处理方式,因此也可以将二阶段提交看作是一个强一致性算法。

阶段1中,协调者发起一个提议,分别问询各参与者是否接受

![image-20190922233819996](分布式系列-二-一致性协议 2PC和3PC/image-20190922233819996.png)

在阶段2中,协调者根据参与者的反馈,提交或中止事务,如果参与者全部同意则提交,只要有一个参与者不同意就中止。

![image-20190922233839319](分布式系列-二-一致性协议 2PC和3PC/image-20190922233839319.png)

二阶段提交有几个优点:原理简单,容易实现。

二阶段提交有几个缺点:

  • 同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
  • 单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
  • 数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据部一致性的现象。
  • 二阶段无法解决的问题:协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。

3PC 三阶段提交协议

三阶段提交(3PC),是二阶段提交(2PC)的改进。有两个改动点。

  • 引入超时机制。同时在协调者和参与者中都引入超时机制。
  • 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

三阶段提交有以下3个阶段:

  • 阶段一:CanCommit。

    事务询问,参与者响应。协调者询问是否可以进行事务操作,参与者反馈

  • 阶段二:PreCommit。

    两种可能。

    1.协调者获得的所有反馈都是Yes,执行事务的预执行。

    • 发送预提交请求 协调者向参与者发送PreCommit请求,并进入Prepared阶段。
    • 事务预提交 参与者接收到PreCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中。
    • 响应反馈 如果参与者成功的执行了事务操作,则返回ACK响应,同时开始等待最终指令。

    2.任一个参与者向协调者发送了No,或等待超时。执行事务的中断。

    • 发送中断请求 协调者向所有参与者发送abort请求。
    • 中断事务 参与者收到来自协调者的abort请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断。
  • 阶段三:doCommit

    该阶段进行真正的事务提交,也分两种情况。

    1.执行提交

    • 发送提交请求 协调者收到所有Ack响应,将从预提交状态进入到提交状态。并向所有参与者发送doCommit请求。
    • 事务提交 参与者接收到doCommit请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。
    • 响应反馈 事务提交完之后,向协调者发送Ack响应。
    • 完成事务 协调者接收到所有参与者的Ack响应之后,完成事务。

    2.中断事务

    协调者没有完全接收到所有的Ack响应,执行中断事务。

    • 协调者向所有参与者发送abort中断请求
    • 事务回滚。参与者接收到abort请求之后,利用其在阶段二记录的undo信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
    • 反馈结果。参与者完成事务回滚之后,向协调者发送Ack消息
    • 中断事务。协调者接收到参与者反馈的Ack消息之后,执行事务的中断。

![image-20190923233335948](分布式系列-二-一致性协议 2PC和3PC/image-20190923233335948.png)

2PC和3PC的区别

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,参与者会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

在2PC中一个参与者的状态只有它自己和协调者知晓,假如协调者提议后自身宕机,在协调者备份启用前一个参与者又宕机,其他参与者就会进入既不能回滚、又不能强制commit的阻塞状态,直到参与者宕机恢复。

参与者如果在不同阶段宕机,3PC如何应对:

  • 阶段1: 协调者或协调者备份未收到宕机参与者的vote,直接中止事务;宕机的参与者恢复后,读取logging发现未发出赞成vote,自行中止该次事务
  • 阶段2: 协调者未收到宕机参与者的precommit ACK,但因为之前已经收到了宕机参与者的赞成反馈(不然也不会进入到阶段2),协调者进行commit;协调者备份可以通过问询其他参与者获得这些信息,过程同理;宕机的参与者恢复后发现收到precommit或已经发出赞成vote,则自行commit该次事务
  • 阶段3: 即便协调者或协调者备份未收到宕机参与者t的commit ACK,也结束该次事务;宕机的参与者恢复后发现收到commit或者precommit,也将自行commit该次事务

分布式系列 (一) 分布式架构概念

喵了个喵,我又遇到瓶颈了

[TOC]

集中式系统架构与分布式系统架构

集中式系统:由卓越性能的大型主机单机组成的计算机系统,称为集中式系统。

特点。单机运算能力强劲,部署结构简单。但是,拥有单点故障,且单机价格昂贵。

分布式系统:一个硬件或者软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。

特点:

  1. 分布性。多台计算机在空间中任意分布,且分布情况随时变动。
  2. 对等性。分布式系统中计算机硬件没有主从之分。
  3. 并发性。并发的操作共享资源(分布式系统最大挑战之一)。
  4. 缺乏全局时钟。分布式系统中时间的先后,缺乏全局时钟序列控制。
  5. 故障总是发生。组成分布式系统的计算机,随时都有可能发生任何形式故障

分布式系统架构中的挑战

事务

定义:一系列对系统中数据进行访问与更新的操作所组成的一个程序执行单元。狭义上的事务特指数据库事务。

4大特性:ACID

原子性 Atomicity 一致性 Consistency 隔离性Isolation 持久性Durability

原子性指事务必须是一个原子的操作序列单元,只允许出现全部成功执行和全部不执行两个状态。

一致性指事务执行的结果必须是使系统从一个一致性状态变为另一个一致性状态。比如数据库事务中,出现故障事务失败,但是数据写入了部分,此为不一致状态。

隔离性指一个事务的执行不能被其它事务干扰。

持久性指事务一旦提交,对系统的变更是永久性的。比如数据库事务操作完毕,数据持久到磁盘。

分布式事务和数据一致性

一个分布式事务可看作由多个分布式的操作序列组成,由于在分布式事务中,各个子事务的执行是分散的,因此要实现一种能够保证ACID特性的分布式事务处理系统格外复杂。

举个典型的分布式事务场景:一个跨银行的转账操作涉及调用两个异地的银行服务,其中一个是本地银行提供的取款服务,另一个则是目标银行提供的存款服务,这两个服务本身是无状态并且相互独立的,共同构成了一个完整的分布式事物。如果从本地银行取款成功,但是因为某种原因存款服务失败了,那么就必须回滚到取款之前的状态,否则用户可能会发现自己的钱不翼而飞了。

CAP定理

一个经典的分布式系统理论。CAP理论:一个分布式系统不可能同时满足一致性(C:Consistency)、可用性(A:Availability)和分区容错性(P:Partition tolerance)这三个基本需求,最多只能同时满足其中两项。

C 一致性:一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态

A 可用性:系统提供的服务必须一致处于可用的状态,对于用户的每一个操作请求总是能够在有限时间返回结果

P 分区容错性:分布式系统在遇到任何网络分区故障的时候,仍然需要能够对外满足一致性和可用性的服务,除非网络环境都发生故障

选择 说 明
CA 放弃分区容错性,加强一致性和可用性,其实就是传统的单机数据库的选择
AP 放弃一致性(追求最终一致性),追求分区容错性和可用性,这是很多分布式系统设计时的选择,例如很多NoSQL系统就是如此
CP 放弃可用性,追求一致性和分区容错性,基本不会选择,网络问题会直接让整个系统不可用

需要明确的一点是:对于一个分布式系统来说,分区容错性可以说是一个最基本的需求。因为既然是一个分布式系统,那么分布式系统中的组件必然需要部署在不同节点,因此会出现子网络,因此分区容错性成为了一个分布式系统必然要面对和解决的问题。因此系统设计师往往在一致性和可用性之间做平衡取舍。由此引发Base理论。

BASE理论

BASE是对CAP中一致性和可用性权衡的结果,核心思想是即使无法做到强一致性,但是每个应用都可以根据自身业务特点,采用适当的方式使系统达到最终一致性。

BASE理论三要素:

  1. 基本可用 Basically Available
  2. 软状态 Soft state
  3. 最终一致性 Eventually consistent

三要素详细解释:

基本可用

分布式系统出现不可预知故障的时候,允许损失部分可用性

例:集群中部分机器宕机,导致剩余机器压力过大,查询效率从0.5s降到了2s

弱状态

允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。

最终一致性

最终一致性强调在系统的所有数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。

总的来说。BASE 理论面向的是大型高可用可扩展的分布式系统,和传统事务的 ACID 是相反的,它完全不同于 ACID 的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间是不一致的。

Hadoop系列(五)Hadoop三大核心之HDFS 读写流程

喵了个喵,我又遇到瓶颈了

[TOC]

首先,再回顾一下HDFS的架构图

image-20190908154045344

HDFS写数据流程

image-20190908154424852

  1. 客户端发送请求,调用DistributedFileSystem API的create方法去请求namenode,并告诉namenode上传文件的文件名、文件大小、文件拥有者。
  2. namenode根据以上信息算出文件需要切成多少块block,以及block要存放在哪个datanode上,并将这些信息返回给客户端。
  3. 客户端调用FSDataInputStream API的write方法首先将其中一个block写在datanode上。
  4. 每一个block多个副本(默认3个),由已经上传了block的datanode产生新的线程,按照放置副本规则往其它datanode写副本。(并不是由客户端分别往3个datanode上写3份,这样的优势就是快。)
  5. 写完后返回给客户端一个信息,然后客户端在将信息反馈给namenode。更新元数据。

HDFS读流程

image-20190908155152510

  1. 客户端通过调用FileSystem对象中的open()方法来读取需要的数据
  2. DistributedFileSystem会通过RPC协议调用NameNode来查找文件块所在的位置

NameNode只会返回所调用文件中开始的几个块而不是全部返回。对于每个返回的块,都包含块所在的DataNode的地址。随后,这些返回的DataNode会按照Hadoop集群的拓扑结构得出客户端的距离,然后再进行排序。如果客户端本身就是DataNode,那么它就从本地读取文件。其次,DistributedFileSystem会向客户端返回一个支持定位的输入流对象FSDataInputStream,用于给客户端读取数据。FSDataInputStream包含一个DFSInputStream对象,这个对象来管理DataNode和NameNode之间的IO

  1. 当以上步骤完成时,客户端便会在这个输入流上调用read()方法

  2. DFSInputStream对象中包含文件开始部分数据块所在的DataNode地址,首先它会连接文件第一个块最近的DataNode,随后在数据流中重复调用read方法,直到这个块读完为止。

  3. 当第一个块读取完毕时,DFSInputStream会关闭连接,并查找存储下一个数据块距离客户端最近的DataNode,以上这些步骤对客户端来说都是透明的。

  4. 当完成所有块的读取时,客户端则会在DFSInputStream中调用close()方法。

|