· 13 phút đọc
Nếu bạn đang tìm kiếm một đường dẫn dữ liệu đơn giản, rẻ tiền để lấy một lượng nhỏ dữ liệu từ API ổn định và lưu trữ trong bộ lưu trữ đám mây, thìchức năng không có máy chủ
là một lựa chọn tốt. Bài đăng này nhằm mục đích trả lời các câu hỏi như những câu hỏi được hiển thị bên dưới
Công ty của tôi không có ngân sách để mua một công cụ như
nămtran
, Tôi nên sử dụng cái gì để lấy dữ liệu từ API?
Tôi có thực sự cần một công cụ điều phối như Airflow cho một lần lấy dữ liệu đơn giản từ API không?
Trong bài viết chúng tôi đề cập đến những gì mộtkhông có máy chủ
chức năng nào có thể và không thể làm, ưu và nhược điểm của nó là gì và hướng dẫn qua một dự án lấy dữ liệu API đơn giản.
Rất có thể là bạn đã nghe nói về thuật ngữkhông có máy chủ
. thuật ngữkhông có máy chủ
có thể gây nhầm lẫn. Trên thực tế, có một máy chủ mà mã của bạn sẽ chạy trên đó.không có máy chủ
đề cập đến thực tế là bạn không phải quản lý bất kỳ máy chủ nào mà chỉ quản lý mã. Chúng tôi sẽ sử dụngAWS Lambda
cho dự án đơn giản này.AWS Lambda
có thể được sử dụng để chạy một chức năng mà không cần phải cung cấp và bảo trì máy chủ.
Bạn viết một hàm và triển khai nó lên AWS Lambda. Ở đó bạn có thể thiết lập chức năng để chạy dựa trênkích hoạt bên ngoài. Hàm lambda sẽ thực thi logic và kết thúc. Lưu ý rằng bạn sẽ có một khoảng thời gian giới hạn (thời gian thực thi tối đa hiện tại để thực thi mã lambda là 15 phút) để logic mã của bạn hoàn tất. Ngoài ra còn có giới hạn về kích thước bộ nhớ (tối đa hiện tại là 3,008GB). Trong bài đăng này, chúng tôi sẽ
- Bắt đầu chức năng lambda.
- Tải dữ liệu từ API giả xuống hệ thống tệp cục bộ.
- Sao chép các tệp đã tải xuống vào AWS S3.
- Dừng chức năng lambda.
- Chức năng lambda sẽ được lên lịch để chạy cứ sau 5 phút.
Giả sử bạn làm việc cho một công ty muốn lấy một số dữ liệu từ API mà bạn có quyền truy cập và đánh giá chất lượng của dữ liệu đó. Trách nhiệm của bạn là lấy dữ liệu này và cung cấp dữ liệu đó thông qua lưu trữ đám mây để các hệ thống khác sử dụng và đánh giá chất lượng. Giả sử rằng đây là lần lấy dữ liệu đầu tiên và duy nhất của bạn (hiện tại). Không có đủ thời gian/tiền bạc để thiết lập/mua công cụ điều phối dữ liệu. Dự án này nhằm xác thực giá trị mà dữ liệu này có thể cung cấp, dựa vào đó công ty của bạn có thể chọn tiếp tục thanh toán cho dữ liệu này hay không.
điều kiện tiên quyết
- Tài khoản AWS
- AWSCLICài đặtVàcấu hình
Tài liệu API
Giả sử rằng API của chúng tôi có tài liệu sau.
- Điểm cuối API: NHẬN
http://jsonplaceholder.typicode.com/posts
để lấy dữ liệu bài đăng của người dùng. - Tham số truy vấn: Dữ liệu trả về từ API được sắp xếp theo user_id. sử dụng
start_user_id
Vàend_user_id
chúng tôi có thể truy vấn một loạt dữ liệu dựa trên id người dùng. - Điểm cuối siêu dữ liệu API: NHẬN
http://jsonplaceholder.typicode.com/number_of_users
, sẽ trả về tổng số bài đăng của người dùng.
Điểm cuối GET sẽ trả về danh sách JSON có định dạng
{"tên người dùng":1,"nhận dạng":1,"tiêu đề":"một số tiêu đề ở đây","thân hình":"một số bài kiểm tra ở đây"}
phân trang
Khi lấy “rất nhiều” dữ liệu từ một API, cách tốt nhất là thực hiện nhiều yêu cầu để lấy dữ liệu theo khối. Định nghĩa về rất nhiều dữ liệu phụ thuộc vào lượng dữ liệu được kéo và độ ổn định của API. Dựa vàoTài liệu API
trên, chúng ta cần
- Lấy tổng số bài viết của người dùng.
- Chia số thành N số cuộc gọi, tùy thuộc vào một được xác định trước
khúc
kích cỡ. Đó là lượng dữ liệu bạn muốn lấy cho mỗi lệnh gọi API. - Thực hiện cuộc gọi API, lấy dữ liệu và ghi dữ liệu vào bộ lưu trữ đĩa cục bộ. Bạn cũng có thể lưu trữ trong bộ nhớ nếu kích thước dữ liệu đủ nhỏ.
- Một số API có thể bị lỗi do mạng hoặc các sự cố khác, hãy thêm tham số thử lại để giải quyết vấn đề này. Trên thực tế, điều này sẽ phụ thuộc vào tính ổn định của API và mạng.
Ghi chú:Tùy thuộc vào kích thước của dữ liệu và bộ nhớ lambda được phân bổ, việc giữ dữ liệu trong bộ nhớ có thể hiệu quả hơn thay vì ghi vào đĩa rồi tải lên S3.
Hãy tạo một thư mục có têndữ liệuKéo
trong thư mục dự án của bạn và trong đó một tập lệnh python có tênlambda_function.py
, bắt đầu với nội dung bên dưới
CHUNK_SIZE= 10000 # được xác định dựa trên API, giới hạn bộ nhớ, thử nghiệmchắc chắn get_num_records():# Hàm giả, để sao chép lệnh gọi GET http://jsonplaceholder.typicode.com/number_of_users trở lại 100000chắc chắn lấy dữ liệu( start_user_id, end_user_id, get_path="http://jsonplaceholder.typicode.com/posts"): http=urllib3.dữ liệu PoolManager()={"tên người dùng": Không có,"nhận dạng": Không có,"tiêu đề": Không có,"thân hình": Không có}thử:r=http.lời yêu cầu("LẤY", get_path, thử lại=urllib3.sử dụng.Thử lại(3), lĩnh vực={"start_user_id": start_user_id,"end_user_id": end_user_id}, ) dữ liệu=json.tải (r.dữ liệu.giải mã ("utf8").thay thế("'",'"'))ngoại trừ Lỗi chính BẰNGe:in(f"Url định dạng sai {get_path}", đ)ngoại trừurllib3.ngoại lệ.MaxRetryErrorBẰNGe:in(f"API không khả dụng tại {get_path}", đ)trở lạidữ liệuchắc chắn parse_data(json_dữ liệu):trở lạif'{json_data.get("userId")},{json_data["id"]},"{json_data["title"]}"\N'chắc chắn write_to_local(ngày, phần, địa điểm="/tmp"): tên_tệp=lộc+ "/" +str(một phần)vớimở (tên_tệp,"w")BẰNGtài liệu:vìcon naiTRONGtập tin dữ liệu.ghi (parse_data (elt))trở lạitên_tệpchắc chắn download_data(N):vìTôiTRONGphạm vi(0, N, CHUNK_SIZE): dữ liệu=get_data(i, i+CHUNK_SIZE) write_to_local(dữ liệu, tôi//CHUNK_SIZE)
Trong đoạn mã trên, chúng ta có các chức năng
get_num_records
để mô phỏng lệnh gọi GET tới API của bạn để lấy tổng số bài đăng của người dùng.download_data
để thực hiện nhiều lệnh gọi API và chỉ truy cập 10.000 bản ghi dữ liệu cho mỗi lần lấy dữ liệu API. 10.000 này được định nghĩa là biến toàn cụcCHUNK_SIZE
. Trong dự án của mình, bạn sẽ cần xác định kích thước này là bao nhiêu tùy thuộc vào độ ổn định của API, giới hạn bộ nhớ, kết nối mạng và kích thước thực tiễn chung tốt nhất cho API đó.parse_data
để chuyển đổi dữ liệu json thành chuỗi định dạng hàng, được ghi vào tệp cục bộ.write_to_local
để ghi đoạn dữ liệu đã tải xuống vào hệ thống tệp cục bộ. Trong trường hợp của chúng tôi nếu chúng tôi có 100.000 bản ghi, chúng tôi sẽ tạo 10 tên tệp0, 1, 2, 3, 4, 5, 6, 7, 8, 9
với dữ liệu cho id người dùng0-10000, 10000-20000,...90000-100000
tương ứng. Lưu ý vị trí hệ thống tệp cục bộ/tmp
sẽ chỉ khả dụng trong khoảng thời gian của một lần chạy chức năng của bạn.
đồng thời
Trong đoạn mã trên, vì chúng tôi đang tải xuống các đoạn một cách độc lập, chúng tôi cũng có thể làm cho nó đồng thời bằng cách sử dụng pythonkhông đồng bộ
thư viện. Nếu bạn đang sử dụng đồng thời, hãy lưu ý đến số lượng kết nối mở/trình xử lý tệp, v.v. Bạn cũng có thể quản lý đồng thời ở cấp hàm lambda như được hiển thịđây.
Lưu ý rằng một số API có thể không cung cấp số lượng bản ghi chính xác, thay vào đó bạn sẽ phải truy vấn cho đến khi không còn dữ liệu nữa (thường sử dụng một số loạitìm_sau
tham số truy vấn). Trong những trường hợp như vậy sẽ khó đạt được đồng thời vì bạn sẽ cần đầu ra từ lệnh gọi trước để thực hiện lệnh gọi API tiếp theo. Một ví dụ về điều này làTìm kiếm đàn hồi
tìm_sau. Ngoài ra còn cótừ, kích thước
nhập các điểm cuối API yêu cầu kích thước dữ liệu cần lấy và id để bắt đầu lấy dữ liệu dưới dạng tham số truy vấn cho lệnh gọi API.
Vì các hàm lambda là tạm thời nên bộ nhớ cục bộ của chúng cũng vậy. Chúng tôi cần đảm bảo dữ liệu được lưu trữ cục bộ của mình (tại/tmp
) được lưu trữ trong một hệ thống lưu trữ liên tục nhưAWS S3
. Thêm mã dưới đây vàolambda_function.py
.
nhập khẩuboto3từngày giờnhập khẩungày giờ, múi giờnhập khẩujsontừhệ điều hànhnhập khẩulà một danh sáchtừos.pathnhập khẩuisfile, tham gianhập khẩuurllib3s3_client=boto3.khách hàng("s3")LOCAL_FILE_SYS= "/tmp"S3_BUCKET= "xô-s3 của bạn" # vui lòng thay thế bằng tên bộ chứa của bạnCHUNK_SIZE= 10000 # được xác định dựa trên API, giới hạn bộ nhớ, thử nghiệmchắc chắn _get_key(): dt_now=ngày giờ.bây giờ (tz=Múi giờ.utc) KHÓA=( dt_now.strftime("%Y-%m-%d")+ "/" +dt_now.strftime("%H")+ "/" +dt_now.strftime("%M")+ "/")trở lạiCHÌA KHÓAchắc chắn get_num_records():# Hàm giả, để sao chép lệnh gọi GET http://jsonplaceholder.typicode.com/number_of_users trở lại 100000chắc chắn lấy dữ liệu( start_user_id, end_user_id, get_path="http://jsonplaceholder.typicode.com/posts"): http=urllib3.dữ liệu PoolManager()={"tên người dùng": Không có,"nhận dạng": Không có,"tiêu đề": Không có,"thân hình": Không có}thử:r=http.lời yêu cầu("LẤY", get_path, thử lại=urllib3.sử dụng.Thử lại(3), lĩnh vực={"start_user_id": start_user_id,"end_user_id": end_user_id}, ) dữ liệu=json.tải (r.dữ liệu.giải mã ("utf8").thay thế("'",'"'))ngoại trừ Lỗi chính BẰNGe:in(f"Url định dạng sai {get_path}", đ)ngoại trừurllib3.ngoại lệ.MaxRetryErrorBẰNGe:in(f"API không khả dụng tại {get_path}", đ)trở lạidữ liệuchắc chắn parse_data(json_dữ liệu):trở lạif'{json_data.get("userId")},{json_data["id"]},"{json_data["title"]}"\N'chắc chắn write_to_local(ngày, phần, địa điểm=LOCAL_FILE_SYS): tên_tệp=lộc+ "/" +str(một phần)vớimở (tên_tệp,"w")BẰNGtài liệu:vìcon naiTRONGtập tin dữ liệu.ghi (parse_data (elt))trở lạitên_tệpchắc chắn download_data(N):vìTôiTRONGphạm vi(0, N, CHUNK_SIZE): dữ liệu=get_data(i, i+CHUNK_SIZE) write_to_local(dữ liệu, tôi//CHUNK_SIZE)chắc chắn lambda_handler(sự kiện, bối cảnh): N=khóa get_num_records() download_data(N)=_get_key() tập tin=[fvìfTRONGlistdir(LOCAL_FILE_SYS)nếu nhưisfile(tham gia(LOCAL_FILE_SYS, f))]vìfTRONGtập tin: s3_client.upload_file(LOCAL_FILE_SYS+ "/" +f, S3_BUCKET, khóa+f)
Trong đoạn mã trên, chúng ta đã thêm một số biến toàn cục và hàm
s3_client
: Đây là ứng dụng khách boto3 s3 được sử dụng để truy cập S3 theo chương trình.dt_now
: Ngày giờ hiện tại theo UTC.S3_BUCKET
: Tên bộ chứa S3 của bạn, vui lòng thay thế tên này bằng tên bộ chứa của bạn. Bạn sẽ tạo nhóm này trong phần tiếp theo._get_key()
: Điều này đại diện cho đường dẫn trong của bạnS3_BUCKET
nơi dữ liệu bạn kéo sẽ được lưu trữ. Chúng tôi sử dụngYYYY-mm-dd/HH/MM
định dạng, ví dụ: thời gian 2020-11-10 9:14 chiều sẽ chuyển thành cấu trúc thư mục của2020-11-10/21/14
và vì chúng tôi lấy dữ liệu cứ sau 5 phút nên điều này sẽ khác nhau đối với mỗi lần chạy lambda. Điều này đảm bảo rằng các lần chạy không ghi đè dữ liệu và làm cho mã của chúng tôi trở nên bình thường dựa trên thời gian chạy (miễn là API cũng là bình thường).lambda_handler
: là điểm vào để bắt đầu thực thi lambda. Ở đây chúng tôi kích hoạtdownload_data
chức năng tải dữ liệu từ API vào hệ thống tệp cục bộ của chúng tôi. Sau đó, mã sẽ tải các tệp cục bộ đã tải xuống lên bộ chứa S3 đã chỉ định.
Lưu ý rằng các biến toàn cục được lưu trong bộ nhớ cache giữa các lần chạy lambda. Đây là lý do tại sao chúng tôi có một_get_key()
chức năng thay vì nó là một biến toàn cục.
Bây giờ chúng ta đã có sẵn mã, hãy thiết lập cơ sở hạ tầng và cấp các quyền thích hợp. Lưu tập lệnh thiết lập bên dưới trong thư mục dự án của bạn dưới dạngsetup_infra.sh
#!/bin/bashnếu như [[$# -eq0 ]];sau đótiếng vang'Vui lòng nhập tên nhóm của bạn là ./setup_infra.sh your-bucket'lối ra0fiAWS_ID=$(aws sts get-caller-identity --query Account --output text | con mèo)AWS_REGION=$(cấu hình aws lấy vùng)tiếng vang"Tạo tệp cấu hình cục bộ"tiếng vang'{"Phiên bản": "17-10-2012","Tuyên bố": [{"Hiệu ứng": "Cho phép","Hoạt động": ["nhật ký:PutLogEvents","nhật ký:Tạo Nhóm Nhật ký","nhật ký:TạoLogStream"],"Tài nguyên": "arn:aws:log:*:*:*"},{"Hiệu ứng": "Cho phép","Hoạt động": ["s3:GetObject","s3:PutObject"],"Tài nguyên": "arn:aws:s3:::'$1'/*"}]}'> ./policyecho'{"Phiên bản": "17-10-2012","Tuyên bố": [{"Hiệu ứng": "Cho phép","Hiệu trưởng": {"Dịch vụ": "lambda.amazonaws.com"},"Hành động": "sts:AssumeRole"}]}'> ./trust-policy.jsonecho'[{"Id": "1","Arn": "arn:aws:lambda:'$AWS_REGION':'$AWS_ID':function:dataPull"}]'> ./target.jsonecho"Đóng gói lambda_function.py cục bộ"dữ liệu cdPullzip -r ../myDeploymentPackage.zip .cd ..echo"Tạo thùng"$1""aws s3api tạo thùng --acl public-read-write --bucket $1 --output text > setup.logecho"Tạo chính sách"aws iam create-policy --policy-name AWSLambdaS3Policy --policy-document file://policy --output text >> setup.logecho"Tạo vai trò"aws iam create-role --role-name lambda-s3-role --assume-role-policy-document file://trust-policy.json --output text >> setup.logecho"Đính kèm chính sách với vai trò"aws iam attachment-role-policy --role-name lambda-s3-role --policy-arn arn:aws:iam::$AWS_ID:policy/AWSLambdaS3Policy --văn bản đầu ra >> setup.logecho"Ngủ 10 giây để cho phép chính sách gắn vào vai trò"ngủ 10secho"Tạo hàm Lambda"aws lambda chức năng tạo --function-name dataPull --runtime python3.7 --role arn:aws:iam::$AWS_ID":"vai trò/lambda-s3-vai trò --handler lambda_function.lambda_handler --zip-file fileb://myDeploymentPackage.zip --timeout60--output văn bản >> setup.logecho"Tạo quy tắc cloudwatch để lên lịch lambda cứ sau 5 phút"sự kiện aws put-rule --name my-scheduled-rule --schedule-expression'tỷ lệ (5 phút)'--output văn bản >> setup.logecho"Đính kèm hàm lambda vào sự kiện rồi đến quy tắc"quyền bổ sung aws lambda --function-name dataPull --statement-id my-scheduled-event --action'lambda: Hàm gọi'--principal events.amazonaws.com --source-arn arn:aws:events:$AWS_REGION:$AWS_ID:rule/my-scheduled-rule --văn bản đầu ra >> setup.logaws sự kiện đặt mục tiêu --rule my- quy tắc theo lịch trình --tệp mục tiêu://mục tiêu.json --văn bản đầu ra >> setup.logecho"Xong"
Cấu trúc thư mục của bạn, sẽ giống như hình bên dưới
chmod755setup_infra.sh# cấp quyền thực thi cho tập lệnh của bạn./setup_infra.sh your-s3-bucket# vui lòng thay thế bằng tên bộ chứa của bạn
Trong kịch bản trên, chúng tôi làm như sau
- Đóng gói mã của chúng tôi
- Tạo một nhóm mà chúng tôi sẽ sử dụng cho dự án này
- Tạo chính sách (Hãy coi chính sách như một cách để cấp quyền cho một số dịch vụ nhất định)
- Tạo một Vai trò (Một Vai trò chứa nhiều chính sách)
- Đính kèm Chính sách từ bước 3 vào Vai trò từ bước 4
- Ngủ trong 10 giây để quá trình đính kèm hoàn tất
- Tải mã của bạn lên và tạo hàm lambda, với thời gian chạy tối đa là 60 giây
- Thiết lập lịch trình để chức năng lambda của bạn chạy 5 phút một lần
- Chúng tôi viết đầu ra của mỗi lệnh vào một tệp có tên
setup.log
trong thư mục của bạn.
Bạn có thể theo dõi các lần chạy lambda bằng cách truy cậpĐồng hồ đám mây AWS
Giao diện người dùng. Đăng nhập vào Bảng điều khiển AWS của bạn.
Đi đếnĐồng hồ đám mây AWS
Đi đếnSự kiện -> Quy tắc
Bấm vàoquy-tắc-của-tôi
Lưu ý các lời gọi lambda
Bạn cũng có thể đặt cảnh báo quaĐồng hồ đám mây AWS
. Bạn cũng có thể kiểm tra các tệp trong bộ chứa s3 của mình bằng lệnh bên dưới.
aws s3 ls s3://your-s3-bucket/ --recursive# vui lòng thay thế bằng tên bộ chứa của bạn
Hãy tạo một tập lệnh để phá bỏ cơ sở hạ tầng. Gọi nó đigiọt nước mắt_down_infra.sh
, với nội dung như bên dưới.
#!/bin/bashnếu như [[$# -eq0 ]];sau đótiếng vang'Vui lòng nhập tên nhóm của bạn là ./setup_infra.sh your-bucket'lối ra0fiAWS_ID=$(aws sts get-caller-identity --query Account --output text | con mèo)tiếng vang"Đang xóa quy tắc lịch biểu của Cloudwatch"sự kiện aws remove-targets --rule my-scheduled-rule --ids"1"--văn bản đầu ra > sự kiện xé_down.logaws quy tắc xóa --tên quy tắc đã lên lịch của tôi --văn bản đầu ra >> tear_down.logaws chức năng xóa lambda --tên hàm dữ liệuPull --văn bản đầu ra >> tear_down.logecho"Xóa vai trò và chính sách cho kết nối lambda - s3"aws iam detach-role-policy --role-name lambda-s3-role --policy-arn arn:aws:iam::$AWS_ID:policy/AWSLambdaS3Policy --xuất văn bản >>ear_down.logaws iam delete-role -- tên vai trò lambda-s3-role --văn bản đầu ra >>ear_down.logaws iam delete-policy --policy-arn arn:aws:iam::$AWS_ID:policy/AWSLambdaS3Policy --văn bản đầu ra >>ear_down.logecho"Xóa xô"$1""aws s3 rm s3://$1 --recursive --output text >>ear_down.logaws s3api delete-bucket --bucket $1 --output text >>ear_down.logecho"Xóa tệp cấu hình cục bộ"rm policyrm target.jsonrm myDeploymentPackage.ziprm trust-policy.jsonrm setup.logrm Tears_down.log
chmod755giọt nước mắt_down_infra.sh# cấp quyền thực thi cho tập lệnh của bạn./tear_down_infra.sh your-s3-bucket# vui lòng thay thế bằng tên bộ chứa của bạn
Cácgiọt nước mắt_down_infra.sh
tập lệnh là phá bỏ cơ sở hạ tầng đã được thiết lập bằng cách sử dụngsetup_infra.sh
. Đảm bảo sử dụng cùng một tên nhóm,
AWS Lambdas phù hợp để kéo dữ liệu nhỏ, nhanh và đơn giản. Tuy nhiên, nói chung nếu bạn đang lấy dữ liệu “lớn” từ một API thì bạn nên sử dụng công cụ điều phối hoặc dịch vụ AWS Batch hoặc AWS Step. Lưu ý rằng ở đây chúng ta chỉ nói về AWS Lambda như một công cụ để lấy dữ liệu từ API. Có các tình huống khác mà AWS Lambdakhông có máy chủ
chức năng là sự lựa chọn tốt nhất.
ưu
- Giá rẻ, vì chúng được tính phí trên 100 mili giây sử dụng.
- Không bảo trì máy chủ.
- Buộc bạn viết mã hiệu quả để giữ thời gian chạy thấp (Điều này nên được thực hiện bất kể).
- Cách nhanh chóng và dễ dàng để kiểm tra dữ liệu API được lên lịch, đơn giản kéo trên đám mây.
- Dễ dàng lên lịch, theo dõi và cảnh báo thông qua
Đồng hồ đám mây AWS
. - Bạn có thể chuyển đầu vào cho lambda với
sự kiện
, nhìn thấycái này.
Nhược điểm
- Giới hạn thời gian: Hiện tại, số phút tối đa mà hàm lambda có thể chạy là 15 phút. Nếu quá trình kéo dữ liệu của bạn mất nhiều thời gian hơn thì chức năng sẽ bị chấm dứt.
- Giới hạn dung lượng: Hiện tại, bộ nhớ tối đa khả dụng cho hàm lambda là 3,008 GB.
- Chèn lấp khéo léo: Chèn lấp dữ liệu sau khi dịch vụ bị gián đoạn với API sẽ yêu cầu can thiệp thủ công hoặc logic tùy chỉnh để xử lý. Trái ngược với các công cụ điều phối ETL khác như
luồng không khí
mà có thể được cấu hình dễ dàng hơn.
Hy vọng bài viết này cung cấp cho bạn một ý tưởng tốt về sự cân bằng liên quan đếnkhông có máy chủ
mô hình liên quan đến việc lấy dữ liệu từ API. Hãy cho tôi biết nếu bạn có bất kỳ câu hỏi/nhận xét nào trong phần bình luận bên dưới.