欢迎来到军工软件开发人才培养基地——学到牛牛

pyhdfs模块

时间:2024-05-06 07:01:10 来源:学到牛牛

pyhdfs模块是Python提供的第三方库模块,它提供了直接对Hadoop中HDFS操作的能力,pyhdfs模块是HDFS的API和命令行接口。

1.安装PyHDFS库

在使用pyhdfs模块需要安装PyHDFS,在Python中所有的第三方模块均采用pip安装。

在Windows下使用pip安装模块有以下两种方式。

(1)命令行安装方式:运行→cmd→pip install pyhdfs,如图1所示。

图1 命令行安装方式安装pyhdfs模块

PyCharm安装方式:在Terminal界面输入pip install pyhdfs命令,如图2所示。

图2 PyCharm安装方式安装pyhdfs模块

验证PyHDFS是否安装成功:在控制台终端输入pip list,如果查看到安装的pyhdfs模块则说明安装成功,如图3所示。

图3 查看安装结果

 

2.连接HDFS

pyhdfs模块中的HdfsClient类非常关键。使用这个类可以实现连接HDFS的NameNode,对HDFS上的文件进行查、读、写操作等功能。

参数解析如下。

hosts:主机名或IP地址,与port之间需要逗号隔开,如:hosts="192.168.153.101:9000",支持高可用集群,例如:["192.168.153.101,9000","192.168.153.102,9000"]。

randomize_hosts:随机选择host进行连接,默认为True。

user_name:连接的Hadoop平台的用户名。

timeout:每个NameNode节点连接等待的秒数,默认为20sec。

max_tries:每个NameNode节点尝试连接的次数,默认为2次。

retry_delay:在尝试连接一个NameNode节点失败后,尝试连接下一个NameNode的时间间隔,默认为5s。

requests_session:设置连接HDFS的HTTPRequest请求的模式为session。

代码示例如下:

import pyhdfs

class HDFSTest2:

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    def test(self):

        print(self.client)

if __name__ == '__main__':

    h = HDFSTest2()

    h.test()

结果如图4所示。

图4 输出结果

注意,在Windons下使用PyCharm中pyhdfs模块连接HDFS时,需要设置Windons中的hosts文件,否则将无法在连接集群里根据节点名字找到从节点,如图5所示。

图5 设置hosts文件

 

3.get_home_directory()函数

get_home_directory()函数用于返回所连接集群的根目录。

get_home_directory()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101, 50070", user_name="hadoop")

# 返回这个用户的根目录

def get_home_directory(self):

    c = self.client

    print(c.get_home_directory())

if __name__ == '__main__':

    h = HDFSTest2()

    h.get_home_directory()

结果如图6所示。

 

图6 get_home_directory()函数示例结果

 

4.get_active_namenode()函数

get_active_namenode()函数用于返回当前活动的NameNode的地址。

get_active_namenode()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 返回可用的NameNode节点

    def get_active_namenode(self):

        c = self.client

        nameNode = c.get_active_namenode()

        print(nameNode)

if __name__ == '__main__':

    h = HDFSTest2()

    h.get_active_namenode()

结果如图7所示。

图7 get_active_namenode()函数示例结果

5.listdir()函数

listdir()函数用于返回指定目录下的所有文件,由于PyHDFS可以设置访问的用户,因此在操作HDFS中的文件时不需要设置其他用户的权限。listdir函数中path参数是指定的HDFS路径。

listdir()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

 

    # 查询HDFS中根目录下的所有文件

    def listdir(self, hdfsPath):

        c = self.client

        dir = c.listdir(path=hdfsPath)

        for d in dir:

            print(d, end="\t")

if __name__ == '__main__':

    h = HDFSTest2()

    h.listdir("/")

结果如图8所示。

图8 listdir()函数示例结果

6.open()函数

open()函数用于远程打开HDFS中的文件,返回IO[bytes]类型,利用read()函数读取指定文件的数据返回AnyStr类型。

open()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 打开hdfs中文件

    def open(self, filDir):

        c = self.client

        file = c.open(path=filDir)

        print(file.read().decode(encoding="utf8"))

if __name__ == '__main__':

    h = HDFSTest2()

    h.open("/input/data")

