HDFS File Upload

实现用户上传文件并将文件存储到大数据HDFS及Hive库中,

后端上传文件到HDFS:可以使用Python的HDFS库(如pyarrow、hdfs3等)来实现将文件上传到HDFS,需要配置HDFS的连接信息和权限,可以在flask中使用路由接口接收文件,然后调用HDFS库上传到HDFS中。   

后端上传文件到Hive:可以使用Python的Pyhive库来实现将文件上传到Hive中,需要配置Hive的连接信息和权限,可以在flask中使用路由接口接收文件,然后调用Pyhive库上传到Hive中。

前端上传文件:可以使用Vue的组件库(如Element-UI等)来实现文件上传功能,需要配置上传文件的接口地址和参数信息,可以使用axios库发送POST请求到后端接口。

后端Spark处理:可以使用PySpark来处理上传到HDFS或Hive中的数据,需要配置Spark的连接信息和权限,可以使用pyspark库来操作Spark,实现对数据的处理和分析。

安全性考虑:为了保证数据的安全性,需要对上传的文件进行权限校验和安全过滤,防止用户上传恶意文件和攻击行为,可以使用Python的安全库(如hashlib等)来实现文件的安全过滤。

代码逻辑具体设计:

  1. 定义后端API接口:在flask中定义API接口,包括上传文件接口和数据处理接口,上传文件接口接收前端传来的文件并存储到HDFS和Hive中,数据处理接口对Hive中的数据进行处理和分析。
  2. 实现上传文件逻辑:在上传文件接口中,需要对接收到的文件进行处理,将文件存储到HDFS和Hive中。具体实现步骤如下:
    a. 接收前端上传的文件:在上传文件接口中,使用flask的request对象获取前端传来的文件,并将文件保存到临时文件夹中。
    b. 将文件存储到HDFS中:使用Python的HDFS库连接HDFS,并将临时文件夹中的文件上传到HDFS中。
    c. 将文件存储到Hive中:使用Python的Pyhive库连接Hive,并将临时文件夹中的文件上传到Hive中。
    d. 删除临时文件:将上传到HDFS和Hive中的文件删除,释放服务器空间。
  3. 实现数据处理逻辑:在数据处理接口中,需要使用PySpark对Hive中的数据进行处理和分析。具体实现步骤如下:
    a. 连接Spark:使用Python的pyspark库连接Spark。
    b. 读取数据:使用Spark SQL从Hive中读取需要处理的数据。
    c. 处理数据:对数据进行处理和分析。
    d. 返回处理结果:将处理结果返回给前端。
  4. 实现安全性控制:对上传的文件进行安全性过滤和权限控制,防止恶意文件上传和攻击行为。
    a. 对上传的文件进行类型和大小的校验:使用Python的MIME类型库和文件大小库对上传的文件进行校验,判断是否符合规定的文件类型和大小。
    b. 对上传的文件进行安全过滤:使用Python的安全库对上传的文件进行安全过滤,防止恶意文件上传和攻击行为。
    c. 对上传的文件进行权限控制:使用Python的权限库对上传的文件进行权限控制,确保只有授权的用户可以上传文件。
    综上所述,后端代码逻辑的具体设计包括定义API接口、实现上传文件逻辑、实现数据处理逻辑和实现安全性控制。在实现过程中需要使用Python的相关库,并考虑数据安全性和权限控制等问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import uuid
from flask import Blueprint, request, jsonify
from pyhdfs import HdfsClient
from pyhive import hive
from werkzeug.utils import secure_filename

your_blueprint = Blueprint('your_blueprint', __name__)

# HDFS 配置
HDFS_HOST = 'your_hdfs_host'
HDFS_PORT = your_hdfs_port
HDFS_USER = 'your_hdfs_user'
HDFS_ROOT = '/user/your_hdfs_user'

# Hive 配置
HIVE_HOST = 'your_hive_host'
HIVE_PORT = your_hive_port
HIVE_USER = 'your_hive_user'
HIVE_DATABASE = 'your_hive_database'

