From 31b31d215832c5dd8d5cffdcffa93039cb30362f Mon Sep 17 00:00:00 2001 From: Dustin Pianalto Date: Sat, 6 Mar 2021 23:08:12 -0900 Subject: [PATCH] gRPC snowflake server --- Cargo.toml | 8 +++++- build.rs | 4 +++ proto/snowflake.proto | 15 +++++++++++ src/lib.rs | 53 ++++++++++++++++++++++++++++++++++++-- src/main.rs | 59 ++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 build.rs create mode 100644 proto/snowflake.proto diff --git a/Cargo.toml b/Cargo.toml index cd068e1..7be2047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "snowflake" +name = "snowflake_service" version = "0.1.0" authors = ["Dustin Pianalto "] edition = "2018" @@ -7,3 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tonic = "0.4" +prost = "0.7" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } + +[build-dependencies] +tonic-build = "0.4" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..2d89d31 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/snowflake.proto")?; + Ok(()) +} diff --git a/proto/snowflake.proto b/proto/snowflake.proto new file mode 100644 index 0000000..a3050be --- /dev/null +++ b/proto/snowflake.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package snowflake; + +message Empty { + +} + +message SnowflakeReply { + string id_str = 1; + uint64 id = 2; +} + +service Snowflake { + rpc GetSnowflake (Empty) returns (SnowflakeReply); +} diff --git a/src/lib.rs b/src/lib.rs index d7e09af..43006d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,52 @@ -pub fn run() { - println!("Hello, world!"); +use std::time; +use std::sync::Mutex; +use std::convert::TryInto; + +const EPOCH_TIME: time::Duration = time::Duration::from_millis(1609459200000); +const TIME_MASK: u64 = 0x1FFFFFFFFFF; + +#[derive(Debug, Default)] +pub struct Generator { + last_generated_time: Mutex, + counter: Mutex, + last_counter_rollover: Mutex, + worker_id: u64, +} + +impl Generator { + pub fn new(worker_id: u64) -> Generator { + if worker_id > 1023 { + panic!("Invalid worker id") + } + Generator { + last_generated_time: Mutex::new(0), + counter: Mutex::new(0), + last_counter_rollover: Mutex::new(0), + worker_id: worker_id << 12, + } + } + + pub fn generate_snowflake(&self) -> Result { + let time: u64 = match time::SystemTime::now().duration_since(time::SystemTime::UNIX_EPOCH + EPOCH_TIME) { + Ok(t) => t.as_millis().try_into().unwrap(), + Err(_) => return Err("The epoch appears to be in the past! Check the system time."), + }; + let mut last_generated_time = self.last_generated_time.lock().unwrap(); + let mut counter = self.counter.lock().unwrap(); + let mut last_counter_rollover = self.last_counter_rollover.lock().unwrap(); + if time < *last_generated_time { + return Err("The current time is less than the last generated time! Check the system time."); + } + if *counter == 4095 { + if *last_counter_rollover >= time { + return Err("Too many requests in the current ms"); + } + *last_counter_rollover = time; + *counter = 0; + } else { + *counter += 1; + } + *last_generated_time = time; + Ok(((time & TIME_MASK) << 22) + self.worker_id + *counter) + } } diff --git a/src/main.rs b/src/main.rs index f8205bb..0016043 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,58 @@ -use snowflake; +use snowflake_service; +use std::env; -fn main() { - snowflake::run(); +use tonic::{transport::Server, Request, Response, Status}; + +use snowflake::snowflake_server::{Snowflake, SnowflakeServer}; +use snowflake::{Empty, SnowflakeReply}; + +pub mod snowflake { + tonic::include_proto!("snowflake"); +} + +#[derive(Debug, Default)] +pub struct MySnowflake { + generator: snowflake_service::Generator, +} + +impl MySnowflake { + fn new(worker_id: u64) -> MySnowflake { + let s = snowflake_service::Generator::new(worker_id); + MySnowflake{ + generator: s, + } + } + +} + +#[tonic::async_trait] +impl Snowflake for MySnowflake { + async fn get_snowflake(&self, _request: Request) -> Result, Status> { + let id = match self.generator.generate_snowflake() { + Ok(i) => i, + Err(s) => return Err(Status::resource_exhausted(s)), + }; + let reply = SnowflakeReply { + id, + id_str: id.to_string(), + }; + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "0.0.0.0:50051".parse()?; + let worker_id: u64 = match env::var("WORKER_ID") { + Ok(id) => id.parse::().unwrap(), + Err(_) => panic!("No WORKER_ID env variable found"), + }; + let s = MySnowflake::new(worker_id); + + Server::builder() + .add_service(SnowflakeServer::new(s)) + .serve(addr) + .await?; + + Ok(()) }