结果如图9所示。

图9 open()函数示例结果

7.copy_from_local()函数

copy_from_local()函数用于从本地上传文件到集群,接收两个参数:localsrc参数用于设置本地文件路径;dest用于设置HDFS中文件路径,如果dest参数对应的路径不存在将创建一个新路径。

copy_from_local()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 从本地上传文件至集群

    def copy_from_local(self, local, hdfsPath):

        c = self.client

        c.copy_from_local(localsrc=local, dest=hdfsPath)

if __name__ == '__main__':

    h = HDFSTest2()

    h.copy_from_local("D:/tmp/test.txt", "/input/dd/newTest")

输出结果:

[hadoop@Slave003 ~]$ hadoop fs -ls /input

-rw-r--r--   3 hadoop supergroup         77 2020-07-23 04:20 /input/data

[hadoop@Slave003 ~]$ hadoop fs -ls /input

-rw-r--r--   3 hadoop supergroup         77 2020-07-23 04:20 /input/data

-rwxr-xr-x   3 hadoop supergroup         10 2020-07-24 06:12 /input/newTest

[hadoop@Slave003 ~]$ hadoop fs -ls -R /input

-rw-r--r--   3 hadoop supergroup         77 2020-07-23 04:20 /input/data

drwxr-xr-x   - hadoop supergroup          0 2020-07-24 06:14 /input/dd

-rwxr-xr-x   3 hadoop supergroup         10 2020-07-24 06:14 /input/dd/newTest

-rwxr-xr-x   3 hadoop supergroup         10 2020-07-24 06:12 /input/newTest

 

8.copy_to_local()函数

copy_to_local()函数用于从集群的HDFS中下载文件到本地,接收两个参数:src参数为hdfs中的文件路径;localdest参数为本地的文件存储路径。

copy_to_local()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 从集群下载文件到本地

    def copy_to_local(self, hdfsPath, local):

        c = self.client

        c.copy_to_local(src=hdfsPath, localdest=local)

if __name__ == '__main__':

    h = HDFSTest2()

    h.copy_to_local("/input/data", "D:/tmp")

 

9.mkdirs()函数

mkdirs()函数用于在集群的HDFS中创建新目录,path参数用于传入需要创建的路径。

mkdirs()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 创建新目录

    def mkdirs(self, hdfsPath):

        c = self.client

        c.mkdirs(path=hdfsPath)

if __name__ == '__main__':

    h = HDFSTest2()

    h.mkdirs("/input/tmp")

结果如图10所示。

图10 mkdirs()函数示例结果

10.exists()函数

exists()函数用于查看指定的文件或目录是否存在,如果存在则返回True,否则返回False。

exists()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    # 查看文件是否存在

    def exists(self, hdfsPath):

        c = self.client

        result = c.exists(path=hdfsPath)

        print("结果:", result)

if __name__ == '__main__':

    h = HDFSTest2()

    h.exists("/input")

结果如图11所示。

图11 exists()函数示例结果

11.get_file_status()函数

get_file_status()函数用于返回指定HDFS路径的路径对象,path参数为HDFS文件或目录路径。

get_file_status函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    def get_file_status(self, hdfsPath):

        c = self.client

        status = c.get_file_status(hdfsPath)

        print(status["type"])

        if status["type"] == "DIRECTORY":

            print(f"{hdfsPath}:该文件是目录!")

        elif status["type"] == "FILE":

            print(f"{hdfsPath}:该文件是文件!")

if __name__ == '__main__':

    h = HDFSTest2()

    h.get_file_status("/input/data")

结果如图12所示。

图12 运行结果

12.delete()函数

delete()函数用于删除HDFS文件函数,该函数只能删除文件或者空目录,如果删除的目录下有文件的目录,将抛出HdfsPathIsNotEmptyDirectoryException异常。

delete()函数示例如下:

class HDFSTest2:

    # 获取对HDFS操作的对象

    def __init__(self):

        self.client = pyhdfs.HdfsClient(hosts="192.168.153.101,50070", user_name="hadoop")

    def delete(self):

        c = self.client

        c.delete("/input")

if __name__ == '__main__':

    h = HDFSTest2()

    h.delet