PyHDFS的方法操作详解

PyHDFS

[TOC]

安装

安装hadoop

关于hadoop的安装配置会在另一篇文章中介绍,这里只介绍pythonhdfs库的安装.

安装hdfs库

所有python的三方模块均采用pip来安装.

1
pip install hdfs

hdfs库的使用

下面将介绍hdfs库的方法列表,并会与hadoop自带的命令行工具进行比较

1
2
3
4
from hdfs.client import Client

Path = "http://192.168.150.150:9870"
client = Client(Path)

list

作用

​ list()会列出hdfs指定路径的所有文件信息,接收两个参数

参数

  1. hdfs_path 要列出的hdfs路径
  2. status 默认为False,是否显示详细信息

应用

​ 查看hdfs根目录下的文件信息,等同于hdfs dfs -ls /

1
2
3
4
5
6
7
8
9
10
11
list_true = client.list(hdfs_path="/", status=True)
list_false = client.list(hdfs_path="/", status=False)

print("hdfs中的目录的详细信息为:", list_true)
print("hdfs中的目录的概要信息为:", list_false)

# 结果如下:
"""
hdfs中的目录的详细信息为: [('app', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 1, 'fileId': 20871, 'group': 'supergroup', 'length': 0, 'modificationTime': 1630303896119, 'owner': 'root', 'pathSuffix': 'app', 'permission': '755', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('data', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 4, 'fileId': 21638, 'group': 'supergroup', 'length': 0, 'modificationTime': 1634027286302, 'owner': 'root', 'pathSuffix': 'data', 'permission': '755', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('demo', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 1, 'fileId': 16688, 'group': 'supergroup', 'length': 0, 'modificationTime': 1623153705295, 'owner': 'root', 'pathSuffix': 'demo', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('hbase', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 12, 'fileId': 19087, 'group': 'supergroup', 'length': 0, 'modificationTime': 1633768289083, 'owner': 'root', 'pathSuffix': 'hbase', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('kb12', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 1, 'fileId': 16927, 'group': 'supergroup', 'length': 0, 'modificationTime': 1623400472184, 'owner': 'root', 'pathSuffix': 'kb12', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('shop12', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 1, 'fileId': 20869, 'group': 'supergroup', 'length': 0, 'modificationTime': 1629700188802, 'owner': 'root', 'pathSuffix': 'shop12', 'permission': '755', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('test', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 9, 'fileId': 16397, 'group': 'supergroup', 'length': 0, 'modificationTime': 1623133555707, 'owner': 'root', 'pathSuffix': 'test', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('tmp', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 4, 'fileId': 16386, 'group': 'supergroup', 'length': 0, 'modificationTime': 1629422901494, 'owner': 'root', 'pathSuffix': 'tmp', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}), ('user', {'accessTime': 0, 'blockSize': 0, 'childrenNum': 4, 'fileId': 16392, 'group': 'supergroup', 'length': 0, 'modificationTime': 1630652216664, 'owner': 'root', 'pathSuffix': 'user', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'})]
hdfs中的目录的概要信息为: ['app', 'data', 'demo', 'hbase', 'kb12', 'shop12', 'test', 'tmp', 'user']
"""

status

作用

​ 查看文件或者目录状态

参数

  1. hdfs_path 要列出的hdfs路径
  2. strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise

应用

1
2
3
4
5
6
7
8
9
10
status_strict = client.status(hdfs_path="/data", strict=True)
status_noneStrict = client.status(hdfs_path="/dat1", strict=False)
print("文件存在的严格模式:", status_strict)
print("文件不存在的非严格模式:", status_noneStrict)

# 结果如下:
"""
文件存在的严格模式: {'accessTime': 0, 'blockSize': 0, 'childrenNum': 4, 'fileId': 21638, 'group': 'supergroup', 'length': 0, 'modificationTime': 1634027286302, 'owner': 'root', 'pathSuffix': '', 'permission': '755', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}
文件不存在的非严格模式: None
"""

checksum

作用

​ 获取hdfs文件的校验和

参数

  1. hdfs_path 要列出file的hdfs路径

应用

1
2
3
4
5
6
7
8
9
10
checksum_file_num = client.checksum(hdfs_path="/data/hive_test/test/row_to_col.txt")
print("文件下的文件数为:", checksum_file_num)

