spark pyspark使用
Last updated on March 14, 2025 am
🧙 Questions
spark中执行pyspark
☄️ Ideas
from pyhive import hive
import pandas as pd
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
conn = hive.Connection(host = '172.23.39.236', port = 30115, username = 'dehoop', database = 'ispong_db')
query = 'select * from ispong_db.users'
df = pd.read_sql(query, conn)
df.to_csv('/tmp/output.csv', index=False)
conn.close()
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
# 数据
da = [('zhangsan', 13),
('lisi', 14)]
col = ['username', 'age']
# 创建 DataFrame
stu = spark.createDataFrame(data=da, schema=col)
# 显示 DataFrame
stu.show()
output_path = "/tmp/csv"
stu.write.csv(output_path, mode="overwrite", header=True)
# 停止 SparkSession
spark.stop()
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import statsmodels.api as sm
from sklearn.metrics import accuracy_score, recall_score
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42)
# 拟合随机森林模型
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
y_pred_rf = rf_model.predict(X_test)
accuracy_rf = accuracy_score(y_test, y_pred_rf)
recall_rf = recall_score(y_test, y_pred_rf)
print("Random Forest Model:")
print(f"Accuracy: {accuracy_rf:.4f}")
print(f"Recall: {recall_rf:.4f}")
print("Random Forest Feature Importances:")
X_train_df = pd.DataFrame(X_train, columns=[f'feature_{i}' for i in range(X_train.shape[1])])
X_test_df = pd.DataFrame(X_test, columns=[f'feature_{i}' for i in range(X_test.shape[1])])
X_train_df = sm.add_constant(X_train_df)
X_test_df = sm.add_constant(X_test_df)
glm_model = sm.GLM(y_train, X_train_df, family=sm.families.Binomial()).fit()
print("\nGLM Model Summary:")
print(glm_model.summary())
y_pred_glm = glm_model.predict(X_test_df)
y_pred_glm_class = (y_pred_glm >= 0.5).astype(int)
accuracy_glm = accuracy_score(y_test, y_pred_glm_class)
recall_glm = recall_score(y_test, y_pred_glm_class)
print("\nGLM Model:")
print(f"Accuracy: {accuracy_glm:.4f}")
print(f"Recall: {recall_glm:.4f}")
from scipy.integrate import quad
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
def integrand(x):
return x**2
result, error = quad(integrand, 0, 1)
print(result, error)
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
提交到yarn
spark-submit \
--master yarn \
--deploy-mode cluster \
--name test \
--num-executors 4 \
--executor-memory 2G \
--driver-memory 1G \
--conf spark.default.parallelism=2 \
test.py
提交到standalone
spark-submit \
--master spark://ispong-mac.local:7077 \
--conf spark.pyspark.python=/Users/ispong/.pyenv/versions/3.10.4/bin/python \
/Users/ispong/Downloads/test.py
standalone指定python环境
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 9) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
spark-submit \
--master spark://ispong-mac.local:7077 \
--conf spark.pyspark.python=/Users/ispong/.pyenv/versions/3.10.4/bin/python \
/Users/ispong/Downloads/test.py
yarn指定python环境
{
"spark.pyspark.python":"/Users/ispong/.pyenv/versions/3.10.4/bin/python"
}
🔗 Links
spark pyspark使用
https://ispong.isxcode.com/hadoop/spark/spark pyspark使用/