Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apscheduler==3.6.0
certifi==2018.11.29
chardet==3.0.4
Click==7.0
firebase-admin==5.4.0
Flask==1.0.2
Flask-SQLAlchemy==2.3.2
Flask-Migrate==2.1.1
Expand Down
3 changes: 3 additions & 0 deletions timeline_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flask import Flask, request
from werkzeug.middleware.proxy_fix import ProxyFix
from rws_common import honeycomb
import firebase_admin

from .settings import config
from .api import init_api
Expand All @@ -16,6 +17,8 @@
init_app(app)
init_api(app) # Includes both private (timeline-sync) and public (timeline-api) APIs

default_app = firebase_admin.initialize_app()

@app.route('/heartbeat')
@app.route('/timeline-sync/heartbeat')
def heartbeat():
Expand Down
54 changes: 52 additions & 2 deletions timeline_sync/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
import requests
from .models import db, SandboxToken, TimelinePin, UserTimeline, TimelineTopic, TimelineTopicSubscription, AppGlance
from .utils import get_uid, api_error, pin_valid, glance_valid
from .utils import get_uid, api_error, pin_valid, glance_valid, send_fcm_message, send_fcm_message_to_topics, subscribe_to_fcm_topic, unsubscribe_from_fcm_topic
from .settings import config

import beeline
Expand Down Expand Up @@ -79,6 +79,33 @@ def sync():
}
return jsonify(result)

@api.route('/user/fcm_token/<token>')
def fcm_token():
user_id = get_uid()

if request.method == 'PUT':
fcm_token_json = request.json

fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).one_or_none()
if fcm_token is None:
fcm_token = FcmToken.from_json(fcm_token_json, token, user_id)
if fcm_token is None:
return api_error(400)

db.session.add(fcm_token)
db.session.commit()
# TODO: Also subscribe to user's topics
# TODO: Resend UserTimeline for the pins about the topic

elif request.method == 'DELETE':
fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).first_or_404()
fcm_token.delete()

db.session.commit()
# TODO: Also unsubscribe from user's topics
# TODO: Send UserTimeline to delete pins the user no longer has a subscription for
return 'OK'


@api.route('/user/pins/<pin_id>', methods=['PUT', 'DELETE'])
def user_pin(pin_id):
Expand Down Expand Up @@ -107,6 +134,8 @@ def user_pin(pin_id):
db.session.add(pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.create' })
else: # update pin
try:
pin.update_from_json(pin_json)
Expand All @@ -122,6 +151,8 @@ def user_pin(pin_id):
db.session.add(pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.create' })
except (KeyError, ValueError):
beeline.add_context_field('timeline.failure.cause', 'update_pin')
return api_error(400)
Expand All @@ -138,6 +169,8 @@ def user_pin(pin_id):
pin=pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.delete' })
return 'OK'


Expand All @@ -152,7 +185,6 @@ def get_app_info(timeline_token):
return app_info['app_uuid'], f"uuid:{app_info['app_uuid']}"



@api.route('/shared/pins/<pin_id>', methods=['PUT', 'DELETE'])
def shared_pin(pin_id):
try:
Expand Down Expand Up @@ -198,6 +230,8 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' })
else: # update pin
try:
pin.update_from_json(pin_json)
Expand All @@ -217,12 +251,15 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' })
except (KeyError, ValueError):
beeline.add_context_field('timeline.failure.cause', 'update_pin')
return api_error(400)

elif request.method == 'DELETE':
pin = TimelinePin.query.filter_by(app_uuid=app_uuid, user_id=None, id=pin_id).first_or_404()
topics = pin.topics

# No need to post even old create events, since nobody will render
# them, after all.
Expand All @@ -236,6 +273,9 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.delete' })

return 'OK'


Expand Down Expand Up @@ -276,11 +316,17 @@ def user_subscriptions_manage(topic_string):

db.session.commit()

subscribe_to_fcm_topic(user_id, topic)
send_fcm_message(user_id, { 'type': 'timeline.topic.subscription' })

elif request.method == 'DELETE':
TimelineTopicSubscription.query.filter_by(user_id=user_id, topic=topic).delete()

db.session.commit()

unsubscribe_from_fcm_topic(user_id, topic)
send_fcm_message(user_id, { 'type': 'timeline.topic.unsubscription' })

return 'OK'


Expand All @@ -305,7 +351,11 @@ def user_app_glance():
return api_error(400)

db.session.add(glance)

db.session.commit()

send_fcm_message(user_id, { 'type': 'appglance.slice.create' })

return 'OK'


Expand Down
22 changes: 22 additions & 0 deletions timeline_sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ class SandboxToken(db.Model):

db.Index('sandbox_token_uid_appuuid_index', SandboxToken.user_id, SandboxToken.app_uuid, unique=True)

class FcmToken(db.Model):
__tablename__ = 'fcm_tokens'
token = db.Column(db.String, primary_key=True)
user_id = db.Column(db.Integer)
device_id = db.Column(db.String)
platform = db.Column(db.String)

@classmethod
def from_json(cls, fcm_token_json, token, user_id):
try:
fcm_token = cls(
token=token,
device_id=fcm_token_json['device_id'],
platform=fcm_token_json['platform'],
user_id=user_id,
)
return fcm_token
except (KeyError, ValueError):
return None


db.Index('fcm_token_uid_token_index', FcmToken.user_id, FcmToken.token, unique=True)

class TimelinePin(db.Model):
__tablename__ = 'timeline_pins'
Expand Down
66 changes: 66 additions & 0 deletions timeline_sync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask import request, abort, jsonify
from .settings import config
import datetime
import firebase_admin

import beeline

Expand Down Expand Up @@ -122,3 +123,68 @@ def glance_valid(glance_json):
return False
return True


def send_fcm_message(user_id, data):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

message = firebase_admin.messaging.Message(
data=data,
tokens=tokens,
)

response = firebase_admin.messaging.send_each_for_multicast(message)

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()


def send_fcm_message_to_topics(topics, data):
condition = ' || '.join([f"'{str(topic.id)}' in topics" for topic in topics])

message = firebase_admin.messaging.Message(
data=data,
condition=condition,
)

response = firebase_admin.messaging.send(message)

if not response.success:
return api_error(400)


def subscribe_to_fcm_topic(user_id, topic):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

response = firebase_admin.messaging.subscribe_to_topic(tokens, str(topic.id))

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()

def unsubscribe_from_fcm_topic(user_id, topic):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

response = firebase_admin.messaging.unsubscribe_from_topic(tokens, str(topic.id))

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()