# 结果如下:
"""
获取hdfs文件的校验和为: {'algorithm': 'MD5-of-0MD5-of-512CRC32C', 'bytes': '000002000000000000000000b4e74bb6e7710bfe5101679b8814c6bf00000000', 'length': 28}
"""
algorithm:算法
bytes:
length:

parts

作用

​ 列出路径下的part file,接收三个参数

参数

  1. hdfs_path 要列出的hdfs路径
  2. parts 要显示的parts数量 默认全部显示,part名称相同,去重后显示
  3. status 默认为False,是否显示详细信息

应用

1
2
3
4
5
6
7
8
9
10
parts_files_all = client.parts(hdfs_path="/user/hive/warehouse/person")
parts_files = client.parts(hdfs_path="/user/hive/warehouse/person", parts=0, status=True)
print("显示当前路径下的文件数为:", parts_files_all)
print("显示当前路径下的部分文件数的详细信息为:", parts_files)

# 结果如下:
"""
显示当前路径下的文件数为: ['part-00000-bff48cd0-a534-495d-bd27-9fb6a623584c-c000', 'part-00001-bff48cd0-a534-495d-bd27-9fb6a623584c-c000']
显示当前路径下的部分文件数的详细信息为: [('part-00000-bff48cd0-a534-495d-bd27-9fb6a623584c-c000', {'accessTime': 1634209606115, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 23892, 'group': 'supergroup', 'length': 34, 'modificationTime': 1634003102519, 'owner': 'root', 'pathSuffix': 'part-00000-bff48cd0-a534-495d-bd27-9fb6a623584c-c000', 'permission': '755', 'replication': 1, 'storagePolicy': 0, 'type': 'FILE'}), ('part-00001-bff48cd0-a534-495d-bd27-9fb6a623584c-c000', {'accessTime': 1634003102020, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 23894, 'group': 'supergroup', 'length': 31, 'modificationTime': 1634003102519, 'owner': 'root', 'pathSuffix': 'part-00001-bff48cd0-a534-495d-bd27-9fb6a623584c-c000', 'permission': '755', 'replication': 1, 'storagePolicy': 0, 'type': 'FILE'})]
"""

content

作用

​ 列出目录或文件详情,接收两个参数

参数

  1. hdfs_path 要列出的hdfs路径
  2. strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise

应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
content_file_true = client.content(hdfs_path="/data/hive_test/test/row_to_col.txt", strict=True)
content_file_false = client.content(hdfs_path="/data/hive_test/test/row_to_col", strict=False)
content_dir_true = client.content(hdfs_path="/data", strict=True)
content_dir_false = client.content(hdfs_path="/dat1", strict=False)
print("hdfs中的文件的严格模式信息为:", content_file_true)
print("hdfs中的文件不存在的非严格模式信息为:", content_file_false)
print("hdfs中的目录的严格模式信息为:", content_dir_true)
print("hdfs中的目录不存在的非严格模式信息为:", content_dir_false)

# 结果如下:
"""
hdfs中的文件的严格模式信息为: {'directoryCount': 0, 'fileCount': 1, 'length': 110, 'quota': -1, 'spaceConsumed': 110, 'spaceQuota': -1, 'typeQuota': {}}
hdfs中的文件不存在的非严格模式信息为: None
hdfs中的目录的严格模式信息为: {'directoryCount': 9, 'fileCount': 7, 'length': 22839529, 'quota': -1, 'spaceConsumed': 22839529, 'spaceQuota': -1, 'typeQuota': {}}
hdfs中的目录不存在的非严格模式信息为: None
"""

makedirs

作用

​ 创建目录,同hdfs dfs -mkdirhdfs dfs -chmod的结合体,接收两个参数

参数

  1. hdfs_path hdfs路径
  2. permission 文件权限

应用

1
2
3
4
create_dir_777 = client.makedirs(hdfs_path="/data/test777", permission="777")
create_dir_111 = client.makedirs(hdfs_path="/data/test111", permission="111")

# 结果如下:

创建文件

rename

作用

​ 文件或目录重命名,接收两个参数

参数

  1. hdfs_src_path 原始路径或名称
  2. hdfs_dst_path 修改后的文件或路径

应用

