hadoop客户端操作

509

基本文件操作

package cn.medemede.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Test;

public class HDFSClient {

	// 打印文件系统
	@Test
	public void getFileSystem() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		System.out.println(fileSystem.toString());
		fileSystem.close();
	}

	// 上传
	@Test
	public void putFileToHdfs() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// copyFromLocalFile(是否删除源文件,源文件,目标路径)
		fileSystem.copyFromLocalFile(false, new Path("D:/Projects/MyEclipse-Work/hadoop0508/testFile/test.txt"),
				new Path("/user/xcp/testFile"));

		// 关闭文件系统
		fileSystem.close();

	}

	// 下载
	@Test
	public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// copyToLocalFile(是否删除源文件,源文件,目标路径,是否需要校验)
		fileSystem.copyToLocalFile(false, new Path("/user/xcp/test.txt"),
				new Path("D:/Projects/MyEclipse-Work/hadoop0508/testFile/test.txt"), true);
		// 关闭文件系统
		fileSystem.close();
	}

	// 创建目录
	@Test
	public void midirAtHDFS() throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		fileSystem.mkdirs(new Path("/user/xcp/testFile"));
		// 关闭文件系统
		fileSystem.close();
	}

	// 删除
	@Test
	public void deleteAtHDFS() throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		fileSystem.delete(new Path("/user/xcp/testFile/test.txt"), true);
		// 关闭文件系统
		fileSystem.close();
	}

	//重命名
	@Test
	public void reNameAtHDFS() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		fileSystem.rename(new Path("/user/xcp/testFile/test.txt"), new Path("/user/xcp/testFile/test2.txt"));
		// 关闭文件系统
		fileSystem.close();
	}

	// 查看文件详情
	@Test
	public void readFileAtHDFS() throws FileNotFoundException, IllegalArgumentException, IOException,
			InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
		while (listFiles.hasNext()) {
			LocatedFileStatus locatedFileStatus = listFiles.next();

			// 文件名
			System.out.println(locatedFileStatus.getPath().getName());
			// 块大小
			System.out.println(locatedFileStatus.getBlockSize());
			// 文件内容长度
			System.out.println(locatedFileStatus.getLen());
			// 文件权限
			System.out.println(locatedFileStatus.getPermission());

			// 文件块具体信息
			BlockLocation[] blockLocations = locatedFileStatus.getBlockLocations();
			for (BlockLocation blockLocation : blockLocations) {
				System.out.println(blockLocation.getOffset());
				String[] hosts = blockLocation.getHosts();
				for (String host : hosts) {
					System.out.print(host + " ");
				}
			}

			System.out.println("\n------------------------------");
		}
		// 关闭文件系统
		fileSystem.close();
	}

	// 查看文件夹详情
	@Test
	public void readDirAtHDFS() throws FileNotFoundException, IllegalArgumentException, IOException,
			InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));

		for (FileStatus fileStatus : listStatus) {
			if (fileStatus.isFile()) {
				System.out.println("f----" + fileStatus.getPath().getName());
			} else {
				System.out.println("d----" + fileStatus.getPath().getName());
			}
		}
	}
}

利用IO流操作文件

package cn.medemede.hadoop;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class IOToHDFS {

	// IO流方式上传
	@Test
	public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// 输出流
		FSDataOutputStream fsOut = fileSystem
				.create(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		// 输入流
		FileInputStream fsIn = new FileInputStream(
				new File("D:/video/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		// 流对接
		try {
			IOUtils.copyBytes(fsIn, fsOut, configuration);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		} finally {
			// 关闭
			IOUtils.closeStream(fsIn);
			IOUtils.closeStream(fsOut);
			fileSystem.close();
		}
	}

	// IO流方式下载
	@Test
	public void downloadFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// 输入流
		FSDataInputStream fsInputStream = fileSystem
				.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		// 输出流
		FileOutputStream fsOutputStream = new FileOutputStream(new File(
				"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		try {
			IOUtils.copyBytes(fsInputStream, fsOutputStream, configuration);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(fsInputStream);
			IOUtils.closeStream(fsOutputStream);
			fileSystem.close();
		}
	}

	// 下载大文件的第一块数据
	@Test
	public void downloadFirstStack() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// 输入流
		FSDataInputStream fsInputStream = fileSystem
				.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		// 输出流
		FileOutputStream fsOutputStream = new FileOutputStream(new File(
				"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv.part1"));

		// 流对接(只读取128M)
		try {
			byte[] buff = new byte[1024]; // 1KB的Buffer

			// 读写1024*128次buff大小的内容,即128M
			for (int i = 0; i < 1024 * 128; i++) {
				fsInputStream.read(buff); // 每次读buff大小
				fsOutputStream.write(buff); // 每次写buff大小
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// TODO: handle finally clause
			IOUtils.closeStream(fsInputStream);
			IOUtils.closeStream(fsOutputStream);
			fileSystem.close();
		}
	}

	// 下载指定位置之后的内容
	/**
	 * 文件的合并可以使用cmd或linux终端 追加file.part2到file.part1之后:【type file.part2>>file.part1】
	 */
	@Test
	public void downloadAfter() throws IOException, InterruptedException, URISyntaxException {
		// 创建配置文件
		Configuration configuration = new Configuration();
		// 获取文件系统
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");

		// 输入流
		FSDataInputStream fsInputStream = fileSystem
				.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));

		// 输出流
		FileOutputStream fsOutputStream = new FileOutputStream(new File(
				"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv.part2"));
		// 流对接
		try {
			// 定位到第一块后
			fsInputStream.seek(1024 * 1024 * 128);
			// 将第一块后的内容全部下载到part2
			IOUtils.copyBytes(fsInputStream, fsOutputStream, configuration);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		} finally {
			// TODO: handle finally clause
			IOUtils.closeStream(fsInputStream);
			IOUtils.closeStream(fsOutputStream);
			fileSystem.close();
		}
	}

}