2024-10-25    2024-11-03    0 words  0 min
import requests
from typing import Dict, List, Optional
import getpass

class PocketBaseClient:
    def __init__(self, base_url: str):
        """
        Initialize PocketBase client with base URL
        
        Args:
            base_url (str): Base URL of the PocketBase instance
        """
        self.base_url = base_url.rstrip('/')
        self.token = None
        self.auth_record = None
    
    def authenticate(self, identity: Optional[str] = None, password: Optional[str] = None) -> Dict:
        """
        Authenticate user with PocketBase
        
        Args:
            identity (str, optional): Username or email
            password (str, optional): Password
            
        Returns:
            Dict: Authentication response including token and record
        """
        if identity is None:
            identity = input("Enter username or email: ")
        if password is None:
            password = getpass.getpass("Enter password: ")
            
        url = f"{self.base_url}/api/admins/auth-with-password"
        data = {
            "identity": identity,
            "password": password
        }
        
        try:
            response = requests.post(url, json=data)
            response.raise_for_status()
            
            auth_data = response.json()
            self.token = auth_data['token']
            
            return auth_data
            
        except requests.exceptions.RequestException as e:
            print(f"Authentication failed: {str(e)}")
            if hasattr(e.response, 'json'):
                print(f"Error details: {e.response.json()}")
            raise
    
    def fetch_records(self, filter_query: str, per_page: int = 100) -> List[Dict]:
        """
        Fetch all records matching the filter query
        
        Args:
            filter_query (str): Filter query string
            per_page (int): Number of records per page
            
        Returns:
            List[Dict]: List of matching records
        """
        if not self.token:
            raise ValueError("Not authenticated. Call authenticate() first.")
            
        headers = {
            "Authorization": self.token
        }
        
        url = f"{self.base_url}/api/collections/track_items/records"
        
        all_records = []
        page = 1
        
        while True:
            params = {
                "page": page,
                "perPage": per_page,
                "filter": filter_query,
                "skipTotal": True
            }
            
            try:
                response = requests.get(url, headers=headers, params=params)
                response.raise_for_status()
                
                data = response.json()
                records = data.get('items', [])
                
                if not records:
                    break
                    
                all_records.extend(records)
                page += 1
                
            except requests.exceptions.RequestException as e:
                print(f"Error fetching records: {str(e)}")
                if hasattr(e.response, 'json'):
                    print(f"Error details: {e.response.json()}")
                raise
                
        return all_records
FILTER_QUERY = """(app = "Arc" || app = "Zen Browser" || app = "Code" || app = "iTerm2") && begin_date < "2024-08-19" && begin_date > "2024-08-05" && (title ~ "Jupyter" || title ~ "stratz" || title ~ "shashank-sharma.github.io" || title ~ "dashboard" || title ~ "metadata" || title ~ "tmux")"""
base_url = "http://localhost:8090"
client = PocketBaseClient(base_url)

# Authenticate
try:
    auth_data = client.authenticate()
    print("Authentication successful!")
except Exception as e:
    print(f"Authentication failed: {e}")
Enter username or email: test@gmail.com
Enter password: 路路路路路路路路
Authentication successful!
# Fetch records using the filter
try:
    records = client.fetch_records(FILTER_QUERY)
    print(f"Found {len(records)} matching records")
except Exception as e:
    print(f"Error fetching records: {e}")
Found 2881 matching records
records[0]
from datetime import datetime
from typing import List, Dict

def calculate_durations(records: List[Dict]) -> List[Dict]:
    """
    Calculate duration for each record between begin_date and end_date
    
    Args:
        records (List[Dict]): List of records containing begin_date and end_date
        
    Returns:
        List[Dict]: Same records with added duration information
    """
    for record in records:
        try:
            begin = datetime.fromisoformat(record['begin_date'].replace('Z', '+00:00'))
            end = datetime.fromisoformat(record['end_date'].replace('Z', '+00:00'))
            
            duration = end - begin
            
            record['duration'] = {
                'total_seconds': duration.total_seconds(),
                'minutes': duration.total_seconds() / 60,
                'hours': duration.total_seconds() / 3600,
                'formatted': str(duration),
                'is_negative': duration.total_seconds() < 0
            }
            
        except (KeyError, ValueError) as e:
            record['duration'] = {
                'error': f"Could not calculate duration: {str(e)}",
                'total_seconds': None,
                'minutes': None,
                'hours': None,
                'formatted': None,
                'is_negative': None
            }
    
    return records

records_with_duration = calculate_durations(records)
print(records_with_duration[0]['duration'])
{'total_seconds': 0.0, 'minutes': 0.0, 'hours': 0.0, 'formatted': '0:00:00', 'is_negative': False}
total_duration = 0
for i in records_with_duration:
    total_duration += i['duration']['total_seconds']
total_duration / 60 / 60
from datetime import datetime
from typing import List, Dict
from collections import defaultdict

