Skip to content
This repository has been archived by the owner on Mar 16, 2020. It is now read-only.

Use task names to reduce the risk of duplicate update-feed tasks. #319

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 76 additions & 7 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,60 @@ import (
"appengine/urlfetch"
)

func taskNameShouldEscape(c byte) bool {
switch {
case 'a' <= c && c <= 'z', 'A' <= c && c <= 'Z', '0' <= c && c <= '9':
return false
case '-' == c:
return false
}
// We use underscore to escape other characters, so escape it.
return true
}

/*
Hex-escape characters that are not allowed in appengine task names.
This is like URL hex-escaping, but using '_' instead of '%'.
This means '_' must also be escaped.

https://cloud.google.com/appengine/docs/go/taskqueue#Go_Task_names
[0-9a-zA-Z\-\_]{1,500}

This function does not try to enforce length.

The golang implementation of taskqueue does not enforce any of these rules,
but it seems prudent to comply unless Google changes the documentation.
*/
func taskNameEscape(s string) string {
// Or we could trade memory for cycles and assume hexCount = len(s)
hexCount := 0
for i := 0; i < len(s); i++ {
c := s[i]
if taskNameShouldEscape(c) {
hexCount++
}
}
if hexCount == 0 {
return s
}

t := make([]byte, len(s)+2*hexCount)
j := 0
for i := 0; i < len(s); i++ {
switch c := s[i]; {
case taskNameShouldEscape(c):
t[j] = '_'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
default:
t[j] = s[i]
j++
}
}
return string(t)
}

func ImportOpmlTask(c mpg.Context, w http.ResponseWriter, r *http.Request) {
gn := goon.FromContext(c)
userid := r.FormValue("user")
Expand Down Expand Up @@ -238,25 +292,40 @@ func SubscribeFeed(c mpg.Context, w http.ResponseWriter, r *http.Request) {
}

func UpdateFeeds(c mpg.Context, w http.ResponseWriter, r *http.Request) {
q := datastore.NewQuery("F").KeysOnly().Filter("n <=", time.Now())
q = q.Limit(10 * 60 * 2) // 10/s queue, 2 min cron
now := time.Now()
limit := 10 * 60 * 2 // 10/s queue, 2 min cron
q := datastore.NewQuery("F").Filter("n <=", now).Limit(limit)
it := q.Run(appengine.Timeout(c, time.Minute))
tc := make(chan *taskqueue.Task)
done := make(chan bool)
i := 0
u := routeUrl("update-feed")
var feed Feed
var id string

go taskSender(c, "update-feed", tc, done)
for {
k, err := it.Next(nil)
k, err := it.Next(&feed)
if err == datastore.Done {
break
} else if err != nil {
}
if err != nil {
c.Errorf("next error: %v", err.Error())
break
}
tc <- taskqueue.NewPOSTTask(u, url.Values{
"feed": {k.StringID()},
})
id = k.StringID()
// To guard against queuing duplicate feeds,
// use the feed id (URL) as the task name.
// https://cloud.google.com/appengine/docs/go/taskqueue#Go_Task_names
// This is not an absolute guarantee, but should help.
newTask := taskqueue.NewPOSTTask(u, url.Values{"feed": {id}})
// Include the NextUpdate time because task names endure 7D or so.
// The URL is hex-escaped but hopefully still human-readable.
newTask.Name = fmt.Sprintf("%v_%v",
feed.NextUpdate.UTC().Format("2006-01-02T15-04-05Z07-00"),
taskNameEscape(id))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just base64.URLEncoding.EncodeToString([]byte(id))?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted it to be human-readable, for debugging. Good tooling could fix that. Or if that isn't important, I also considered using a one-way hash.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know there won't be any collisions? i.e., two different feeds that result in the same name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task names look like 2014-12-16T16-58-34Z07-00_http_3A_2F_2Fxkcd_2Ecom_2Fatom_2Exml: the time to 1-sec precision plus the full URL in escaped form. I think 1-sec is plenty precise enough to avoid collisions, but we could add fractional seconds if needed.

With that format I don't see how we could generate collisions — so I must be missing something. Can you explain?

c.Debugf("queuing feed %v", newTask.Name)
tc <- newTask
i++
}
close(tc)
Expand Down
16 changes: 13 additions & 3 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func ParseFeed(c appengine.Context, contentType, origUrl, fetchUrl string, body
return parseFix(c, feed, stories, fetchUrl)
}

func parseAtom(c appengine.Context, body []byte, charsetReader func(string, io.Reader) (io.Reader, error)) (*Feed, []*Story, error) {
func parseAtom(c appengine.Context, body []byte, charsetReader func(string, io.Reader) (io.Reader, error)) (*Feed, []*Story, error) {
var f Feed
var s []*Story
var err error
Expand Down Expand Up @@ -822,8 +822,18 @@ func taskSender(c mpg.Context, queue string, tc chan *taskqueue.Task, done chan
const taskLimit = 100
tasks := make([]*taskqueue.Task, 0, taskLimit)
send := func() {
taskqueue.AddMulti(c, tasks, queue)
c.Infof("added %v tasks", len(tasks))
errorCount := 0
queued, err := taskqueue.AddMulti(c, tasks, queue)
if me, ok := err.(appengine.MultiError); ok {
for i, merr := range me {
if merr == nil {
continue
}
errorCount++
c.Warningf("error for task %v: %v", queued[i], merr)
}
}
c.Infof("added %v tasks with %v errors", len(tasks), errorCount)
tasks = tasks[0:0]
}
for t := range tc {
Expand Down