1
2
3
4
rename_file = client.rename(hdfs_src_path="/data/hive_test/test/row_to_col.txt", hdfs_dst_path="/data/hive_test/test/row_to_col.txt.bak")
rename_dir = client.rename(hdfs_src_path="/data/test111", hdfs_dst_path="/data/test111.bak")

# 结果如下:

文件或目录修改前

文件或目录修改后

resolve

作用

​ 返回绝对路径,接收一个参数hdfs_path

参数

  1. hdfs_path 要列出file的hdfs路径 ,若存在多个重名的文件,则返回路径深度最浅的路径

应用

1
2
3
4
5
6
7
8
9
10
resolve_file_path_1 = client.resolve(hdfs_path="/data/hive_test/test/row_to_col.txt.bak")
resolve_file_path_2 = client.resolve(hdfs_path="row_to_col.txt.bak")
print("文件的绝对路径为:", resolve_file_path_1)
print("文件的绝对路径为:", resolve_file_path_2)

# 结果如下:
"""
文件的绝对路径为: /data/hive_test/test/row_to_col.txt.bak
文件的绝对路径为: /user/root/row_to_col.txt.bak
"""

set_replication

作用

​ 设置文件在hdfs上的副本(datanode上)数量,接收两个参数,集群模式下的hadoop默认保存3份

参数

  1. hdfs_path hdfs路径
  2. replication 副本数量

应用

1
2
3
client.set_replication(hdfs_path="/row_to_col.txt.bak",replication=2)

# 结果如下:

read

作用

​ 读取文件信息 类似与 hdfs dfs -cat hfds_path,参数如下:

参数

  1. hdfs_path hdfs路径
  2. offset 读取位置
  3. length 读取长度
  4. buffer_size 设置buffer_size 不设置使用hdfs默认100MB 对于大文件 buffer够大的化 sort与shuffle都更快
  5. encoding 指定编码
  6. chunk_size 字节的生成器,必须和encodeing一起使用 满足chunk_size设置即 yield
  7. delimiter 设置分隔符 必须和encodeing一起设置
  8. progress 读取进度回调函数 读取一个chunk_size回调一次

应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    This method must be called using a `with` block:

.. code-block:: python

with client.read('foo') as reader:
content = reader.read()

