From dbe2d553da776a0732af70200a0f515722a2b3f1 Mon Sep 17 00:00:00 2001 From: Yoo1tic <137816438+Yoo1tic@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:50:09 +0800 Subject: [PATCH] feat: update validation service to handle key writing. output invalid keys to different files. --- src/adapters/mod.rs | 2 +- src/adapters/output/local.rs | 27 +------ src/utils/mod.rs | 2 + src/utils/writer.rs | 16 +++++ src/validation/key_validator.rs | 4 +- src/validation/validation_service.rs | 101 +++++++++++++++++++-------- 6 files changed, 92 insertions(+), 60 deletions(-) create mode 100644 src/utils/writer.rs diff --git a/src/adapters/mod.rs b/src/adapters/mod.rs index de2d31e..5b6b95b 100644 --- a/src/adapters/mod.rs +++ b/src/adapters/mod.rs @@ -2,4 +2,4 @@ pub mod input; pub mod output; pub use input::load_keys_from_txt; -pub use output::write_validated_key_to_tier_writers; + diff --git a/src/adapters/output/local.rs b/src/adapters/output/local.rs index 78a6a57..3c577fd 100644 --- a/src/adapters/output/local.rs +++ b/src/adapters/output/local.rs @@ -1,34 +1,9 @@ use crate::error::ValidatorError; -use crate::types::{GeminiKey, KeyTier, ValidatedKey}; +use crate::types::GeminiKey; use std::{fs, io::Write}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; use toml::Value; use tracing::info; -// Write valid key to appropriate tier file -pub async fn write_validated_key_to_tier_writers( - free_writer: &mut W, - paid_writer: &mut W, - validated_key: &ValidatedKey, -) -> Result<(), ValidatorError> -where - W: AsyncWrite + Unpin, -{ - match validated_key.tier { - KeyTier::Free => { - free_writer - .write_all(format!("{}\n", validated_key.key.as_ref()).as_bytes()) - .await?; - } - KeyTier::Paid => { - paid_writer - .write_all(format!("{}\n", validated_key.key.as_ref()).as_bytes()) - .await?; - } - } - Ok(()) -} - // Write valid key to output file in Clewdr format pub fn write_keys_clewdr_format( file: &mut fs::File, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 7d723a4..1ca7416 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,5 @@ pub mod http; +pub mod writer; pub use http::{client_builder, send_request}; +pub use writer::write_key_into_file; diff --git a/src/utils/writer.rs b/src/utils/writer.rs new file mode 100644 index 0000000..e379ff2 --- /dev/null +++ b/src/utils/writer.rs @@ -0,0 +1,16 @@ +use crate::error::ValidatorError; +use crate::types::ValidatedKey; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +pub async fn write_key_into_file( + writer: &mut W, + validated_key: &ValidatedKey, +) -> Result<(), ValidatorError> +where + W: AsyncWrite + Unpin, +{ + writer + .write_all(format!("{}\n", validated_key.key.as_ref()).as_bytes()) + .await?; + Ok(()) +} diff --git a/src/validation/key_validator.rs b/src/validation/key_validator.rs index 51ef0e9..b40c09d 100644 --- a/src/validation/key_validator.rs +++ b/src/validation/key_validator.rs @@ -12,7 +12,7 @@ pub async fn test_generate_content_api( api_endpoint: impl IntoUrl, api_key: GeminiKey, config: KeyCheckerConfig, -) -> Result { +) -> Result<(), ValidatorError> { let api_endpoint = api_endpoint.into_url().unwrap(); match send_request( @@ -29,7 +29,7 @@ pub async fn test_generate_content_api( "BASIC API VALID - {}... - Passed generate content API test", &api_key.as_ref()[..10] ); - Ok(ValidatedKey::new(api_key)) + Ok(()) } Err(e) => match &e { ValidatorError::HttpBadRequest { .. } diff --git a/src/validation/validation_service.rs b/src/validation/validation_service.rs index 6ad6736..5a8fbde 100644 --- a/src/validation/validation_service.rs +++ b/src/validation/validation_service.rs @@ -1,9 +1,9 @@ use super::key_validator::{test_cache_content_api, test_generate_content_api}; -use crate::adapters::{load_keys_from_txt, write_validated_key_to_tier_writers}; +use crate::adapters::load_keys_from_txt; use crate::config::KeyCheckerConfig; use crate::error::ValidatorError; -use crate::types::GeminiKey; -use crate::utils::client_builder; +use crate::types::{GeminiKey, KeyTier, ValidatedKey}; +use crate::utils::{client_builder, write_key_into_file}; use async_stream::stream; use futures::{pin_mut, stream::StreamExt}; use indicatif::ProgressStyle; @@ -30,6 +30,19 @@ impl ValidationService { pub async fn validate_keys(&self, keys: Vec) -> Result<(), ValidatorError> { let total_keys = keys.len(); + // Create a progress bar to track validation progress + let progress_span = info_span!("key_checker"); + progress_span.pb_set_style( + &ProgressStyle::with_template( + "[{bar:60.cyan/blue}] {pos}/{len} ({percent}%) [{elapsed_precise}] ETA:{eta} Speed:{per_sec}", + ) + .unwrap(), + ); + progress_span.pb_set_length(total_keys as u64); + progress_span.pb_set_message("Validating keys..."); + progress_span.pb_set_finish_message("All items processed"); + let progress_span_enter = progress_span.enter(); + // Create channel for streaming keys from producer to consumer let (tx, mut rx) = mpsc::unbounded_channel::(); let stream = stream! { @@ -46,35 +59,38 @@ impl ValidationService { } } }); - // Create a progress bar to track validation progress - let progress_span = info_span!("key_checker"); - progress_span.pb_set_style( - &ProgressStyle::with_template( - "[{bar:60.cyan/blue}] {pos}/{len} ({percent}%) [{elapsed_precise}] ETA:{eta} Speed:{per_sec}", - ) - .unwrap(), - ); - progress_span.pb_set_length(total_keys as u64); - progress_span.pb_set_message("Validating keys..."); - progress_span.pb_set_finish_message("All items processed"); - let progress_span_enter = progress_span.enter(); - // Create stream to validate keys concurrently (two-stage pipeline) + let (invalid_tx, mut invalid_rx) = mpsc::unbounded_channel::(); + let invalid_stream = stream! { + while let Some(item) = invalid_rx.recv().await { + yield ValidatedKey::new(item); + } + }; + + // Create invalid keys stream let cache_api_url = self.config.cache_api_url(); let valid_keys_stream = stream .map(|key| async move { let result = test_generate_content_api( self.client.clone(), self.full_url.clone(), - key, + key.clone(), self.config.clone(), ) .await; Span::current().pb_inc(1); - result + (key, result) }) .buffer_unordered(self.config.concurrency) - .filter_map(|result| async { result.ok() }) + .filter_map(|(key, result)| async { + match result { + Ok(()) => Some(ValidatedKey::new(key)), + Err(_e) => { + let _ = invalid_tx.send(key); + None + } + } + }) .map(|validated_key| { test_cache_content_api(self.client.clone(), cache_api_url.clone(), validated_key) }) @@ -84,27 +100,50 @@ impl ValidationService { // Open output files for writing keys by tier (fixed filenames) let free_keys_path = "freekey.txt"; let paid_keys_path = "paidkey.txt"; + let invalid_keys_path = "invalidkey.txt"; let free_file = fs::File::create(&free_keys_path).await?; let paid_file = fs::File::create(&paid_keys_path).await?; + let invalid_file = fs::File::create(&invalid_keys_path).await?; let mut free_buffer_writer = tokio::io::BufWriter::new(free_file); let mut paid_buffer_writer = tokio::io::BufWriter::new(paid_file); + let mut invalid_buffer_writer = tokio::io::BufWriter::new(invalid_file); - // Process validated keys and write to appropriate tier files - while let Some(valid_key) = valid_keys_stream.next().await { - if let Err(e) = write_validated_key_to_tier_writers( - &mut free_buffer_writer, - &mut paid_buffer_writer, - &valid_key, - ) - .await - { - error!("Failed to write key to output file: {e}"); + // Spawn task to process invalid keys stream + tokio::spawn(async move { + let mut pinned_stream = Box::pin(invalid_stream); + while let Some(invalid_key) = pinned_stream.next().await { + if let Err(e) = write_key_into_file(&mut invalid_buffer_writer, &invalid_key).await + { + error!("Failed to write invalid key to file: {e}"); + } + } + if let Err(e) = invalid_buffer_writer.flush().await { + error!("Failed to flush invalid keys buffer: {e}"); + } + }); + + // Process all keys and write to appropriate tier files + while let Some(validated_key) = valid_keys_stream.next().await { + match validated_key.tier { + KeyTier::Free => { + if let Err(e) = + write_key_into_file(&mut free_buffer_writer, &validated_key).await + { + error!("Failed to write free key to file: {e}"); + } + } + KeyTier::Paid => { + if let Err(e) = + write_key_into_file(&mut paid_buffer_writer, &validated_key).await + { + error!("Failed to write paid key to file: {e}"); + } + } } } - - // Flush both buffers to ensure all data is written to files + // Flush buffers for valid keys free_buffer_writer.flush().await?; paid_buffer_writer.flush().await?;