基本文件操作
package cn.medemede.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Test;
public class HDFSClient {
// 打印文件系统
@Test
public void getFileSystem() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
System.out.println(fileSystem.toString());
fileSystem.close();
}
// 上传
@Test
public void putFileToHdfs() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// copyFromLocalFile(是否删除源文件,源文件,目标路径)
fileSystem.copyFromLocalFile(false, new Path("D:/Projects/MyEclipse-Work/hadoop0508/testFile/test.txt"),
new Path("/user/xcp/testFile"));
// 关闭文件系统
fileSystem.close();
}
// 下载
@Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// copyToLocalFile(是否删除源文件,源文件,目标路径,是否需要校验)
fileSystem.copyToLocalFile(false, new Path("/user/xcp/test.txt"),
new Path("D:/Projects/MyEclipse-Work/hadoop0508/testFile/test.txt"), true);
// 关闭文件系统
fileSystem.close();
}
// 创建目录
@Test
public void midirAtHDFS() throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
fileSystem.mkdirs(new Path("/user/xcp/testFile"));
// 关闭文件系统
fileSystem.close();
}
// 删除
@Test
public void deleteAtHDFS() throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
fileSystem.delete(new Path("/user/xcp/testFile/test.txt"), true);
// 关闭文件系统
fileSystem.close();
}
//重命名
@Test
public void reNameAtHDFS() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
fileSystem.rename(new Path("/user/xcp/testFile/test.txt"), new Path("/user/xcp/testFile/test2.txt"));
// 关闭文件系统
fileSystem.close();
}
// 查看文件详情
@Test
public void readFileAtHDFS() throws FileNotFoundException, IllegalArgumentException, IOException,
InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus locatedFileStatus = listFiles.next();
// 文件名
System.out.println(locatedFileStatus.getPath().getName());
// 块大小
System.out.println(locatedFileStatus.getBlockSize());
// 文件内容长度
System.out.println(locatedFileStatus.getLen());
// 文件权限
System.out.println(locatedFileStatus.getPermission());
// 文件块具体信息
BlockLocation[] blockLocations = locatedFileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
System.out.println(blockLocation.getOffset());
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.print(host + " ");
}
}
System.out.println("\n------------------------------");
}
// 关闭文件系统
fileSystem.close();
}
// 查看文件夹详情
@Test
public void readDirAtHDFS() throws FileNotFoundException, IllegalArgumentException, IOException,
InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
System.out.println("f----" + fileStatus.getPath().getName());
} else {
System.out.println("d----" + fileStatus.getPath().getName());
}
}
}
}
利用IO流操作文件
package cn.medemede.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class IOToHDFS {
// IO流方式上传
@Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// 输出流
FSDataOutputStream fsOut = fileSystem
.create(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
// 输入流
FileInputStream fsIn = new FileInputStream(
new File("D:/video/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
// 流对接
try {
IOUtils.copyBytes(fsIn, fsOut, configuration);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
// 关闭
IOUtils.closeStream(fsIn);
IOUtils.closeStream(fsOut);
fileSystem.close();
}
}
// IO流方式下载
@Test
public void downloadFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// 输入流
FSDataInputStream fsInputStream = fileSystem
.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
// 输出流
FileOutputStream fsOutputStream = new FileOutputStream(new File(
"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
try {
IOUtils.copyBytes(fsInputStream, fsOutputStream, configuration);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
IOUtils.closeStream(fsInputStream);
IOUtils.closeStream(fsOutputStream);
fileSystem.close();
}
}
// 下载大文件的第一块数据
@Test
public void downloadFirstStack() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// 输入流
FSDataInputStream fsInputStream = fileSystem
.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
// 输出流
FileOutputStream fsOutputStream = new FileOutputStream(new File(
"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv.part1"));
// 流对接(只读取128M)
try {
byte[] buff = new byte[1024]; // 1KB的Buffer
// 读写1024*128次buff大小的内容,即128M
for (int i = 0; i < 1024 * 128; i++) {
fsInputStream.read(buff); // 每次读buff大小
fsOutputStream.write(buff); // 每次写buff大小
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// TODO: handle finally clause
IOUtils.closeStream(fsInputStream);
IOUtils.closeStream(fsOutputStream);
fileSystem.close();
}
}
// 下载指定位置之后的内容
/**
* 文件的合并可以使用cmd或linux终端 追加file.part2到file.part1之后:【type file.part2>>file.part1】
*/
@Test
public void downloadAfter() throws IOException, InterruptedException, URISyntaxException {
// 创建配置文件
Configuration configuration = new Configuration();
// 获取文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.142.101:9000"), configuration, "root");
// 输入流
FSDataInputStream fsInputStream = fileSystem
.open(new Path("/user/xcp/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv"));
// 输出流
FileOutputStream fsOutputStream = new FileOutputStream(new File(
"D:/Projects/MyEclipse-Work/hadoop0508/testFile/[BD影视分享bd-film.co]敦刻尔克.Dunkirk.2017.内地公映版.HD1080P.国英双语.中英双字.mkv.part2"));
// 流对接
try {
// 定位到第一块后
fsInputStream.seek(1024 * 1024 * 128);
// 将第一块后的内容全部下载到part2
IOUtils.copyBytes(fsInputStream, fsOutputStream, configuration);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
// TODO: handle finally clause
IOUtils.closeStream(fsInputStream);
IOUtils.closeStream(fsOutputStream);
fileSystem.close();
}
}
}