This ensures that connections are always properly closed.
# 官方使用提示:
"""
此方法必须使用 `with` 块调用:

.. 代码块:: python

以 client.read('foo') 作为读者:
内容 = reader.read()

这可确保连接始终正确关闭。
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 读取200长度
with client.read("/data/rent/data.csv", length=200, encoding='utf-8') as reader:
content = reader.read()
print(content)
print("*"*100)
# 从20位置读取200长度
with client.read("/data/rent/data.csv", offset=20, length=200, encoding='utf-8') as reader:
content = reader.read()
print(content)
print("*"*100)

# 设置buffer为1024,读取
with client.read("/data/rent/data.csv", buffer_size=200, length=200, encoding='utf-8') as reader:
content = reader.read()
print(content)
print("*"*100)

# 设置分隔符为换行
# with client.read("/data/rent/data.csv", encoding='utf-8', length=200, delimiter='\n') as reader:
# content = reader.read()
# print(content)

# print("*"*100)

# # 设置读取每个块的大小为8
# with client.read("/data/rent/data.csv", encoding='utf-8', chunk_size=80) as reader:
# content = reader

# 结果如下:
"""
city_name,data_all,public_time,rent_address,rent_area,rent_money,rent_title,rent_type
南京,电梯:无-车位:暂无数据-用水:民水-用电:民电-燃气:有</-采暖暂无数据,2021-10-05,鼓
****************************************************************************************************
ublic_time,rent_address,rent_area,rent_money,rent_title,rent_type
南京,电梯:无-车位:暂无数据-用水:民水-用电:民电-燃气:有</-采暖暂无数据,2021-10-05,鼓楼-宁海路-随园,
****************************************************************************************************
city_name,data_all,public_time,rent_address,rent_area,rent_money,rent_title,rent_type
南京,电梯:无-车位:暂无数据-用水:民水-用电:民电-燃气:有</-采暖暂无数据,2021-10-05,鼓
****************************************************************************************************
"""

download

作用

​ 从hdfs下载文件到本地,参数列表如下.

参数

  1. hdfs_path hdfs路径
  2. local_path 下载到的本地路径
  3. overwrite 是否覆盖(如果有同名文件) 默认为Flase
  4. n_threads 启动线程数量,默认为1,不启用多线程
  5. temp_dir下载过程中文件的临时路径
  6. **kwargs其他属性

应用

1
2
3
4
5
6
download_file = client.download(hdfs_path="/data/hive_test/test/row_to_col.txt.bak", local_path="download/", overwrite=True)
print("下载文件结果:", download_file)
# 结果如下:
"""
下载文件结果: D:\Desktop\PyHadoop\PyHdfs\download
"""

下载文件

upload

作用

​ 上传文件到hdfs 同hdfs dfs -copyFromLocal local_file hdfs_path,参数列表如下:

参数

  1. hdfs_path, hdfs上位置
  2. local_path, 本地文件位置
  3. n_threads=1 并行线程数量 temp_dir=None, overwrite=True或者文件已存在的情况下的临时路径
  4. chunk_size=2 ** 16 块大小
  5. progress=None, 报告进度的回调函数 完成一个chunk_size回调一次 chunk_size可以设置大点 如果大文件的话
  6. cleanup=True, 上传错误时 是否删除已经上传的文件
  7. **kwargs 上传的一些关键字 一般设置为 overwrite 来覆盖上传

应用

1
2
3
4
5
6
7
8
9
10
11
12
13
def callback(filename, size):
print(filename, "完成了一个chunk上传", "当前大小:", size)
if size == -1:
print("文件上传完成")

# 上传成功返回 hdfs_path
client.upload(hdfs_path="/data", local_path="./download", chunk_size=2 << 19, progress=callback, cleanup=True)
# 结果如下:
"""
./download 完成了一个chunk上传 当前大小: 110
./download 完成了一个chunk上传 当前大小: -1
文件上传完成
"""

上传文件

delete

作用

​ 删除文件,接收三个参数【等同hdfs dfs -rm (-r)

参数

  1. hdfs_path
  2. recursive=False 是否递归删除
  3. skip_trash=True 是否移到垃圾箱而不是直接删除 hadoop 2.9+版本支持

应用

1
2
3
4
5
6
7
delete_file = client.delete("/data/download")
print("文件删除信息为:", delete_file)

# 结果如下:
"""
文件删除信息为: True
"""

删除文件

set_owner

作用

​ 类似与 hdfs dfs -chown root root hdfs_path修改目录或文件的所属用户,用户组,接收三个参数

参数

  1. hdfs_path hdfs路径
  2. owner 用户
  3. group 用户组

注意:对于默认用户,只能修改自己的文件.

应用

1
2
3
4
5
6
7
set_owner = client.set_owner(hdfs_path="/data/hive_test/test/row_to_col.txt.bak", owner="root", group="root")
print("设置文件拥有者的信息为:", set_owner)

# 结果如下:
"""
设置文件拥有者的信息为: None
"""

目录或文件的所属用户,用户组修改前

目录或文件的所属用户,用户组 修改后

set_permission

作用

​ 修改权限,类似于hdfs dfs -chmod 777 hdfs_path,接收两个参数

参数

  1. hdfs_path hdfs路径
  2. permission 权限

应用

1
2
3
4
5
6
7
set_owner = client.set_permission(hdfs_path="/data/hive_test/test/row_to_col.txt.bak", permission="777")
print("修改权限的信息为:", set_owner)

# 结果如下:
"""
修改权限的信息为: None
"""

文件权限修改前

文件权限修改后

set_acl与acl_status

​ 查看和修改访问权限控制 需要开启acl支持

set_times

作用

​ 设置文件时间,接收参数如下:

参数

  1. hdfs_path: hdfs路径.
  2. access_time: 最后访问时间 时间戳 毫秒
  3. modification_time: 最后修改时间 时间戳 毫秒

应用

1
2
3
4
5
6
7
8
9
10
11
import time

modify_time = client.set_times(hdfs_path="/data/hive_test/person/person.txt", access_time=int(time.time())*1000,
modification_time=int(time.time())*1000)

print("修改文件的时间的信息为:", modify_time)

# 结果如下:
"""
修改文件的时间的信息为: None
"""

文件操作时间修改前

文件操作时间修改后


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!