0%

SPARK_ON_MINIKUBE示例

spark minikube测试

启动minikube

minikube delete
minikube start —driver docker —cpus 8 —memory 8g
nohup minikube dashboard 2&1 >/dev/null &

docker镜像build

./bin/docker-image-tool.sh -r harbor.dc.servyou-it.com/geosmart-ops/spark -m -t spark:2.4.8 build

 rbac配置

k create namespace spark
k create serviceaccount spark —namespace=spark
k create clusterrolebinding spark-role —clusterrole=edit —serviceaccount=spark:spark —namespace=spark

执行

export K8S_SERVER=$(k config view —output=jsonpath=’{.clusters[].cluster.server}’)
export K8S_SERVER=https://192.168.49.2:8443
export SPARK_HOME=/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/spark-2.4.8-bin-hadoop2.6

spark

1
2
3
4
5
6
7
8
$SPARK_HOME/bin/spark-submit \
--master k8s://$K8S_SERVER \
--deploy-mode cluster \
--name spark-pi \
--conf spark.executor.instances=2 \
--class org.apache.spark.examples.SparkPi \
--verbose \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.8.jar 10000

s3a

$SPARK_HOME/bin/spark-submit \
—master k8s://$K8S_SERVER \
—deploy-mode cluster \
—name spark-pi \
—conf spark.executor.instances=2 \
—class org.apache.spark.examples.SparkPi \
—verbose \
s3a://dboard/user/geosmart/spark-examples_2.11-2.4.8.jar 10000

 log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
21/06/24 13:05:58 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/06/24 13:05:58 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-pi-1624539948750-driver-svc.spark-demo.svc, 7079, None)
21/06/24 13:05:58 INFO BlockManagerMasterEndpoint: Registering block manager spark-pi-1624539948750-driver-svc.spark-demo.svc:7079 with 413.9 MB RAM, BlockManagerId(driver, spark-pi-1624539948750-driver-svc.spark-demo.svc, 7079, None)
21/06/24 13:05:58 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-pi-1624539948750-driver-svc.spark-demo.svc, 7079, None)
21/06/24 13:05:58 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-pi-1624539948750-driver-svc.spark-demo.svc, 7079, None)
21/06/24 13:06:06 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.7:39840) with ID 2
21/06/24 13:06:07 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.6:49498) with ID 1
21/06/24 13:06:07 INFO KubernetesClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
21/06/24 13:06:07 INFO BlockManagerMasterEndpoint: Registering block manager 172.17.0.7:43535 with 413.9 MB RAM, BlockManagerId(2, 172.17.0.7, 43535, None)
21/06/24 13:06:08 INFO BlockManagerMasterEndpoint: Registering block manager 172.17.0.6:43419 with 413.9 MB RAM, BlockManagerId(1, 172.17.0.6, 43419, None)
21/06/24 13:06:08 INFO SparkContext: Starting job: reduce at SparkPi.scala:38
21/06/24 13:06:08 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 2 output partitions
21/06/24 13:06:08 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38)
21/06/24 13:06:08 INFO DAGScheduler: Parents of final stage: List()
21/06/24 13:06:08 INFO DAGScheduler: Missing parents: List()
21/06/24 13:06:08 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents
21/06/24 13:06:09 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.0 KB, free 413.9 MB)
21/06/24 13:06:09 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1381.0 B, free 413.9 MB)
21/06/24 13:06:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on spark-pi-1624539948750-driver-svc.spark-demo.svc:7079 (size: 1381.0 B, free: 413.9 MB)
21/06/24 13:06:09 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184
21/06/24 13:06:09 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1))
21/06/24 13:06:09 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
21/06/24 13:06:09 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 172.17.0.7, executor 2, partition 0, PROCESS_LOCAL, 7885 bytes)
21/06/24 13:06:09 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 172.17.0.6, executor 1, partition 1, PROCESS_LOCAL, 7885 bytes)
21/06/24 13:06:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.17.0.7:43535 (size: 1381.0 B, free: 413.9 MB)
21/06/24 13:06:11 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2093 ms on 172.17.0.7 (executor 2) (1/2)
21/06/24 13:06:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.17.0.6:43419 (size: 1381.0 B, free: 413.9 MB)
21/06/24 13:06:11 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2443 ms on 172.17.0.6 (executor 1) (2/2)
21/06/24 13:06:11 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/06/24 13:06:11 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 3.020 s
21/06/24 13:06:11 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 3.205210 s
Pi is roughly 3.1459757298786495
21/06/24 13:06:11 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1624539948750-driver-svc.spark-demo.svc:4040

pyspark测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
**import pyspark

conf = pyspark.SparkConf()
conf.setMaster("k8s://https://192.168.49.2:8443")

# Worker pods are created from the base Spark docker image.
# If you use another image, specify its name instead.
conf.set("spark.kubernetes.container.image", "harbor.dc.servyou-it.com/geosmart-ops/dolphinscheduler:latest")

# Authentication certificate and token (required to create worker pods):
conf.set("spark.kubernetes.authenticate.caCertFile", "/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/spark/conf/serviceaccount/ca.crt")
conf.set("spark.kubernetes.authenticate.oauthTokenFile", "/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/spark/conf/serviceaccount/token")

# Service account which should be used for the driver
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")

# 2 pods/workers will be created. Can be expanded for larger workloads.
conf.set("spark.executor.instances", "2")

# The DNS alias for the Spark driver. Required by executors to report status.
conf.set("spark.driver.host", "192.168.49.2")

# Port which the Spark shell should bind to and to which executors will report progress
conf.set("spark.driver.port", "20020")

# Initialize spark context, create executors
conf.setAppName('spark-iotest')
sc = pyspark.SparkContext(conf=conf)

# Initialize Spark Session
from pyspark.sql import SparkSession
spark = SparkSession(sc)