# 上传文件目录配置
UPLOAD_FOLDER = '/path/to/your/upload/folder'
ALLOWED_EXTENSIONS = {'txt', 'csv', 'json'} # 允许上传的文件类型


# 检查上传文件类型
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS


# 获取 HDFS 客户端
def get_hdfs_client():
return HdfsClient(hosts=f"{HDFS_HOST}:{HDFS_PORT}", user_name=HDFS_USER)


# 获取 Hive 连接
def get_hive_conn():
return hive.connect(host=HIVE_HOST, port=HIVE_PORT, username=HIVE_USER, database=HIVE_DATABASE)


# 上传文件到 HDFS 和 Hive
@bp.route('/upload', methods=['POST'])
def upload_file():
# 检查文件是否存在
if 'file' not in request.files:
return jsonify({'message': 'No file part'}), 400

file = request.files['file']
if file.filename == '':
return jsonify({'message': 'No file selected for uploading'}), 400

# 检查文件类型是否被允许
if not allowed_file(file.filename):
return jsonify({'message': 'Invalid file type'}), 400

# 生成唯一的文件名,确保不会覆盖已存在的文件
filename = secure_filename(f"{str(uuid.uuid4())}_{file.filename}")

# 将文件保存到本地
file.save(os.path.join(UPLOAD_FOLDER, filename))

# 上传文件到 HDFS
hdfs_client = get_hdfs_client()
hdfs_path = f"{HDFS_ROOT}/{filename}"
with open(os.path.join(UPLOAD_FOLDER, filename), 'rb') as f:
hdfs_client.create(hdfs_path, f)

# 将文件存储到 Hive 表中
hive_conn = get_hive_conn()
hive_cursor = hive_conn.cursor()
hive_cursor.execute(f"LOAD DATA INPATH '{hdfs_path}' INTO TABLE your_hive_table")

return jsonify({'message': 'File uploaded successfully'}), 200

使用脚本服务器直接调脚本

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from sqlalchemy import create_engine
import pandas as pd


def import_csv_to_hive(csv_file_path, database):
try:
# 配置下host和port {hive.host} {hive.port} 替换
db_uri = f'hive://hive:Hive-123@{hive.host}:{hive.port}/default'
hive_engine = create_engine(db_uri)
csv_files = os.listdir(csv_file_path)
csv_files = [f for f in csv_files if f.endswith('.csv')]
success_count = 0
start_index = 0
for i, csv_file in enumerate(csv_files):
table_name = os.path.splitext(csv_file)[0]
csv_file = os.path.join(csv_file_path, csv_file)
try:
df = pd.read_csv(csv_file)
df.columns = map(str.lower, df.columns)
df.head(0).to_sql(table_name, hive_engine, schema=database, index=False, if_exists='replace')
chunksize = 2000
df.to_sql(table_name, hive_engine, schema=database, index=False, if_exists='append', method='multi',
chunksize=chunksize)
hive_engine.execute('COMMIT')
row_count = hive_engine.execute(f'SELECT COUNT(*) FROM {database}.{table_name}').fetchone()[0]
success_count += 1
print(f'Successfully imported {row_count} rows from CSV file {csv_file}.')
except Exception as e:
hive_engine.execute('ROLLBACK')
hive_engine.execute('COMMIT')
error_msg = f'Error message: {e}'
print(error_msg)
print(f'Failed to import CSV file {csv_file}.')
if start_index == 0:
start_index = i
fail_files = csv_files[start_index:]
if start_index > 0:
print(f'Failed to import {len(fail_files)} CSV files: {", ".join(fail_files)}.')
print(f'Successfully imported {success_count} CSV files.')
except Exception as e:
hive_engine.execute('ROLLBACK')
hive_engine.execute('COMMIT')
error_msg = f'Error message: {e}'
print(error_msg)


def main():
csv_file_path = '/opt/NewMiner/data'
database = 'default'
import_csv_to_hive(csv_file_path, database)


if __name__ == '__main__':
main()


HDFS File Upload
https://www.prime.org.cn/2023/03/23/Python-Vue-HDFS-File-Upload/
Author
emroy
Posted on
March 23, 2023
Licensed under