def aggregate_time_by_date(records: List[Dict]) -> Dict[str, Dict]:
    """
    Aggregate total time spent per date from records
    
    Args:
        records (List[Dict]): List of records with begin_date and duration info
        
    Returns:
        Dict[str, Dict]: Dictionary with date as key and time statistics as value
    """
    daily_stats = defaultdict(lambda: {
        'total_seconds': 0,
        'total_minutes': 0,
        'total_hours': 0,
        'records_count': 0,
        'apps': defaultdict(int),  # Track time per app
        'titles': []  # Track unique titles
    })
    
    for record in records:
        # Get the date part from begin_date
        date_str = datetime.fromisoformat(record['begin_date'].replace('Z', '+00:00')).strftime('%Y-%m-%d')

        # Calculate duration in seconds
        begin = datetime.fromisoformat(record['begin_date'].replace('Z', '+00:00'))
        end = datetime.fromisoformat(record['end_date'].replace('Z', '+00:00'))
        duration = (end - begin).total_seconds()

        # Skip negative durations
        if duration < 0:
            continue

        # Update statistics
        daily_stats[date_str]['total_seconds'] += duration
        daily_stats[date_str]['total_minutes'] += duration / 60
        daily_stats[date_str]['total_hours'] += duration / 3600
        daily_stats[date_str]['records_count'] += 1

        # Track time per app
        app_name = record.get('app', 'Unknown')
        daily_stats[date_str]['apps'][app_name] += duration

        # Track unique titles
        title = record.get('title')
        if title and title not in daily_stats[date_str]['titles']:
            daily_stats[date_str]['titles'].append(title)
    
    # Convert defaultdict to regular dict and format numbers
    formatted_stats = {}
    for date, stats in daily_stats.items():
        formatted_stats[date] = {
            'total_time': {
                'hours': round(stats['total_hours'], 2),
                'minutes': round(stats['total_minutes'], 2),
                'seconds': round(stats['total_seconds'], 2)
            },
            'records_count': stats['records_count'],
            'apps': {
                app: {
                    'hours': round(seconds / 3600, 2),
                    'minutes': round(seconds / 60, 2),
                    'seconds': round(seconds, 2),
                    'percentage': round((seconds / stats['total_seconds']) * 100, 2)
                }
                for app, seconds in stats['apps'].items()
            },
            'unique_titles_count': len(stats['titles']),
            'titles': stats['titles']
        }
    
    return dict(sorted(formatted_stats.items(), reverse=True))  # Sort by date descending

daily_summary = aggregate_time_by_date(records)
for date, stats in daily_summary.items():
    print(f"{date}: {stats['total_time']['minutes']} minutes")
2024-08-18: 93.17 minutes
2024-08-17: 227.47 minutes
2024-08-16: 84.87 minutes
2024-08-15: 91.62 minutes
2024-08-14: 182.38 minutes
2024-08-13: 3.08 minutes
2024-08-12: 51.1 minutes
2024-08-11: 251.5 minutes
2024-08-10: 233.12 minutes
2024-08-09: 67.88 minutes
2024-08-08: 265.03 minutes
2024-08-07: 231.15 minutes
2024-08-06: 14.9 minutes
2024-08-05: 48.82 minutes
date_hours = {}
for i in ddd2.keys():
    date_hours[i] = ddd2[i]['total_time']['hours']
import json
from datetime import datetime

def format_time_mountain_data(date_hours_dict, base_position=10, position_spread=6):
    """
    Convert date-hours dictionary to time-mountain shortcode format.
    
    Args:
        date_hours_dict: Dictionary with dates as keys and hours as values
        base_position: Starting position for first mountain
        position_spread: Spread between mountains for overlap
    
    Returns:
        Formatted string ready to use in Hugo shortcode
    """
    # Convert dates to datetime objects and sort in reverse order (most recent first)
    sorted_dates = sorted(date_hours_dict.items(), 
                         key=lambda x: datetime.strptime(x[0], '%Y-%m-%d'),
                         reverse=False)
        
    # Create the data list
    mountain_data = []
    for idx, (date_str, hours) in enumerate(sorted_dates):
        # Convert date string to datetime
        date = datetime.strptime(date_str, '%Y-%m-%d')
        
        # Format date as short day name
        day_name = date.strftime('%d %b')
        
        # Calculate position with some randomization for overlap
        # Earlier dates will have higher positions
        position = base_position + (idx * position_spread)
        
        # Only include days with non-zero hours
        if hours > 0:
            mountain_data.append({
                "date": day_name,
                "hours": round(hours, 2),
                "position": min(position, 90)  # Cap position to avoid overflow
            })
    
    # Convert to JSON string with escaped quotes
    json_str = json.dumps(mountain_data)
    
    escaped_json = json_str.replace('"', '\\"')
    
    # Format for Hugo shortcode
    hugo_shortcode = f'{{{{< time-mountain data="{escaped_json}" >}}}}'
    
    return hugo_shortcode

# Generate the shortcode
shortcode = format_time_mountain_data(date_hours)
print(shortcode)
{{< time-mountain data="[{\"date\": \"05 Aug\", \"hours\": 0.81, \"position\": 10}, {\"date\": \"06 Aug\", \"hours\": 0.25, \"position\": 16}, {\"date\": \"07 Aug\", \"hours\": 3.85, \"position\": 22}, {\"date\": \"08 Aug\", \"hours\": 4.42, \"position\": 28}, {\"date\": \"09 Aug\", \"hours\": 1.13, \"position\": 34}, {\"date\": \"10 Aug\", \"hours\": 3.89, \"position\": 40}, {\"date\": \"11 Aug\", \"hours\": 4.19, \"position\": 46}, {\"date\": \"12 Aug\", \"hours\": 0.85, \"position\": 52}, {\"date\": \"13 Aug\", \"hours\": 0.05, \"position\": 58}, {\"date\": \"14 Aug\", \"hours\": 3.04, \"position\": 64}, {\"date\": \"15 Aug\", \"hours\": 1.53, \"position\": 70}, {\"date\": \"16 Aug\", \"hours\": 1.41, \"position\": 76}, {\"date\": \"17 Aug\", \"hours\": 3.79, \"position\": 82}, {\"date\": \"18 Aug\", \"hours\": 1.55, \"position\": 88}]" >}}