diff --git a/src/reducers/balance_by_script.rs b/src/reducers/balance_by_script.rs new file mode 100644 index 0000000..482cd04 --- /dev/null +++ b/src/reducers/balance_by_script.rs @@ -0,0 +1,180 @@ +// CRFA + +// used by crfa in prod, c1 key + +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; +use serde::Deserialize; + +use crate::crosscut::epochs::block_epoch; +use crate::{crosscut, model, prelude::*}; + +#[derive(Deserialize, Copy, Clone)] +pub enum Projection { + Individual, + Total, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, + pub key_addr_type: Option, + pub projection: Projection, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, address: String, epoch_no: u64) -> String { + let def_key_prefix = "balance_by_script"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + } + }; + } + + fn process_inbound_txo( + &mut self, + ctx: &model::BlockContext, + input: &OutputRef, + output: &mut super::OutputPort, + epoch_no: u64 + ) -> Result<(), gasket::error::Error> { + + let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?; + + let utxo = match utxo { + Some(x) => x, + None => return Ok(()) + }; + + let is_script_address = utxo.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = utxo.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + let key = self.config_key(address, epoch_no); + + let amount: i64 = (-1) * utxo.lovelace_amount() as i64; + + let crdt = match &self.config.projection { + Projection::Individual => model::CRDTCommand::GrowOnlySetAdd(key, format!("{}", amount)), + Projection::Total => model::CRDTCommand::PNCounter(key, amount), + }; + + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + fn process_outbound_txo( + &mut self, + tx_output: &MultiEraOutput, + output: &mut super::OutputPort, + epoch_no: u64, + ) -> Result<(), gasket::error::Error> { + let is_script_address = tx_output.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = tx_output.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + let key = self.config_key(address, epoch_no); + + let amount = tx_output.lovelace_amount() as i64; + + let crdt = match &self.config.projection { + Projection::Individual => model::CRDTCommand::GrowOnlySetAdd(key, format!("{}", amount)), + Projection::Total => model::CRDTCommand::PNCounter(key, amount), + }; + + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + + for consumed in tx.consumes().iter().map(|i| i.output_ref()) { + self.process_inbound_txo(&ctx, &consumed, output, epoch_no)?; + } + + for (_, produced) in tx.produces() { + self.process_outbound_txo(&produced, output, epoch_no)?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::BalanceByScript(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/fees_by_script.rs b/src/reducers/fees_by_script.rs new file mode 100644 index 0000000..19eaf9d --- /dev/null +++ b/src/reducers/fees_by_script.rs @@ -0,0 +1,187 @@ +// CRFA +// used by crfa in prod + +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; +use serde::Deserialize; + +use crate::crosscut::epochs::block_epoch; +use crate::{crosscut, model, prelude::*}; + +use std::collections::HashSet; + +#[derive(Deserialize, Copy, Clone)] +pub enum Projection { + Individual, + Total, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, + pub key_addr_type: Option, + pub projection: Projection, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, address: String, epoch_no: u64) -> String { + let def_key_prefix = "fees_by_script"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + } + }; + } + + fn process_inbound_txo( + &mut self, + ctx: &model::BlockContext, + seen: &mut HashSet, + input: &OutputRef, + ) -> Result<(), gasket::error::Error> { + + let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?; + + let utxo = match utxo { + Some(x) => x, + None => return Ok(()) + }; + + let is_script_address = utxo.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = utxo.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + seen.insert(address); + + Ok(()) + } + + fn process_outbound_txo( + &mut self, + seen: &mut HashSet, + tx_output: &MultiEraOutput, + ) -> Result<(), gasket::error::Error> { + let is_script_address = tx_output.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = tx_output.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + seen.insert(address); + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + let mut seen = HashSet::new(); + + for consumed in tx.consumes().iter().map(|i| i.output_ref()) { + self.process_inbound_txo(&ctx, &mut seen, &consumed)?; + } + + for (_, produced) in tx.produces() { + self.process_outbound_txo(&mut seen, &produced)?; + } + + let fee = tx.fee().unwrap_or(0); + + if fee == 0 { + return Ok(()); + } + + let seen_size = seen.len(); + + if seen_size == 0 { + return Ok(()); + } + + let fee_per_addr = fee / (seen_size as u64); + + for addr in seen.iter() { + let key = self.config_key(addr.to_string(), epoch_no); + + let crdt = match &self.config.projection { + Projection::Individual => model::CRDTCommand::GrowOnlySetAdd(key, format!("{}", fee)), + Projection::Total => model::CRDTCommand::PNCounter(key, fee_per_addr as i64), + }; + + output.send(gasket::messaging::Message::from(crdt))?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::FeesByScript(reducer) + } +} diff --git a/src/reducers/mod.rs b/src/reducers/mod.rs index 9420dc5..6b161f9 100644 --- a/src/reducers/mod.rs +++ b/src/reducers/mod.rs @@ -1,8 +1,8 @@ +use serde::Deserialize; use std::time::Duration; use gasket::runtime::spawn_stage; use pallas_traverse::MultiEraBlock; -use serde::Deserialize; use crate::{bootstrap, crosscut, model}; @@ -42,6 +42,24 @@ pub mod utxo_by_stake; #[cfg(feature = "unstable")] pub mod utxos_by_asset; +// CRFA +#[cfg(feature = "unstable")] +pub mod volume_by_address; +#[cfg(feature = "unstable")] +pub mod unique_addresses_by_script; +#[cfg(feature = "unstable")] +pub mod tx_count_by_script; +#[cfg(feature = "unstable")] +pub mod balance_by_script; +#[cfg(feature = "unstable")] +pub mod fees_by_script; +#[cfg(feature = "unstable")] +pub mod transaction_size_by_script; +#[cfg(feature = "unstable")] +pub mod total_tx_count; +#[cfg(feature = "unstable")] +pub mod total_balance; + #[derive(Deserialize)] #[serde(tag = "type")] pub enum Config { @@ -75,6 +93,23 @@ pub enum Config { SupplyByAsset(supply_by_asset::Config), #[cfg(feature = "unstable")] AddressesByStake(addresses_by_stake::Config), + // CRFA + #[cfg(feature = "unstable")] + VolumeByAddress(volume_by_address::Config), + #[cfg(feature = "unstable")] + UniqueAddressesByScript(unique_addresses_by_script::Config), + #[cfg(feature = "unstable")] + TxCountByScript(tx_count_by_script::Config), + #[cfg(feature = "unstable")] + BalanceByScript(balance_by_script::Config), + #[cfg(feature = "unstable")] + FeesByScript(fees_by_script::Config), + #[cfg(feature = "unstable")] + TransactionSizeByScript(transaction_size_by_script::Config), + #[cfg(feature = "unstable")] + TotalTransactionsCount(total_tx_count::Config), + #[cfg(feature = "unstable")] + TotalBalance(total_balance::Config), } impl Config { @@ -114,6 +149,23 @@ impl Config { Config::SupplyByAsset(c) => c.plugin(policy), #[cfg(feature = "unstable")] Config::AddressesByStake(c) => c.plugin(policy), + // CRFA + #[cfg(feature = "unstable")] + Config::VolumeByAddress(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::UniqueAddressesByScript(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::TxCountByScript(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::BalanceByScript(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::FeesByScript(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::TransactionSizeByScript(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::TotalTransactionsCount(c) => c.plugin(chain, policy), + #[cfg(feature = "unstable")] + Config::TotalBalance(c) => c.plugin(chain, policy), } } } @@ -194,6 +246,23 @@ pub enum Reducer { SupplyByAsset(supply_by_asset::Reducer), #[cfg(feature = "unstable")] AddressesByStake(addresses_by_stake::Reducer), + // CRFA + #[cfg(feature = "unstable")] + VolumeByAddress(volume_by_address::Reducer), + #[cfg(feature = "unstable")] + UniqueAddressesByScript(unique_addresses_by_script::Reducer), + #[cfg(feature = "unstable")] + TxCountByScript(tx_count_by_script::Reducer), + #[cfg(feature = "unstable")] + BalanceByScript(balance_by_script::Reducer), + #[cfg(feature = "unstable")] + FeesByScript(fees_by_script::Reducer), + #[cfg(feature = "unstable")] + TransactionSizeByScript(transaction_size_by_script::Reducer), + #[cfg(feature = "unstable")] + TotalTransactionsCount(total_tx_count::Reducer), + #[cfg(feature = "unstable")] + TotalBalance(total_balance::Reducer), } impl Reducer { @@ -234,6 +303,23 @@ impl Reducer { Reducer::SupplyByAsset(x) => x.reduce_block(block, ctx, output), #[cfg(feature = "unstable")] Reducer::AddressesByStake(x) => x.reduce_block(block, ctx, output), + // CRFA + #[cfg(feature = "unstable")] + Reducer::VolumeByAddress(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::UniqueAddressesByScript(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::TxCountByScript(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::BalanceByScript(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::FeesByScript(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::TransactionSizeByScript(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::TotalTransactionsCount(x) => x.reduce_block(block, ctx, output), + #[cfg(feature = "unstable")] + Reducer::TotalBalance(x) => x.reduce_block(block, ctx, output), } } } diff --git a/src/reducers/total_balance.rs b/src/reducers/total_balance.rs new file mode 100644 index 0000000..a25d787 --- /dev/null +++ b/src/reducers/total_balance.rs @@ -0,0 +1,134 @@ +use pallas_traverse::{MultiEraBlock, OutputRef}; +use pallas_traverse::MultiEraOutput; +use serde::Deserialize; + +use crate::crosscut::epochs::block_epoch; + +use crate::{crosscut, model, prelude::*}; + +#[derive(Deserialize, Clone)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, epoch_no: u64) -> String { + let def_key_prefix = "total_balance"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, epoch_no), + None => format!("{}", def_key_prefix.to_string()), + }; + } + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}", prefix), + None => format!("{}", def_key_prefix.to_string()), + }; + } + }; + } + + fn process_inbound_txo( + &mut self, + ctx: &model::BlockContext, + epoch_no: u64, + input: &OutputRef, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?; + + let utxo = match utxo { + Some(x) => x, + None => return Ok(()) + }; + + let is_script_address = utxo.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let key = self.config_key(epoch_no); + + let crdt = model::CRDTCommand::PNCounter(key, -1 * utxo.lovelace_amount() as i64); + + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + fn process_outbound_txo( + &mut self, + epoch_no: u64, + tx_output: &MultiEraOutput, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + let is_script_address = tx_output.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let key = self.config_key(epoch_no); + + let crdt = model::CRDTCommand::PNCounter(key, tx_output.lovelace_amount() as i64); + + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + + for consumed in tx.consumes().iter().map(|i| i.output_ref()) { + self.process_inbound_txo(&ctx, epoch_no, &consumed, output)?; + } + + for (_, produced) in tx.produces() { + self.process_outbound_txo(epoch_no, &produced, output)?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::TotalBalance(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/total_tx_count.rs b/src/reducers/total_tx_count.rs new file mode 100644 index 0000000..6adcbd6 --- /dev/null +++ b/src/reducers/total_tx_count.rs @@ -0,0 +1,114 @@ +use pallas_traverse::MultiEraBlock; +use serde::Deserialize; + +use crate::{crosscut, model, prelude::*}; + +use crate::crosscut::epochs::block_epoch; + +#[derive(Deserialize, Clone)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub key_prefix: Option, + pub aggr_by: Option, + pub filter: Option, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, + epoch_no: u64, + is_valid_trx: bool, + ) -> String { + let def_key_prefix = match is_valid_trx { + true => "total_tx_count.valid", + false => "total_tx_count.invalid" + }; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, epoch_no), + None => format!("{}", def_key_prefix.to_string()), + }; + } + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}", prefix), + None => format!("{}", def_key_prefix.to_string()), + }; + } + }; + } + + pub fn reduce_valid_tx( + &mut self, + epoch_no: u64, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + + let key = self.config_key(epoch_no, true); + + let crdt = model::CRDTCommand::PNCounter(key, 1); + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + pub fn reduce_invalid_tx<'b>( + &mut self, + epoch_no: u64, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + let key = self.config_key(epoch_no, false); + + let crdt = model::CRDTCommand::PNCounter(key, 1); + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + + if tx.is_valid() { + self.reduce_valid_tx(epoch_no, output)?; + } else { + self.reduce_invalid_tx(epoch_no, output)?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + chain: chain.clone(), + policy: policy.clone(), + }; + + super::Reducer::TotalTransactionsCount(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/transaction_size_by_script.rs b/src/reducers/transaction_size_by_script.rs new file mode 100644 index 0000000..31af731 --- /dev/null +++ b/src/reducers/transaction_size_by_script.rs @@ -0,0 +1,179 @@ +// CRFA +// used by crfa in prod + +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; +use serde::Deserialize; + +use crate::crosscut::epochs::block_epoch; +use crate::{crosscut, model, prelude::*}; + +use std::collections::HashSet; + +#[derive(Deserialize, Copy, Clone)] +pub enum Projection { + Individual, + Total, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Clone)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, + pub key_addr_type: Option, + pub projection: Projection, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, address: String, epoch_no: u64) -> String { + let def_key_prefix = "trx_size_by_script"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + } + }; + } + + fn process_inbound_txo( + &mut self, + ctx: &model::BlockContext, + seen: &mut HashSet, + input: &OutputRef, + ) -> Result<(), gasket::error::Error> { + + let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?; + + let utxo = match utxo { + Some(x) => x, + None => return Ok(()) + }; + + let is_script_address = utxo.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = utxo.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + seen.insert(address); + + Ok(()) + } + + fn process_outbound_txo( + &mut self, + seen: &mut HashSet, + tx_output: &MultiEraOutput, + ) -> Result<(), gasket::error::Error> { + let is_script_address = tx_output.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(()); + } + + let address = tx_output.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + seen.insert(address); + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + let mut seen = HashSet::new(); + + for consumed in tx.consumes().iter().map(|i| i.output_ref()) { + self.process_inbound_txo(&ctx, &mut seen, &consumed)?; + } + + for (_, produced) in tx.produces() { + self.process_outbound_txo(&mut seen, &produced)?; + } + + let tx_len = tx.encode().len(); + + if tx_len == 0 { + return Ok(()); + } + + for addr in seen.iter() { + let key = self.config_key(addr.to_string(), epoch_no); + + let crdt = match &self.config.projection { + Projection::Individual => model::CRDTCommand::GrowOnlySetAdd(key, format!("{}", tx_len)), + Projection::Total => model::CRDTCommand::PNCounter(key, tx_len as i64), + }; + + output.send(gasket::messaging::Message::from(crdt))?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::TransactionSizeByScript(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/tx_count_by_script.rs b/src/reducers/tx_count_by_script.rs new file mode 100644 index 0000000..cecd02a --- /dev/null +++ b/src/reducers/tx_count_by_script.rs @@ -0,0 +1,167 @@ +// CRFA +// used by crfa in prod, c2, c3 key + +use std::collections::HashSet; + +use pallas_traverse::{Feature, MultiEraBlock, OutputRef}; +use serde::Deserialize; + +use crate::crosscut::epochs::block_epoch; + +use crate::{crosscut, model, prelude::*}; + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize)] +pub struct Config { + pub key_prefix: Option, + pub aggr_by: Option, + pub key_addr_type: Option, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, address: String, epoch_no: u64) -> String { + let def_key_prefix = "trx_count"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + }; + } + + fn increment_for_address( + &mut self, + address: &str, + output: &mut super::OutputPort, + epoch_no: u64, + ) -> Result<(), gasket::error::Error> { + let key = self.config_key(address.to_string(), epoch_no); + + let crdt = model::CRDTCommand::PNCounter(key, 1); + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + fn find_address_from_output_ref( + &mut self, + ctx: &model::BlockContext, + input: &OutputRef, + ) -> Result, gasket::error::Error> { + let utxo = ctx.find_utxo(input).apply_policy(&self.policy).or_panic()?; + + let utxo = match utxo { + Some(x) => x, + None => return Result::Ok(None) + }; + + let is_script_address = utxo.address().map_or(false, |addr| addr.has_script()); + + if !is_script_address { + return Ok(None); + } + + let address = utxo.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + Ok(Some(address)) + } + + pub fn reduce_block( + &mut self, + block: &MultiEraBlock, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + if block.era().has_feature(Feature::SmartContracts) { + for tx in block.txs() { + let epoch_no = block_epoch(&self.chain, block); + + let input_addresses: Vec<_> = tx + .consumes() + .iter() + .filter_map(|mei| { + let output_ref = mei.output_ref(); + + let maybe_input_address = + self.find_address_from_output_ref(ctx, &output_ref); + + match maybe_input_address { + Ok(maybe_addr) => maybe_addr, + Err(_) => None + } + }) + .collect(); + + let output_addresses: Vec<_> = tx + .produces() + .iter() + .filter(|(_, meo)| meo.address().map_or(false, |a| a.has_script())) + .filter_map(|(_, meo)| meo.address().ok()) + .filter(|addr| addr.has_script()) + .map(|addr| -> String { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .collect(); + + let all_addresses = [&input_addresses[..], &output_addresses[..]].concat(); + let all_addresses_deduped: HashSet = + HashSet::from_iter(all_addresses.iter().cloned()); + + for address in all_addresses_deduped.iter() { + self.increment_for_address(address, output, epoch_no)?; + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::TxCountByScript(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/unique_addresses_by_script.rs b/src/reducers/unique_addresses_by_script.rs new file mode 100644 index 0000000..307f8f8 --- /dev/null +++ b/src/reducers/unique_addresses_by_script.rs @@ -0,0 +1,217 @@ +// CRFA + +use pallas_addresses::ShelleyDelegationPart; +use pallas_addresses::Address; + +use pallas_traverse::MultiEraOutput; +use pallas_traverse::MultiEraBlock; +use serde::Deserialize; + +use crate::{crosscut, model, prelude::*}; + +use crate::crosscut::epochs::block_epoch; + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize, Copy, Clone)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Copy, Clone)] +pub enum AddressType { + Payment, Staking +} + +#[derive(Deserialize)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, + pub addr_type: AddressType, + pub key_addr_type: Option, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + fn config_key(&self, address: &String, epoch_no: u64) -> String { + let def_key_prefix = "unique_addresses_by_script"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + }; + } + + fn process_user_address_given_contract_address( + &mut self, + contract_address: &Address, + user_address: &Address, + epoch_no: u64, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + + let maybe_addr = match user_address { + Address::Shelley(shelley_addr) => { + match &self.config.addr_type { + AddressType::Staking => { + let delegation_part = shelley_addr.delegation(); + + match delegation_part { + ShelleyDelegationPart::Key(_) | ShelleyDelegationPart::Script(_) => { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => Some(delegation_part.to_hex()), + _ => delegation_part.to_bech32().ok() + } + }, + _ => None, + } + }, + AddressType::Payment => { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => Some(shelley_addr.to_hex()), + _ => shelley_addr.to_bech32().ok() + } + }, + } + }, + _ => None, + }; + + if let Some(addr) = &maybe_addr { + + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => { + let key = self.config_key(&contract_address.to_hex(), epoch_no); + + let crdt = model::CRDTCommand::GrowOnlySetAdd(key, addr.to_string()); + + output.send(gasket::messaging::Message::from(crdt))?; + } + _ => { + if let Some(contr_addr) = contract_address.to_bech32().ok() { + let key = self.config_key(&contr_addr, epoch_no); + + let crdt = model::CRDTCommand::GrowOnlySetAdd(key, addr.to_string()); + + output.send(gasket::messaging::Message::from(crdt))?; + } + } + } + } + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + + let enriched_inputs: Vec = tx.consumes().iter() + .flat_map(|mei| ctx.find_utxo(&mei.output_ref()).apply_policy(&self.policy).or_panic().ok()) + .filter_map(|maybe_multi_era_output| maybe_multi_era_output) + .collect(); + + let inputs_have_script = enriched_inputs.iter().find(|meo| { + match meo.address().ok() { + Some(addr) => addr.has_script(), + None => false + } + }); + + let enriched_outputs: Vec<(usize, MultiEraOutput)> = tx.produces(); + + let outputs_have_script = enriched_outputs.iter().find(|(_, meo)| { + match meo.address().ok() { + Some(addr) => addr.has_script(), + None => false + } + }); + + if let Some(meo) = inputs_have_script { + + if let Some(contract_address) = &meo.address().ok() { + + for meo in &enriched_inputs { + match meo.address().ok() { + Some(user_address) if !user_address.has_script() => self.process_user_address_given_contract_address(&contract_address, &user_address, epoch_no, output)?, + _ => (), + } + } + + for (_, meo) in &enriched_outputs { + match meo.address().ok() { + Some(user_address) if !user_address.has_script() => self.process_user_address_given_contract_address(&contract_address, &user_address, epoch_no, output)?, + _ => (), + } + } + + } + } + + if let Some((_, meo)) = outputs_have_script { + + if let Some(contract_address) = &meo.address().ok() { + + for meo in &enriched_inputs { + match meo.address().ok() { + Some(user_address) if !user_address.has_script() => self.process_user_address_given_contract_address(&contract_address, &user_address, epoch_no, output)?, + _ => (), + } + } + + for (_, meo) in &enriched_outputs { + match meo.address().ok() { + Some(user_address) if !user_address.has_script() => self.process_user_address_given_contract_address(&contract_address, &user_address, epoch_no, output)?, + _ => (), + } + } + + } + } + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy, + ) -> super::Reducer { + + let reducer = Reducer { + config: self, + chain: chain.clone(), + policy: policy.clone(), + }; + + super::Reducer::UniqueAddressesByScript(reducer) + } +} \ No newline at end of file diff --git a/src/reducers/volume_by_address.rs b/src/reducers/volume_by_address.rs new file mode 100644 index 0000000..73932a5 --- /dev/null +++ b/src/reducers/volume_by_address.rs @@ -0,0 +1,125 @@ +use pallas_traverse::MultiEraOutput; +use pallas_traverse::MultiEraBlock; +use serde::Deserialize; + +use crate::{crosscut, model, prelude::*}; + +use crate::crosscut::epochs::block_epoch; + +#[derive(Deserialize, Copy, Clone)] +pub enum Projection { + Individual, + Total, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AggrType { + Epoch, +} + +#[derive(Deserialize, Copy, Clone, PartialEq)] +pub enum AddrType { + Hex, +} + +#[derive(Deserialize)] +pub struct Config { + pub key_prefix: Option, + pub filter: Option, + pub aggr_by: Option, + pub key_addr_type: Option, + pub projection: Projection, +} + +pub struct Reducer { + config: Config, + policy: crosscut::policies::RuntimePolicy, + chain: crosscut::ChainWellKnownInfo, +} + +impl Reducer { + + fn config_key(&self, address: String, epoch_no: u64) -> String { + let def_key_prefix = "volume_by_address"; + + match &self.config.aggr_by { + Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, address, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + }, + _ => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}", prefix, address), + None => format!("{}.{}", def_key_prefix.to_string(), address), + }; + } + }; + } + + fn process_produced_txo( + &mut self, + tx_output: &MultiEraOutput, + output: &mut super::OutputPort, + epoch_no: u64 + ) -> Result<(), gasket::error::Error> { + + let address = tx_output.address() + .map(|addr| { + match &self.config.key_addr_type { + Some(addr_typ) if matches!(addr_typ, AddrType::Hex) => addr.to_hex(), + _ => addr.to_string() + } + }) + .or_panic()?; + + let key = self.config_key(address, epoch_no); + + let amount = tx_output.lovelace_amount() as i64; + + let crdt = match &self.config.projection { + Projection::Individual => model::CRDTCommand::GrowOnlySetAdd(key, format!("{}", amount)), + Projection::Total => model::CRDTCommand::PNCounter(key, amount as i64), + }; + + output.send(gasket::messaging::Message::from(crdt))?; + + Ok(()) + } + + pub fn reduce_block<'b>( + &mut self, + block: &'b MultiEraBlock<'b>, + ctx: &model::BlockContext, + output: &mut super::OutputPort, + ) -> Result<(), gasket::error::Error> { + for tx in block.txs().into_iter() { + if filter_matches!(self, block, &tx, ctx) { + let epoch_no = block_epoch(&self.chain, block); + + for (_, produced) in tx.produces() { + self.process_produced_txo(&produced, output, epoch_no)?; + } + + } + } + + Ok(()) + } +} + +impl Config { + pub fn plugin(self, + chain: &crosscut::ChainWellKnownInfo, + policy: &crosscut::policies::RuntimePolicy) -> super::Reducer { + + let reducer = Reducer { + config: self, + policy: policy.clone(), + chain: chain.clone(), + }; + + super::Reducer::VolumeByAddress(reducer) + } +} \ No newline at end of file